diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-19 11:08:25 +0200 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-19 11:08:25 +0200 |
commit | 4943dd85a9190f4477bd5e98a83be1f5e2790d8d (patch) | |
tree | 08cb786a444450b1f8dd4a05e2103f6fd7a56e28 | |
parent | 8b9eeb43c4f7d958972b8bf1fd39d19da570224b (diff) |
Minor improvements for parallel block processor
- Fewer lock aquires in worker function
- There is no point in locking/unlocking for inserting the completed
block if we are going to lock again immediately in the next iteration
-> Merge those two critical sections into one
- Constant time queue insertion
- Bypass queue entirely if there is nothing to do for a block
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r-- | lib/comp/block_processor_parallel.c | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c index 617900f..985c900 100644 --- a/lib/comp/block_processor_parallel.c +++ b/lib/comp/block_processor_parallel.c @@ -30,6 +30,8 @@ struct block_processor_t { /* needs rw access by worker and main thread */ block_t *queue; + block_t *queue_last; + block_t *done; bool terminate; size_t backlog; @@ -73,10 +75,16 @@ static void *worker_proc(void *arg) { compress_worker_t *worker = arg; block_processor_t *shared = worker->shared; - block_t *blk; + block_t *blk = NULL; for (;;) { pthread_mutex_lock(&shared->mtx); + if (blk != NULL) { + store_completed_block(shared, blk); + shared->backlog -= 1; + pthread_cond_broadcast(&shared->done_cond); + } + while (shared->queue == NULL && !shared->terminate) { pthread_cond_wait(&shared->queue_cond, &shared->mtx); @@ -90,18 +98,15 @@ static void *worker_proc(void *arg) blk = shared->queue; shared->queue = blk->next; blk->next = NULL; + + if (shared->queue == NULL) + shared->queue_last = NULL; pthread_mutex_unlock(&shared->mtx); if (process_block(blk, worker->cmp, worker->scratch, shared->max_block_size)) { blk->flags |= BLK_COMPRESS_ERROR; } - - pthread_mutex_lock(&shared->mtx); - store_completed_block(shared, blk); - shared->backlog -= 1; - pthread_cond_broadcast(&shared->done_cond); - pthread_mutex_unlock(&shared->mtx); } return NULL; } @@ -273,18 +278,23 @@ int block_processor_enqueue(block_processor_t *proc, block_t *block) block->next = NULL; pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->queue == NULL) { - proc->queue = block; + if ((block->flags & BLK_DONT_COMPRESS) && + (block->flags & BLK_DONT_CHECKSUM)) { + store_completed_block(proc, block); } else { - for (it = proc->queue; it->next != NULL; it = it->next) - ; - it->next = block; + while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + proc->backlog += 1; } - proc->backlog += 1; it = proc->done; prev = NULL; |