diff options
Diffstat (limited to 'lib/sqfs/blk_proc/pthread.c')
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 162 |
1 files changed, 98 insertions, 64 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 4f01bb3..b1755ce 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -244,94 +244,98 @@ static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc) return block; } -int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, - sqfs_block_t *block) +static int test_and_set_status(sqfs_block_processor_t *proc, int status) { - sqfs_block_t *completed = NULL; - int status; - - if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { - status = SQFS_ERROR_UNSUPPORTED; - - pthread_mutex_lock(&proc->mtx); - if (proc->status == 0) { - proc->status = status; - pthread_cond_broadcast(&proc->queue_cond); - } - goto fail; + pthread_mutex_lock(&proc->mtx); + if (proc->status == 0) { + proc->status = status; + } else { + status = proc->status; } + pthread_cond_broadcast(&proc->queue_cond); + return status; +} - if (block->flags & SQFS_BLK_IS_FRAGMENT) { - block->checksum = crc32(0, block->data, block->size); - - completed = NULL; - status = handle_fragment(proc, block, &completed); - - if (status != 0) { - pthread_mutex_lock(&proc->mtx); - proc->status = status; - goto fail; - } - - free(block); - if (completed == NULL) - return 0; - - block = completed; - completed = NULL; - } +static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block) +{ + sqfs_block_t *completed = NULL; + int status; pthread_mutex_lock(&proc->mtx); + while (proc->backlog > proc->max_backlog && proc->status == 0) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + if (proc->status != 0) { status = proc->status; - goto fail; + pthread_mutex_unlock(&proc->mtx); + return status; } - while (proc->backlog > proc->max_backlog) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - completed = get_completed_if_avail(proc); append_to_work_queue(proc, block); block = NULL; pthread_mutex_unlock(&proc->mtx); - if (completed != NULL) { - status = process_completed_block(proc, completed); + if (completed != NULL && (completed->flags & SQFS_BLK_IS_FRAGMENT)) { + status = handle_fragment(proc, completed, &block); if (status != 0) { + free(block); + return test_and_set_status(proc, status); + } + + if (block != NULL) { pthread_mutex_lock(&proc->mtx); - proc->status = status; - goto fail; + proc->dequeue_id = completed->sequence_number; + block->sequence_number = proc->dequeue_id; + + if (proc->queue == NULL) { + proc->queue = block; + proc->queue_last = block; + } else { + block->next = proc->queue; + proc->queue = block; + } + + proc->backlog += 1; + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); } + } else if (completed != NULL) { + status = process_completed_block(proc, completed); + + if (status != 0) + status = test_and_set_status(proc, status); } free(completed); - return 0; -fail: - pthread_mutex_unlock(&proc->mtx); - free(block); - free(completed); return status; } +int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, + sqfs_block_t *block) +{ + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) + return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED); + + return queue_pump(proc, block); +} + int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { - sqfs_block_t *it; + sqfs_block_t *it, *block; int status = 0; pthread_mutex_lock(&proc->mtx); - if (proc->frag_block != NULL) { - append_to_work_queue(proc, proc->frag_block); - proc->frag_block = NULL; - } - +restart: while (proc->backlog > 0 && proc->status == 0) pthread_cond_wait(&proc->done_cond, &proc->mtx); if (proc->status != 0) { status = proc->status; - goto out; + pthread_mutex_unlock(&proc->mtx); + return status; } while (proc->done != NULL) { @@ -339,22 +343,52 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (it == NULL) { status = SQFS_ERROR_INTERNAL; + } else if (it->flags & SQFS_BLK_IS_FRAGMENT) { + block = NULL; + status = handle_fragment(proc, it, &block); + + if (status != 0) { + proc->status = status; + pthread_mutex_unlock(&proc->mtx); + free(it); + return status; + } + + if (block != NULL) { + proc->dequeue_id = it->sequence_number; + block->sequence_number = proc->dequeue_id; + free(it); + + if (proc->queue == NULL) { + proc->queue = block; + proc->queue_last = block; + } else { + block->next = proc->queue; + proc->queue = block; + } + + proc->backlog += 1; + pthread_cond_broadcast(&proc->queue_cond); + goto restart; + } } else { status = process_completed_block(proc, it); free(it); - } - if (status != 0) { - proc->status = status; - pthread_cond_broadcast(&proc->queue_cond); - goto out; + if (status != 0) { + proc->status = status; + pthread_mutex_unlock(&proc->mtx); + return status; + } } } -out: - free_blk_list(proc->queue); - free_blk_list(proc->done); - proc->queue = NULL; - proc->done = NULL; + + if (proc->frag_block != NULL) { + append_to_work_queue(proc, proc->frag_block); + proc->frag_block = NULL; + goto restart; + } + pthread_mutex_unlock(&proc->mtx); - return status; + return 0; } |