From 4943dd85a9190f4477bd5e98a83be1f5e2790d8d Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Mon, 19 Aug 2019 11:08:25 +0200 Subject: Minor improvements for parallel block processor - Fewer lock aquires in worker function - There is no point in locking/unlocking for inserting the completed block if we are going to lock again immediately in the next iteration -> Merge those two critical sections into one - Constant time queue insertion - Bypass queue entirely if there is nothing to do for a block Signed-off-by: David Oberhollenzer --- lib/comp/block_processor_parallel.c | 42 +++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 16 deletions(-) (limited to 'lib/comp') diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c index 617900f..985c900 100644 --- a/lib/comp/block_processor_parallel.c +++ b/lib/comp/block_processor_parallel.c @@ -30,6 +30,8 @@ struct block_processor_t { /* needs rw access by worker and main thread */ block_t *queue; + block_t *queue_last; + block_t *done; bool terminate; size_t backlog; @@ -73,10 +75,16 @@ static void *worker_proc(void *arg) { compress_worker_t *worker = arg; block_processor_t *shared = worker->shared; - block_t *blk; + block_t *blk = NULL; for (;;) { pthread_mutex_lock(&shared->mtx); + if (blk != NULL) { + store_completed_block(shared, blk); + shared->backlog -= 1; + pthread_cond_broadcast(&shared->done_cond); + } + while (shared->queue == NULL && !shared->terminate) { pthread_cond_wait(&shared->queue_cond, &shared->mtx); @@ -90,18 +98,15 @@ static void *worker_proc(void *arg) blk = shared->queue; shared->queue = blk->next; blk->next = NULL; + + if (shared->queue == NULL) + shared->queue_last = NULL; pthread_mutex_unlock(&shared->mtx); if (process_block(blk, worker->cmp, worker->scratch, shared->max_block_size)) { blk->flags |= BLK_COMPRESS_ERROR; } - - pthread_mutex_lock(&shared->mtx); - store_completed_block(shared, blk); - shared->backlog -= 1; - pthread_cond_broadcast(&shared->done_cond); - pthread_mutex_unlock(&shared->mtx); } return NULL; } @@ -273,18 +278,23 @@ int block_processor_enqueue(block_processor_t *proc, block_t *block) block->next = NULL; pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->queue == NULL) { - proc->queue = block; + if ((block->flags & BLK_DONT_COMPRESS) && + (block->flags & BLK_DONT_CHECKSUM)) { + store_completed_block(proc, block); } else { - for (it = proc->queue; it->next != NULL; it = it->next) - ; - it->next = block; + while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + proc->backlog += 1; } - proc->backlog += 1; it = proc->done; prev = NULL; -- cgit v1.2.3