diff options
| author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-20 16:46:22 +0100 | 
|---|---|---|
| committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-21 17:29:18 +0100 | 
| commit | a18f724aa3bf57aeed285b5f61eca4a0ba891c21 (patch) | |
| tree | ce90fb7fd9494f340efc3416ab769353d480f82b /include | |
| parent | 977aa1d0d29b5b48b31279d7709a7209001ee309 (diff) | |
Add a thread pool implementation to libutil
The thread pool enforces ordering of items during dequeue similar
to the already existing implementation in libsqfs. The idea is to
eventually pull this functionality out of the block processor and
turn it into a cleaner, separately tested module.
The thread pool is implemented as an abstract interface, so we can
have multiple implementations around, including the serial fallback
implementation which we can then *always* test, irregardless of the
compile config and run through static analysis as well.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'include')
| -rw-r--r-- | include/threadpool.h | 125 | ||||
| -rw-r--r-- | include/w32threadwrap.h | 105 | 
2 files changed, 230 insertions, 0 deletions
| diff --git a/include/threadpool.h b/include/threadpool.h new file mode 100644 index 0000000..f25c497 --- /dev/null +++ b/include/threadpool.h @@ -0,0 +1,125 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool.h + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include "sqfs/predef.h" + +typedef int (*thread_pool_worker_t)(void *user, void *work_item); + +/** + * @struct thread_pool_t + * + * @brief A thread pool with a ticket number based work item ordering. + * + * While the order in which items are non-deterministic, the thread pool + * implementation internally uses a ticket system to ensure the completed + * items are deqeueued in the same order that they were enqueued. + */ +typedef struct thread_pool_t { +	/** +	 * @brief Shutdown and destroy a thread pool. +	 * +	 * @param pool A pointer to a pool returned by thread_pool_create +	 */ +	void (*destroy)(struct thread_pool_t *pool); + +	/** +	 * @brief Get the actual number of worker threads available. +	 * +	 * @return A number greater or equal to 1. +	 */ +	size_t (*get_worker_count)(struct thread_pool_t *pool); + +	/** +	 * @brief Change the user data pointer for a thread pool worker +	 *        by index. +	 * +	 * @param idx A zero-based index into the worker list. +	 * @param ptr A user pointer that this specific worker thread should +	 *            pass to the worker callback. +	 */ +	void (*set_worker_ptr)(struct thread_pool_t *pool, size_t idx, +			       void *ptr); + +	/** +	 * @brief Submit a work item to a thread pool. +	 * +	 * This function will fail on allocation failure or if the internal +	 * error state is set was set by one of the workers. +	 * +	 * @param ptr A pointer to a work object to enqueue. +	 * +	 * @return Zero on success. +	 */ +	int (*submit)(struct thread_pool_t *pool, void *ptr); + +	/** +	 * @brief Wait for a work item to be completed. +	 * +	 * This function dequeues a single completed work item. It may block +	 * until one of the worker threads signals completion of an additional +	 * item. +	 * +	 * This function guarantees to return the items in the same order as +	 * they were submitted, so the function can actually block longer than +	 * necessary, because it has to wait until the next item in sequence +	 * is finished. +	 * +	 * @return A pointer to a new work item or NULL if there are none +	 *         in the pipeline. +	 */ +	void *(*dequeue)(struct thread_pool_t *pool); + +	/** +	 * @brief Get the internal worker return status value. +	 * +	 * If the worker functions returns a non-zero exit status in one of the +	 * worker threads, the thread pool stors the value internally and shuts +	 * down. This function can be used to retrieve the value. +	 * +	 * @return A non-zero value returned by the worker callback or zero if +	 *         everything is A-OK. +	 */ +	int (*get_status)(struct thread_pool_t *pool); +} thread_pool_t; + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Create a thread pool instance. + * + * @param num_jobs The number of worker threads to launch. + * @param worker A function to call from the worker threads to process + *               the work items. + * + * @return A pointer to a thread pool on success, NULL on failure. + */ +SQFS_INTERNAL thread_pool_t *thread_pool_create(size_t num_jobs, +						thread_pool_worker_t worker); + +/** + * @brief Create a serial mockup thread pool implementation. + * + * This returns a @ref thread_pool_t implementation that, instead of running a + * thread pool actually does the work in-situ when dequeueing. + * + * @param worker A function to call from the worker threads to process + *               the work items. + * + * @return A pointer to a thread pool on success, NULL on failure. + */ +SQFS_INTERNAL +thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker); + +#ifdef __cplusplus +} +#endif + +#endif /* THREADPOOL_H */ diff --git a/include/w32threadwrap.h b/include/w32threadwrap.h new file mode 100644 index 0000000..6b7344c --- /dev/null +++ b/include/w32threadwrap.h @@ -0,0 +1,105 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * w32threadwrap.h + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#ifndef W32THREADWRAP_H +#define W32THREADWRAP_H + +#include "sqfs/predef.h" + +#define WIN32_LEAN_AND_MEAN +#define VC_EXTRALEAN +#include <windows.h> + +typedef unsigned int sigset_t; +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef CONDITION_VARIABLE pthread_cond_t; + +static inline int pthread_create(pthread_t *thread, const void *attr, +				 LPTHREAD_START_ROUTINE proc, LPVOID arg) +{ +	HANDLE hnd = CreateThread(NULL, 0, proc, arg, 0, 0); +	(void)attr; + +	if (hnd == NULL) +		return -1; + +	*thread = hnd; +	return 0; +} + +static int pthread_join(pthread_t thread, void **retval) +{ +	WaitForSingleObject(thread, INFINITE); +	CloseHandle(thread); +	if (retval != NULL) +		*retval = NULL; +	return 0; +} + +static inline int pthread_mutex_init(pthread_mutex_t *mutex, const void *attr) +{ +	(void)attr; +	InitializeCriticalSection(mutex); +	return 0; +} + +static inline int pthread_mutex_lock(pthread_mutex_t *mutex) +{ +	EnterCriticalSection(mutex); +	return 0; +} + +static inline int pthread_mutex_unlock(pthread_mutex_t *mutex) +{ +	LeaveCriticalSection(mutex); +	return 0; +} + +static inline void pthread_mutex_destroy(pthread_mutex_t *mutex) +{ +	DeleteCriticalSection(mutex); +} + +static inline int pthread_cond_init(pthread_cond_t *cond, const void *attr) +{ +	(void)attr; +	InitializeConditionVariable(cond); +	return 0; +} + +static inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx) +{ +	return SleepConditionVariableCS(cond, mtx, INFINITE) != 0; +} + +static inline int pthread_cond_broadcast(pthread_cond_t *cond) +{ +	WakeAllConditionVariable(cond); +	return 0; +} + +static inline void pthread_cond_destroy(pthread_cond_t *cond) +{ +	(void)cond; +} + +#define SIG_SETMASK (0) + +static inline int sigfillset(sigset_t *set) +{ +	*set = 0xFFFFFFFF; +	return 0; +} + +static inline int pthread_sigmask(int how, const sigset_t *set, +				  const sigset_t *old) +{ +	(void)how; (void)set; (void)old; +	return 0; +} + +#endif /* W32THREADWRAP_H */ | 
