diff options
| author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-19 11:08:25 +0200 | 
|---|---|---|
| committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-19 11:08:25 +0200 | 
| commit | 4943dd85a9190f4477bd5e98a83be1f5e2790d8d (patch) | |
| tree | 08cb786a444450b1f8dd4a05e2103f6fd7a56e28 /lib/comp | |
| parent | 8b9eeb43c4f7d958972b8bf1fd39d19da570224b (diff) | |
Minor improvements for parallel block processor
 - Fewer lock aquires in worker function
   - There is no point in locking/unlocking for inserting the completed
     block if we are going to lock again immediately in the next iteration
   -> Merge those two critical sections into one
 - Constant time queue insertion
 - Bypass queue entirely if there is nothing to do for a block
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/comp')
| -rw-r--r-- | lib/comp/block_processor_parallel.c | 42 | 
1 files changed, 26 insertions, 16 deletions
| diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c index 617900f..985c900 100644 --- a/lib/comp/block_processor_parallel.c +++ b/lib/comp/block_processor_parallel.c @@ -30,6 +30,8 @@ struct block_processor_t {  	/* needs rw access by worker and main thread */  	block_t *queue; +	block_t *queue_last; +  	block_t *done;  	bool terminate;  	size_t backlog; @@ -73,10 +75,16 @@ static void *worker_proc(void *arg)  {  	compress_worker_t *worker = arg;  	block_processor_t *shared = worker->shared; -	block_t *blk; +	block_t *blk = NULL;  	for (;;) {  		pthread_mutex_lock(&shared->mtx); +		if (blk != NULL) { +			store_completed_block(shared, blk); +			shared->backlog -= 1; +			pthread_cond_broadcast(&shared->done_cond); +		} +  		while (shared->queue == NULL && !shared->terminate) {  			pthread_cond_wait(&shared->queue_cond,  					  &shared->mtx); @@ -90,18 +98,15 @@ static void *worker_proc(void *arg)  		blk = shared->queue;  		shared->queue = blk->next;  		blk->next = NULL; + +		if (shared->queue == NULL) +			shared->queue_last = NULL;  		pthread_mutex_unlock(&shared->mtx);  		if (process_block(blk, worker->cmp, worker->scratch,  				  shared->max_block_size)) {  			blk->flags |= BLK_COMPRESS_ERROR;  		} - -		pthread_mutex_lock(&shared->mtx); -		store_completed_block(shared, blk); -		shared->backlog -= 1; -		pthread_cond_broadcast(&shared->done_cond); -		pthread_mutex_unlock(&shared->mtx);  	}  	return NULL;  } @@ -273,18 +278,23 @@ int block_processor_enqueue(block_processor_t *proc, block_t *block)  	block->next = NULL;  	pthread_mutex_lock(&proc->mtx); -	while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) -		pthread_cond_wait(&proc->done_cond, &proc->mtx); - -	if (proc->queue == NULL) { -		proc->queue = block; +	if ((block->flags & BLK_DONT_COMPRESS) && +	    (block->flags & BLK_DONT_CHECKSUM)) { +		store_completed_block(proc, block);  	} else { -		for (it = proc->queue; it->next != NULL; it = it->next) -			; -		it->next = block; +		while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) +			pthread_cond_wait(&proc->done_cond, &proc->mtx); + +		if (proc->queue_last == NULL) { +			proc->queue = proc->queue_last = block; +		} else { +			proc->queue_last->next = block; +			proc->queue_last = block; +		} + +		proc->backlog += 1;  	} -	proc->backlog += 1;  	it = proc->done;  	prev = NULL; | 
