diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-24 02:56:42 +0200 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-24 03:04:38 +0200 |
commit | c56c830ae96ed000c999fb93c23bbaad0303acf9 (patch) | |
tree | 589d038e56f6000e55b3cfce5cc74e64304958c9 | |
parent | 87d577a66eb3b1aaca91c4841445cccaf151ee81 (diff) |
Cleanup pthread based block processing code
Break convoluted, long functions up into smaller ones where the control
flow (especially locking and signalling) is more easily readable and
remove some copy and paste clean up code.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r-- | lib/sqfs/blk_proc/process_block.c | 5 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 223 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/serial.c | 14 |
3 files changed, 120 insertions, 122 deletions
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index b303278..ccc6df0 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -15,6 +15,11 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, { ssize_t ret; + if (block->size == 0) { + block->checksum = 0; + return 0; + } + block->checksum = crc32(0, block->data, block->size); if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) { diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index ee73e52..a3dc01f 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,10 +7,21 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static void store_completed_block(sqfs_block_processor_t *shared, - sqfs_block_t *blk) +static void free_blk_list(sqfs_block_t *list) { - sqfs_block_t *it = shared->done, *prev = NULL; + sqfs_block_t *it; + + while (list != NULL) { + it = list; + list = list->next; + free(it); + } +} + +static void store_completed_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk, int status) +{ + sqfs_block_t *it = proc->done, *prev = NULL; while (it != NULL) { if (it->sequence_number >= blk->sequence_number) @@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared, } if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; + blk->next = proc->done; + proc->done = blk; } else { blk->next = prev->next; prev->next = blk; } + + if (status != 0 && proc->status == 0) + proc->status = status; + + proc->backlog -= 1; + pthread_cond_broadcast(&proc->done_cond); +} + +static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + + while (proc->queue == NULL) { + if (proc->terminate || proc->status != 0) + return NULL; + + pthread_cond_wait(&proc->queue_cond, &proc->mtx); + } + + blk = proc->queue; + proc->queue = blk->next; + blk->next = NULL; + + if (proc->queue == NULL) + proc->queue_last = NULL; + + return blk; } static void *worker_proc(void *arg) @@ -37,33 +75,14 @@ static void *worker_proc(void *arg) for (;;) { pthread_mutex_lock(&shared->mtx); - if (blk != NULL) { - store_completed_block(shared, blk); - shared->backlog -= 1; - - if (status != 0 && shared->status == 0) - shared->status = status; - pthread_cond_broadcast(&shared->done_cond); - } + if (blk != NULL) + store_completed_block(shared, blk, status); - while (shared->queue == NULL && !shared->terminate && - shared->status == 0) { - pthread_cond_wait(&shared->queue_cond, - &shared->mtx); - } + blk = get_next_work_item(shared); + pthread_mutex_unlock(&shared->mtx); - if (shared->terminate || shared->status != 0) { - pthread_mutex_unlock(&shared->mtx); + if (blk == NULL) break; - } - - blk = shared->queue; - shared->queue = blk->next; - blk->next = NULL; - - if (shared->queue == NULL) - shared->queue_last = NULL; - pthread_mutex_unlock(&shared->mtx); status = sqfs_block_process(blk, worker->cmp, worker->scratch, shared->max_block_size); @@ -161,7 +180,6 @@ fail_init: void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { - sqfs_block_t *blk; unsigned int i; pthread_mutex_lock(&proc->mtx); @@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - while (proc->queue != NULL) { - blk = proc->queue; - proc->queue = blk->next; - free(blk); + free_blk_list(proc->queue); + free_blk_list(proc->done); + free(proc->fragments); + free(proc->blocks); + free(proc); +} + +static void append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) +{ + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; } - while (proc->done != NULL) { - blk = proc->done; - proc->done = blk->next; - free(blk); + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + pthread_cond_broadcast(&proc->queue_cond); +} + +static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc) +{ + sqfs_block_t *block = NULL; + + if (proc->done != NULL && + proc->done->sequence_number == proc->dequeue_id) { + block = proc->done; + proc->done = proc->done->next; + proc->dequeue_id += 1; } - free(proc->fragments); - free(proc->blocks); - free(proc); + return block; } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *completed = NULL; int status; pthread_mutex_lock(&proc->mtx); @@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, goto fail; } - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - - if (block->size == 0) { - block->checksum = 0; - store_completed_block(proc, block); - } else { - while (proc->backlog > proc->max_backlog) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; - } - - proc->backlog += 1; - } + while (proc->backlog > proc->max_backlog) + pthread_cond_wait(&proc->done_cond, &proc->mtx); - if (proc->done != NULL && - proc->done->sequence_number == proc->dequeue_id) { - block = proc->done; - proc->done = proc->done->next; - proc->dequeue_id += 1; - } else { - block = NULL; - } + completed = get_completed_if_avail(proc); - pthread_cond_broadcast(&proc->queue_cond); + append_to_work_queue(proc, block); + block = NULL; pthread_mutex_unlock(&proc->mtx); - if (block == NULL) - return 0; + if (completed != NULL) { + status = process_completed_block(proc, completed); - status = process_completed_block(proc, block); - if (status != 0) { - pthread_mutex_lock(&proc->mtx); - proc->status = status; - goto fail; + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + goto fail; + } } - free(block); + free(completed); return 0; fail: pthread_mutex_unlock(&proc->mtx); free(block); + free(completed); return status; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { - sqfs_block_t *queue, *it; - int status; + sqfs_block_t *it; + int status = 0; pthread_mutex_lock(&proc->mtx); while (proc->backlog > 0 && proc->status == 0) @@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (proc->status != 0) { status = proc->status; - goto fail; + goto out; } - for (it = proc->done; it != NULL; it = it->next) { - if (it->sequence_number != proc->dequeue_id++) { + while (proc->done != NULL) { + it = get_completed_if_avail(proc); + + if (it == NULL) { status = SQFS_ERROR_INTERNAL; - proc->status = status; - goto fail; + } else { + status = process_completed_block(proc, it); + free(it); } - } - - queue = proc->done; - proc->done = NULL; - pthread_mutex_unlock(&proc->mtx); - - while (queue != NULL) { - it = queue; - queue = queue->next; - it->next = NULL; - status = process_completed_block(proc, it); - free(it); if (status != 0) { - pthread_mutex_lock(&proc->mtx); proc->status = status; pthread_cond_broadcast(&proc->queue_cond); - goto fail; + goto out; } } - - return 0; -fail: - while (proc->queue != NULL) { - it = proc->queue; - proc->queue = it->next; - free(it); - } - - while (proc->done != NULL) { - it = proc->done; - proc->done = it->next; - free(it); - } - +out: + free_blk_list(proc->queue); + free_blk_list(proc->done); + proc->queue = NULL; + proc->done = NULL; pthread_mutex_unlock(&proc->mtx); return status; } diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index 850c33d..06c811e 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -59,16 +59,12 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, return proc->status; } - if (block->size == 0) { - block->checksum = 0; - } else { - proc->status = sqfs_block_process(block, proc->cmp, - proc->scratch, - proc->max_block_size); - } + proc->status = sqfs_block_process(block, proc->cmp, proc->scratch, + proc->max_block_size); + + if (proc->status == 0) + proc->status = process_completed_block(proc, block); - block->next = NULL; - proc->status = process_completed_block(proc, block); free(block); return proc->status; } |