From 1c82a9f2fa98e68b68e6868298fba1830d91d1db Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 16 Feb 2020 17:34:06 +0100 Subject: block processor: move the internals to the respective implementations Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/internal.h | 48 +------------ lib/sqfs/block_processor/serial.c | 63 +++++++++------- lib/sqfs/block_processor/winpthread.c | 130 ++++++++++++++++++++++------------ 3 files changed, 120 insertions(+), 121 deletions(-) diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index b7fbef0..3c6570e 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -22,17 +22,6 @@ #include #include -#include - -#ifdef WITH_PTHREAD -#include -#include -#elif defined(_WIN32) || defined(__WINDOWS__) -#define WIN32_LEAN_AND_MEAN -#include -#endif - -typedef struct compress_worker_t compress_worker_t; typedef struct sqfs_block_t { struct sqfs_block_t *next; @@ -52,53 +41,18 @@ typedef struct sqfs_block_t { struct sqfs_block_processor_t { sqfs_object_t obj; - /* synchronization primitives */ -#ifdef WITH_PTHREAD - pthread_mutex_t mtx; - pthread_cond_t queue_cond; - pthread_cond_t done_cond; -#elif defined(_WIN32) || defined(__WINDOWS__) - CRITICAL_SECTION mtx; - CONDITION_VARIABLE queue_cond; - CONDITION_VARIABLE done_cond; -#endif - - /* needs rw access by worker and main thread */ - sqfs_block_t *queue; - sqfs_block_t *queue_last; - - sqfs_block_t *done; - size_t backlog; - int status; - - /* used by main thread only */ - sqfs_u32 enqueue_id; - sqfs_u32 dequeue_id; - - unsigned int num_workers; - size_t max_backlog; - sqfs_frag_table_t *frag_tbl; sqfs_compressor_t *cmp; - sqfs_block_t *frag_block; - sqfs_block_writer_t *wr; + sqfs_block_processor_stats_t stats; - /* file API */ sqfs_inode_generic_t *inode; sqfs_block_t *blk_current; sqfs_u32 blk_flags; - /* used only by workers */ size_t max_block_size; - -#if defined(WITH_PTHREAD) || defined(_WIN32) || defined(__WINDOWS__) - compress_worker_t *workers[]; -#else - sqfs_u8 scratch[]; -#endif }; SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc, diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c index e44ce17..2e00198 100644 --- a/lib/sqfs/block_processor/serial.c +++ b/lib/sqfs/block_processor/serial.c @@ -7,6 +7,12 @@ #define SQFS_BUILDING_DLL #include "internal.h" +typedef struct { + sqfs_block_processor_t base; + int status; + sqfs_u8 scratch[]; +} serial_block_processor_t; + static void block_processor_destroy(sqfs_object_t *obj) { sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; @@ -23,71 +29,74 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl) { - sqfs_block_processor_t *proc; + serial_block_processor_t *proc; (void)num_workers; (void)max_backlog; proc = alloc_flex(sizeof(*proc), 1, max_block_size); if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; - proc->cmp = cmp; - proc->frag_tbl = tbl; - proc->wr = wr; - proc->stats.size = sizeof(proc->stats); + proc->base.max_block_size = max_block_size; + proc->base.cmp = cmp; + proc->base.frag_tbl = tbl; + proc->base.wr = wr; + proc->base.stats.size = sizeof(proc->base.stats); ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - return proc; + return (sqfs_block_processor_t *)proc; } int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + serial_block_processor_t *sproc = (serial_block_processor_t *)proc; sqfs_block_t *fragblk = NULL; - if (proc->status != 0) + if (sproc->status != 0) goto done; - proc->status = block_processor_do_block(block, proc->cmp, - proc->scratch, - proc->max_block_size); - if (proc->status != 0) + sproc->status = block_processor_do_block(block, proc->cmp, + sproc->scratch, + proc->max_block_size); + if (sproc->status != 0) goto done; if (block->flags & SQFS_BLK_IS_FRAGMENT) { - proc->status = process_completed_fragment(proc, block, - &fragblk); + sproc->status = process_completed_fragment(proc, block, + &fragblk); if (fragblk == NULL) goto done; free(block); block = fragblk; - proc->status = block_processor_do_block(block, proc->cmp, - proc->scratch, - proc->max_block_size); - if (proc->status != 0) + sproc->status = block_processor_do_block(block, proc->cmp, + sproc->scratch, + proc->max_block_size); + if (sproc->status != 0) goto done; } - proc->status = process_completed_block(proc, block); + sproc->status = process_completed_block(proc, block); done: free(block); - return proc->status; + return sproc->status; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { - if (proc->frag_block == NULL || proc->status != 0) + serial_block_processor_t *sproc = (serial_block_processor_t *)proc; + + if (proc->frag_block == NULL || sproc->status != 0) goto out; - proc->status = block_processor_do_block(proc->frag_block, proc->cmp, - proc->scratch, - proc->max_block_size); - if (proc->status != 0) + sproc->status = block_processor_do_block(proc->frag_block, proc->cmp, + sproc->scratch, + proc->max_block_size); + if (sproc->status != 0) goto out; - proc->status = process_completed_block(proc, proc->frag_block); + sproc->status = process_completed_block(proc, proc->frag_block); out: free(proc->frag_block); proc->frag_block = NULL; - return proc->status; + return sproc->status; } diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 0ad8475..aa1a99f 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -8,6 +8,8 @@ #include "internal.h" #if defined(_WIN32) || defined(__WINDOWS__) +# define WIN32_LEAN_AND_MEAN +# include # define LOCK(mtx) EnterCriticalSection(mtx) # define UNLOCK(mtx) LeaveCriticalSection(mtx) # define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) @@ -16,7 +18,11 @@ # define THREAD_TYPE DWORD WINAPI # define THREAD_ARG LPVOID # define THREAD_HANDLE HANDLE +# define MUTEX_TYPE CRITICAL_SECTION +# define CONDITION_TYPE CONDITION_VARIABLE #else +# include +# include # define LOCK(mtx) pthread_mutex_lock(mtx) # define UNLOCK(mtx) pthread_mutex_unlock(mtx) # define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx) @@ -25,15 +31,43 @@ # define THREAD_TYPE void * # define THREAD_ARG void * # define THREAD_HANDLE pthread_t +# define MUTEX_TYPE pthread_mutex_t +# define CONDITION_TYPE pthread_cond_t #endif +typedef struct compress_worker_t compress_worker_t; +typedef struct thread_pool_processor_t thread_pool_processor_t; + struct compress_worker_t { - sqfs_block_processor_t *shared; + thread_pool_processor_t *shared; sqfs_compressor_t *cmp; THREAD_HANDLE thread; sqfs_u8 scratch[]; }; +struct thread_pool_processor_t { + sqfs_block_processor_t base; + + MUTEX_TYPE mtx; + CONDITION_TYPE queue_cond; + CONDITION_TYPE done_cond; + + sqfs_block_t *queue; + sqfs_block_t *queue_last; + + sqfs_block_t *done; + size_t backlog; + int status; + + sqfs_u32 enqueue_id; + sqfs_u32 dequeue_id; + + unsigned int num_workers; + size_t max_backlog; + + compress_worker_t *workers[]; +}; + static void free_blk_list(sqfs_block_t *list) { sqfs_block_t *it; @@ -48,7 +82,7 @@ static void free_blk_list(sqfs_block_t *list) static THREAD_TYPE worker_proc(THREAD_ARG arg) { compress_worker_t *worker = arg; - sqfs_block_processor_t *shared = worker->shared; + thread_pool_processor_t *shared = worker->shared; sqfs_block_t *it, *prev, *blk = NULL; int status = 0; @@ -98,7 +132,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) status = block_processor_do_block(blk, worker->cmp, worker->scratch, - shared->max_block_size); + shared->base.max_block_size); } return THREAD_EXIT_SUCCESS; @@ -107,7 +141,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) #if defined(_WIN32) || defined(__WINDOWS__) static void block_processor_destroy(sqfs_object_t *obj) { - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; + thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; unsigned int i; EnterCriticalSection(&proc->mtx); @@ -133,8 +167,8 @@ static void block_processor_destroy(sqfs_object_t *obj) DeleteCriticalSection(&proc->mtx); free_blk_list(proc->queue); free_blk_list(proc->done); - free(proc->blk_current); - free(proc->frag_block); + free(proc->base.blk_current); + free(proc->base.frag_block); free(proc); } @@ -145,7 +179,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl) { - sqfs_block_processor_t *proc; + thread_pool_processor_t *proc; unsigned int i; if (num_workers < 1) @@ -156,13 +190,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; proc->num_workers = num_workers; proc->max_backlog = max_backlog; - proc->cmp = cmp; - proc->frag_tbl = tbl; - proc->wr = wr; - proc->stats.size = sizeof(proc->stats); + proc->base.max_block_size = max_block_size; + proc->base.cmp = cmp; + proc->base.frag_tbl = tbl; + proc->base.wr = wr; + proc->base.stats.size = sizeof(proc->base.stats); ((sqfs_object_t *)obj)->destroy = block_processor_destroy; InitializeCriticalSection(&proc->mtx); @@ -188,7 +222,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, goto fail; } - return proc; + return (sqfs_block_processor_t *)proc; fail: block_processor_destroy(proc); return NULL; @@ -196,7 +230,7 @@ fail: #else static void block_processor_destroy(sqfs_object_t *obj) { - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; + thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; unsigned int i; pthread_mutex_lock(&proc->mtx); @@ -217,8 +251,8 @@ static void block_processor_destroy(sqfs_object_t *obj) free_blk_list(proc->queue); free_blk_list(proc->done); - free(proc->blk_current); - free(proc->frag_block); + free(proc->base.blk_current); + free(proc->base.frag_block); free(proc); } @@ -229,7 +263,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl) { - sqfs_block_processor_t *proc; + thread_pool_processor_t *proc; sigset_t set, oldset; unsigned int i; int ret; @@ -246,13 +280,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->max_block_size = max_block_size; proc->num_workers = num_workers; proc->max_backlog = max_backlog; - proc->cmp = cmp; - proc->frag_tbl = tbl; - proc->wr = wr; - proc->stats.size = sizeof(proc->stats); + proc->base.max_block_size = max_block_size; + proc->base.cmp = cmp; + proc->base.frag_tbl = tbl; + proc->base.wr = wr; + proc->base.stats.size = sizeof(proc->base.stats); for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), @@ -281,7 +315,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, pthread_sigmask(SIG_SETMASK, &oldset, NULL); - return proc; + return (sqfs_block_processor_t *)proc; fail_thread: pthread_mutex_lock(&proc->mtx); proc->status = -1; @@ -311,7 +345,7 @@ fail_init: } #endif -static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc) +static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc) { sqfs_block_t *queue, *it, *prev; @@ -357,7 +391,7 @@ static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) return head; } -static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) +static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue) { sqfs_block_t *it, *block = NULL; int status = 0; @@ -369,7 +403,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) if (it->flags & SQFS_BLK_IS_FRAGMENT) { block = NULL; - status = process_completed_fragment(proc, it, &block); + status = process_completed_fragment(&proc->base, it, &block); if (block != NULL && status == 0) { LOCK(&proc->mtx); @@ -394,7 +428,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) free(block); } } else { - status = process_completed_block(proc, it); + status = process_completed_block(&proc->base, it); } free(it); @@ -404,7 +438,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue) return status; } -static int wait_completed(sqfs_block_processor_t *proc) +static int wait_completed(thread_pool_processor_t *proc) { sqfs_block_t *queue; int status; @@ -443,50 +477,52 @@ static int wait_completed(sqfs_block_processor_t *proc) int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; int status; - while (proc->backlog > proc->max_backlog) { - status = wait_completed(proc); + while (thproc->backlog > thproc->max_backlog) { + status = wait_completed(thproc); if (status) return status; } - LOCK(&proc->mtx); - status = proc->status; + LOCK(&thproc->mtx); + status = thproc->status; if (status != 0) goto out; if (block != NULL) { - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; + if (thproc->queue_last == NULL) { + thproc->queue = thproc->queue_last = block; } else { - proc->queue_last->next = block; - proc->queue_last = block; + thproc->queue_last->next = block; + thproc->queue_last = block; } - block->sequence_number = proc->enqueue_id++; + block->sequence_number = thproc->enqueue_id++; block->next = NULL; - proc->backlog += 1; + thproc->backlog += 1; block = NULL; } out: - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); free(block); return 0; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; int status; - LOCK(&proc->mtx); - status = proc->status; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); + LOCK(&thproc->mtx); + status = thproc->status; + SIGNAL_ALL(&thproc->queue_cond); + UNLOCK(&thproc->mtx); - while (status == 0 && proc->backlog > 0) - status = wait_completed(proc); + while (status == 0 && thproc->backlog > 0) + status = wait_completed(thproc); if (status == 0 && proc->frag_block != NULL) { status = append_to_work_queue(proc, proc->frag_block); @@ -495,7 +531,7 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (status) return status; - status = wait_completed(proc); + status = wait_completed(thproc); } return status; -- cgit v1.2.3