summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-08-19 11:08:25 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-08-19 11:08:25 +0200
commit4943dd85a9190f4477bd5e98a83be1f5e2790d8d (patch)
tree08cb786a444450b1f8dd4a05e2103f6fd7a56e28
parent8b9eeb43c4f7d958972b8bf1fd39d19da570224b (diff)
Minor improvements for parallel block processor
- Fewer lock aquires in worker function - There is no point in locking/unlocking for inserting the completed block if we are going to lock again immediately in the next iteration -> Merge those two critical sections into one - Constant time queue insertion - Bypass queue entirely if there is nothing to do for a block Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/comp/block_processor_parallel.c42
1 files changed, 26 insertions, 16 deletions
diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c
index 617900f..985c900 100644
--- a/lib/comp/block_processor_parallel.c
+++ b/lib/comp/block_processor_parallel.c
@@ -30,6 +30,8 @@ struct block_processor_t {
/* needs rw access by worker and main thread */
block_t *queue;
+ block_t *queue_last;
+
block_t *done;
bool terminate;
size_t backlog;
@@ -73,10 +75,16 @@ static void *worker_proc(void *arg)
{
compress_worker_t *worker = arg;
block_processor_t *shared = worker->shared;
- block_t *blk;
+ block_t *blk = NULL;
for (;;) {
pthread_mutex_lock(&shared->mtx);
+ if (blk != NULL) {
+ store_completed_block(shared, blk);
+ shared->backlog -= 1;
+ pthread_cond_broadcast(&shared->done_cond);
+ }
+
while (shared->queue == NULL && !shared->terminate) {
pthread_cond_wait(&shared->queue_cond,
&shared->mtx);
@@ -90,18 +98,15 @@ static void *worker_proc(void *arg)
blk = shared->queue;
shared->queue = blk->next;
blk->next = NULL;
+
+ if (shared->queue == NULL)
+ shared->queue_last = NULL;
pthread_mutex_unlock(&shared->mtx);
if (process_block(blk, worker->cmp, worker->scratch,
shared->max_block_size)) {
blk->flags |= BLK_COMPRESS_ERROR;
}
-
- pthread_mutex_lock(&shared->mtx);
- store_completed_block(shared, blk);
- shared->backlog -= 1;
- pthread_cond_broadcast(&shared->done_cond);
- pthread_mutex_unlock(&shared->mtx);
}
return NULL;
}
@@ -273,18 +278,23 @@ int block_processor_enqueue(block_processor_t *proc, block_t *block)
block->next = NULL;
pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->queue == NULL) {
- proc->queue = block;
+ if ((block->flags & BLK_DONT_COMPRESS) &&
+ (block->flags & BLK_DONT_CHECKSUM)) {
+ store_completed_block(proc, block);
} else {
- for (it = proc->queue; it->next != NULL; it = it->next)
- ;
- it->next = block;
+ while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
+ }
+
+ proc->backlog += 1;
}
- proc->backlog += 1;
it = proc->done;
prev = NULL;