aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/threadpool.h125
-rw-r--r--include/w32threadwrap.h105
-rw-r--r--lib/util/Makemodule.am18
-rw-r--r--lib/util/threadpool.c366
-rw-r--r--lib/util/threadpool_serial.c162
-rw-r--r--tests/libutil/Makemodule.am6
-rw-r--r--tests/libutil/threadpool.c111
7 files changed, 892 insertions, 1 deletions
diff --git a/include/threadpool.h b/include/threadpool.h
new file mode 100644
index 0000000..f25c497
--- /dev/null
+++ b/include/threadpool.h
@@ -0,0 +1,125 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.h
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#ifndef THREADPOOL_H
+#define THREADPOOL_H
+
+#include "sqfs/predef.h"
+
+typedef int (*thread_pool_worker_t)(void *user, void *work_item);
+
+/**
+ * @struct thread_pool_t
+ *
+ * @brief A thread pool with a ticket number based work item ordering.
+ *
+ * While the order in which items are non-deterministic, the thread pool
+ * implementation internally uses a ticket system to ensure the completed
+ * items are deqeueued in the same order that they were enqueued.
+ */
+typedef struct thread_pool_t {
+ /**
+ * @brief Shutdown and destroy a thread pool.
+ *
+ * @param pool A pointer to a pool returned by thread_pool_create
+ */
+ void (*destroy)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Get the actual number of worker threads available.
+ *
+ * @return A number greater or equal to 1.
+ */
+ size_t (*get_worker_count)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Change the user data pointer for a thread pool worker
+ * by index.
+ *
+ * @param idx A zero-based index into the worker list.
+ * @param ptr A user pointer that this specific worker thread should
+ * pass to the worker callback.
+ */
+ void (*set_worker_ptr)(struct thread_pool_t *pool, size_t idx,
+ void *ptr);
+
+ /**
+ * @brief Submit a work item to a thread pool.
+ *
+ * This function will fail on allocation failure or if the internal
+ * error state is set was set by one of the workers.
+ *
+ * @param ptr A pointer to a work object to enqueue.
+ *
+ * @return Zero on success.
+ */
+ int (*submit)(struct thread_pool_t *pool, void *ptr);
+
+ /**
+ * @brief Wait for a work item to be completed.
+ *
+ * This function dequeues a single completed work item. It may block
+ * until one of the worker threads signals completion of an additional
+ * item.
+ *
+ * This function guarantees to return the items in the same order as
+ * they were submitted, so the function can actually block longer than
+ * necessary, because it has to wait until the next item in sequence
+ * is finished.
+ *
+ * @return A pointer to a new work item or NULL if there are none
+ * in the pipeline.
+ */
+ void *(*dequeue)(struct thread_pool_t *pool);
+
+ /**
+ * @brief Get the internal worker return status value.
+ *
+ * If the worker functions returns a non-zero exit status in one of the
+ * worker threads, the thread pool stors the value internally and shuts
+ * down. This function can be used to retrieve the value.
+ *
+ * @return A non-zero value returned by the worker callback or zero if
+ * everything is A-OK.
+ */
+ int (*get_status)(struct thread_pool_t *pool);
+} thread_pool_t;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @brief Create a thread pool instance.
+ *
+ * @param num_jobs The number of worker threads to launch.
+ * @param worker A function to call from the worker threads to process
+ * the work items.
+ *
+ * @return A pointer to a thread pool on success, NULL on failure.
+ */
+SQFS_INTERNAL thread_pool_t *thread_pool_create(size_t num_jobs,
+ thread_pool_worker_t worker);
+
+/**
+ * @brief Create a serial mockup thread pool implementation.
+ *
+ * This returns a @ref thread_pool_t implementation that, instead of running a
+ * thread pool actually does the work in-situ when dequeueing.
+ *
+ * @param worker A function to call from the worker threads to process
+ * the work items.
+ *
+ * @return A pointer to a thread pool on success, NULL on failure.
+ */
+SQFS_INTERNAL
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* THREADPOOL_H */
diff --git a/include/w32threadwrap.h b/include/w32threadwrap.h
new file mode 100644
index 0000000..6b7344c
--- /dev/null
+++ b/include/w32threadwrap.h
@@ -0,0 +1,105 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * w32threadwrap.h
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#ifndef W32THREADWRAP_H
+#define W32THREADWRAP_H
+
+#include "sqfs/predef.h"
+
+#define WIN32_LEAN_AND_MEAN
+#define VC_EXTRALEAN
+#include <windows.h>
+
+typedef unsigned int sigset_t;
+typedef HANDLE pthread_t;
+typedef CRITICAL_SECTION pthread_mutex_t;
+typedef CONDITION_VARIABLE pthread_cond_t;
+
+static inline int pthread_create(pthread_t *thread, const void *attr,
+ LPTHREAD_START_ROUTINE proc, LPVOID arg)
+{
+ HANDLE hnd = CreateThread(NULL, 0, proc, arg, 0, 0);
+ (void)attr;
+
+ if (hnd == NULL)
+ return -1;
+
+ *thread = hnd;
+ return 0;
+}
+
+static int pthread_join(pthread_t thread, void **retval)
+{
+ WaitForSingleObject(thread, INFINITE);
+ CloseHandle(thread);
+ if (retval != NULL)
+ *retval = NULL;
+ return 0;
+}
+
+static inline int pthread_mutex_init(pthread_mutex_t *mutex, const void *attr)
+{
+ (void)attr;
+ InitializeCriticalSection(mutex);
+ return 0;
+}
+
+static inline int pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ EnterCriticalSection(mutex);
+ return 0;
+}
+
+static inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ LeaveCriticalSection(mutex);
+ return 0;
+}
+
+static inline void pthread_mutex_destroy(pthread_mutex_t *mutex)
+{
+ DeleteCriticalSection(mutex);
+}
+
+static inline int pthread_cond_init(pthread_cond_t *cond, const void *attr)
+{
+ (void)attr;
+ InitializeConditionVariable(cond);
+ return 0;
+}
+
+static inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
+{
+ return SleepConditionVariableCS(cond, mtx, INFINITE) != 0;
+}
+
+static inline int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+ WakeAllConditionVariable(cond);
+ return 0;
+}
+
+static inline void pthread_cond_destroy(pthread_cond_t *cond)
+{
+ (void)cond;
+}
+
+#define SIG_SETMASK (0)
+
+static inline int sigfillset(sigset_t *set)
+{
+ *set = 0xFFFFFFFF;
+ return 0;
+}
+
+static inline int pthread_sigmask(int how, const sigset_t *set,
+ const sigset_t *old)
+{
+ (void)how; (void)set; (void)old;
+ return 0;
+}
+
+#endif /* W32THREADWRAP_H */
diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am
index 39352f4..113855c 100644
--- a/lib/util/Makemodule.am
+++ b/lib/util/Makemodule.am
@@ -4,9 +4,27 @@ libutil_a_SOURCES += lib/util/rbtree.c include/rbtree.h
libutil_a_SOURCES += lib/util/array.c include/array.h
libutil_a_SOURCES += lib/util/xxhash.c lib/util/hash_table.c
libutil_a_SOURCES += lib/util/fast_urem_by_const.h
+libutil_a_SOURCES += include/threadpool.h
+libutil_a_SOURCES += include/w32threadwrap.h
+libutil_a_SOURCES += lib/util/threadpool_serial.c
libutil_a_CFLAGS = $(AM_CFLAGS)
libutil_a_CPPFLAGS = $(AM_CPPFLAGS)
+if WINDOWS
+libutil_a_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600
+endif
+
+if HAVE_PTHREAD
+libutil_a_SOURCES += lib/util/threadpool.c
+libutil_a_CFLAGS += $(PTHREAD_CFLAGS)
+else
+if WINDOWS
+libutil_a_SOURCES += lib/util/threadpool.c
+else
+libutil_a_CPPFLAGS += -DNO_THREAD_IMPL
+endif
+endif
+
if CUSTOM_ALLOC
libutil_a_SOURCES += lib/util/mempool.c include/mempool.h
endif
diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c
new file mode 100644
index 0000000..245efe9
--- /dev/null
+++ b/lib/util/threadpool.c
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#include "w32threadwrap.h"
+
+#define THREAD_FUN(funname, argname) DWORD WINAPI funname(LPVOID argname)
+#define THREAD_EXIT_SUCCESS (0)
+#else
+#include <pthread.h>
+#include <signal.h>
+
+#define THREAD_FUN(funname, argname) void *funname(void *argname)
+#define THREAD_EXIT_SUCCESS NULL
+#endif
+
+typedef struct thread_pool_impl_t thread_pool_impl_t;
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ size_t ticket_number;
+
+ void *data;
+} work_item_t;
+
+typedef struct {
+ pthread_t thread;
+ thread_pool_impl_t *pool;
+
+ thread_pool_worker_t fun;
+ void *user;
+} worker_t;
+
+struct thread_pool_impl_t {
+ thread_pool_t base;
+
+ pthread_mutex_t mtx;
+ pthread_cond_t queue_cond;
+ pthread_cond_t done_cond;
+
+ size_t next_ticket;
+ size_t next_dequeue_ticket;
+
+ size_t item_count;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *done;
+
+ work_item_t *recycle;
+
+ int status;
+
+ size_t num_workers;
+ worker_t workers[];
+};
+
+/*****************************************************************************/
+
+static void store_completed(thread_pool_impl_t *pool, work_item_t *done,
+ int status)
+{
+ work_item_t *it = pool->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->ticket_number >= done->ticket_number)
+ break;
+
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ done->next = pool->done;
+ pool->done = done;
+ } else {
+ done->next = prev->next;
+ prev->next = done;
+ }
+
+ if (status != 0 && pool->status == 0)
+ pool->status = status;
+
+ pthread_cond_broadcast(&pool->done_cond);
+}
+
+static work_item_t *get_next_work_item(thread_pool_impl_t *pool)
+{
+ work_item_t *item = NULL;
+
+ while (pool->queue == NULL && pool->status == 0)
+ pthread_cond_wait(&pool->queue_cond, &pool->mtx);
+
+ if (pool->status == 0) {
+ item = pool->queue;
+ pool->queue = item->next;
+ item->next = NULL;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+ }
+
+ return item;
+}
+
+static THREAD_FUN(worker_proc, arg)
+{
+ work_item_t *item = NULL;
+ worker_t *worker = arg;
+ int status = 0;
+
+ for (;;) {
+ pthread_mutex_lock(&worker->pool->mtx);
+ if (item != NULL)
+ store_completed(worker->pool, item, status);
+
+ item = get_next_work_item(worker->pool);
+ pthread_mutex_unlock(&worker->pool->mtx);
+
+ if (item == NULL)
+ break;
+
+ status = worker->fun(worker->user, item->data);
+ }
+
+ return THREAD_EXIT_SUCCESS;
+}
+
+/*****************************************************************************/
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ size_t i;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (i = 0; i < pool->num_workers; ++i)
+ pthread_join(pool->workers[i].thread, NULL);
+
+ pthread_cond_destroy(&pool->done_cond);
+ pthread_cond_destroy(&pool->queue_cond);
+ pthread_mutex_destroy(&pool->mtx);
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->done != NULL) {
+ work_item_t *item = pool->done;
+ pool->done = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->num_workers;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= pool->num_workers)
+ return;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->workers[idx].user = ptr;
+ pthread_mutex_unlock(&pool->mtx);
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+ int status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+
+ if (status == 0) {
+ item->data = ptr;
+ item->ticket_number = pool->next_ticket++;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ pool->item_count += 1;
+ }
+
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ if (status != 0) {
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+ }
+
+ return status;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *out = NULL;
+ void *ptr = NULL;
+
+ if (pool->item_count == 0)
+ return NULL;
+
+ /* dequeue */
+ pthread_mutex_lock(&pool->mtx);
+ while (out == NULL) {
+ if (pool->done == NULL) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ if (pool->done->ticket_number != pool->next_dequeue_ticket) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ out = pool->done;
+ pool->done = out->next;
+ out->next = NULL;
+ pool->next_dequeue_ticket += 1;
+ }
+ pthread_mutex_unlock(&pool->mtx);
+
+ /* get the data pointer, put the container in the recycle bin */
+ ptr = out->data;
+
+ out->ticket_number = 0;
+ out->data = NULL;
+ out->next = pool->recycle;
+ pool->recycle = out;
+
+ pool->item_count -= 1;
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ int status;
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+ pthread_mutex_unlock(&pool->mtx);
+
+ return status;
+}
+
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool;
+ thread_pool_t *interface;
+ sigset_t set, oldset;
+ size_t i, j;
+ int ret;
+
+ if (num_jobs < 1)
+ num_jobs = 1;
+
+ pool = alloc_flex(sizeof(*pool), sizeof(pool->workers[0]), num_jobs);
+ if (pool == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&pool->mtx, NULL) != 0)
+ goto fail_free;
+
+ if (pthread_cond_init(&pool->queue_cond, NULL) != 0)
+ goto fail_mtx;
+
+ if (pthread_cond_init(&pool->done_cond, NULL) != 0)
+ goto fail_qcond;
+
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+ pool->num_workers = num_jobs;
+
+ for (i = 0; i < num_jobs; ++i) {
+ pool->workers[i].fun = worker;
+ pool->workers[i].pool = pool;
+
+ ret = pthread_create(&pool->workers[i].thread, NULL,
+ worker_proc, pool->workers + i);
+
+ if (ret != 0)
+ goto fail;
+ }
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+fail:
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (j = 0; j < i; ++j)
+ pthread_join(pool->workers[j].thread, NULL);
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ pthread_cond_destroy(&pool->done_cond);
+fail_qcond:
+ pthread_cond_destroy(&pool->queue_cond);
+fail_mtx:
+ pthread_mutex_destroy(&pool->mtx);
+fail_free:
+ free(pool);
+ return NULL;
+}
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c
new file mode 100644
index 0000000..86eade8
--- /dev/null
+++ b/lib/util/threadpool_serial.c
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool_serial.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ void *data;
+} work_item_t;
+
+typedef struct {
+ thread_pool_t base;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *recycle;
+
+ thread_pool_worker_t fun;
+ void *user;
+ int status;
+} thread_pool_impl_t;
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *pool)
+{
+ (void)pool;
+ return 1;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= 1)
+ return;
+
+ pool->user = ptr;
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+
+ if (pool->status != 0)
+ return pool->status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ item->data = ptr;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ return 0;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item;
+ void *ptr;
+ int ret;
+
+ if (pool->queue == NULL)
+ return NULL;
+
+ item = pool->queue;
+ pool->queue = item->next;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+
+ ptr = item->data;
+
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+
+ ret = pool->fun(pool->user, ptr);
+
+ if (ret != 0 && pool->status == 0)
+ pool->status = ret;
+
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->status;
+}
+
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool = calloc(1, sizeof(*pool));
+ thread_pool_t *interface = (thread_pool_t *)pool;
+
+ if (pool == NULL)
+ return NULL;
+
+ pool->fun = worker;
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+
+}
+
+#ifdef NO_THREAD_IMPL
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ (void)num_jobs;
+ return thread_pool_create_serial(worker);
+}
+#endif
diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am
index 1153db4..1fe4ebf 100644
--- a/tests/libutil/Makemodule.am
+++ b/tests/libutil/Makemodule.am
@@ -8,8 +8,12 @@ test_rbtree_LDADD = libutil.a libcompat.a
test_xxhash_SOURCES = tests/libutil/xxhash.c
test_xxhash_LDADD = libutil.a libcompat.a
+test_threadpool_SOURCES = tests/libutil/threadpool.c
+test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
+test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS)
+
LIBUTIL_TESTS = \
- test_str_table test_rbtree test_xxhash
+ test_str_table test_rbtree test_xxhash test_threadpool
check_PROGRAMS += $(LIBUTIL_TESTS)
TESTS += $(LIBUTIL_TESTS)
diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c
new file mode 100644
index 0000000..566239f
--- /dev/null
+++ b/tests/libutil/threadpool.c
@@ -0,0 +1,111 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "config.h"
+
+#include "threadpool.h"
+#include "../test.h"
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#define WIN32_LEAN_AND_MEAN
+#define VC_EXTRALEAN
+#include <windows.h>
+#endif
+
+#include <time.h>
+
+static int worker(void *user, void *work_item)
+{
+ unsigned int value = *((unsigned int *)work_item);
+ (void)user;
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+ Sleep(100 * value);
+#else
+ {
+ struct timespec sp;
+
+ sp.tv_sec = value / 10;
+ sp.tv_nsec = 100000000;
+ sp.tv_nsec *= (long)(value % 10);
+
+ nanosleep(&sp, NULL);
+ }
+#endif
+
+ *((unsigned int *)work_item) = 42;
+ return 0;
+}
+
+int main(void)
+{
+ unsigned int values[10];
+ thread_pool_t *pool;
+ unsigned int *ptr;
+ size_t i, count;
+ int ret;
+
+ pool = thread_pool_create(10, worker);
+ TEST_NOT_NULL(pool);
+
+ count = pool->get_worker_count(pool);
+ TEST_ASSERT(count >= 1);
+
+ /* dequeue on empty pool MUST NOT deadlock */
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ values[i] = sizeof(values) / sizeof(values[0]) - i;
+
+ ret = pool->submit(pool, values + i);
+ TEST_EQUAL_I(ret, 0);
+ }
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ ptr = pool->dequeue(pool);
+
+ TEST_NOT_NULL(ptr);
+ TEST_ASSERT(ptr == (values + i));
+ TEST_EQUAL_UI(*ptr, 42);
+ }
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ pool->destroy(pool);
+
+ /* redo the same test with the serial implementation */
+ pool = thread_pool_create_serial(worker);
+ TEST_NOT_NULL(pool);
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ count = pool->get_worker_count(pool);
+ TEST_EQUAL_UI(count, 1);
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ values[i] = sizeof(values) / sizeof(values[0]) - i;
+
+ ret = pool->submit(pool, values + i);
+ TEST_EQUAL_I(ret, 0);
+ }
+
+ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
+ ptr = pool->dequeue(pool);
+
+ TEST_NOT_NULL(ptr);
+ TEST_ASSERT(ptr == (values + i));
+ TEST_EQUAL_UI(*ptr, 42);
+ }
+
+ ptr = pool->dequeue(pool);
+ TEST_NULL(ptr);
+
+ pool->destroy(pool);
+ return EXIT_SUCCESS;
+}