summaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor/winpthread.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-29 03:38:10 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-29 03:40:12 +0200
commitcada7ca7d9386e68c38fc504c01bd2cac33dac44 (patch)
tree3fa6272a42a0504263d350ba82638579ad6d25ed /lib/sqfs/block_processor/winpthread.c
parent4b20b555bd9813ce85cacf78f0c194fa66ad5172 (diff)
Block processor: turn internal functions into interface entry points
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/winpthread.c')
-rw-r--r--lib/sqfs/block_processor/winpthread.c267
1 files changed, 135 insertions, 132 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 281eb6b..44c62f1 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -142,6 +142,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
thread_pool_processor_t *shared = worker->shared;
+ sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared;
sqfs_block_t *blk = NULL;
int status = 0;
@@ -156,9 +157,8 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
if (blk == NULL)
break;
- status = block_processor_do_block(blk, worker->cmp,
- worker->scratch,
- shared->base.max_block_size);
+ status = proc->process_block(blk, worker->cmp, worker->scratch,
+ proc->max_block_size);
}
return THREAD_EXIT_SUCCESS;
@@ -198,127 +198,6 @@ static void block_processor_destroy(sqfs_object_t *obj)
free(proc);
}
-static thread_pool_processor_t *block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- unsigned int i;
-
- if (num_workers < 1)
- num_workers = 1;
-
- proc = alloc_flex(sizeof(*proc),
- sizeof(proc->workers[0]), num_workers);
- if (proc == NULL)
- return NULL;
-
- proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
- proc->base.max_block_size = max_block_size;
- proc->base.cmp = cmp;
- proc->base.frag_tbl = tbl;
- proc->base.wr = wr;
- proc->base.stats.size = sizeof(proc->base.stats);
- ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
-
- if (proc->workers[i] == NULL)
- goto fail;
-
- proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = sqfs_copy(cmp);
-
- if (proc->workers[i]->cmp == NULL)
- goto fail;
- }
-
- return proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-
-#if defined(_WIN32) || defined(__WINDOWS__)
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- unsigned int i;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- InitializeCriticalSection(&proc->mtx);
- InitializeConditionVariable(&proc->queue_cond);
- InitializeConditionVariable(&proc->done_cond);
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
- proc->workers[i], 0, 0);
- if (proc->workers[i]->thread == NULL)
- goto fail;
- }
-
- return (sqfs_block_processor_t *)proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-#else
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- sigset_t set, oldset;
- unsigned int i;
- int ret;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
- proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-
- sigfillset(&set);
- pthread_sigmask(SIG_SETMASK, &set, &oldset);
-
- for (i = 0; i < num_workers; ++i) {
- ret = pthread_create(&proc->workers[i]->thread, NULL,
- worker_proc, proc->workers[i]);
-
- if (ret != 0)
- goto fail;
- }
-
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- return (sqfs_block_processor_t *)proc;
-fail:
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-#endif
-
static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)
{
sqfs_block_t *it = proc->io_queue, *prev = NULL;
@@ -397,7 +276,7 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
while (status == 0 && list != NULL) {
it = list;
list = list->next;
- status = process_completed_block(&proc->base, it);
+ status = proc->base.process_completed_block(&proc->base, it);
if (status != 0) {
LOCK(&proc->mtx);
@@ -411,7 +290,8 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
return status;
}
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
+static int append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
{
thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
sqfs_block_t *io_list = NULL, *io_list_last = NULL;
@@ -454,8 +334,9 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
fragblk = NULL;
- thproc->status = process_completed_fragment(proc, blk,
- &fragblk);
+ thproc->status =
+ proc->process_completed_fragment(proc, blk,
+ &fragblk);
if (fragblk != NULL) {
fragblk->io_seq_num = thproc->io_enq_id++;
@@ -481,6 +362,54 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
return status;
}
+static thread_pool_processor_t *block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ unsigned int i;
+
+ if (num_workers < 1)
+ num_workers = 1;
+
+ proc = alloc_flex(sizeof(*proc),
+ sizeof(proc->workers[0]), num_workers);
+ if (proc == NULL)
+ return NULL;
+
+ if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ free(proc);
+ return NULL;
+ }
+
+ proc->base.append_to_work_queue = append_to_work_queue;
+ proc->num_workers = num_workers;
+ proc->max_backlog = max_backlog;
+ ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+ 1, max_block_size);
+
+ if (proc->workers[i] == NULL)
+ goto fail;
+
+ proc->workers[i]->shared = proc;
+ proc->workers[i]->cmp = sqfs_copy(cmp);
+
+ if (proc->workers[i]->cmp == NULL)
+ goto fail;
+ }
+
+ return proc;
+fail:
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+
int sqfs_block_processor_sync(sqfs_block_processor_t *proc)
{
return append_to_work_queue(proc, NULL);
@@ -492,16 +421,16 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
sqfs_block_t *blk;
int status;
- status = append_to_work_queue(proc, NULL);
+ status = sqfs_block_processor_sync(proc);
if (status == 0 && proc->frag_block != NULL) {
blk = proc->frag_block;
blk->next = NULL;
proc->frag_block = NULL;
- status = block_processor_do_block(blk, proc->cmp,
- thproc->workers[0]->scratch,
- proc->max_block_size);
+ status = proc->process_block(blk, proc->cmp,
+ thproc->workers[0]->scratch,
+ proc->max_block_size);
if (status == 0)
status = handle_io_queue(thproc, blk);
@@ -517,3 +446,77 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
return status;
}
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ unsigned int i;
+
+ proc = block_processor_create(max_block_size, cmp, num_workers,
+ max_backlog, wr, tbl);
+ if (proc == NULL)
+ return NULL;
+
+ InitializeCriticalSection(&proc->mtx);
+ InitializeConditionVariable(&proc->queue_cond);
+ InitializeConditionVariable(&proc->done_cond);
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
+ proc->workers[i], 0, 0);
+ if (proc->workers[i]->thread == NULL)
+ goto fail;
+ }
+
+ return (sqfs_block_processor_t *)proc;
+fail:
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+#else
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ sigset_t set, oldset;
+ unsigned int i;
+ int ret;
+
+ proc = block_processor_create(max_block_size, cmp, num_workers,
+ max_backlog, wr, tbl);
+ if (proc == NULL)
+ return NULL;
+
+ proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
+ proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+ for (i = 0; i < num_workers; ++i) {
+ ret = pthread_create(&proc->workers[i]->thread, NULL,
+ worker_proc, proc->workers[i]);
+
+ if (ret != 0)
+ goto fail;
+ }
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ return (sqfs_block_processor_t *)proc;
+fail:
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+#endif