diff options
| -rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 154 | 
1 files changed, 110 insertions, 44 deletions
| diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 85146da..de499f7 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -256,74 +256,140 @@ static int test_and_set_status(sqfs_block_processor_t *proc, int status)  	return status;  } -static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block) +static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)  { -	sqfs_block_t *completed = NULL; -	int status; +	sqfs_block_t *queue, *it, *prev; -	pthread_mutex_lock(&proc->mtx); -	while (proc->backlog > proc->max_backlog && proc->status == 0) -		pthread_cond_wait(&proc->done_cond, &proc->mtx); +	it = proc->done; +	prev = NULL; -	if (proc->status != 0) { -		status = proc->status; -		pthread_mutex_unlock(&proc->mtx); -		free(block); -		return status; +	while (it != NULL && it->sequence_number == proc->dequeue_id) { +		prev = it; +		it = it->next; +		proc->dequeue_id += 1; +	} + +	if (prev == NULL) { +		queue = NULL; +	} else { +		queue = proc->done; +		prev->next = NULL; +		proc->done = it;  	} -	completed = get_completed_if_avail(proc); +	return queue; +} -	append_to_work_queue(proc, block); -	block = NULL; -	pthread_mutex_unlock(&proc->mtx); +static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) +{ +	sqfs_block_t *it, *head = NULL, **next_ptr = &head; + +	while (lhs != NULL && rhs != NULL) { +		if (lhs->sequence_number <= rhs->sequence_number) { +			it = lhs; +			lhs = lhs->next; +		} else { +			it = rhs; +			rhs = rhs->next; +		} + +		*next_ptr = it; +		next_ptr = &it->next; +	} + +	it = (lhs != NULL ? lhs : rhs); +	*next_ptr = it; +	return head; +} + +static int process_done_queue(sqfs_block_processor_t *proc, +			      sqfs_block_t *queue) +{ +	sqfs_block_t *it, *block = NULL; +	int status = 0; + +	while (queue != NULL) { +		it = queue; +		queue = it->next; +		it->next = NULL; + +		if (it->flags & SQFS_BLK_IS_FRAGMENT) { +			status = handle_fragment(proc, it, &block); + +			if (status != 0) { +				free(it); +				free(block); +				free_blk_list(queue); +				status = test_and_set_status(proc, status); +				break; +			} + +			if (block != NULL) { +				pthread_mutex_lock(&proc->mtx); +				proc->dequeue_id = it->sequence_number; +				block->sequence_number = proc->dequeue_id; + +				if (proc->queue == NULL) { +					proc->queue = block; +					proc->queue_last = block; +				} else { +					block->next = proc->queue; +					proc->queue = block; +				} + +				proc->backlog += 1; +				proc->done = queue_merge(queue, proc->done); +				pthread_cond_broadcast(&proc->queue_cond); +				pthread_mutex_unlock(&proc->mtx); -	if (completed == NULL) -		return 0; - -	if (completed->flags & SQFS_BLK_IS_FRAGMENT) { -		status = handle_fragment(proc, completed, &block); - -		if (status != 0) { -			free(block); -			status = test_and_set_status(proc, status); -		} else if (block != NULL) { -			pthread_mutex_lock(&proc->mtx); -			proc->dequeue_id = completed->sequence_number; -			block->sequence_number = proc->dequeue_id; - -			if (proc->queue == NULL) { -				proc->queue = block; -				proc->queue_last = block; -			} else { -				block->next = proc->queue; -				proc->queue = block; +				queue = NULL;  			} +		} else { +			status = process_completed_block(proc, it); -			proc->backlog += 1; -			pthread_cond_broadcast(&proc->queue_cond); -			pthread_mutex_unlock(&proc->mtx); +			if (status != 0) { +				status = test_and_set_status(proc, status); +				free_blk_list(queue); +				free(it); +				break; +			}  		} -	} else { -		status = process_completed_block(proc, completed); -		if (status != 0) -			status = test_and_set_status(proc, status); +		free(it);  	} -	free(completed);  	return status;  }  int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,  				 sqfs_block_t *block)  { +	sqfs_block_t *queue; +	int status; +  	if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {  		free(block);  		return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED);  	} -	return queue_pump(proc, block); +	pthread_mutex_lock(&proc->mtx); +	while (proc->backlog > proc->max_backlog && proc->status == 0) +		pthread_cond_wait(&proc->done_cond, &proc->mtx); + +	if (proc->status != 0) { +		status = proc->status; +		pthread_mutex_unlock(&proc->mtx); +		free(block); +		return status; +	} + +	append_to_work_queue(proc, block); +	block = NULL; + +	queue = try_dequeue(proc); +	pthread_mutex_unlock(&proc->mtx); + +	return process_done_queue(proc, queue);  }  int sqfs_block_processor_finish(sqfs_block_processor_t *proc) | 
