aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-29 03:38:10 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-29 03:40:12 +0200
commitcada7ca7d9386e68c38fc504c01bd2cac33dac44 (patch)
tree3fa6272a42a0504263d350ba82638579ad6d25ed /lib/sqfs/block_processor
parent4b20b555bd9813ce85cacf78f0c194fa66ad5172 (diff)
Block processor: turn internal functions into interface entry points
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor')
-rw-r--r--lib/sqfs/block_processor/common.c27
-rw-r--r--lib/sqfs/block_processor/frontend.c4
-rw-r--r--lib/sqfs/block_processor/internal.h25
-rw-r--r--lib/sqfs/block_processor/serial.c75
-rw-r--r--lib/sqfs/block_processor/winpthread.c267
5 files changed, 212 insertions, 186 deletions
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index 6fb8f6f..6a62d4f 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -62,7 +62,8 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
proc->free_list = blk;
}
-int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
+static int process_completed_block(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk)
{
sqfs_u64 location;
sqfs_u32 size;
@@ -119,8 +120,8 @@ static bool is_zero_block(unsigned char *ptr, size_t size)
return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0;
}
-int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
- sqfs_u8 *scratch, size_t scratch_size)
+static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+ sqfs_u8 *scratch, size_t scratch_size)
{
sqfs_block_t *it;
size_t offset;
@@ -165,8 +166,9 @@ int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
return 0;
}
-int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
- sqfs_block_t **blk_out)
+static int process_completed_fragment(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag,
+ sqfs_block_t **blk_out)
{
sqfs_u32 index, offset;
size_t size;
@@ -247,3 +249,18 @@ fail_outblk:
}
return err;
}
+
+int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size,
+ sqfs_compressor_t *cmp, sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ base->process_completed_block = process_completed_block;
+ base->process_completed_fragment = process_completed_fragment;
+ base->process_block = process_block;
+ base->max_block_size = max_block_size;
+ base->cmp = cmp;
+ base->frag_tbl = tbl;
+ base->wr = wr;
+ base->stats.size = sizeof(base->stats);
+ return 0;
+}
diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c
index 0865df8..8381dd8 100644
--- a/lib/sqfs/block_processor/frontend.c
+++ b/lib/sqfs/block_processor/frontend.c
@@ -34,7 +34,7 @@ static int add_sentinel_block(sqfs_block_processor_t *proc)
blk->inode = proc->inode;
blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK;
- return append_to_work_queue(proc, blk);
+ return proc->append_to_work_queue(proc, blk);
}
static int flush_block(sqfs_block_processor_t *proc)
@@ -51,7 +51,7 @@ static int flush_block(sqfs_block_processor_t *proc)
}
block->index = proc->blk_index++;
- return append_to_work_queue(proc, block);
+ return proc->append_to_work_queue(proc, block);
}
int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index a71e6d7..61e13d7 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -65,20 +65,25 @@ struct sqfs_block_processor_t {
size_t max_block_size;
bool begin_called;
-};
-SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc,
- sqfs_block_t *block);
+ int (*process_completed_block)(sqfs_block_processor_t *proc,
+ sqfs_block_t *block);
-SQFS_INTERNAL
-int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
- sqfs_block_t **blk_out);
+ int (*process_completed_fragment)(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag,
+ sqfs_block_t **blk_out);
-SQFS_INTERNAL
-int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+ int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp,
sqfs_u8 *scratch, size_t scratch_size);
-SQFS_INTERNAL
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block);
+ int (*append_to_work_queue)(sqfs_block_processor_t *proc,
+ sqfs_block_t *block);
+};
+
+SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base,
+ size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl);
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
index 54edda1..9768a7f 100644
--- a/lib/sqfs/block_processor/serial.c
+++ b/lib/sqfs/block_processor/serial.c
@@ -39,30 +39,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
free(proc);
}
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- serial_block_processor_t *proc;
- (void)num_workers; (void)max_backlog;
-
- proc = alloc_flex(sizeof(*proc), 1, max_block_size);
- if (proc == NULL)
- return NULL;
-
- proc->base.max_block_size = max_block_size;
- proc->base.cmp = cmp;
- proc->base.frag_tbl = tbl;
- proc->base.wr = wr;
- proc->base.stats.size = sizeof(proc->base.stats);
- ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
- return (sqfs_block_processor_t *)proc;
-}
-
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
+static int append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
{
serial_block_processor_t *sproc = (serial_block_processor_t *)proc;
sqfs_block_t *fragblk = NULL;
@@ -70,27 +48,26 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
if (sproc->status != 0)
goto fail;
- sproc->status = block_processor_do_block(block, proc->cmp,
- sproc->scratch,
- proc->max_block_size);
+ sproc->status = proc->process_block(block, proc->cmp, sproc->scratch,
+ proc->max_block_size);
if (sproc->status != 0)
goto fail;
if (block->flags & SQFS_BLK_IS_FRAGMENT) {
- sproc->status = process_completed_fragment(proc, block,
- &fragblk);
+ sproc->status = proc->process_completed_fragment(proc, block,
+ &fragblk);
if (fragblk == NULL)
return sproc->status;
block = fragblk;
- sproc->status = block_processor_do_block(block, proc->cmp,
- sproc->scratch,
- proc->max_block_size);
+ sproc->status = proc->process_block(block, proc->cmp,
+ sproc->scratch,
+ proc->max_block_size);
if (sproc->status != 0)
goto fail;
}
- sproc->status = process_completed_block(proc, block);
+ sproc->status = proc->process_completed_block(proc, block);
return sproc->status;
fail:
free_block_list(block->frag_list);
@@ -98,6 +75,30 @@ fail:
return sproc->status;
}
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ serial_block_processor_t *proc;
+ (void)num_workers; (void)max_backlog;
+
+ proc = alloc_flex(sizeof(*proc), 1, max_block_size);
+ if (proc == NULL)
+ return NULL;
+
+ if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ free(proc);
+ return NULL;
+ }
+
+ proc->base.append_to_work_queue = append_to_work_queue;
+ ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+ return (sqfs_block_processor_t *)proc;
+}
+
int sqfs_block_processor_sync(sqfs_block_processor_t *proc)
{
return ((serial_block_processor_t *)proc)->status;
@@ -110,13 +111,13 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (proc->frag_block == NULL || sproc->status != 0)
goto fail;
- sproc->status = block_processor_do_block(proc->frag_block, proc->cmp,
- sproc->scratch,
- proc->max_block_size);
+ sproc->status = proc->process_block(proc->frag_block, proc->cmp,
+ sproc->scratch,
+ proc->max_block_size);
if (sproc->status != 0)
goto fail;
- sproc->status = process_completed_block(proc, proc->frag_block);
+ sproc->status = proc->process_completed_block(proc, proc->frag_block);
proc->frag_block = NULL;
return sproc->status;
fail:
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 281eb6b..44c62f1 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -142,6 +142,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
thread_pool_processor_t *shared = worker->shared;
+ sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared;
sqfs_block_t *blk = NULL;
int status = 0;
@@ -156,9 +157,8 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
if (blk == NULL)
break;
- status = block_processor_do_block(blk, worker->cmp,
- worker->scratch,
- shared->base.max_block_size);
+ status = proc->process_block(blk, worker->cmp, worker->scratch,
+ proc->max_block_size);
}
return THREAD_EXIT_SUCCESS;
@@ -198,127 +198,6 @@ static void block_processor_destroy(sqfs_object_t *obj)
free(proc);
}
-static thread_pool_processor_t *block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- unsigned int i;
-
- if (num_workers < 1)
- num_workers = 1;
-
- proc = alloc_flex(sizeof(*proc),
- sizeof(proc->workers[0]), num_workers);
- if (proc == NULL)
- return NULL;
-
- proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
- proc->base.max_block_size = max_block_size;
- proc->base.cmp = cmp;
- proc->base.frag_tbl = tbl;
- proc->base.wr = wr;
- proc->base.stats.size = sizeof(proc->base.stats);
- ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
-
- if (proc->workers[i] == NULL)
- goto fail;
-
- proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = sqfs_copy(cmp);
-
- if (proc->workers[i]->cmp == NULL)
- goto fail;
- }
-
- return proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-
-#if defined(_WIN32) || defined(__WINDOWS__)
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- unsigned int i;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- InitializeCriticalSection(&proc->mtx);
- InitializeConditionVariable(&proc->queue_cond);
- InitializeConditionVariable(&proc->done_cond);
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
- proc->workers[i], 0, 0);
- if (proc->workers[i]->thread == NULL)
- goto fail;
- }
-
- return (sqfs_block_processor_t *)proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-#else
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- sigset_t set, oldset;
- unsigned int i;
- int ret;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
- proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-
- sigfillset(&set);
- pthread_sigmask(SIG_SETMASK, &set, &oldset);
-
- for (i = 0; i < num_workers; ++i) {
- ret = pthread_create(&proc->workers[i]->thread, NULL,
- worker_proc, proc->workers[i]);
-
- if (ret != 0)
- goto fail;
- }
-
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- return (sqfs_block_processor_t *)proc;
-fail:
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-#endif
-
static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)
{
sqfs_block_t *it = proc->io_queue, *prev = NULL;
@@ -397,7 +276,7 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
while (status == 0 && list != NULL) {
it = list;
list = list->next;
- status = process_completed_block(&proc->base, it);
+ status = proc->base.process_completed_block(&proc->base, it);
if (status != 0) {
LOCK(&proc->mtx);
@@ -411,7 +290,8 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
return status;
}
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
+static int append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
{
thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
sqfs_block_t *io_list = NULL, *io_list_last = NULL;
@@ -454,8 +334,9 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
fragblk = NULL;
- thproc->status = process_completed_fragment(proc, blk,
- &fragblk);
+ thproc->status =
+ proc->process_completed_fragment(proc, blk,
+ &fragblk);
if (fragblk != NULL) {
fragblk->io_seq_num = thproc->io_enq_id++;
@@ -481,6 +362,54 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
return status;
}
+static thread_pool_processor_t *block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ unsigned int i;
+
+ if (num_workers < 1)
+ num_workers = 1;
+
+ proc = alloc_flex(sizeof(*proc),
+ sizeof(proc->workers[0]), num_workers);
+ if (proc == NULL)
+ return NULL;
+
+ if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ free(proc);
+ return NULL;
+ }
+
+ proc->base.append_to_work_queue = append_to_work_queue;
+ proc->num_workers = num_workers;
+ proc->max_backlog = max_backlog;
+ ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+ 1, max_block_size);
+
+ if (proc->workers[i] == NULL)
+ goto fail;
+
+ proc->workers[i]->shared = proc;
+ proc->workers[i]->cmp = sqfs_copy(cmp);
+
+ if (proc->workers[i]->cmp == NULL)
+ goto fail;
+ }
+
+ return proc;
+fail:
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+
int sqfs_block_processor_sync(sqfs_block_processor_t *proc)
{
return append_to_work_queue(proc, NULL);
@@ -492,16 +421,16 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
sqfs_block_t *blk;
int status;
- status = append_to_work_queue(proc, NULL);
+ status = sqfs_block_processor_sync(proc);
if (status == 0 && proc->frag_block != NULL) {
blk = proc->frag_block;
blk->next = NULL;
proc->frag_block = NULL;
- status = block_processor_do_block(blk, proc->cmp,
- thproc->workers[0]->scratch,
- proc->max_block_size);
+ status = proc->process_block(blk, proc->cmp,
+ thproc->workers[0]->scratch,
+ proc->max_block_size);
if (status == 0)
status = handle_io_queue(thproc, blk);
@@ -517,3 +446,77 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
return status;
}
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ unsigned int i;
+
+ proc = block_processor_create(max_block_size, cmp, num_workers,
+ max_backlog, wr, tbl);
+ if (proc == NULL)
+ return NULL;
+
+ InitializeCriticalSection(&proc->mtx);
+ InitializeConditionVariable(&proc->queue_cond);
+ InitializeConditionVariable(&proc->done_cond);
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
+ proc->workers[i], 0, 0);
+ if (proc->workers[i]->thread == NULL)
+ goto fail;
+ }
+
+ return (sqfs_block_processor_t *)proc;
+fail:
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+#else
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ thread_pool_processor_t *proc;
+ sigset_t set, oldset;
+ unsigned int i;
+ int ret;
+
+ proc = block_processor_create(max_block_size, cmp, num_workers,
+ max_backlog, wr, tbl);
+ if (proc == NULL)
+ return NULL;
+
+ proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
+ proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+ for (i = 0; i < num_workers; ++i) {
+ ret = pthread_create(&proc->workers[i]->thread, NULL,
+ worker_proc, proc->workers[i]);
+
+ if (ret != 0)
+ goto fail;
+ }
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ return (sqfs_block_processor_t *)proc;
+fail:
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ block_processor_destroy((sqfs_object_t *)proc);
+ return NULL;
+}
+#endif