aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-30 17:37:25 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-05-30 17:49:40 +0200
commit59631ccfde23d175e97bda016923bf9cbabbd92a (patch)
treef028a86bd213774ce1bd8c8b93e240b46af8b893
parentefb6ab552c5cdc9d12585c7c24b98667e212cb4c (diff)
Block processor: cleanup macros, merge windows & pthread initialization
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/block_processor/winpthread.c168
1 files changed, 68 insertions, 100 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index e39f795..1519257 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -10,39 +10,65 @@
#if defined(_WIN32) || defined(__WINDOWS__)
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
-# define LOCK(mtx) EnterCriticalSection(mtx)
-# define UNLOCK(mtx) LeaveCriticalSection(mtx)
+# define LOCK EnterCriticalSection
+# define UNLOCK LeaveCriticalSection
# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
-# define SIGNAL_ALL(cond) WakeAllConditionVariable(cond)
+# define SIGNAL_ALL WakeAllConditionVariable
+# define THREAD_CREATE(hndptr, proc, arg) \
+ ((*hndptr) = CreateThread(NULL, 0, proc, arg, 0, 0), \
+ ((*hndptr) == NULL) ? -1 : 0)
# define THREAD_JOIN(t) \
- if (t != NULL) { \
+ do { \
WaitForSingleObject(t, INFINITE); \
CloseHandle(t); \
- }
-# define MUTEX_DESTROY(mtx) DeleteCriticalSection(mtx)
+ } while (0)
+# define MUTEX_INIT InitializeCriticalSection
+# define MUTEX_DESTROY DeleteCriticalSection
+# define CONDITION_INIT InitializeConditionVariable
# define CONDITION_DESTROY(cond)
# define THREAD_EXIT_SUCCESS 0
+# define THREAD_INVALID NULL
# define THREAD_TYPE DWORD WINAPI
-# define THREAD_ARG LPVOID
-# define THREAD_HANDLE HANDLE
-# define MUTEX_TYPE CRITICAL_SECTION
-# define CONDITION_TYPE CONDITION_VARIABLE
+# define SIGNAL_DISABLE(oldset) (void)oldset
+# define SIGNAL_ENABLE(oldset) (void)oldset
+
+typedef int sigset_t;
+typedef LPVOID THREAD_ARG;
+typedef HANDLE THREAD_HANDLE;
+typedef CRITICAL_SECTION MUTEX_TYPE;
+typedef CONDITION_VARIABLE CONDITION_TYPE;
#else
# include <pthread.h>
# include <signal.h>
-# define LOCK(mtx) pthread_mutex_lock(mtx)
-# define UNLOCK(mtx) pthread_mutex_unlock(mtx)
-# define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx)
-# define SIGNAL_ALL(cond) pthread_cond_broadcast(cond)
-# define THREAD_JOIN(t) if (t != (pthread_t)0) { pthread_join(t, NULL); }
-# define MUTEX_DESTROY(mtx) pthread_mutex_destroy(mtx)
-# define CONDITION_DESTROY(cond) pthread_cond_destroy(cond)
+# define LOCK pthread_mutex_lock
+# define UNLOCK pthread_mutex_unlock
+# define AWAIT pthread_cond_wait
+# define SIGNAL_ALL pthread_cond_broadcast
+# define THREAD_CREATE(hndptr, proc, arg) \
+ pthread_create((hndptr), NULL, (proc), (arg))
+# define THREAD_JOIN(t) pthread_join((t), NULL)
+# define MUTEX_INIT(mtx) \
+ *(mtx) = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER
+# define MUTEX_DESTROY pthread_mutex_destroy
+# define CONDITION_INIT(cond) \
+ *(cond) = (pthread_cond_t)PTHREAD_COND_INITIALIZER
+# define CONDITION_DESTROY pthread_cond_destroy
# define THREAD_EXIT_SUCCESS NULL
-# define THREAD_TYPE void *
-# define THREAD_ARG void *
-# define THREAD_HANDLE pthread_t
-# define MUTEX_TYPE pthread_mutex_t
-# define CONDITION_TYPE pthread_cond_t
+# define THREAD_INVALID (pthread_t)0
+# define SIGNAL_ENABLE(oldset) pthread_sigmask(SIG_SETMASK, oldset, NULL)
+
+typedef void *THREAD_TYPE;
+typedef void *THREAD_ARG;
+typedef pthread_t THREAD_HANDLE;
+typedef pthread_mutex_t MUTEX_TYPE;
+typedef pthread_cond_t CONDITION_TYPE;
+
+static inline void SIGNAL_DISABLE(sigset_t *oldset)
+{
+ sigset_t set;
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, oldset);
+}
#endif
typedef struct compress_worker_t compress_worker_t;
@@ -176,7 +202,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
for (i = 0; i < proc->num_workers; ++i) {
if (proc->workers[i] != NULL) {
- THREAD_JOIN(proc->workers[i]->thread);
+ if (proc->workers[i]->thread != THREAD_INVALID)
+ THREAD_JOIN(proc->workers[i]->thread);
if (proc->workers[i]->cmp != NULL)
sqfs_destroy(proc->workers[i]->cmp);
@@ -368,15 +395,17 @@ static int block_processor_sync(sqfs_block_processor_t *proc)
return append_to_work_queue(proc, NULL);
}
-static thread_pool_processor_t *block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
{
thread_pool_processor_t *proc;
+ sigset_t oldset;
unsigned int i;
+ int ret;
if (num_workers < 1)
num_workers = 1;
@@ -397,6 +426,12 @@ static thread_pool_processor_t *block_processor_create(size_t max_block_size,
proc->max_backlog = max_backlog;
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+ MUTEX_INIT(&proc->mtx);
+ CONDITION_INIT(&proc->queue_cond);
+ CONDITION_INIT(&proc->done_cond);
+
+ SIGNAL_DISABLE(&oldset);
+
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
1, max_block_size);
@@ -409,84 +444,17 @@ static thread_pool_processor_t *block_processor_create(size_t max_block_size,
if (proc->workers[i]->cmp == NULL)
goto fail;
- }
-
- return proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-
-#if defined(_WIN32) || defined(__WINDOWS__)
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- unsigned int i;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- InitializeCriticalSection(&proc->mtx);
- InitializeConditionVariable(&proc->queue_cond);
- InitializeConditionVariable(&proc->done_cond);
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
- proc->workers[i], 0, 0);
- if (proc->workers[i]->thread == NULL)
- goto fail;
- }
-
- return (sqfs_block_processor_t *)proc;
-fail:
- block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
-}
-#else
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
-{
- thread_pool_processor_t *proc;
- sigset_t set, oldset;
- unsigned int i;
- int ret;
-
- proc = block_processor_create(max_block_size, cmp, num_workers,
- max_backlog, wr, tbl);
- if (proc == NULL)
- return NULL;
-
- proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
- proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-
- sigfillset(&set);
- pthread_sigmask(SIG_SETMASK, &set, &oldset);
-
- for (i = 0; i < num_workers; ++i) {
- ret = pthread_create(&proc->workers[i]->thread, NULL,
- worker_proc, proc->workers[i]);
+ ret = THREAD_CREATE(&proc->workers[i]->thread,
+ worker_proc, proc->workers[i]);
if (ret != 0)
goto fail;
}
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ SIGNAL_ENABLE(&oldset);
return (sqfs_block_processor_t *)proc;
fail:
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ SIGNAL_ENABLE(&oldset);
block_processor_destroy((sqfs_object_t *)proc);
return NULL;
}
-#endif