From 59631ccfde23d175e97bda016923bf9cbabbd92a Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sat, 30 May 2020 17:37:25 +0200 Subject: Block processor: cleanup macros, merge windows & pthread initialization Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/winpthread.c | 168 ++++++++++++++-------------------- 1 file changed, 68 insertions(+), 100 deletions(-) diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index e39f795..1519257 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -10,39 +10,65 @@ #if defined(_WIN32) || defined(__WINDOWS__) # define WIN32_LEAN_AND_MEAN # include -# define LOCK(mtx) EnterCriticalSection(mtx) -# define UNLOCK(mtx) LeaveCriticalSection(mtx) +# define LOCK EnterCriticalSection +# define UNLOCK LeaveCriticalSection # define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) -# define SIGNAL_ALL(cond) WakeAllConditionVariable(cond) +# define SIGNAL_ALL WakeAllConditionVariable +# define THREAD_CREATE(hndptr, proc, arg) \ + ((*hndptr) = CreateThread(NULL, 0, proc, arg, 0, 0), \ + ((*hndptr) == NULL) ? -1 : 0) # define THREAD_JOIN(t) \ - if (t != NULL) { \ + do { \ WaitForSingleObject(t, INFINITE); \ CloseHandle(t); \ - } -# define MUTEX_DESTROY(mtx) DeleteCriticalSection(mtx) + } while (0) +# define MUTEX_INIT InitializeCriticalSection +# define MUTEX_DESTROY DeleteCriticalSection +# define CONDITION_INIT InitializeConditionVariable # define CONDITION_DESTROY(cond) # define THREAD_EXIT_SUCCESS 0 +# define THREAD_INVALID NULL # define THREAD_TYPE DWORD WINAPI -# define THREAD_ARG LPVOID -# define THREAD_HANDLE HANDLE -# define MUTEX_TYPE CRITICAL_SECTION -# define CONDITION_TYPE CONDITION_VARIABLE +# define SIGNAL_DISABLE(oldset) (void)oldset +# define SIGNAL_ENABLE(oldset) (void)oldset + +typedef int sigset_t; +typedef LPVOID THREAD_ARG; +typedef HANDLE THREAD_HANDLE; +typedef CRITICAL_SECTION MUTEX_TYPE; +typedef CONDITION_VARIABLE CONDITION_TYPE; #else # include # include -# define LOCK(mtx) pthread_mutex_lock(mtx) -# define UNLOCK(mtx) pthread_mutex_unlock(mtx) -# define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx) -# define SIGNAL_ALL(cond) pthread_cond_broadcast(cond) -# define THREAD_JOIN(t) if (t != (pthread_t)0) { pthread_join(t, NULL); } -# define MUTEX_DESTROY(mtx) pthread_mutex_destroy(mtx) -# define CONDITION_DESTROY(cond) pthread_cond_destroy(cond) +# define LOCK pthread_mutex_lock +# define UNLOCK pthread_mutex_unlock +# define AWAIT pthread_cond_wait +# define SIGNAL_ALL pthread_cond_broadcast +# define THREAD_CREATE(hndptr, proc, arg) \ + pthread_create((hndptr), NULL, (proc), (arg)) +# define THREAD_JOIN(t) pthread_join((t), NULL) +# define MUTEX_INIT(mtx) \ + *(mtx) = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER +# define MUTEX_DESTROY pthread_mutex_destroy +# define CONDITION_INIT(cond) \ + *(cond) = (pthread_cond_t)PTHREAD_COND_INITIALIZER +# define CONDITION_DESTROY pthread_cond_destroy # define THREAD_EXIT_SUCCESS NULL -# define THREAD_TYPE void * -# define THREAD_ARG void * -# define THREAD_HANDLE pthread_t -# define MUTEX_TYPE pthread_mutex_t -# define CONDITION_TYPE pthread_cond_t +# define THREAD_INVALID (pthread_t)0 +# define SIGNAL_ENABLE(oldset) pthread_sigmask(SIG_SETMASK, oldset, NULL) + +typedef void *THREAD_TYPE; +typedef void *THREAD_ARG; +typedef pthread_t THREAD_HANDLE; +typedef pthread_mutex_t MUTEX_TYPE; +typedef pthread_cond_t CONDITION_TYPE; + +static inline void SIGNAL_DISABLE(sigset_t *oldset) +{ + sigset_t set; + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, oldset); +} #endif typedef struct compress_worker_t compress_worker_t; @@ -176,7 +202,8 @@ static void block_processor_destroy(sqfs_object_t *obj) for (i = 0; i < proc->num_workers; ++i) { if (proc->workers[i] != NULL) { - THREAD_JOIN(proc->workers[i]->thread); + if (proc->workers[i]->thread != THREAD_INVALID) + THREAD_JOIN(proc->workers[i]->thread); if (proc->workers[i]->cmp != NULL) sqfs_destroy(proc->workers[i]->cmp); @@ -368,15 +395,17 @@ static int block_processor_sync(sqfs_block_processor_t *proc) return append_to_work_queue(proc, NULL); } -static thread_pool_processor_t *block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) { thread_pool_processor_t *proc; + sigset_t oldset; unsigned int i; + int ret; if (num_workers < 1) num_workers = 1; @@ -397,6 +426,12 @@ static thread_pool_processor_t *block_processor_create(size_t max_block_size, proc->max_backlog = max_backlog; ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + MUTEX_INIT(&proc->mtx); + CONDITION_INIT(&proc->queue_cond); + CONDITION_INIT(&proc->done_cond); + + SIGNAL_DISABLE(&oldset); + for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), 1, max_block_size); @@ -409,84 +444,17 @@ static thread_pool_processor_t *block_processor_create(size_t max_block_size, if (proc->workers[i]->cmp == NULL) goto fail; - } - - return proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} - -#if defined(_WIN32) || defined(__WINDOWS__) -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - unsigned int i; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - InitializeCriticalSection(&proc->mtx); - InitializeConditionVariable(&proc->queue_cond); - InitializeConditionVariable(&proc->done_cond); - - for (i = 0; i < num_workers; ++i) { - proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, - proc->workers[i], 0, 0); - if (proc->workers[i]->thread == NULL) - goto fail; - } - - return (sqfs_block_processor_t *)proc; -fail: - block_processor_destroy((sqfs_object_t *)proc); - return NULL; -} -#else -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - thread_pool_processor_t *proc; - sigset_t set, oldset; - unsigned int i; - int ret; - - proc = block_processor_create(max_block_size, cmp, num_workers, - max_backlog, wr, tbl); - if (proc == NULL) - return NULL; - - proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; - proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - - sigfillset(&set); - pthread_sigmask(SIG_SETMASK, &set, &oldset); - - for (i = 0; i < num_workers; ++i) { - ret = pthread_create(&proc->workers[i]->thread, NULL, - worker_proc, proc->workers[i]); + ret = THREAD_CREATE(&proc->workers[i]->thread, + worker_proc, proc->workers[i]); if (ret != 0) goto fail; } - pthread_sigmask(SIG_SETMASK, &oldset, NULL); + SIGNAL_ENABLE(&oldset); return (sqfs_block_processor_t *)proc; fail: - pthread_sigmask(SIG_SETMASK, &oldset, NULL); + SIGNAL_ENABLE(&oldset); block_processor_destroy((sqfs_object_t *)proc); return NULL; } -#endif -- cgit v1.2.3