From c56c830ae96ed000c999fb93c23bbaad0303acf9 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Tue, 24 Sep 2019 02:56:42 +0200 Subject: Cleanup pthread based block processing code Break convoluted, long functions up into smaller ones where the control flow (especially locking and signalling) is more easily readable and remove some copy and paste clean up code. Signed-off-by: David Oberhollenzer --- lib/sqfs/blk_proc/process_block.c | 5 + lib/sqfs/blk_proc/pthread.c | 223 +++++++++++++++++++------------------- lib/sqfs/blk_proc/serial.c | 14 +-- 3 files changed, 120 insertions(+), 122 deletions(-) (limited to 'lib/sqfs') diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index b303278..ccc6df0 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -15,6 +15,11 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, { ssize_t ret; + if (block->size == 0) { + block->checksum = 0; + return 0; + } + block->checksum = crc32(0, block->data, block->size); if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) { diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index ee73e52..a3dc01f 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,10 +7,21 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static void store_completed_block(sqfs_block_processor_t *shared, - sqfs_block_t *blk) +static void free_blk_list(sqfs_block_t *list) { - sqfs_block_t *it = shared->done, *prev = NULL; + sqfs_block_t *it; + + while (list != NULL) { + it = list; + list = list->next; + free(it); + } +} + +static void store_completed_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk, int status) +{ + sqfs_block_t *it = proc->done, *prev = NULL; while (it != NULL) { if (it->sequence_number >= blk->sequence_number) @@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared, } if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; + blk->next = proc->done; + proc->done = blk; } else { blk->next = prev->next; prev->next = blk; } + + if (status != 0 && proc->status == 0) + proc->status = status; + + proc->backlog -= 1; + pthread_cond_broadcast(&proc->done_cond); +} + +static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + + while (proc->queue == NULL) { + if (proc->terminate || proc->status != 0) + return NULL; + + pthread_cond_wait(&proc->queue_cond, &proc->mtx); + } + + blk = proc->queue; + proc->queue = blk->next; + blk->next = NULL; + + if (proc->queue == NULL) + proc->queue_last = NULL; + + return blk; } static void *worker_proc(void *arg) @@ -37,33 +75,14 @@ static void *worker_proc(void *arg) for (;;) { pthread_mutex_lock(&shared->mtx); - if (blk != NULL) { - store_completed_block(shared, blk); - shared->backlog -= 1; - - if (status != 0 && shared->status == 0) - shared->status = status; - pthread_cond_broadcast(&shared->done_cond); - } + if (blk != NULL) + store_completed_block(shared, blk, status); - while (shared->queue == NULL && !shared->terminate && - shared->status == 0) { - pthread_cond_wait(&shared->queue_cond, - &shared->mtx); - } + blk = get_next_work_item(shared); + pthread_mutex_unlock(&shared->mtx); - if (shared->terminate || shared->status != 0) { - pthread_mutex_unlock(&shared->mtx); + if (blk == NULL) break; - } - - blk = shared->queue; - shared->queue = blk->next; - blk->next = NULL; - - if (shared->queue == NULL) - shared->queue_last = NULL; - pthread_mutex_unlock(&shared->mtx); status = sqfs_block_process(blk, worker->cmp, worker->scratch, shared->max_block_size); @@ -161,7 +180,6 @@ fail_init: void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { - sqfs_block_t *blk; unsigned int i; pthread_mutex_lock(&proc->mtx); @@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - while (proc->queue != NULL) { - blk = proc->queue; - proc->queue = blk->next; - free(blk); + free_blk_list(proc->queue); + free_blk_list(proc->done); + free(proc->fragments); + free(proc->blocks); + free(proc); +} + +static void append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) +{ + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; } - while (proc->done != NULL) { - blk = proc->done; - proc->done = blk->next; - free(blk); + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + pthread_cond_broadcast(&proc->queue_cond); +} + +static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc) +{ + sqfs_block_t *block = NULL; + + if (proc->done != NULL && + proc->done->sequence_number == proc->dequeue_id) { + block = proc->done; + proc->done = proc->done->next; + proc->dequeue_id += 1; } - free(proc->fragments); - free(proc->blocks); - free(proc); + return block; } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *completed = NULL; int status; pthread_mutex_lock(&proc->mtx); @@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, goto fail; } - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - - if (block->size == 0) { - block->checksum = 0; - store_completed_block(proc, block); - } else { - while (proc->backlog > proc->max_backlog) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; - } - - proc->backlog += 1; - } + while (proc->backlog > proc->max_backlog) + pthread_cond_wait(&proc->done_cond, &proc->mtx); - if (proc->done != NULL && - proc->done->sequence_number == proc->dequeue_id) { - block = proc->done; - proc->done = proc->done->next; - proc->dequeue_id += 1; - } else { - block = NULL; - } + completed = get_completed_if_avail(proc); - pthread_cond_broadcast(&proc->queue_cond); + append_to_work_queue(proc, block); + block = NULL; pthread_mutex_unlock(&proc->mtx); - if (block == NULL) - return 0; + if (completed != NULL) { + status = process_completed_block(proc, completed); - status = process_completed_block(proc, block); - if (status != 0) { - pthread_mutex_lock(&proc->mtx); - proc->status = status; - goto fail; + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + goto fail; + } } - free(block); + free(completed); return 0; fail: pthread_mutex_unlock(&proc->mtx); free(block); + free(completed); return status; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { - sqfs_block_t *queue, *it; - int status; + sqfs_block_t *it; + int status = 0; pthread_mutex_lock(&proc->mtx); while (proc->backlog > 0 && proc->status == 0) @@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (proc->status != 0) { status = proc->status; - goto fail; + goto out; } - for (it = proc->done; it != NULL; it = it->next) { - if (it->sequence_number != proc->dequeue_id++) { + while (proc->done != NULL) { + it = get_completed_if_avail(proc); + + if (it == NULL) { status = SQFS_ERROR_INTERNAL; - proc->status = status; - goto fail; + } else { + status = process_completed_block(proc, it); + free(it); } - } - - queue = proc->done; - proc->done = NULL; - pthread_mutex_unlock(&proc->mtx); - - while (queue != NULL) { - it = queue; - queue = queue->next; - it->next = NULL; - status = process_completed_block(proc, it); - free(it); if (status != 0) { - pthread_mutex_lock(&proc->mtx); proc->status = status; pthread_cond_broadcast(&proc->queue_cond); - goto fail; + goto out; } } - - return 0; -fail: - while (proc->queue != NULL) { - it = proc->queue; - proc->queue = it->next; - free(it); - } - - while (proc->done != NULL) { - it = proc->done; - proc->done = it->next; - free(it); - } - +out: + free_blk_list(proc->queue); + free_blk_list(proc->done); + proc->queue = NULL; + proc->done = NULL; pthread_mutex_unlock(&proc->mtx); return status; } diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index 850c33d..06c811e 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -59,16 +59,12 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, return proc->status; } - if (block->size == 0) { - block->checksum = 0; - } else { - proc->status = sqfs_block_process(block, proc->cmp, - proc->scratch, - proc->max_block_size); - } + proc->status = sqfs_block_process(block, proc->cmp, proc->scratch, + proc->max_block_size); + + if (proc->status == 0) + proc->status = process_completed_block(proc, block); - block->next = NULL; - proc->status = process_completed_block(proc, block); free(block); return proc->status; } -- cgit v1.2.3