diff options
Diffstat (limited to 'lib/sqfs/blk_proc/pthread.c')
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 223 |
1 files changed, 110 insertions, 113 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index ee73e52..a3dc01f 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,10 +7,21 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static void store_completed_block(sqfs_block_processor_t *shared, - sqfs_block_t *blk) +static void free_blk_list(sqfs_block_t *list) { - sqfs_block_t *it = shared->done, *prev = NULL; + sqfs_block_t *it; + + while (list != NULL) { + it = list; + list = list->next; + free(it); + } +} + +static void store_completed_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk, int status) +{ + sqfs_block_t *it = proc->done, *prev = NULL; while (it != NULL) { if (it->sequence_number >= blk->sequence_number) @@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared, } if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; + blk->next = proc->done; + proc->done = blk; } else { blk->next = prev->next; prev->next = blk; } + + if (status != 0 && proc->status == 0) + proc->status = status; + + proc->backlog -= 1; + pthread_cond_broadcast(&proc->done_cond); +} + +static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + + while (proc->queue == NULL) { + if (proc->terminate || proc->status != 0) + return NULL; + + pthread_cond_wait(&proc->queue_cond, &proc->mtx); + } + + blk = proc->queue; + proc->queue = blk->next; + blk->next = NULL; + + if (proc->queue == NULL) + proc->queue_last = NULL; + + return blk; } static void *worker_proc(void *arg) @@ -37,33 +75,14 @@ static void *worker_proc(void *arg) for (;;) { pthread_mutex_lock(&shared->mtx); - if (blk != NULL) { - store_completed_block(shared, blk); - shared->backlog -= 1; - - if (status != 0 && shared->status == 0) - shared->status = status; - pthread_cond_broadcast(&shared->done_cond); - } + if (blk != NULL) + store_completed_block(shared, blk, status); - while (shared->queue == NULL && !shared->terminate && - shared->status == 0) { - pthread_cond_wait(&shared->queue_cond, - &shared->mtx); - } + blk = get_next_work_item(shared); + pthread_mutex_unlock(&shared->mtx); - if (shared->terminate || shared->status != 0) { - pthread_mutex_unlock(&shared->mtx); + if (blk == NULL) break; - } - - blk = shared->queue; - shared->queue = blk->next; - blk->next = NULL; - - if (shared->queue == NULL) - shared->queue_last = NULL; - pthread_mutex_unlock(&shared->mtx); status = sqfs_block_process(blk, worker->cmp, worker->scratch, shared->max_block_size); @@ -161,7 +180,6 @@ fail_init: void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { - sqfs_block_t *blk; unsigned int i; pthread_mutex_lock(&proc->mtx); @@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - while (proc->queue != NULL) { - blk = proc->queue; - proc->queue = blk->next; - free(blk); + free_blk_list(proc->queue); + free_blk_list(proc->done); + free(proc->fragments); + free(proc->blocks); + free(proc); +} + +static void append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) +{ + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; } - while (proc->done != NULL) { - blk = proc->done; - proc->done = blk->next; - free(blk); + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + pthread_cond_broadcast(&proc->queue_cond); +} + +static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc) +{ + sqfs_block_t *block = NULL; + + if (proc->done != NULL && + proc->done->sequence_number == proc->dequeue_id) { + block = proc->done; + proc->done = proc->done->next; + proc->dequeue_id += 1; } - free(proc->fragments); - free(proc->blocks); - free(proc); + return block; } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *completed = NULL; int status; pthread_mutex_lock(&proc->mtx); @@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, goto fail; } - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - - if (block->size == 0) { - block->checksum = 0; - store_completed_block(proc, block); - } else { - while (proc->backlog > proc->max_backlog) - pthread_cond_wait(&proc->done_cond, &proc->mtx); - - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; - } - - proc->backlog += 1; - } + while (proc->backlog > proc->max_backlog) + pthread_cond_wait(&proc->done_cond, &proc->mtx); - if (proc->done != NULL && - proc->done->sequence_number == proc->dequeue_id) { - block = proc->done; - proc->done = proc->done->next; - proc->dequeue_id += 1; - } else { - block = NULL; - } + completed = get_completed_if_avail(proc); - pthread_cond_broadcast(&proc->queue_cond); + append_to_work_queue(proc, block); + block = NULL; pthread_mutex_unlock(&proc->mtx); - if (block == NULL) - return 0; + if (completed != NULL) { + status = process_completed_block(proc, completed); - status = process_completed_block(proc, block); - if (status != 0) { - pthread_mutex_lock(&proc->mtx); - proc->status = status; - goto fail; + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + goto fail; + } } - free(block); + free(completed); return 0; fail: pthread_mutex_unlock(&proc->mtx); free(block); + free(completed); return status; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { - sqfs_block_t *queue, *it; - int status; + sqfs_block_t *it; + int status = 0; pthread_mutex_lock(&proc->mtx); while (proc->backlog > 0 && proc->status == 0) @@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (proc->status != 0) { status = proc->status; - goto fail; + goto out; } - for (it = proc->done; it != NULL; it = it->next) { - if (it->sequence_number != proc->dequeue_id++) { + while (proc->done != NULL) { + it = get_completed_if_avail(proc); + + if (it == NULL) { status = SQFS_ERROR_INTERNAL; - proc->status = status; - goto fail; + } else { + status = process_completed_block(proc, it); + free(it); } - } - - queue = proc->done; - proc->done = NULL; - pthread_mutex_unlock(&proc->mtx); - - while (queue != NULL) { - it = queue; - queue = queue->next; - it->next = NULL; - status = process_completed_block(proc, it); - free(it); if (status != 0) { - pthread_mutex_lock(&proc->mtx); proc->status = status; pthread_cond_broadcast(&proc->queue_cond); - goto fail; + goto out; } } - - return 0; -fail: - while (proc->queue != NULL) { - it = proc->queue; - proc->queue = it->next; - free(it); - } - - while (proc->done != NULL) { - it = proc->done; - proc->done = it->next; - free(it); - } - +out: + free_blk_list(proc->queue); + free_blk_list(proc->done); + proc->queue = NULL; + proc->done = NULL; pthread_mutex_unlock(&proc->mtx); return status; } |