From 81f9ddf58b4024d51b24123788c860bb08a85b04 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 16 Feb 2020 01:28:43 +0100 Subject: Move all the queue-waiting logic to the thread pool implemenation Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/fileapi.c | 45 +++++++------------- lib/sqfs/block_processor/internal.h | 2 - lib/sqfs/block_processor/serial.c | 5 --- lib/sqfs/block_processor/winpthread.c | 78 +++++++++++++++++++---------------- 4 files changed, 58 insertions(+), 72 deletions(-) diff --git a/lib/sqfs/block_processor/fileapi.c b/lib/sqfs/block_processor/fileapi.c index 098f724..073d92f 100644 --- a/lib/sqfs/block_processor/fileapi.c +++ b/lib/sqfs/block_processor/fileapi.c @@ -7,19 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *block) -{ - int status; - - while (proc->backlog > proc->max_backlog) { - status = wait_completed(proc); - if (status) - return status; - } - - return append_to_work_queue(proc, block); -} - static int add_sentinel_block(sqfs_block_processor_t *proc) { sqfs_block_t *blk = calloc(1, sizeof(*blk)); @@ -30,21 +17,7 @@ static int add_sentinel_block(sqfs_block_processor_t *proc) blk->inode = proc->inode; blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return enqueue_block(proc, blk); -} - -int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, - sqfs_inode_generic_t *inode, sqfs_u32 flags) -{ - if (proc->inode != NULL) - return SQFS_ERROR_SEQUENCE; - - if (flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) - return SQFS_ERROR_UNSUPPORTED; - - proc->inode = inode; - proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK; - return 0; + return append_to_work_queue(proc, blk); } static int flush_block(sqfs_block_processor_t *proc) @@ -60,7 +33,21 @@ static int flush_block(sqfs_block_processor_t *proc) proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK; } - return enqueue_block(proc, block); + return append_to_work_queue(proc, block); +} + +int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, + sqfs_inode_generic_t *inode, sqfs_u32 flags) +{ + if (proc->inode != NULL) + return SQFS_ERROR_SEQUENCE; + + if (flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) + return SQFS_ERROR_UNSUPPORTED; + + proc->inode = inode; + proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK; + return 0; } int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data, diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index cea820f..f26d58e 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -115,6 +115,4 @@ int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, SQFS_INTERNAL int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block); -SQFS_INTERNAL int wait_completed(sqfs_block_processor_t *proc); - #endif /* INTERNAL_H */ diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c index 1d2c2db..e44ce17 100644 --- a/lib/sqfs/block_processor/serial.c +++ b/lib/sqfs/block_processor/serial.c @@ -74,11 +74,6 @@ done: return proc->status; } -int wait_completed(sqfs_block_processor_t *proc) -{ - return proc->status; -} - int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { if (proc->frag_block == NULL || proc->status != 0) diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index fcbb8b4..0ad8475 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -311,35 +311,6 @@ fail_init: } #endif -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) -{ - int status; - - LOCK(&proc->mtx); - status = proc->status; - if (status != 0) - goto out; - - if (block != NULL) { - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; - } - - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - proc->backlog += 1; - block = NULL; - } -out: - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - free(block); - return 0; -} - static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc) { sqfs_block_t *queue, *it, *prev; @@ -433,7 +404,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) return status; } -int wait_completed(sqfs_block_processor_t *proc) +static int wait_completed(sqfs_block_processor_t *proc) { sqfs_block_t *queue; int status; @@ -470,19 +441,54 @@ int wait_completed(sqfs_block_processor_t *proc) return status; } -int sqfs_block_processor_finish(sqfs_block_processor_t *proc) +int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) { - int status = 0; - - append_to_work_queue(proc, NULL); + int status; - while (proc->backlog > 0) { + while (proc->backlog > proc->max_backlog) { status = wait_completed(proc); if (status) return status; } - if (proc->frag_block != NULL) { + LOCK(&proc->mtx); + status = proc->status; + if (status != 0) + goto out; + + if (block != NULL) { + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + block = NULL; + } +out: + SIGNAL_ALL(&proc->queue_cond); + UNLOCK(&proc->mtx); + free(block); + return 0; +} + +int sqfs_block_processor_finish(sqfs_block_processor_t *proc) +{ + int status; + + LOCK(&proc->mtx); + status = proc->status; + SIGNAL_ALL(&proc->queue_cond); + UNLOCK(&proc->mtx); + + while (status == 0 && proc->backlog > 0) + status = wait_completed(proc); + + if (status == 0 && proc->frag_block != NULL) { status = append_to_work_queue(proc, proc->frag_block); proc->frag_block = NULL; -- cgit v1.2.3