From 3da78a56df43360520f8007bdb4e11fa25f712cc Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Wed, 25 Sep 2019 06:03:36 +0200 Subject: Refactor out common code of the block processors Code that already is shared between the pthread and the serial processor as well as code that can be re-used by other threading API implementations. Signed-off-by: David Oberhollenzer --- lib/sqfs/blk_proc/pthread.c | 110 ++++++++------------------------------------ 1 file changed, 18 insertions(+), 92 deletions(-) (limited to 'lib/sqfs/blk_proc/pthread.c') diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 1c1088a..57b61c1 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,65 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static void free_blk_list(sqfs_block_t *list) -{ - sqfs_block_t *it; - - while (list != NULL) { - it = list; - list = list->next; - free(it); - } -} - -static void store_completed_block(sqfs_block_processor_t *proc, - sqfs_block_t *blk, int status) -{ - sqfs_block_t *it = proc->done, *prev = NULL; - - while (it != NULL) { - if (it->sequence_number >= blk->sequence_number) - break; - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = proc->done; - proc->done = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - if (status != 0 && proc->status == 0) - proc->status = status; - - proc->backlog -= 1; - pthread_cond_broadcast(&proc->done_cond); -} - -static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc) -{ - sqfs_block_t *blk; - - while (proc->queue == NULL) { - if (proc->terminate || proc->status != 0) - return NULL; - - pthread_cond_wait(&proc->queue_cond, &proc->mtx); - } - - blk = proc->queue; - proc->queue = blk->next; - blk->next = NULL; - - if (proc->queue == NULL) - proc->queue_last = NULL; - - return blk; -} - static void *worker_proc(void *arg) { compress_worker_t *worker = arg; @@ -75,17 +16,23 @@ static void *worker_proc(void *arg) for (;;) { pthread_mutex_lock(&shared->mtx); - if (blk != NULL) - store_completed_block(shared, blk, status); + if (blk != NULL) { + block_processor_store_done(shared, blk, status); + pthread_cond_broadcast(&shared->done_cond); + } + + while (shared->queue == NULL && shared->status == 0) + pthread_cond_wait(&shared->queue_cond, &shared->mtx); - blk = get_next_work_item(shared); + blk = block_processor_next_work_item(shared); pthread_mutex_unlock(&shared->mtx); if (blk == NULL) break; - status = sqfs_block_process(blk, worker->cmp, worker->scratch, - shared->max_block_size); + status = block_processor_do_block(blk, worker->cmp, + worker->scratch, + shared->max_block_size); } return NULL; } @@ -109,26 +56,14 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; - proc->num_workers = num_workers; - proc->max_backlog = max_backlog; proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->devblksz = devblksz; - proc->cmp = cmp; - proc->file = file; - proc->max_blocks = INIT_BLOCK_COUNT; - proc->frag_list_max = INIT_BLOCK_COUNT; - - proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); - if (proc->blocks == NULL) - goto fail_init; - proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), - proc->frag_list_max); - if (proc->frag_list == NULL) + if (block_processor_init(proc, max_block_size, cmp, num_workers, + max_backlog, devblksz, file)) { goto fail_init; + } for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), @@ -155,7 +90,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, return proc; fail_thread: pthread_mutex_lock(&proc->mtx); - proc->terminate = true; + proc->status = -1; pthread_cond_broadcast(&proc->queue_cond); pthread_mutex_unlock(&proc->mtx); @@ -178,10 +113,7 @@ fail_init: pthread_cond_destroy(&proc->done_cond); pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - free(proc->frag_list); - free(proc->fragments); - free(proc->blocks); - free(proc); + block_processor_cleanup(proc); return NULL; } @@ -190,7 +122,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) unsigned int i; pthread_mutex_lock(&proc->mtx); - proc->terminate = true; + proc->status = -1; pthread_cond_broadcast(&proc->queue_cond); pthread_mutex_unlock(&proc->mtx); @@ -205,13 +137,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - free_blk_list(proc->queue); - free_blk_list(proc->done); - free(proc->frag_block); - free(proc->frag_list); - free(proc->fragments); - free(proc->blocks); - free(proc); + block_processor_cleanup(proc); } static void append_to_work_queue(sqfs_block_processor_t *proc, -- cgit v1.2.3