summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-20 16:46:22 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 17:29:18 +0100
commita18f724aa3bf57aeed285b5f61eca4a0ba891c21 (patch)
treece90fb7fd9494f340efc3416ab769353d480f82b
parent977aa1d0d29b5b48b31279d7709a7209001ee309 (diff)
Add a thread pool implementation to libutil
The thread pool enforces ordering of items during dequeue similar to the already existing implementation in libsqfs. The idea is to eventually pull this functionality out of the block processor and turn it into a cleaner, separately tested module. The thread pool is implemented as an abstract interface, so we can have multiple implementations around, including the serial fallback implementation which we can then *always* test, irregardless of the compile config and run through static analysis as well. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--include/threadpool.h125
-rw-r--r--include/w32threadwrap.h105
-rw-r--r--lib/util/Makemodule.am18
-rw-r--r--lib/util/threadpool.c366
-rw-r--r--lib/util/threadpool_serial.c162
-rw-r--r--tests/libutil/Makemodule.am6
-rw-r--r--tests/libutil/threadpool.c111
7 files changed, 892 insertions, 1 deletions
diff --git a/include/threadpool.h b/include/threadpool.h
new file mode 100644
index 0000000..f25c497
--- /dev/null
+++ b/include/threadpool.h
@@ -0,0 +1,125 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.h
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#ifndef THREADPOOL_H
+#define THREADPOOL_H
+
+#include "sqfs/predef.h"
+
+typedef int (*thread_pool_worker_t)(void *user, void *work_item);
+
+/**
+ * @struct thread_pool_t
+ *
+ * @brief A thread pool with a ticket number based work item ordering.
+ *
+ * While the order in which items are non-deterministic, the thread pool
+ * implementation internally uses a ticket system to ensure the completed
+ * items are deqeueued in the same order that they were enqueued.
+ */
+typedef struct thread_pool_t {
+ /**
+ * @brief Shutdown and destroy a thread pool.
+ *
+ * @param pool A pointer to a pool returned by thread_pool_create
+ */
+ void (*destroy)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Get the actual number of worker threads available.
+ *
+ * @return A number greater or equal to 1.
+ */
+ size_t (*get_worker_count)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Change the user data pointer for a thread pool worker
+ * by index.
+ *
+ * @param idx A zero-based index into the worker list.
+ * @param ptr A user pointer that this specific worker thread should
+ * pass to the worker callback.
+ */
+ void (*set_worker_ptr)(struct thread_pool_t *pool, size_t idx,
+ void *ptr);
+
+ /**
+ * @brief Submit a work item to a thread pool.
+ *
+ * This function will fail on allocation failure or if the internal
+ * error state is set was set by one of the workers.
+ *
+ * @param ptr A pointer to a work object to enqueue.
+ *
+ * @return Zero on success.
+ */
+ int (*submit)(struct thread_pool_t *pool, void *ptr);
+
+ /**
+ * @brief Wait for a work item to be completed.
+ *
+ * This function dequeues a single completed work item. It may block
+ * until one of the worker threads signals completion of an additional
+ * item.
+ *
+ * This function guarantees to return the items in the same order as
+ * they were submitted, so the function can actually block longer than
+ * necessary, because it has to wait until the next item in sequence
+ * is finished.
+ *
+ * @return A pointer to a new work item or NULL if there are none
+ * in the pipeline.
+ */
+ void *(*dequeue)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Get the internal worker return status value.
+ *
+ * If the worker functions returns a non-zero exit status in one of the
+ * worker threads, the thread pool stors the value internally and shuts
+ * down. This function can be used to retrieve the value.
+ *
+ * @return A non-zero value returned by the worker callback or zero if
+ * everything is A-OK.
+ */
+ int (*get_status)(struct thread_pool_t *pool);
+} thread_pool_t;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @brief Create a thread pool instance.
+ *
+ * @param num_jobs The number of worker threads to launch.
+ * @param worker A function to call from the worker threads to process
+ * the work items.
+ *
+ * @return A pointer to a thread pool on success, NULL on failure.
+ */
+SQFS_INTERNAL thread_pool_t *thread_pool_create(size_t num_jobs,
+ thread_pool_worker_t worker);
+
+/**
+ * @brief Create a serial mockup thread pool implementation.
+ *
+ * This returns a @ref thread_pool_t implementation that, instead of running a
+ * thread pool actually does the work in-situ when dequeueing.
+ *
+ * @param worker A function to call from the worker threads to process
+ * the work items.
+ *
+ * @return A pointer to a thread pool on success, NULL on failure.
+ */
+SQFS_INTERNAL
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* THREADPOOL_H */
diff --git a/include/w32threadwrap.h b/include/w32threadwrap.h
new file mode 100644
index 0000000..6b7344c
--- /dev/null
+++ b/include/w32threadwrap.h
@@ -0,0 +1,105 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * w32threadwrap.h
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#ifndef W32THREADWRAP_H
+#define W32THREADWRAP_H
+
+#include "sqfs/predef.h"
+
+#define WIN32_LEAN_AND_MEAN
+#define VC_EXTRALEAN
+#include <windows.h>
+
+typedef unsigned int sigset_t;
+typedef HANDLE pthread_t;
+typedef CRITICAL_SECTION pthread_mutex_t;
+typedef CONDITION_VARIABLE pthread_cond_t;
+
+static inline int pthread_create(pthread_t *thread, const void *attr,
+ LPTHREAD_START_ROUTINE proc, LPVOID arg)
+{
+ HANDLE hnd = CreateThread(NULL, 0, proc, arg, 0, 0);
+ (void)attr;
+
+ if (hnd == NULL)
+ return -1;
+
+ *thread = hnd;
+ return 0;
+}
+
+static int pthread_join(pthread_t thread, void **retval)
+{
+ WaitForSingleObject(thread, INFINITE);
+ CloseHandle(thread);
+ if (retval != NULL)
+ *retval = NULL;
+ return 0;
+}
+
+static inline int pthread_mutex_init(pthread_mutex_t *mutex, const void *attr)
+{
+ (void)attr;
+ InitializeCriticalSection(mutex);
+ return 0;
+}
+
+static inline int pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ EnterCriticalSection(mutex);
+ return 0;
+}
+
+static inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ LeaveCriticalSection(mutex);
+ return 0;
+}
+
+static inline void pthread_mutex_destroy(pthread_mutex_t *mutex)
+{
+ DeleteCriticalSection(mutex);
+}
+
+static inline int pthread_cond_init(pthread_cond_t *cond, const void *attr)
+{
+ (void)attr;
+ InitializeConditionVariable(cond);
+ return 0;
+}
+
+static inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
+{
+ return SleepConditionVariableCS(cond, mtx, INFINITE) != 0;
+}
+
+static inline int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+ WakeAllConditionVariable(cond);
+ return 0;
+}
+
+static inline void pthread_cond_destroy(pthread_cond_t *cond)
+{
+ (void)cond;
+}
+
+#define SIG_SETMASK (0)
+
+static inline int sigfillset(sigset_t *set)
+{
+ *set = 0xFFFFFFFF;
+ return 0;
+}
+
+static inline int pthread_sigmask(int how, const sigset_t *set,
+ const sigset_t *old)
+{
+ (void)how; (void)set; (void)old;
+ return 0;
+}
+
+#endif /* W32THREADWRAP_H */
diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am
index 39352f4..113855c 100644
--- a/lib/util/Makemodule.am
+++ b/lib/util/Makemodule.am
@@ -4,9 +4,27 @@ libutil_a_SOURCES += lib/util/rbtree.c include/rbtree.h
libutil_a_SOURCES += lib/util/array.c include/array.h
libutil_a_SOURCES += lib/util/xxhash.c lib/util/hash_table.c
libutil_a_SOURCES += lib/util/fast_urem_by_const.h
+libutil_a_SOURCES += include/threadpool.h
+libutil_a_SOURCES += include/w32threadwrap.h
+libutil_a_SOURCES += lib/util/threadpool_serial.c
libutil_a_CFLAGS = $(AM_CFLAGS)
libutil_a_CPPFLAGS = $(AM_CPPFLAGS)
+if WINDOWS
+libutil_a_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600
+endif
+
+if HAVE_PTHREAD
+libutil_a_SOURCES += lib/util/threadpool.c
+libutil_a_CFLAGS += $(PTHREAD_CFLAGS)
+else
+if WINDOWS
+libutil_a_SOURCES += lib/util/threadpool.c
+else
+libutil_a_CPPFLAGS += -DNO_THREAD_IMPL
+endif
+endif
+
if CUSTOM_ALLOC
libutil_a_SOURCES += lib/util/mempool.c include/mempool.h
endif
diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c
new file mode 100644
index 0000000..245efe9
--- /dev/null
+++ b/lib/util/threadpool.c
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#include "w32threadwrap.h"
+
+#define THREAD_FUN(funname, argname) DWORD WINAPI funname(LPVOID argname)
+#define THREAD_EXIT_SUCCESS (0)
+#else
+#include <pthread.h>
+#include <signal.h>
+
+#define THREAD_FUN(funname, argname) void *funname(void *argname)
+#define THREAD_EXIT_SUCCESS NULL
+#endif
+
+typedef struct thread_pool_impl_t thread_pool_impl_t;
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ size_t ticket_number;
+
+ void *data;
+} work_item_t;
+
+typedef struct {
+ pthread_t thread;
+ thread_pool_impl_t *pool;
+
+ thread_pool_worker_t fun;
+ void *user;
+} worker_t;
+
+struct thread_pool_impl_t {
+ thread_pool_t base;
+
+ pthread_mutex_t mtx;
+ pthread_cond_t queue_cond;
+ pthread_cond_t done_cond;
+
+ size_t next_ticket;
+ size_t next_dequeue_ticket;
+
+ size_t item_count;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *done;
+
+ work_item_t *recycle;
+
+ int status;
+
+ size_t num_workers;
+ worker_t workers[];
+};
+
+/*****************************************************************************/
+
+static void store_completed(thread_pool_impl_t *pool, work_item_t *done,
+ int status)
+{
+ work_item_t *it = pool->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->ticket_number >= done->ticket_number)
+ break;
+
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ done->next = pool->done;
+ pool->done = done;
+ } else {
+ done->next = prev->next;
+ prev->next = done;
+ }
+
+ if (status != 0 && pool->status == 0)
+ pool->status = status;
+
+ pthread_cond_broadcast(&pool->done_cond);
+}
+
+static work_item_t *get_next_work_item(thread_pool_impl_t *pool)
+{
+ work_item_t *item = NULL;
+
+ while (pool->queue == NULL && pool->status == 0)
+ pthread_cond_wait(&pool->queue_cond, &pool->mtx);
+
+ if (pool->status == 0) {
+ item = pool->queue;
+ pool->queue = item->next;
+ item->next = NULL;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+ }
+
+ return item;
+}
+
+static THREAD_FUN(worker_proc, arg)
+{
+ work_item_t *item = NULL;
+ worker_t *worker = arg;
+ int status = 0;
+
+ for (;;) {
+ pthread_mutex_lock(&worker->pool->mtx);
+ if (item != NULL)
+ store_completed(worker->pool, item, status);
+
+ item = get_next_work_item(worker->pool);
+ pthread_mutex_unlock(&worker->pool->mtx);
+
+ if (item == NULL)
+ break;
+
+ status = worker->fun(worker->user, item->data);
+ }
+
+ return THREAD_EXIT_SUCCESS;
+}
+
+/*****************************************************************************/
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ size_t i;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (i = 0; i < pool->num_workers; ++i)
+ pthread_join(pool->workers[i].thread, NULL);
+
+ pthread_cond_destroy(&pool->done_cond);
+ pthread_cond_destroy(&pool->queue_cond);
+ pthread_mutex_destroy(&pool->mtx);
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->done != NULL) {
+ work_item_t *item = pool->done;
+ pool->done = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->num_workers;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= pool->num_workers)
+ return;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->workers[idx].user = ptr;
+ pthread_mutex_unlock(&pool->mtx);
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+ int status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+
+ if (status == 0) {
+ item->data = ptr;
+ item->ticket_number = pool->next_ticket++;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ pool->item_count += 1;
+ }
+
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ if (status != 0) {
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+ }
+
+ return status;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *out = NULL;
+ void *ptr = NULL;
+
+ if (pool->item_count == 0)
+ return NULL;
+
+ /* dequeue */
+ pthread_mutex_lock(&pool->mtx);
+ while (out == NULL) {
+ if (pool->done == NULL) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ if (pool->done->ticket_number != pool->next_dequeue_ticket) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ out = pool->done;
+ pool->done = out->next;
+ out->next = NULL;
+ pool->next_dequeue_ticket += 1;
+ }
+ pthread_mutex_unlock(&pool->mtx);
+
+ /* get the data pointer, put the container in the recycle bin */
+ ptr = out->data;
+
+ out->ticket_number = 0;
+ out->data = NULL;
+ out->next = pool->recycle;
+ pool->recycle = out;
+
+ pool->item_count -= 1;
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ int status;
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+ pthread_mutex_unlock(&pool->mtx);
+
+ return status;
+}
+
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool;
+ thread_pool_t *interface;
+ sigset_t set, oldset;
+ size_t i, j;
+ int ret;
+
+ if (num_jobs < 1)
+ num_jobs = 1;
+
+ pool = alloc_flex(sizeof(*pool), sizeof(pool->workers[0]), num_jobs);
+ if (pool == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&pool->mtx, NULL) != 0)
+ goto fail_free;
+
+ if (pthread_cond_init(&pool->queue_cond, NULL) != 0)
+ goto fail_mtx;
+
+ if (pthread_cond_init(&pool->done_cond, NULL) != 0)
+ goto fail_qcond;
+
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+ pool->num_workers = num_jobs;
+
+ for (i = 0; i < num_jobs; ++i) {
+ pool->workers[i].fun = worker;
+ pool->workers[i].pool = pool;
+
+ ret = pthread_create(&pool->workers[i].thread, NULL,
+ worker_proc, pool->workers + i);
+
+ if (ret != 0)
+ goto fail;
+ }
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+fail:
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (j = 0; j < i; ++j)
+ pthread_join(pool->workers[j].thread, NULL);
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ pthread_cond_destroy(&pool->done_cond);
+fail_qcond:
+ pthread_cond_destroy(&pool->queue_cond);
+fail_mtx:
+ pthread_mutex_destroy(&pool->mtx);
+fail_free:
+ free(pool);
+ return NULL;
+}
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c
new file mode 100644
index 0000000..86eade8
--- /dev/null
+++ b/lib/util/threadpool_serial.c
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool_serial.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ void *data;
+} work_item_t;
+
+typedef struct {
+ thread_pool_t base;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *recycle;
+
+ thread_pool_worker_t fun;
+ void *user;
+ int status;
+} thread_pool_impl_t;
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *pool)
+{
+ (void)pool;
+ return 1;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= 1)
+ return;
+
+ pool->user = ptr;
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+
+ if (pool->status != 0)
+ return pool->status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ item->data = ptr;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ return 0;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item;
+ void *ptr;
+ int ret;
+
+ if (pool->queue == NULL)
+ return NULL;
+
+ item = pool->queue;
+ pool->queue = item->next;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+
+ ptr = item->data;
+
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+
+ ret = pool->fun(pool->user, ptr);
+
+ if (ret != 0 && pool->status == 0)
+ pool->status = ret;
+
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->status;
+}
+
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool = calloc(1, sizeof(*pool));
+ thread_pool_t *interface = (thread_pool_t *)pool;
+
+ if (pool == NULL)
+ return NULL;
+
+ pool->fun = worker;
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+
+}
+
+#ifdef NO_THREAD_IMPL
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ (void)num_jobs;
+ return thread_pool_create_serial(worker);
+}
+#endif
diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am
index 1153db4..1fe4ebf 100644
--- a/tests/libutil/Makemodule.am
+++ b/tests/libutil/Makemodule.am
@@ -8,8 +8,12 @@ test_rbtree_LDADD = libutil.a libcompat.a
test_xxhash_SOURCES = tests/libutil/xxhash.c
test_xxhash_LDADD = libutil.a libcompat.a
+test_threadpool_SOURCES = tests/libutil/threadpool.c
+test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
+test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS)
+
LIBUTIL_TESTS = \
- test_str_table test_rbtree test_xxhash
+ test_str_table test_rbtree test_xxhash test_threadpool
check_PROGRAMS += $(LIBUTIL_TESTS)
TESTS += $(LIBUTIL_TESTS)
diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c
new file mode 100644
index 0000000..566239f
--- /dev/null
+++ b/tests/libutil/threadpool.c
@@ -0,0 +1,111 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "config.h"
+
+#include "threadpool.h"
+#include "../test.h"
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#define WIN32_LEAN_AND_MEAN
+#define VC_EXTRALEAN
+#include <windows.h>
+#endif
+
+#include <time.h>
+
+static int worker(void *user, void *work_item)
+{
+ unsigned int value = *((unsigned int *)work_item);
+ (void)user;
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+ Sleep(100 * value);
+#else
+ {
+ struct timespec sp;
+
+ sp.tv_sec = value / 10;
+ sp.tv_nsec = 100000000;
+ sp.tv_nsec *= (long)(value % 10);
+
+ nanosleep(&sp, NULL);
+ }
+#endif
+
+ *((unsigned int *)work_item) = 42;
+ return 0;
+}
+
+int main(void)
+{
+ unsigned int values[10];
+ thread_pool_t *pool;
+ unsigned int *ptr;
+ size_t i, count;
+ int ret;
+
+ pool = thread_pool_create(10, worker);
+ TEST_NOT_NULL(pool);
+
+ count = pool->get_worker_count(pool);
+ TEST_ASSERT(count >= 1);
+
+ /* dequeue on empty pool MUST NOT deadlock */
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ values[i] = sizeof(values) / sizeof(values[0]) - i;
+
+ ret = pool->submit(pool, values + i);
+ TEST_EQUAL_I(ret, 0);
+ }
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ ptr = pool->dequeue(pool);
+
+ TEST_NOT_NULL(ptr);
+ TEST_ASSERT(ptr == (values + i));
+ TEST_EQUAL_UI(*ptr, 42);
+ }
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ pool->destroy(pool);
+
+ /* redo the same test with the serial implementation */
+ pool = thread_pool_create_serial(worker);
+ TEST_NOT_NULL(pool);
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ count = pool->get_worker_count(pool);
+ TEST_EQUAL_UI(count, 1);
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ values[i] = sizeof(values) / sizeof(values[0]) - i;
+
+ ret = pool->submit(pool, values + i);
+ TEST_EQUAL_I(ret, 0);
+ }
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ ptr = pool->dequeue(pool);
+
+ TEST_NOT_NULL(ptr);
+ TEST_ASSERT(ptr == (values + i));
+ TEST_EQUAL_UI(*ptr, 42);
+ }
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ pool->destroy(pool);
+ return EXIT_SUCCESS;
+}