From 544575bd09378dcf573e33a315628ecc193925e7 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sat, 21 Sep 2019 00:04:12 +0200 Subject: Merge some of serial & pthread block processor code paths/declarations Signed-off-by: David Oberhollenzer --- lib/sqfs/Makemodule.am | 1 + lib/sqfs/blk_proc/internal.h | 56 ++++++++++++++++++++++++++++++++++ lib/sqfs/blk_proc/process_block.c | 20 ++++++++++++ lib/sqfs/blk_proc/pthread.c | 64 --------------------------------------- lib/sqfs/blk_proc/serial.c | 36 +++++----------------- 5 files changed, 85 insertions(+), 92 deletions(-) diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index cb003c0..16a07e9 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -28,6 +28,7 @@ libsquashfs_la_LIBADD += $(ZSTD_LIBS) $(PTHREAD_LIBS) libutil.la if HAVE_PTHREAD libsquashfs_la_SOURCES += lib/sqfs/blk_proc/pthread.c +libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD else libsquashfs_la_SOURCES += lib/sqfs/blk_proc/serial.c endif diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index 1c33999..85c7783 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -9,8 +9,64 @@ #include "sqfs/error.h" #include "util.h" +#include +#include + +#ifdef WITH_PTHREAD +#include +#endif + +#ifdef WITH_PTHREAD +typedef struct { + sqfs_block_processor_t *shared; + sqfs_compressor_t *cmp; + pthread_t thread; + uint8_t scratch[]; +} compress_worker_t; +#endif + +struct sqfs_block_processor_t { + /* synchronization primitives */ +#ifdef WITH_PTHREAD + pthread_mutex_t mtx; + pthread_cond_t queue_cond; + pthread_cond_t done_cond; +#endif + + /* needs rw access by worker and main thread */ + sqfs_block_t *queue; + sqfs_block_t *queue_last; + + sqfs_block_t *done; + bool terminate; + size_t backlog; + + /* used by main thread only */ + uint32_t enqueue_id; + uint32_t dequeue_id; + + unsigned int num_workers; + sqfs_block_cb cb; + void *user; + int status; + size_t max_backlog; + + /* used only by workers */ + size_t max_block_size; + +#ifdef WITH_PTHREAD + compress_worker_t *workers[]; +#else + sqfs_compressor_t *cmp; + uint8_t scratch[]; +#endif +}; + SQFS_INTERNAL int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, uint8_t *scratch, size_t scratch_size); +SQFS_INTERNAL int process_completed_blocks(sqfs_block_processor_t *proc, + sqfs_block_t *queue); + #endif /* INTERNAL_H */ diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index 0eae244..1b6ee29 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -36,3 +36,23 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, return 0; } + +int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue) +{ + sqfs_block_t *it; + + while (queue != NULL) { + it = queue; + queue = queue->next; + + if (it->flags & SQFS_BLK_COMPRESS_ERROR) { + proc->status = SQFS_ERROR_COMRPESSOR; + } else if (proc->status == 0) { + proc->status = proc->cb(proc->user, it); + } + + free(it); + } + + return proc->status; +} diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index f56dfe1..565bad2 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,46 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -#include -#include -#include - -typedef struct { - sqfs_block_processor_t *shared; - sqfs_compressor_t *cmp; - pthread_t thread; - uint8_t scratch[]; -} compress_worker_t; - -struct sqfs_block_processor_t { - pthread_mutex_t mtx; - pthread_cond_t queue_cond; - pthread_cond_t done_cond; - - /* needs rw access by worker and main thread */ - sqfs_block_t *queue; - sqfs_block_t *queue_last; - - sqfs_block_t *done; - bool terminate; - size_t backlog; - - /* used by main thread only */ - uint32_t enqueue_id; - uint32_t dequeue_id; - - unsigned int num_workers; - sqfs_block_cb cb; - void *user; - int status; - size_t max_backlog; - - /* used only by workers */ - size_t max_block_size; - - compress_worker_t *workers[]; -}; - static void store_completed_block(sqfs_block_processor_t *shared, sqfs_block_t *blk) { @@ -222,30 +182,6 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) free(proc); } -static int process_completed_blocks(sqfs_block_processor_t *proc, - sqfs_block_t *queue) -{ - sqfs_block_t *it; - int ret; - - while (queue != NULL) { - it = queue; - queue = queue->next; - - if (it->flags & SQFS_BLK_COMPRESS_ERROR) { - proc->status = SQFS_ERROR_COMRPESSOR; - } else { - ret = proc->cb(proc->user, it); - if (ret) - proc->status = ret; - } - - free(it); - } - - return proc->status; -} - int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index 85f39fe..ee172de 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -7,19 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -#include -#include - -struct sqfs_block_processor_t { - size_t max_block_size; - sqfs_compressor_t *cmp; - sqfs_block_cb cb; - void *user; - int status; - - uint8_t scratch[]; -}; - sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, @@ -51,23 +38,16 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { - int ret; - - ret = sqfs_block_process(block, proc->cmp, - proc->scratch, proc->max_block_size); - if (ret) - goto fail; + if (proc->status != 0) { + free(block); + return proc->status; + } - ret = proc->cb(proc->user, block); - if (ret) - goto fail; + proc->status = sqfs_block_process(block, proc->cmp, + proc->scratch, proc->max_block_size); - free(block); - return 0; -fail: - free(block); - proc->status = ret; - return ret; + block->next = NULL; + return process_completed_blocks(proc, block); } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) -- cgit v1.2.3