From 754e4f1bdeb2f89a04058c9f830a903a2a70c627 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Tue, 18 Feb 2020 18:32:33 +0100 Subject: Simplify the thread pool block processor somewhat - Split the worker function up into smaller functions that are a little more readable. - Only dequeue one block at a time. Makes the dequeueing a lot more readable and understandable. Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/winpthread.c | 209 ++++++++++++++-------------------- 1 file changed, 88 insertions(+), 121 deletions(-) diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index aa1a99f..0cb6d02 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -79,52 +79,64 @@ static void free_blk_list(sqfs_block_t *list) } } +static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared) +{ + sqfs_block_t *blk = NULL; + + while (shared->queue == NULL && shared->status == 0) + AWAIT(&shared->queue_cond, &shared->mtx); + + if (shared->status == 0) { + blk = shared->queue; + shared->queue = blk->next; + blk->next = NULL; + + if (shared->queue == NULL) + shared->queue_last = NULL; + } + + return blk; +} + +static void store_completed_block(thread_pool_processor_t *shared, + sqfs_block_t *blk, int status) +{ + sqfs_block_t *it = shared->done, *prev = NULL; + + while (it != NULL) { + if (it->sequence_number >= blk->sequence_number) + break; + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = shared->done; + shared->done = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } + + if (status != 0 && shared->status == 0) + shared->status = status; + + SIGNAL_ALL(&shared->done_cond); +} + static THREAD_TYPE worker_proc(THREAD_ARG arg) { compress_worker_t *worker = arg; thread_pool_processor_t *shared = worker->shared; - sqfs_block_t *it, *prev, *blk = NULL; + sqfs_block_t *blk = NULL; int status = 0; for (;;) { LOCK(&shared->mtx); - if (blk != NULL) { - it = shared->done; - prev = NULL; - - while (it != NULL) { - if (it->sequence_number >= blk->sequence_number) - break; - prev = it; - it = it->next; - } + if (blk != NULL) + store_completed_block(shared, blk, status); - if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - if (status != 0 && shared->status == 0) - shared->status = status; - SIGNAL_ALL(&shared->done_cond); - } - - while (shared->queue == NULL && shared->status == 0) - AWAIT(&shared->queue_cond, &shared->mtx); - - if (shared->status == 0) { - blk = shared->queue; - shared->queue = blk->next; - blk->next = NULL; - - if (shared->queue == NULL) - shared->queue_last = NULL; - } else { - blk = NULL; - } + blk = get_next_work_item(shared); UNLOCK(&shared->mtx); if (blk == NULL) @@ -347,108 +359,68 @@ fail_init: static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc) { - sqfs_block_t *queue, *it, *prev; - - it = proc->done; - prev = NULL; + sqfs_block_t *out; - while (it != NULL && it->sequence_number == proc->dequeue_id) { - prev = it; - it = it->next; - proc->dequeue_id += 1; - } + if (proc->done == NULL) + return NULL; - if (prev == NULL) { - queue = NULL; - } else { - queue = proc->done; - prev->next = NULL; - proc->done = it; - } + if (proc->done->sequence_number != proc->dequeue_id) + return NULL; - return queue; + out = proc->done; + proc->done = out->next; + out->next = NULL; + proc->dequeue_id += 1; + proc->backlog -= 1; + return out; } -static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) +static int process_done_block(thread_pool_processor_t *proc, sqfs_block_t *it) { - sqfs_block_t *it, *head = NULL, **next_ptr = &head; - - while (lhs != NULL && rhs != NULL) { - if (lhs->sequence_number <= rhs->sequence_number) { - it = lhs; - lhs = lhs->next; - } else { - it = rhs; - rhs = rhs->next; - } + sqfs_block_t *fragblk = NULL; + int status = 0; - *next_ptr = it; - next_ptr = &it->next; - } + if (it->flags & SQFS_BLK_IS_FRAGMENT) { + status = process_completed_fragment(&proc->base, it, &fragblk); - it = (lhs != NULL ? lhs : rhs); - *next_ptr = it; - return head; -} + if (fragblk != NULL && status == 0) { + LOCK(&proc->mtx); + proc->dequeue_id = it->sequence_number; + fragblk->sequence_number = it->sequence_number; -static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue) -{ - sqfs_block_t *it, *block = NULL; - int status = 0; - - while (queue != NULL && status == 0) { - it = queue; - queue = it->next; - proc->backlog -= 1; - - if (it->flags & SQFS_BLK_IS_FRAGMENT) { - block = NULL; - status = process_completed_fragment(&proc->base, it, &block); - - if (block != NULL && status == 0) { - LOCK(&proc->mtx); - proc->dequeue_id = it->sequence_number; - block->sequence_number = it->sequence_number; - - if (proc->queue == NULL) { - proc->queue = block; - proc->queue_last = block; - } else { - block->next = proc->queue; - proc->queue = block; - } - - proc->backlog += 1; - proc->done = queue_merge(queue, proc->done); - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - - queue = NULL; + if (proc->queue == NULL) { + proc->queue = fragblk; + proc->queue_last = fragblk; } else { - free(block); + fragblk->next = proc->queue; + proc->queue = fragblk; } + + proc->backlog += 1; + SIGNAL_ALL(&proc->queue_cond); + UNLOCK(&proc->mtx); } else { - status = process_completed_block(&proc->base, it); + free(fragblk); } - - free(it); + } else { + status = process_completed_block(&proc->base, it); } - free_blk_list(queue); + free(it); return status; } static int wait_completed(thread_pool_processor_t *proc) { - sqfs_block_t *queue; + sqfs_block_t *blk; int status; LOCK(&proc->mtx); for (;;) { - queue = try_dequeue(proc); + blk = try_dequeue(proc); status = proc->status; - if (queue != NULL || status != 0) + if (blk != NULL || status != 0) break; AWAIT(&proc->done_cond, &proc->mtx); @@ -456,11 +428,11 @@ static int wait_completed(thread_pool_processor_t *proc) UNLOCK(&proc->mtx); if (status != 0) { - free_blk_list(queue); + free(blk); return status; } - status = process_done_queue(proc, queue); + status = process_done_block(proc, blk); if (status != 0) { LOCK(&proc->mtx); @@ -514,12 +486,7 @@ out: int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - int status; - - LOCK(&thproc->mtx); - status = thproc->status; - SIGNAL_ALL(&thproc->queue_cond); - UNLOCK(&thproc->mtx); + int status = 0; while (status == 0 && thproc->backlog > 0) status = wait_completed(thproc); -- cgit v1.2.3