From 2354f7b5cfc289fdec95eeecb8d441c20513712b Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Tue, 24 Sep 2019 17:29:40 +0200 Subject: Bring back batched dequeuing Instead of doing piece-wise block dequeuing, take as many consequitive blocks as we can and in the comparetively rare* case that we produce a fragment block, put everything back for the next run. *rare because a lot of fragments are eaten by deduplication and even after that, a number of them are required to fill a block. Signed-off-by: David Oberhollenzer --- lib/sqfs/blk_proc/pthread.c | 154 +++++++++++++++++++++++++++++++------------- 1 file changed, 110 insertions(+), 44 deletions(-) diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 85146da..de499f7 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -256,74 +256,140 @@ static int test_and_set_status(sqfs_block_processor_t *proc, int status) return status; } -static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block) +static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc) { - sqfs_block_t *completed = NULL; - int status; + sqfs_block_t *queue, *it, *prev; - pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->max_backlog && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); + it = proc->done; + prev = NULL; - if (proc->status != 0) { - status = proc->status; - pthread_mutex_unlock(&proc->mtx); - free(block); - return status; + while (it != NULL && it->sequence_number == proc->dequeue_id) { + prev = it; + it = it->next; + proc->dequeue_id += 1; + } + + if (prev == NULL) { + queue = NULL; + } else { + queue = proc->done; + prev->next = NULL; + proc->done = it; } - completed = get_completed_if_avail(proc); + return queue; +} - append_to_work_queue(proc, block); - block = NULL; - pthread_mutex_unlock(&proc->mtx); +static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) +{ + sqfs_block_t *it, *head = NULL, **next_ptr = &head; + + while (lhs != NULL && rhs != NULL) { + if (lhs->sequence_number <= rhs->sequence_number) { + it = lhs; + lhs = lhs->next; + } else { + it = rhs; + rhs = rhs->next; + } + + *next_ptr = it; + next_ptr = &it->next; + } + + it = (lhs != NULL ? lhs : rhs); + *next_ptr = it; + return head; +} + +static int process_done_queue(sqfs_block_processor_t *proc, + sqfs_block_t *queue) +{ + sqfs_block_t *it, *block = NULL; + int status = 0; + + while (queue != NULL) { + it = queue; + queue = it->next; + it->next = NULL; + + if (it->flags & SQFS_BLK_IS_FRAGMENT) { + status = handle_fragment(proc, it, &block); + + if (status != 0) { + free(it); + free(block); + free_blk_list(queue); + status = test_and_set_status(proc, status); + break; + } + + if (block != NULL) { + pthread_mutex_lock(&proc->mtx); + proc->dequeue_id = it->sequence_number; + block->sequence_number = proc->dequeue_id; + + if (proc->queue == NULL) { + proc->queue = block; + proc->queue_last = block; + } else { + block->next = proc->queue; + proc->queue = block; + } + + proc->backlog += 1; + proc->done = queue_merge(queue, proc->done); + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); - if (completed == NULL) - return 0; - - if (completed->flags & SQFS_BLK_IS_FRAGMENT) { - status = handle_fragment(proc, completed, &block); - - if (status != 0) { - free(block); - status = test_and_set_status(proc, status); - } else if (block != NULL) { - pthread_mutex_lock(&proc->mtx); - proc->dequeue_id = completed->sequence_number; - block->sequence_number = proc->dequeue_id; - - if (proc->queue == NULL) { - proc->queue = block; - proc->queue_last = block; - } else { - block->next = proc->queue; - proc->queue = block; + queue = NULL; } + } else { + status = process_completed_block(proc, it); - proc->backlog += 1; - pthread_cond_broadcast(&proc->queue_cond); - pthread_mutex_unlock(&proc->mtx); + if (status != 0) { + status = test_and_set_status(proc, status); + free_blk_list(queue); + free(it); + break; + } } - } else { - status = process_completed_block(proc, completed); - if (status != 0) - status = test_and_set_status(proc, status); + free(it); } - free(completed); return status; } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *queue; + int status; + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { free(block); return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED); } - return queue_pump(proc, block); + pthread_mutex_lock(&proc->mtx); + while (proc->backlog > proc->max_backlog && proc->status == 0) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + if (proc->status != 0) { + status = proc->status; + pthread_mutex_unlock(&proc->mtx); + free(block); + return status; + } + + append_to_work_queue(proc, block); + block = NULL; + + queue = try_dequeue(proc); + pthread_mutex_unlock(&proc->mtx); + + return process_done_queue(proc, queue); } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) -- cgit v1.2.3