diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2020-05-29 03:38:10 +0200 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2020-05-29 03:40:12 +0200 |
commit | cada7ca7d9386e68c38fc504c01bd2cac33dac44 (patch) | |
tree | 3fa6272a42a0504263d350ba82638579ad6d25ed /lib/sqfs/block_processor/winpthread.c | |
parent | 4b20b555bd9813ce85cacf78f0c194fa66ad5172 (diff) |
Block processor: turn internal functions into interface entry points
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/winpthread.c')
-rw-r--r-- | lib/sqfs/block_processor/winpthread.c | 267 |
1 files changed, 135 insertions, 132 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 281eb6b..44c62f1 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -142,6 +142,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) { compress_worker_t *worker = arg; thread_pool_processor_t *shared = worker->shared; + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared; sqfs_block_t *blk = NULL; int status = 0; @@ -156,9 +157,8 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) if (blk == NULL) break; - status = block_processor_do_block(blk, worker->cmp, - worker->scratch, - shared->base.max_block_size); + status = proc->process_block(blk, worker->cmp, worker->scratch, + proc->max_block_size); } return THREAD_EXIT_SUCCESS; @@ -198,127 +198,6 @@ static void block_processor_destroy(sqfs_object_t *obj) free(proc); } -static thread_pool_processor_t *block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - unsigned int i; - - if (num_workers < 1) - num_workers = 1; - - proc = alloc_flex(sizeof(*proc), - sizeof(proc->workers[0]), num_workers); - if (proc == NULL) - return NULL; - - proc->num_workers = num_workers; - proc->max_backlog = max_backlog; - proc->base.max_block_size = max_block_size; - proc->base.cmp = cmp; - proc->base.frag_tbl = tbl; - proc->base.wr = wr; - proc->base.stats.size = sizeof(proc->base.stats); - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - for (i = 0; i < num_workers; ++i) { - proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, max_block_size); - - if (proc->workers[i] == NULL) - goto fail; - - proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(cmp); - - if (proc->workers[i]->cmp == NULL) - goto fail; - } - - return proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} - -#if defined(_WIN32) || defined(__WINDOWS__) -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - unsigned int i; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - InitializeCriticalSection(&proc->mtx); - InitializeConditionVariable(&proc->queue_cond); - InitializeConditionVariable(&proc->done_cond); - - for (i = 0; i < num_workers; ++i) { - proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, - proc->workers[i], 0, 0); - if (proc->workers[i]->thread == NULL) - goto fail; - } - - return (sqfs_block_processor_t *)proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} -#else -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - sigset_t set, oldset; - unsigned int i; - int ret; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; - proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - - sigfillset(&set); - pthread_sigmask(SIG_SETMASK, &set, &oldset); - - for (i = 0; i < num_workers; ++i) { - ret = pthread_create(&proc->workers[i]->thread, NULL, - worker_proc, proc->workers[i]); - - if (ret != 0) - goto fail; - } - - pthread_sigmask(SIG_SETMASK, &oldset, NULL); - return (sqfs_block_processor_t *)proc; -fail: - pthread_sigmask(SIG_SETMASK, &oldset, NULL); - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} -#endif - static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk) { sqfs_block_t *it = proc->io_queue, *prev = NULL; @@ -397,7 +276,7 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) while (status == 0 && list != NULL) { it = list; list = list->next; - status = process_completed_block(&proc->base, it); + status = proc->base.process_completed_block(&proc->base, it); if (status != 0) { LOCK(&proc->mtx); @@ -411,7 +290,8 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) return status; } -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) +static int append_to_work_queue(sqfs_block_processor_t *proc, + sqfs_block_t *block) { thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; sqfs_block_t *io_list = NULL, *io_list_last = NULL; @@ -454,8 +334,9 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) if (blk->flags & SQFS_BLK_IS_FRAGMENT) { fragblk = NULL; - thproc->status = process_completed_fragment(proc, blk, - &fragblk); + thproc->status = + proc->process_completed_fragment(proc, blk, + &fragblk); if (fragblk != NULL) { fragblk->io_seq_num = thproc->io_enq_id++; @@ -481,6 +362,54 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) return status; } +static thread_pool_processor_t *block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + unsigned int i; + + if (num_workers < 1) + num_workers = 1; + + proc = alloc_flex(sizeof(*proc), + sizeof(proc->workers[0]), num_workers); + if (proc == NULL) + return NULL; + + if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + free(proc); + return NULL; + } + + proc->base.append_to_work_queue = append_to_work_queue; + proc->num_workers = num_workers; + proc->max_backlog = max_backlog; + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + + for (i = 0; i < num_workers; ++i) { + proc->workers[i] = alloc_flex(sizeof(compress_worker_t), + 1, max_block_size); + + if (proc->workers[i] == NULL) + goto fail; + + proc->workers[i]->shared = proc; + proc->workers[i]->cmp = sqfs_copy(cmp); + + if (proc->workers[i]->cmp == NULL) + goto fail; + } + + return proc; +fail: + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} + int sqfs_block_processor_sync(sqfs_block_processor_t *proc) { return append_to_work_queue(proc, NULL); @@ -492,16 +421,16 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) sqfs_block_t *blk; int status; - status = append_to_work_queue(proc, NULL); + status = sqfs_block_processor_sync(proc); if (status == 0 && proc->frag_block != NULL) { blk = proc->frag_block; blk->next = NULL; proc->frag_block = NULL; - status = block_processor_do_block(blk, proc->cmp, - thproc->workers[0]->scratch, - proc->max_block_size); + status = proc->process_block(blk, proc->cmp, + thproc->workers[0]->scratch, + proc->max_block_size); if (status == 0) status = handle_io_queue(thproc, blk); @@ -517,3 +446,77 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) return status; } + +#if defined(_WIN32) || defined(__WINDOWS__) +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + unsigned int i; + + proc = block_processor_create(max_block_size, cmp, num_workers, + max_backlog, wr, tbl); + if (proc == NULL) + return NULL; + + InitializeCriticalSection(&proc->mtx); + InitializeConditionVariable(&proc->queue_cond); + InitializeConditionVariable(&proc->done_cond); + + for (i = 0; i < num_workers; ++i) { + proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, + proc->workers[i], 0, 0); + if (proc->workers[i]->thread == NULL) + goto fail; + } + + return (sqfs_block_processor_t *)proc; +fail: + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} +#else +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + thread_pool_processor_t *proc; + sigset_t set, oldset; + unsigned int i; + int ret; + + proc = block_processor_create(max_block_size, cmp, num_workers, + max_backlog, wr, tbl); + if (proc == NULL) + return NULL; + + proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, &oldset); + + for (i = 0; i < num_workers; ++i) { + ret = pthread_create(&proc->workers[i]->thread, NULL, + worker_proc, proc->workers[i]); + + if (ret != 0) + goto fail; + } + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + return (sqfs_block_processor_t *)proc; +fail: + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + block_processor_destroy((sqfs_object_t *)proc); + return NULL; +} +#endif |