aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-16 17:34:06 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-16 17:34:06 +0100
commit1c82a9f2fa98e68b68e6868298fba1830d91d1db (patch)
tree10d0943535cf2c52ac808234b810200d69202172
parentcab741413084e1f6b2bd9288273c7b14dfd4a8d7 (diff)
block processor: move the internals to the respective implementations
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/block_processor/internal.h48
-rw-r--r--lib/sqfs/block_processor/serial.c63
-rw-r--r--lib/sqfs/block_processor/winpthread.c130
3 files changed, 120 insertions, 121 deletions
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index b7fbef0..3c6570e 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -22,17 +22,6 @@
#include <string.h>
#include <stdlib.h>
-#include <zlib.h>
-
-#ifdef WITH_PTHREAD
-#include <pthread.h>
-#include <signal.h>
-#elif defined(_WIN32) || defined(__WINDOWS__)
-#define WIN32_LEAN_AND_MEAN
-#include <windows.h>
-#endif
-
-typedef struct compress_worker_t compress_worker_t;
typedef struct sqfs_block_t {
struct sqfs_block_t *next;
@@ -52,53 +41,18 @@ typedef struct sqfs_block_t {
struct sqfs_block_processor_t {
sqfs_object_t obj;
- /* synchronization primitives */
-#ifdef WITH_PTHREAD
- pthread_mutex_t mtx;
- pthread_cond_t queue_cond;
- pthread_cond_t done_cond;
-#elif defined(_WIN32) || defined(__WINDOWS__)
- CRITICAL_SECTION mtx;
- CONDITION_VARIABLE queue_cond;
- CONDITION_VARIABLE done_cond;
-#endif
-
- /* needs rw access by worker and main thread */
- sqfs_block_t *queue;
- sqfs_block_t *queue_last;
-
- sqfs_block_t *done;
- size_t backlog;
- int status;
-
- /* used by main thread only */
- sqfs_u32 enqueue_id;
- sqfs_u32 dequeue_id;
-
- unsigned int num_workers;
- size_t max_backlog;
-
sqfs_frag_table_t *frag_tbl;
sqfs_compressor_t *cmp;
-
sqfs_block_t *frag_block;
-
sqfs_block_writer_t *wr;
+
sqfs_block_processor_stats_t stats;
- /* file API */
sqfs_inode_generic_t *inode;
sqfs_block_t *blk_current;
sqfs_u32 blk_flags;
- /* used only by workers */
size_t max_block_size;
-
-#if defined(WITH_PTHREAD) || defined(_WIN32) || defined(__WINDOWS__)
- compress_worker_t *workers[];
-#else
- sqfs_u8 scratch[];
-#endif
};
SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc,
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
index e44ce17..2e00198 100644
--- a/lib/sqfs/block_processor/serial.c
+++ b/lib/sqfs/block_processor/serial.c
@@ -7,6 +7,12 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
+typedef struct {
+ sqfs_block_processor_t base;
+ int status;
+ sqfs_u8 scratch[];
+} serial_block_processor_t;
+
static void block_processor_destroy(sqfs_object_t *obj)
{
sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
@@ -23,71 +29,74 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_block_writer_t *wr,
sqfs_frag_table_t *tbl)
{
- sqfs_block_processor_t *proc;
+ serial_block_processor_t *proc;
(void)num_workers; (void)max_backlog;
proc = alloc_flex(sizeof(*proc), 1, max_block_size);
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
- proc->cmp = cmp;
- proc->frag_tbl = tbl;
- proc->wr = wr;
- proc->stats.size = sizeof(proc->stats);
+ proc->base.max_block_size = max_block_size;
+ proc->base.cmp = cmp;
+ proc->base.frag_tbl = tbl;
+ proc->base.wr = wr;
+ proc->base.stats.size = sizeof(proc->base.stats);
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
- return proc;
+ return (sqfs_block_processor_t *)proc;
}
int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
+ serial_block_processor_t *sproc = (serial_block_processor_t *)proc;
sqfs_block_t *fragblk = NULL;
- if (proc->status != 0)
+ if (sproc->status != 0)
goto done;
- proc->status = block_processor_do_block(block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
- if (proc->status != 0)
+ sproc->status = block_processor_do_block(block, proc->cmp,
+ sproc->scratch,
+ proc->max_block_size);
+ if (sproc->status != 0)
goto done;
if (block->flags & SQFS_BLK_IS_FRAGMENT) {
- proc->status = process_completed_fragment(proc, block,
- &fragblk);
+ sproc->status = process_completed_fragment(proc, block,
+ &fragblk);
if (fragblk == NULL)
goto done;
free(block);
block = fragblk;
- proc->status = block_processor_do_block(block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
- if (proc->status != 0)
+ sproc->status = block_processor_do_block(block, proc->cmp,
+ sproc->scratch,
+ proc->max_block_size);
+ if (sproc->status != 0)
goto done;
}
- proc->status = process_completed_block(proc, block);
+ sproc->status = process_completed_block(proc, block);
done:
free(block);
- return proc->status;
+ return sproc->status;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
- if (proc->frag_block == NULL || proc->status != 0)
+ serial_block_processor_t *sproc = (serial_block_processor_t *)proc;
+
+ if (proc->frag_block == NULL || sproc->status != 0)
goto out;
- proc->status = block_processor_do_block(proc->frag_block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
- if (proc->status != 0)
+ sproc->status = block_processor_do_block(proc->frag_block, proc->cmp,
+ sproc->scratch,
+ proc->max_block_size);
+ if (sproc->status != 0)
goto out;
- proc->status = process_completed_block(proc, proc->frag_block);
+ sproc->status = process_completed_block(proc, proc->frag_block);
out:
free(proc->frag_block);
proc->frag_block = NULL;
- return proc->status;
+ return sproc->status;
}
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 0ad8475..aa1a99f 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -8,6 +8,8 @@
#include "internal.h"
#if defined(_WIN32) || defined(__WINDOWS__)
+# define WIN32_LEAN_AND_MEAN
+# include <windows.h>
# define LOCK(mtx) EnterCriticalSection(mtx)
# define UNLOCK(mtx) LeaveCriticalSection(mtx)
# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
@@ -16,7 +18,11 @@
# define THREAD_TYPE DWORD WINAPI
# define THREAD_ARG LPVOID
# define THREAD_HANDLE HANDLE
+# define MUTEX_TYPE CRITICAL_SECTION
+# define CONDITION_TYPE CONDITION_VARIABLE
#else
+# include <pthread.h>
+# include <signal.h>
# define LOCK(mtx) pthread_mutex_lock(mtx)
# define UNLOCK(mtx) pthread_mutex_unlock(mtx)
# define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx)
@@ -25,15 +31,43 @@
# define THREAD_TYPE void *
# define THREAD_ARG void *
# define THREAD_HANDLE pthread_t
+# define MUTEX_TYPE pthread_mutex_t
+# define CONDITION_TYPE pthread_cond_t
#endif
+typedef struct compress_worker_t compress_worker_t;
+typedef struct thread_pool_processor_t thread_pool_processor_t;
+
struct compress_worker_t {
- sqfs_block_processor_t *shared;
+ thread_pool_processor_t *shared;
sqfs_compressor_t *cmp;
THREAD_HANDLE thread;
sqfs_u8 scratch[];
};
+struct thread_pool_processor_t {
+ sqfs_block_processor_t base;
+
+ MUTEX_TYPE mtx;
+ CONDITION_TYPE queue_cond;
+ CONDITION_TYPE done_cond;
+
+ sqfs_block_t *queue;
+ sqfs_block_t *queue_last;
+
+ sqfs_block_t *done;
+ size_t backlog;
+ int status;
+
+ sqfs_u32 enqueue_id;
+ sqfs_u32 dequeue_id;
+
+ unsigned int num_workers;
+ size_t max_backlog;
+
+ compress_worker_t *workers[];
+};
+
static void free_blk_list(sqfs_block_t *list)
{
sqfs_block_t *it;
@@ -48,7 +82,7 @@ static void free_blk_list(sqfs_block_t *list)
static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
- sqfs_block_processor_t *shared = worker->shared;
+ thread_pool_processor_t *shared = worker->shared;
sqfs_block_t *it, *prev, *blk = NULL;
int status = 0;
@@ -98,7 +132,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
status = block_processor_do_block(blk, worker->cmp,
worker->scratch,
- shared->max_block_size);
+ shared->base.max_block_size);
}
return THREAD_EXIT_SUCCESS;
@@ -107,7 +141,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
#if defined(_WIN32) || defined(__WINDOWS__)
static void block_processor_destroy(sqfs_object_t *obj)
{
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
+ thread_pool_processor_t *proc = (thread_pool_processor_t *)obj;
unsigned int i;
EnterCriticalSection(&proc->mtx);
@@ -133,8 +167,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
DeleteCriticalSection(&proc->mtx);
free_blk_list(proc->queue);
free_blk_list(proc->done);
- free(proc->blk_current);
- free(proc->frag_block);
+ free(proc->base.blk_current);
+ free(proc->base.frag_block);
free(proc);
}
@@ -145,7 +179,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_block_writer_t *wr,
sqfs_frag_table_t *tbl)
{
- sqfs_block_processor_t *proc;
+ thread_pool_processor_t *proc;
unsigned int i;
if (num_workers < 1)
@@ -156,13 +190,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
proc->num_workers = num_workers;
proc->max_backlog = max_backlog;
- proc->cmp = cmp;
- proc->frag_tbl = tbl;
- proc->wr = wr;
- proc->stats.size = sizeof(proc->stats);
+ proc->base.max_block_size = max_block_size;
+ proc->base.cmp = cmp;
+ proc->base.frag_tbl = tbl;
+ proc->base.wr = wr;
+ proc->base.stats.size = sizeof(proc->base.stats);
((sqfs_object_t *)obj)->destroy = block_processor_destroy;
InitializeCriticalSection(&proc->mtx);
@@ -188,7 +222,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
goto fail;
}
- return proc;
+ return (sqfs_block_processor_t *)proc;
fail:
block_processor_destroy(proc);
return NULL;
@@ -196,7 +230,7 @@ fail:
#else
static void block_processor_destroy(sqfs_object_t *obj)
{
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
+ thread_pool_processor_t *proc = (thread_pool_processor_t *)obj;
unsigned int i;
pthread_mutex_lock(&proc->mtx);
@@ -217,8 +251,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
free_blk_list(proc->queue);
free_blk_list(proc->done);
- free(proc->blk_current);
- free(proc->frag_block);
+ free(proc->base.blk_current);
+ free(proc->base.frag_block);
free(proc);
}
@@ -229,7 +263,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_block_writer_t *wr,
sqfs_frag_table_t *tbl)
{
- sqfs_block_processor_t *proc;
+ thread_pool_processor_t *proc;
sigset_t set, oldset;
unsigned int i;
int ret;
@@ -246,13 +280,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->max_block_size = max_block_size;
proc->num_workers = num_workers;
proc->max_backlog = max_backlog;
- proc->cmp = cmp;
- proc->frag_tbl = tbl;
- proc->wr = wr;
- proc->stats.size = sizeof(proc->stats);
+ proc->base.max_block_size = max_block_size;
+ proc->base.cmp = cmp;
+ proc->base.frag_tbl = tbl;
+ proc->base.wr = wr;
+ proc->base.stats.size = sizeof(proc->base.stats);
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
@@ -281,7 +315,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- return proc;
+ return (sqfs_block_processor_t *)proc;
fail_thread:
pthread_mutex_lock(&proc->mtx);
proc->status = -1;
@@ -311,7 +345,7 @@ fail_init:
}
#endif
-static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)
+static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc)
{
sqfs_block_t *queue, *it, *prev;
@@ -357,7 +391,7 @@ static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
return head;
}
-static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
+static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue)
{
sqfs_block_t *it, *block = NULL;
int status = 0;
@@ -369,7 +403,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
if (it->flags & SQFS_BLK_IS_FRAGMENT) {
block = NULL;
- status = process_completed_fragment(proc, it, &block);
+ status = process_completed_fragment(&proc->base, it, &block);
if (block != NULL && status == 0) {
LOCK(&proc->mtx);
@@ -394,7 +428,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
free(block);
}
} else {
- status = process_completed_block(proc, it);
+ status = process_completed_block(&proc->base, it);
}
free(it);
@@ -404,7 +438,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
return status;
}
-static int wait_completed(sqfs_block_processor_t *proc)
+static int wait_completed(thread_pool_processor_t *proc)
{
sqfs_block_t *queue;
int status;
@@ -443,50 +477,52 @@ static int wait_completed(sqfs_block_processor_t *proc)
int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
int status;
- while (proc->backlog > proc->max_backlog) {
- status = wait_completed(proc);
+ while (thproc->backlog > thproc->max_backlog) {
+ status = wait_completed(thproc);
if (status)
return status;
}
- LOCK(&proc->mtx);
- status = proc->status;
+ LOCK(&thproc->mtx);
+ status = thproc->status;
if (status != 0)
goto out;
if (block != NULL) {
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
+ if (thproc->queue_last == NULL) {
+ thproc->queue = thproc->queue_last = block;
} else {
- proc->queue_last->next = block;
- proc->queue_last = block;
+ thproc->queue_last->next = block;
+ thproc->queue_last = block;
}
- block->sequence_number = proc->enqueue_id++;
+ block->sequence_number = thproc->enqueue_id++;
block->next = NULL;
- proc->backlog += 1;
+ thproc->backlog += 1;
block = NULL;
}
out:
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
free(block);
return 0;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
int status;
- LOCK(&proc->mtx);
- status = proc->status;
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
+ LOCK(&thproc->mtx);
+ status = thproc->status;
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
- while (status == 0 && proc->backlog > 0)
- status = wait_completed(proc);
+ while (status == 0 && thproc->backlog > 0)
+ status = wait_completed(thproc);
if (status == 0 && proc->frag_block != NULL) {
status = append_to_work_queue(proc, proc->frag_block);
@@ -495,7 +531,7 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (status)
return status;
- status = wait_completed(proc);
+ status = wait_completed(thproc);
}
return status;