From 1c82a9f2fa98e68b68e6868298fba1830d91d1db Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 16 Feb 2020 17:34:06 +0100 Subject: block processor: move the internals to the respective implementations Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/winpthread.c | 130 ++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 47 deletions(-) (limited to 'lib/sqfs/block_processor/winpthread.c') diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 0ad8475..aa1a99f 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -8,6 +8,8 @@ #include "internal.h" #if defined(_WIN32) || defined(__WINDOWS__) +# define WIN32_LEAN_AND_MEAN +# include # define LOCK(mtx) EnterCriticalSection(mtx) # define UNLOCK(mtx) LeaveCriticalSection(mtx) # define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) @@ -16,7 +18,11 @@ # define THREAD_TYPE DWORD WINAPI # define THREAD_ARG LPVOID # define THREAD_HANDLE HANDLE +# define MUTEX_TYPE CRITICAL_SECTION +# define CONDITION_TYPE CONDITION_VARIABLE #else +# include +# include # define LOCK(mtx) pthread_mutex_lock(mtx) # define UNLOCK(mtx) pthread_mutex_unlock(mtx) # define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx) @@ -25,15 +31,43 @@ # define THREAD_TYPE void * # define THREAD_ARG void * # define THREAD_HANDLE pthread_t +# define MUTEX_TYPE pthread_mutex_t +# define CONDITION_TYPE pthread_cond_t #endif +typedef struct compress_worker_t compress_worker_t; +typedef struct thread_pool_processor_t thread_pool_processor_t; + struct compress_worker_t { - sqfs_block_processor_t *shared; + thread_pool_processor_t *shared; sqfs_compressor_t *cmp; THREAD_HANDLE thread; sqfs_u8 scratch[]; }; +struct thread_pool_processor_t { + sqfs_block_processor_t base; + + MUTEX_TYPE mtx; + CONDITION_TYPE queue_cond; + CONDITION_TYPE done_cond; + + sqfs_block_t *queue; + sqfs_block_t *queue_last; + + sqfs_block_t *done; + size_t backlog; + int status; + + sqfs_u32 enqueue_id; + sqfs_u32 dequeue_id; + + unsigned int num_workers; + size_t max_backlog; + + compress_worker_t *workers[]; +}; + static void free_blk_list(sqfs_block_t *list) { sqfs_block_t *it; @@ -48,7 +82,7 @@ static void free_blk_list(sqfs_block_t *list) static THREAD_TYPE worker_proc(THREAD_ARG arg) { compress_worker_t *worker = arg; - sqfs_block_processor_t *shared = worker->shared; + thread_pool_processor_t *shared = worker->shared; sqfs_block_t *it, *prev, *blk = NULL; int status = 0; @@ -98,7 +132,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) status = block_processor_do_block(blk, worker->cmp, worker->scratch, - shared->max_block_size); + shared->base.max_block_size); } return THREAD_EXIT_SUCCESS; @@ -107,7 +141,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) #if defined(_WIN32) || defined(__WINDOWS__) static void block_processor_destroy(sqfs_object_t *obj) { - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; + thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; unsigned int i; EnterCriticalSection(&proc->mtx); @@ -133,8 +167,8 @@ static void block_processor_destroy(sqfs_object_t *obj) DeleteCriticalSection(&proc->mtx); free_blk_list(proc->queue); free_blk_list(proc->done); - free(proc->blk_current); - free(proc->frag_block); + free(proc->base.blk_current); + free(proc->base.frag_block); free(proc); } @@ -145,7 +179,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl) { - sqfs_block_processor_t *proc; + thread_pool_processor_t *proc; unsigned int i; if (num_workers < 1) @@ -156,13 +190,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; proc->num_workers = num_workers; proc->max_backlog = max_backlog; - proc->cmp = cmp; - proc->frag_tbl = tbl; - proc->wr = wr; - proc->stats.size = sizeof(proc->stats); + proc->base.max_block_size = max_block_size; + proc->base.cmp = cmp; + proc->base.frag_tbl = tbl; + proc->base.wr = wr; + proc->base.stats.size = sizeof(proc->base.stats); ((sqfs_object_t *)obj)->destroy = block_processor_destroy; InitializeCriticalSection(&proc->mtx); @@ -188,7 +222,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, goto fail; } - return proc; + return (sqfs_block_processor_t *)proc; fail: block_processor_destroy(proc); return NULL; @@ -196,7 +230,7 @@ fail: #else static void block_processor_destroy(sqfs_object_t *obj) { - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; + thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; unsigned int i; pthread_mutex_lock(&proc->mtx); @@ -217,8 +251,8 @@ static void block_processor_destroy(sqfs_object_t *obj) free_blk_list(proc->queue); free_blk_list(proc->done); - free(proc->blk_current); - free(proc->frag_block); + free(proc->base.blk_current); + free(proc->base.frag_block); free(proc); } @@ -229,7 +263,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl) { - sqfs_block_processor_t *proc; + thread_pool_processor_t *proc; sigset_t set, oldset; unsigned int i; int ret; @@ -246,13 +280,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->max_block_size = max_block_size; proc->num_workers = num_workers; proc->max_backlog = max_backlog; - proc->cmp = cmp; - proc->frag_tbl = tbl; - proc->wr = wr; - proc->stats.size = sizeof(proc->stats); + proc->base.max_block_size = max_block_size; + proc->base.cmp = cmp; + proc->base.frag_tbl = tbl; + proc->base.wr = wr; + proc->base.stats.size = sizeof(proc->base.stats); for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), @@ -281,7 +315,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, pthread_sigmask(SIG_SETMASK, &oldset, NULL); - return proc; + return (sqfs_block_processor_t *)proc; fail_thread: pthread_mutex_lock(&proc->mtx); proc->status = -1; @@ -311,7 +345,7 @@ fail_init: } #endif -static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc) +static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc) { sqfs_block_t *queue, *it, *prev; @@ -357,7 +391,7 @@ static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) return head; } -static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) +static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue) { sqfs_block_t *it, *block = NULL; int status = 0; @@ -369,7 +403,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) if (it->flags & SQFS_BLK_IS_FRAGMENT) { block = NULL; - status = process_completed_fragment(proc, it, &block); + status = process_completed_fragment(&proc->base, it, &block); if (block != NULL && status == 0) { LOCK(&proc->mtx); @@ -394,7 +428,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) free(block); } } else { - status = process_completed_block(proc, it); + status = process_completed_block(&proc->base, it); } free(it); @@ -404,7 +438,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) return status; } -static int wait_completed(sqfs_block_processor_t *proc) +static int wait_completed(thread_pool_processor_t *proc) { sqfs_block_t *queue; int status; @@ -443,50 +477,52 @@ static int wait_completed(sqfs_block_processor_t *proc) int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; int status; - while (proc->backlog > proc->max_backlog) { - status = wait_completed(proc); + while (thproc->backlog > thproc->max_backlog) { + status = wait_completed(thproc); if (status) return status; } - LOCK(&proc->mtx); - status = proc->status; + LOCK(&thproc->mtx); + status = thproc->status; if (status != 0) goto out; if (block != NULL) { - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; + if (thproc->queue_last == NULL) { + thproc->queue = thproc->queue_last = block; } else { - proc->queue_last->next = block; - proc->queue_last = block; + thproc->queue_last->next = block; + thproc->queue_last = block; } - block->sequence_number = proc->enqueue_id++; + block->sequence_number = thproc->enqueue_id++; block->next = NULL; - proc->backlog += 1; + thproc->backlog += 1; block = NULL; } out: - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); free(block); return 0; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; int status; - LOCK(&proc->mtx); - status = proc->status; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); + LOCK(&thproc->mtx); + status = thproc->status; + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); - while (status == 0 && proc->backlog > 0) - status = wait_completed(proc); + while (status == 0 && thproc->backlog > 0) + status = wait_completed(thproc); if (status == 0 && proc->frag_block != NULL) { status = append_to_work_queue(proc, proc->frag_block); @@ -495,7 +531,7 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (status) return status; - status = wait_completed(proc); + status = wait_completed(thproc); } return status; -- cgit v1.2.3