diff options
Diffstat (limited to 'lib/sqfs/blk_proc')
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 154 |
1 files changed, 110 insertions, 44 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 85146da..de499f7 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -256,74 +256,140 @@ static int test_and_set_status(sqfs_block_processor_t *proc, int status) return status; } -static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block) +static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc) { - sqfs_block_t *completed = NULL; - int status; + sqfs_block_t *queue, *it, *prev; - pthread_mutex_lock(&proc->mtx); - while (proc->backlog > proc->max_backlog && proc->status == 0) - pthread_cond_wait(&proc->done_cond, &proc->mtx); + it = proc->done; + prev = NULL; - if (proc->status != 0) { - status = proc->status; - pthread_mutex_unlock(&proc->mtx); - free(block); - return status; + while (it != NULL && it->sequence_number == proc->dequeue_id) { + prev = it; + it = it->next; + proc->dequeue_id += 1; + } + + if (prev == NULL) { + queue = NULL; + } else { + queue = proc->done; + prev->next = NULL; + proc->done = it; } - completed = get_completed_if_avail(proc); + return queue; +} - append_to_work_queue(proc, block); - block = NULL; - pthread_mutex_unlock(&proc->mtx); +static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) +{ + sqfs_block_t *it, *head = NULL, **next_ptr = &head; + + while (lhs != NULL && rhs != NULL) { + if (lhs->sequence_number <= rhs->sequence_number) { + it = lhs; + lhs = lhs->next; + } else { + it = rhs; + rhs = rhs->next; + } + + *next_ptr = it; + next_ptr = &it->next; + } + + it = (lhs != NULL ? lhs : rhs); + *next_ptr = it; + return head; +} + +static int process_done_queue(sqfs_block_processor_t *proc, + sqfs_block_t *queue) +{ + sqfs_block_t *it, *block = NULL; + int status = 0; + + while (queue != NULL) { + it = queue; + queue = it->next; + it->next = NULL; + + if (it->flags & SQFS_BLK_IS_FRAGMENT) { + status = handle_fragment(proc, it, &block); + + if (status != 0) { + free(it); + free(block); + free_blk_list(queue); + status = test_and_set_status(proc, status); + break; + } + + if (block != NULL) { + pthread_mutex_lock(&proc->mtx); + proc->dequeue_id = it->sequence_number; + block->sequence_number = proc->dequeue_id; + + if (proc->queue == NULL) { + proc->queue = block; + proc->queue_last = block; + } else { + block->next = proc->queue; + proc->queue = block; + } + + proc->backlog += 1; + proc->done = queue_merge(queue, proc->done); + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); - if (completed == NULL) - return 0; - - if (completed->flags & SQFS_BLK_IS_FRAGMENT) { - status = handle_fragment(proc, completed, &block); - - if (status != 0) { - free(block); - status = test_and_set_status(proc, status); - } else if (block != NULL) { - pthread_mutex_lock(&proc->mtx); - proc->dequeue_id = completed->sequence_number; - block->sequence_number = proc->dequeue_id; - - if (proc->queue == NULL) { - proc->queue = block; - proc->queue_last = block; - } else { - block->next = proc->queue; - proc->queue = block; + queue = NULL; } + } else { + status = process_completed_block(proc, it); - proc->backlog += 1; - pthread_cond_broadcast(&proc->queue_cond); - pthread_mutex_unlock(&proc->mtx); + if (status != 0) { + status = test_and_set_status(proc, status); + free_blk_list(queue); + free(it); + break; + } } - } else { - status = process_completed_block(proc, completed); - if (status != 0) - status = test_and_set_status(proc, status); + free(it); } - free(completed); return status; } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *queue; + int status; + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { free(block); return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED); } - return queue_pump(proc, block); + pthread_mutex_lock(&proc->mtx); + while (proc->backlog > proc->max_backlog && proc->status == 0) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + if (proc->status != 0) { + status = proc->status; + pthread_mutex_unlock(&proc->mtx); + free(block); + return status; + } + + append_to_work_queue(proc, block); + block = NULL; + + queue = try_dequeue(proc); + pthread_mutex_unlock(&proc->mtx); + + return process_done_queue(proc, queue); } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) |