From 3215679c2c87f3f40809f5eb75b5c679bcb4879d Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Wed, 19 Feb 2020 15:32:47 +0100 Subject: Restructure thread pool block processor Implement the io-queue based design as outline in doc/parallelism.txt Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/common.c | 20 ++- lib/sqfs/block_processor/internal.h | 4 +- lib/sqfs/block_processor/winpthread.c | 291 +++++++++++++++++++++++----------- 3 files changed, 212 insertions(+), 103 deletions(-) (limited to 'lib') diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index 674e4cf..ab5d1bc 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -21,8 +21,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) if (blk->flags & SQFS_BLK_IS_SPARSE) { sqfs_inode_make_extended(blk->inode); blk->inode->data.file_ext.sparse += blk->size; - blk->inode->extra[blk->inode->num_file_blocks] = 0; - blk->inode->num_file_blocks += 1; + blk->inode->extra[blk->index] = 0; + + if (blk->index >= blk->inode->num_file_blocks) + blk->inode->num_file_blocks = blk->index + 1; proc->stats.sparse_block_count += 1; } else if (blk->size != 0) { @@ -36,8 +38,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) if (err) return err; } else { - blk->inode->extra[blk->inode->num_file_blocks] = size; - blk->inode->num_file_blocks += 1; + blk->inode->extra[blk->index] = size; + + if (blk->index >= blk->inode->num_file_blocks) + blk->inode->num_file_blocks = blk->index + 1; proc->stats.data_block_count += 1; } @@ -95,8 +99,10 @@ int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, if (frag->flags & SQFS_BLK_IS_SPARSE) { sqfs_inode_make_extended(frag->inode); frag->inode->data.file_ext.sparse += frag->size; - frag->inode->extra[frag->inode->num_file_blocks] = 0; - frag->inode->num_file_blocks += 1; + frag->inode->extra[frag->index] = 0; + + if (frag->index >= frag->inode->num_file_blocks) + frag->inode->num_file_blocks = frag->index + 1; proc->stats.sparse_block_count += 1; return 0; @@ -187,6 +193,7 @@ static int flush_block(sqfs_block_processor_t *proc) proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK; } + block->index = proc->blk_index++; return append_to_work_queue(proc, block); } @@ -201,6 +208,7 @@ int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, proc->inode = inode; proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK; + proc->blk_index = 0; return 0; } diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index 3c6570e..fb9c4a0 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -27,7 +27,8 @@ typedef struct sqfs_block_t { struct sqfs_block_t *next; sqfs_inode_generic_t *inode; - sqfs_u32 sequence_number; + sqfs_u32 proc_seq_num; + sqfs_u32 io_seq_num; sqfs_u32 flags; sqfs_u32 size; sqfs_u32 checksum; @@ -51,6 +52,7 @@ struct sqfs_block_processor_t { sqfs_inode_generic_t *inode; sqfs_block_t *blk_current; sqfs_u32 blk_flags; + sqfs_u32 blk_index; size_t max_block_size; }; diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 0cb6d02..da69cb7 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -52,15 +52,19 @@ struct thread_pool_processor_t { CONDITION_TYPE queue_cond; CONDITION_TYPE done_cond; - sqfs_block_t *queue; - sqfs_block_t *queue_last; + sqfs_block_t *proc_queue; + sqfs_block_t *proc_queue_last; + sqfs_block_t *io_queue; sqfs_block_t *done; size_t backlog; int status; - sqfs_u32 enqueue_id; - sqfs_u32 dequeue_id; + sqfs_u32 proc_enq_id; + sqfs_u32 proc_deq_id; + + sqfs_u32 io_enq_id; + sqfs_u32 io_deq_id; unsigned int num_workers; size_t max_backlog; @@ -83,16 +87,16 @@ static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared) { sqfs_block_t *blk = NULL; - while (shared->queue == NULL && shared->status == 0) + while (shared->proc_queue == NULL && shared->status == 0) AWAIT(&shared->queue_cond, &shared->mtx); if (shared->status == 0) { - blk = shared->queue; - shared->queue = blk->next; + blk = shared->proc_queue; + shared->proc_queue = blk->next; blk->next = NULL; - if (shared->queue == NULL) - shared->queue_last = NULL; + if (shared->proc_queue == NULL) + shared->proc_queue_last = NULL; } return blk; @@ -104,7 +108,7 @@ static void store_completed_block(thread_pool_processor_t *shared, sqfs_block_t *it = shared->done, *prev = NULL; while (it != NULL) { - if (it->sequence_number >= blk->sequence_number) + if (it->proc_seq_num >= blk->proc_seq_num) break; prev = it; it = it->next; @@ -177,7 +181,7 @@ static void block_processor_destroy(sqfs_object_t *obj) } DeleteCriticalSection(&proc->mtx); - free_blk_list(proc->queue); + free_blk_list(proc->proc_queue); free_blk_list(proc->done); free(proc->base.blk_current); free(proc->base.frag_block); @@ -261,7 +265,7 @@ static void block_processor_destroy(sqfs_object_t *obj) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - free_blk_list(proc->queue); + free_blk_list(proc->proc_queue); free_blk_list(proc->done); free(proc->base.blk_current); free(proc->base.frag_block); @@ -357,149 +361,244 @@ fail_init: } #endif -static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc) +static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk) +{ + sqfs_block_t *it = proc->io_queue, *prev = NULL; + + while (it != NULL && it->io_seq_num < blk->io_seq_num) { + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = proc->io_queue; + proc->io_queue = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } + + proc->backlog += 1; +} + +static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc) +{ + sqfs_block_t *out; + + if (proc->io_queue == NULL) + return NULL; + + if (proc->io_queue->io_seq_num != proc->io_deq_id) + return NULL; + + out = proc->io_queue; + proc->io_queue = out->next; + out->next = NULL; + proc->io_deq_id += 1; + proc->backlog -= 1; + return out; +} + +static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc) { sqfs_block_t *out; if (proc->done == NULL) return NULL; - if (proc->done->sequence_number != proc->dequeue_id) + if (proc->done->proc_seq_num != proc->proc_deq_id) return NULL; out = proc->done; proc->done = out->next; out->next = NULL; - proc->dequeue_id += 1; + proc->proc_deq_id += 1; proc->backlog -= 1; return out; } -static int process_done_block(thread_pool_processor_t *proc, sqfs_block_t *it) +static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block) { - sqfs_block_t *fragblk = NULL; - int status = 0; + if (proc->proc_queue_last == NULL) { + proc->proc_queue = proc->proc_queue_last = block; + } else { + proc->proc_queue_last->next = block; + proc->proc_queue_last = block; + } - if (it->flags & SQFS_BLK_IS_FRAGMENT) { - status = process_completed_fragment(&proc->base, it, &fragblk); + block->proc_seq_num = proc->proc_enq_id++; + block->next = NULL; + proc->backlog += 1; +} - if (fragblk != NULL && status == 0) { - LOCK(&proc->mtx); - proc->dequeue_id = it->sequence_number; - fragblk->sequence_number = it->sequence_number; +static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) +{ + sqfs_block_t *it = list; + int status = 0; - if (proc->queue == NULL) { - proc->queue = fragblk; - proc->queue_last = fragblk; - } else { - fragblk->next = proc->queue; - proc->queue = fragblk; - } + while (status == 0 && it != NULL) { + status = process_completed_block(&proc->base, it); + it = it->next; - proc->backlog += 1; + if (status != 0) { + LOCK(&proc->mtx); + if (proc->status == 0) + proc->status = status; SIGNAL_ALL(&proc->queue_cond); UNLOCK(&proc->mtx); - } else { - free(fragblk); } - } else { - status = process_completed_block(&proc->base, it); } - free(it); return status; } -static int wait_completed(thread_pool_processor_t *proc) +int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) { - sqfs_block_t *blk; + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; + sqfs_block_t *io_list = NULL, *io_list_last = NULL; + sqfs_block_t *blk, *fragblk, *free_list = NULL; int status; - LOCK(&proc->mtx); + LOCK(&thproc->mtx); for (;;) { - blk = try_dequeue(proc); - status = proc->status; - - if (blk != NULL || status != 0) + status = thproc->status; + if (status != 0) break; - AWAIT(&proc->done_cond, &proc->mtx); - } - UNLOCK(&proc->mtx); + if (thproc->backlog < thproc->max_backlog) { + append_block(thproc, block); + block = NULL; + break; + } - if (status != 0) { - free(blk); - return status; - } + blk = try_dequeue_io(thproc); + if (blk != NULL) { + if (io_list_last == NULL) { + io_list = io_list_last = blk; + } else { + io_list_last->next = blk; + io_list_last = blk; + } + continue; + } - status = process_done_block(proc, blk); + blk = try_dequeue_done(thproc); + if (blk == NULL) { + AWAIT(&thproc->done_cond, &thproc->mtx); + continue; + } - if (status != 0) { - LOCK(&proc->mtx); - if (proc->status == 0) { - proc->status = status; + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + fragblk = NULL; + thproc->status = process_completed_fragment(proc, blk, + &fragblk); + blk->next = free_list; + free_list = blk; + + if (fragblk != NULL) { + fragblk->io_seq_num = thproc->io_enq_id++; + append_block(thproc, fragblk); + SIGNAL_ALL(&thproc->queue_cond); + } } else { - status = proc->status; + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) + blk->io_seq_num = thproc->io_enq_id++; + store_io_block(thproc, blk); } - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); } + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); + free(block); + + if (status == 0) + status = handle_io_queue(thproc, io_list); + + free_blk_list(io_list); + free_blk_list(free_list); return status; } -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) +int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; + sqfs_block_t *io_list = NULL, *io_list_last = NULL; + sqfs_block_t *blk, *fragblk, *free_list = NULL; int status; - while (thproc->backlog > thproc->max_backlog) { - status = wait_completed(thproc); - if (status) - return status; - } - LOCK(&thproc->mtx); - status = thproc->status; - if (status != 0) - goto out; + for (;;) { + status = thproc->status; + if (status != 0) + break; - if (block != NULL) { - if (thproc->queue_last == NULL) { - thproc->queue = thproc->queue_last = block; - } else { - thproc->queue_last->next = block; - thproc->queue_last = block; + if (thproc->backlog == 0) + break; + + blk = try_dequeue_io(thproc); + if (blk != NULL) { + if (io_list_last == NULL) { + io_list = io_list_last = blk; + } else { + io_list_last->next = blk; + io_list_last = blk; + } + continue; + } + + blk = try_dequeue_done(thproc); + if (blk == NULL) { + AWAIT(&thproc->done_cond, &thproc->mtx); + continue; } - block->sequence_number = thproc->enqueue_id++; - block->next = NULL; - thproc->backlog += 1; - block = NULL; + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + fragblk = NULL; + thproc->status = process_completed_fragment(proc, blk, + &fragblk); + blk->next = free_list; + free_list = blk; + + if (fragblk != NULL) { + fragblk->io_seq_num = thproc->io_enq_id++; + append_block(thproc, fragblk); + SIGNAL_ALL(&thproc->queue_cond); + } + } else { + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) + blk->io_seq_num = thproc->io_enq_id++; + store_io_block(thproc, blk); + } } -out: - SIGNAL_ALL(&thproc->queue_cond); UNLOCK(&thproc->mtx); - free(block); - return 0; -} - -int sqfs_block_processor_finish(sqfs_block_processor_t *proc) -{ - thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - int status = 0; - - while (status == 0 && thproc->backlog > 0) - status = wait_completed(thproc); if (status == 0 && proc->frag_block != NULL) { - status = append_to_work_queue(proc, proc->frag_block); + blk = proc->frag_block; proc->frag_block = NULL; - if (status) - return status; + status = block_processor_do_block(blk, proc->cmp, + thproc->workers[0]->scratch, + proc->max_block_size); - status = wait_completed(thproc); + if (io_list_last == NULL) { + io_list = io_list_last = blk; + } else { + io_list_last->next = blk; + io_list_last = blk; + } + + if (status != 0) { + LOCK(&thproc->mtx); + if (thproc->status == 0) + thproc->status = status; + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); + } } + if (status == 0) + status = handle_io_queue(thproc, io_list); + + free_blk_list(io_list); + free_blk_list(free_list); return status; } -- cgit v1.2.3