diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-12-12 12:48:08 +0100 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-12-12 12:48:33 +0100 |
commit | 6d4faedcb53f54253160f1717fac609f922ae0c7 (patch) | |
tree | a082bc838a8d08df2769e1da11e600946f7354a9 /lib/sqfs/data_writer | |
parent | 40075fe0751a06c3373f53f36d44a27e79c2cca7 (diff) |
Fix thread pool queue accounting
- ONLY manipulate the back log counter in the main thread.
- Fix the order of operations when submitting blocks.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/data_writer')
-rw-r--r-- | lib/sqfs/data_writer/common.c | 2 | ||||
-rw-r--r-- | lib/sqfs/data_writer/pthread.c | 96 |
2 files changed, 56 insertions, 42 deletions
diff --git a/lib/sqfs/data_writer/common.c b/lib/sqfs/data_writer/common.c index 5fe70ac..d316a6f 100644 --- a/lib/sqfs/data_writer/common.c +++ b/lib/sqfs/data_writer/common.c @@ -77,8 +77,6 @@ void data_writer_store_done(sqfs_data_writer_t *proc, sqfs_block_t *blk, if (status != 0 && proc->status == 0) proc->status = status; - - proc->backlog -= 1; } sqfs_block_t *data_writer_next_work_item(sqfs_data_writer_t *proc) diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c index 1830d07..97114c5 100644 --- a/lib/sqfs/data_writer/pthread.c +++ b/lib/sqfs/data_writer/pthread.c @@ -154,9 +154,19 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) data_writer_cleanup(proc); } -static void append_to_work_queue(sqfs_data_writer_t *proc, - sqfs_block_t *block) +static int append_to_work_queue(sqfs_data_writer_t *proc, + sqfs_block_t *block) { + int status; + + pthread_mutex_lock(&proc->mtx); + status = proc->status; + if (status != 0) { + free(block); + pthread_mutex_unlock(&proc->mtx); + return status; + } + if (proc->queue_last == NULL) { proc->queue = proc->queue_last = block; } else { @@ -168,6 +178,8 @@ static void append_to_work_queue(sqfs_data_writer_t *proc, block->next = NULL; proc->backlog += 1; pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); + return 0; } static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc) @@ -224,6 +236,7 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue) while (queue != NULL && status == 0) { it = queue; queue = it->next; + proc->backlog -= 1; if (it->flags & SQFS_BLK_IS_FRAGMENT) { block = NULL; @@ -275,66 +288,69 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status) return status; } -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) +static int wait_completed(sqfs_data_writer_t *proc) { sqfs_block_t *queue; int status; pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->max_backlog && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->status != 0) { + for (;;) { + queue = try_dequeue(proc); status = proc->status; - pthread_mutex_unlock(&proc->mtx); - free(block); - return status; - } - append_to_work_queue(proc, block); - block = NULL; + if (queue != NULL || status != 0) + break; - queue = try_dequeue(proc); + pthread_cond_wait(&proc->done_cond, &proc->mtx); + } pthread_mutex_unlock(&proc->mtx); + if (status != 0) { + free_blk_list(queue); + return status; + } + status = process_done_queue(proc, queue); if (status != 0) return test_and_set_status(proc, status); - return 0; + return status; } -int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) { - sqfs_block_t *queue; - int status = 0; - - for (;;) { - pthread_mutex_lock(&proc->mtx); - while (proc->backlog > 0 && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); + int status; - if (proc->status != 0) { - status = proc->status; - pthread_mutex_unlock(&proc->mtx); + while (proc->backlog > proc->max_backlog) { + status = wait_completed(proc); + if (status) return status; - } + } - queue = proc->done; - proc->done = NULL; - pthread_mutex_unlock(&proc->mtx); + return append_to_work_queue(proc, block); +} - if (queue == NULL) { - if (proc->frag_block != NULL) { - append_to_work_queue(proc, proc->frag_block); - proc->frag_block = NULL; - continue; - } - break; - } +int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +{ + int status; + + while (proc->backlog > 0) { + status = wait_completed(proc); + if (status) + return status; + } + + if (proc->frag_block != NULL) { + status = data_writer_do_block(proc->frag_block, + proc->workers[0]->cmp, + proc->workers[0]->scratch, + proc->max_block_size); + if (status) + return status; - status = process_done_queue(proc, queue); - if (status != 0) + status = process_done_queue(proc, proc->frag_block); + proc->frag_block = NULL; + if (status) return status; } |