From cada7ca7d9386e68c38fc504c01bd2cac33dac44 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Fri, 29 May 2020 03:38:10 +0200 Subject: Block processor: turn internal functions into interface entry points Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/common.c | 27 +++- lib/sqfs/block_processor/frontend.c | 4 +- lib/sqfs/block_processor/internal.h | 25 ++-- lib/sqfs/block_processor/serial.c | 75 +++++----- lib/sqfs/block_processor/winpthread.c | 267 +++++++++++++++++----------------- 5 files changed, 212 insertions(+), 186 deletions(-) (limited to 'lib/sqfs/block_processor') diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index 6fb8f6f..6a62d4f 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -62,7 +62,8 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) proc->free_list = blk; } -int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +static int process_completed_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk) { sqfs_u64 location; sqfs_u32 size; @@ -119,8 +120,8 @@ static bool is_zero_block(unsigned char *ptr, size_t size) return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; } -int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, - sqfs_u8 *scratch, size_t scratch_size) +static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp, + sqfs_u8 *scratch, size_t scratch_size) { sqfs_block_t *it; size_t offset; @@ -165,8 +166,9 @@ int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, return 0; } -int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, - sqfs_block_t **blk_out) +static int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag, + sqfs_block_t **blk_out) { sqfs_u32 index, offset; size_t size; @@ -247,3 +249,18 @@ fail_outblk: } return err; } + +int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size, + sqfs_compressor_t *cmp, sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + base->process_completed_block = process_completed_block; + base->process_completed_fragment = process_completed_fragment; + base->process_block = process_block; + base->max_block_size = max_block_size; + base->cmp = cmp; + base->frag_tbl = tbl; + base->wr = wr; + base->stats.size = sizeof(base->stats); + return 0; +} diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c index 0865df8..8381dd8 100644 --- a/lib/sqfs/block_processor/frontend.c +++ b/lib/sqfs/block_processor/frontend.c @@ -34,7 +34,7 @@ static int add_sentinel_block(sqfs_block_processor_t *proc) blk->inode = proc->inode; blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return append_to_work_queue(proc, blk); + return proc->append_to_work_queue(proc, blk); } static int flush_block(sqfs_block_processor_t *proc) @@ -51,7 +51,7 @@ static int flush_block(sqfs_block_processor_t *proc) } block->index = proc->blk_index++; - return append_to_work_queue(proc, block); + return proc->append_to_work_queue(proc, block); } int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index a71e6d7..61e13d7 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -65,20 +65,25 @@ struct sqfs_block_processor_t { size_t max_block_size; bool begin_called; -}; -SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc, - sqfs_block_t *block); + int (*process_completed_block)(sqfs_block_processor_t *proc, + sqfs_block_t *block); -SQFS_INTERNAL -int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, - sqfs_block_t **blk_out); + int (*process_completed_fragment)(sqfs_block_processor_t *proc, + sqfs_block_t *frag, + sqfs_block_t **blk_out); -SQFS_INTERNAL -int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, + int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp, sqfs_u8 *scratch, size_t scratch_size); -SQFS_INTERNAL -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block); + int (*append_to_work_queue)(sqfs_block_processor_t *proc, + sqfs_block_t *block); +}; + +SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base, + size_t max_block_size, + sqfs_compressor_t *cmp, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl); #endif /* INTERNAL_H */ diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c index 54edda1..9768a7f 100644 --- a/lib/sqfs/block_processor/serial.c +++ b/lib/sqfs/block_processor/serial.c @@ -39,30 +39,8 @@ static void block_processor_destroy(sqfs_object_t *obj) free(proc); } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - serial_block_processor_t *proc; - (void)num_workers; (void)max_backlog; - - proc = alloc_flex(sizeof(*proc), 1, max_block_size); - if (proc == NULL) - return NULL; - - proc->base.max_block_size = max_block_size; - proc->base.cmp = cmp; - proc->base.frag_tbl = tbl; - proc->base.wr = wr; - proc->base.stats.size = sizeof(proc->base.stats); - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - return (sqfs_block_processor_t *)proc; -} - -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) +static int append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) { serial_block_processor_t *sproc = (serial_block_processor_t *)proc; sqfs_block_t *fragblk = NULL; @@ -70,27 +48,26 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) if (sproc->status != 0) goto fail; - sproc->status = block_processor_do_block(block, proc->cmp, - sproc->scratch, - proc->max_block_size); + sproc->status = proc->process_block(block, proc->cmp, sproc->scratch, + proc->max_block_size); if (sproc->status != 0) goto fail; if (block->flags & SQFS_BLK_IS_FRAGMENT) { - sproc->status = process_completed_fragment(proc, block, - &fragblk); + sproc->status = proc->process_completed_fragment(proc, block, + &fragblk); if (fragblk == NULL) return sproc->status; block = fragblk; - sproc->status = block_processor_do_block(block, proc->cmp, - sproc->scratch, - proc->max_block_size); + sproc->status = proc->process_block(block, proc->cmp, + sproc->scratch, + proc->max_block_size); if (sproc->status != 0) goto fail; } - sproc->status = process_completed_block(proc, block); + sproc->status = proc->process_completed_block(proc, block); return sproc->status; fail: free_block_list(block->frag_list); @@ -98,6 +75,30 @@ fail: return sproc->status; } +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + serial_block_processor_t *proc; + (void)num_workers; (void)max_backlog; + + proc = alloc_flex(sizeof(*proc), 1, max_block_size); + if (proc == NULL) + return NULL; + + if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + free(proc); + return NULL; + } + + proc->base.append_to_work_queue = append_to_work_queue; + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + return (sqfs_block_processor_t *)proc; +} + int sqfs_block_processor_sync(sqfs_block_processor_t *proc) { return ((serial_block_processor_t *)proc)->status; @@ -110,13 +111,13 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (proc->frag_block == NULL || sproc->status != 0) goto fail; - sproc->status = block_processor_do_block(proc->frag_block, proc->cmp, - sproc->scratch, - proc->max_block_size); + sproc->status = proc->process_block(proc->frag_block, proc->cmp, + sproc->scratch, + proc->max_block_size); if (sproc->status != 0) goto fail; - sproc->status = process_completed_block(proc, proc->frag_block); + sproc->status = proc->process_completed_block(proc, proc->frag_block); proc->frag_block = NULL; return sproc->status; fail: diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 281eb6b..44c62f1 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -142,6 +142,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) { compress_worker_t *worker = arg; thread_pool_processor_t *shared = worker->shared; + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared; sqfs_block_t *blk = NULL; int status = 0; @@ -156,9 +157,8 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) if (blk == NULL) break; - status = block_processor_do_block(blk, worker->cmp, - worker->scratch, - shared->base.max_block_size); + status = proc->process_block(blk, worker->cmp, worker->scratch, + proc->max_block_size); } return THREAD_EXIT_SUCCESS; @@ -198,127 +198,6 @@ static void block_processor_destroy(sqfs_object_t *obj) free(proc); } -static thread_pool_processor_t *block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - unsigned int i; - - if (num_workers < 1) - num_workers = 1; - - proc = alloc_flex(sizeof(*proc), - sizeof(proc->workers[0]), num_workers); - if (proc == NULL) - return NULL; - - proc->num_workers = num_workers; - proc->max_backlog = max_backlog; - proc->base.max_block_size = max_block_size; - proc->base.cmp = cmp; - proc->base.frag_tbl = tbl; - proc->base.wr = wr; - proc->base.stats.size = sizeof(proc->base.stats); - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - for (i = 0; i < num_workers; ++i) { - proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, max_block_size); - - if (proc->workers[i] == NULL) - goto fail; - - proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(cmp); - - if (proc->workers[i]->cmp == NULL) - goto fail; - } - - return proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} - -#if defined(_WIN32) || defined(__WINDOWS__) -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - unsigned int i; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - InitializeCriticalSection(&proc->mtx); - InitializeConditionVariable(&proc->queue_cond); - InitializeConditionVariable(&proc->done_cond); - - for (i = 0; i < num_workers; ++i) { - proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, - proc->workers[i], 0, 0); - if (proc->workers[i]->thread == NULL) - goto fail; - } - - return (sqfs_block_processor_t *)proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} -#else -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - sigset_t set, oldset; - unsigned int i; - int ret; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; - proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - - sigfillset(&set); - pthread_sigmask(SIG_SETMASK, &set, &oldset); - - for (i = 0; i < num_workers; ++i) { - ret = pthread_create(&proc->workers[i]->thread, NULL, - worker_proc, proc->workers[i]); - - if (ret != 0) - goto fail; - } - - pthread_sigmask(SIG_SETMASK, &oldset, NULL); - return (sqfs_block_processor_t *)proc; -fail: - pthread_sigmask(SIG_SETMASK, &oldset, NULL); - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} -#endif - static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk) { sqfs_block_t *it = proc->io_queue, *prev = NULL; @@ -397,7 +276,7 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) while (status == 0 && list != NULL) { it = list; list = list->next; - status = process_completed_block(&proc->base, it); + status = proc->base.process_completed_block(&proc->base, it); if (status != 0) { LOCK(&proc->mtx); @@ -411,7 +290,8 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) return status; } -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) +static int append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) { thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; sqfs_block_t *io_list = NULL, *io_list_last = NULL; @@ -454,8 +334,9 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) if (blk->flags & SQFS_BLK_IS_FRAGMENT) { fragblk = NULL; - thproc->status = process_completed_fragment(proc, blk, - &fragblk); + thproc->status = + proc->process_completed_fragment(proc, blk, + &fragblk); if (fragblk != NULL) { fragblk->io_seq_num = thproc->io_enq_id++; @@ -481,6 +362,54 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) return status; } +static thread_pool_processor_t *block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + unsigned int i; + + if (num_workers < 1) + num_workers = 1; + + proc = alloc_flex(sizeof(*proc), + sizeof(proc->workers[0]), num_workers); + if (proc == NULL) + return NULL; + + if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + free(proc); + return NULL; + } + + proc->base.append_to_work_queue = append_to_work_queue; + proc->num_workers = num_workers; + proc->max_backlog = max_backlog; + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + + for (i = 0; i < num_workers; ++i) { + proc->workers[i] = alloc_flex(sizeof(compress_worker_t), + 1, max_block_size); + + if (proc->workers[i] == NULL) + goto fail; + + proc->workers[i]->shared = proc; + proc->workers[i]->cmp = sqfs_copy(cmp); + + if (proc->workers[i]->cmp == NULL) + goto fail; + } + + return proc; +fail: + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} + int sqfs_block_processor_sync(sqfs_block_processor_t *proc) { return append_to_work_queue(proc, NULL); @@ -492,16 +421,16 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) sqfs_block_t *blk; int status; - status = append_to_work_queue(proc, NULL); + status = sqfs_block_processor_sync(proc); if (status == 0 && proc->frag_block != NULL) { blk = proc->frag_block; blk->next = NULL; proc->frag_block = NULL; - status = block_processor_do_block(blk, proc->cmp, - thproc->workers[0]->scratch, - proc->max_block_size); + status = proc->process_block(blk, proc->cmp, + thproc->workers[0]->scratch, + proc->max_block_size); if (status == 0) status = handle_io_queue(thproc, blk); @@ -517,3 +446,77 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) return status; } + +#if defined(_WIN32) || defined(__WINDOWS__) +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + unsigned int i; + + proc = block_processor_create(max_block_size, cmp, num_workers, + max_backlog, wr, tbl); + if (proc == NULL) + return NULL; + + InitializeCriticalSection(&proc->mtx); + InitializeConditionVariable(&proc->queue_cond); + InitializeConditionVariable(&proc->done_cond); + + for (i = 0; i < num_workers; ++i) { + proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, + proc->workers[i], 0, 0); + if (proc->workers[i]->thread == NULL) + goto fail; + } + + return (sqfs_block_processor_t *)proc; +fail: + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} +#else +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + sigset_t set, oldset; + unsigned int i; + int ret; + + proc = block_processor_create(max_block_size, cmp, num_workers, + max_backlog, wr, tbl); + if (proc == NULL) + return NULL; + + proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, &oldset); + + for (i = 0; i < num_workers; ++i) { + ret = pthread_create(&proc->workers[i]->thread, NULL, + worker_proc, proc->workers[i]); + + if (ret != 0) + goto fail; + } + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + return (sqfs_block_processor_t *)proc; +fail: + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} +#endif -- cgit v1.2.3