From 6d4faedcb53f54253160f1717fac609f922ae0c7 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Thu, 12 Dec 2019 12:48:08 +0100 Subject: Fix thread pool queue accounting - ONLY manipulate the back log counter in the main thread. - Fix the order of operations when submitting blocks. Signed-off-by: David Oberhollenzer --- lib/sqfs/data_writer/common.c | 2 - lib/sqfs/data_writer/pthread.c | 96 ++++++++++++++++++++++++------------------ 2 files changed, 56 insertions(+), 42 deletions(-) (limited to 'lib/sqfs') diff --git a/lib/sqfs/data_writer/common.c b/lib/sqfs/data_writer/common.c index 5fe70ac..d316a6f 100644 --- a/lib/sqfs/data_writer/common.c +++ b/lib/sqfs/data_writer/common.c @@ -77,8 +77,6 @@ void data_writer_store_done(sqfs_data_writer_t *proc, sqfs_block_t *blk, if (status != 0 && proc->status == 0) proc->status = status; - - proc->backlog -= 1; } sqfs_block_t *data_writer_next_work_item(sqfs_data_writer_t *proc) diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c index 1830d07..97114c5 100644 --- a/lib/sqfs/data_writer/pthread.c +++ b/lib/sqfs/data_writer/pthread.c @@ -154,9 +154,19 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) data_writer_cleanup(proc); } -static void append_to_work_queue(sqfs_data_writer_t *proc, - sqfs_block_t *block) +static int append_to_work_queue(sqfs_data_writer_t *proc, + sqfs_block_t *block) { + int status; + + pthread_mutex_lock(&proc->mtx); + status = proc->status; + if (status != 0) { + free(block); + pthread_mutex_unlock(&proc->mtx); + return status; + } + if (proc->queue_last == NULL) { proc->queue = proc->queue_last = block; } else { @@ -168,6 +178,8 @@ static void append_to_work_queue(sqfs_data_writer_t *proc, block->next = NULL; proc->backlog += 1; pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); + return 0; } static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc) @@ -224,6 +236,7 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue) while (queue != NULL && status == 0) { it = queue; queue = it->next; + proc->backlog -= 1; if (it->flags & SQFS_BLK_IS_FRAGMENT) { block = NULL; @@ -275,66 +288,69 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status) return status; } -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) +static int wait_completed(sqfs_data_writer_t *proc) { sqfs_block_t *queue; int status; pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->max_backlog && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->status != 0) { + for (;;) { + queue = try_dequeue(proc); status = proc->status; - pthread_mutex_unlock(&proc->mtx); - free(block); - return status; - } - append_to_work_queue(proc, block); - block = NULL; + if (queue != NULL || status != 0) + break; - queue = try_dequeue(proc); + pthread_cond_wait(&proc->done_cond, &proc->mtx); + } pthread_mutex_unlock(&proc->mtx); + if (status != 0) { + free_blk_list(queue); + return status; + } + status = process_done_queue(proc, queue); if (status != 0) return test_and_set_status(proc, status); - return 0; + return status; } -int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) { - sqfs_block_t *queue; - int status = 0; - - for (;;) { - pthread_mutex_lock(&proc->mtx); - while (proc->backlog > 0 && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); + int status; - if (proc->status != 0) { - status = proc->status; - pthread_mutex_unlock(&proc->mtx); + while (proc->backlog > proc->max_backlog) { + status = wait_completed(proc); + if (status) return status; - } + } - queue = proc->done; - proc->done = NULL; - pthread_mutex_unlock(&proc->mtx); + return append_to_work_queue(proc, block); +} - if (queue == NULL) { - if (proc->frag_block != NULL) { - append_to_work_queue(proc, proc->frag_block); - proc->frag_block = NULL; - continue; - } - break; - } +int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +{ + int status; + + while (proc->backlog > 0) { + status = wait_completed(proc); + if (status) + return status; + } + + if (proc->frag_block != NULL) { + status = data_writer_do_block(proc->frag_block, + proc->workers[0]->cmp, + proc->workers[0]->scratch, + proc->max_block_size); + if (status) + return status; - status = process_done_queue(proc, queue); - if (status != 0) + status = process_done_queue(proc, proc->frag_block); + proc->frag_block = NULL; + if (status) return status; } -- cgit v1.2.3