aboutsummaryrefslogtreecommitdiff
path: root/lib/util
diff options
context:
space:
mode:
Diffstat (limited to 'lib/util')
-rw-r--r--lib/util/Makemodule.am18
-rw-r--r--lib/util/threadpool.c366
-rw-r--r--lib/util/threadpool_serial.c162
3 files changed, 546 insertions, 0 deletions
diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am
index 39352f4..113855c 100644
--- a/lib/util/Makemodule.am
+++ b/lib/util/Makemodule.am
@@ -4,9 +4,27 @@ libutil_a_SOURCES += lib/util/rbtree.c include/rbtree.h
libutil_a_SOURCES += lib/util/array.c include/array.h
libutil_a_SOURCES += lib/util/xxhash.c lib/util/hash_table.c
libutil_a_SOURCES += lib/util/fast_urem_by_const.h
+libutil_a_SOURCES += include/threadpool.h
+libutil_a_SOURCES += include/w32threadwrap.h
+libutil_a_SOURCES += lib/util/threadpool_serial.c
libutil_a_CFLAGS = $(AM_CFLAGS)
libutil_a_CPPFLAGS = $(AM_CPPFLAGS)
+if WINDOWS
+libutil_a_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600
+endif
+
+if HAVE_PTHREAD
+libutil_a_SOURCES += lib/util/threadpool.c
+libutil_a_CFLAGS += $(PTHREAD_CFLAGS)
+else
+if WINDOWS
+libutil_a_SOURCES += lib/util/threadpool.c
+else
+libutil_a_CPPFLAGS += -DNO_THREAD_IMPL
+endif
+endif
+
if CUSTOM_ALLOC
libutil_a_SOURCES += lib/util/mempool.c include/mempool.h
endif
diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c
new file mode 100644
index 0000000..245efe9
--- /dev/null
+++ b/lib/util/threadpool.c
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#include "w32threadwrap.h"
+
+#define THREAD_FUN(funname, argname) DWORD WINAPI funname(LPVOID argname)
+#define THREAD_EXIT_SUCCESS (0)
+#else
+#include <pthread.h>
+#include <signal.h>
+
+#define THREAD_FUN(funname, argname) void *funname(void *argname)
+#define THREAD_EXIT_SUCCESS NULL
+#endif
+
+typedef struct thread_pool_impl_t thread_pool_impl_t;
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ size_t ticket_number;
+
+ void *data;
+} work_item_t;
+
+typedef struct {
+ pthread_t thread;
+ thread_pool_impl_t *pool;
+
+ thread_pool_worker_t fun;
+ void *user;
+} worker_t;
+
+struct thread_pool_impl_t {
+ thread_pool_t base;
+
+ pthread_mutex_t mtx;
+ pthread_cond_t queue_cond;
+ pthread_cond_t done_cond;
+
+ size_t next_ticket;
+ size_t next_dequeue_ticket;
+
+ size_t item_count;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *done;
+
+ work_item_t *recycle;
+
+ int status;
+
+ size_t num_workers;
+ worker_t workers[];
+};
+
+/*****************************************************************************/
+
+static void store_completed(thread_pool_impl_t *pool, work_item_t *done,
+ int status)
+{
+ work_item_t *it = pool->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->ticket_number >= done->ticket_number)
+ break;
+
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ done->next = pool->done;
+ pool->done = done;
+ } else {
+ done->next = prev->next;
+ prev->next = done;
+ }
+
+ if (status != 0 && pool->status == 0)
+ pool->status = status;
+
+ pthread_cond_broadcast(&pool->done_cond);
+}
+
+static work_item_t *get_next_work_item(thread_pool_impl_t *pool)
+{
+ work_item_t *item = NULL;
+
+ while (pool->queue == NULL && pool->status == 0)
+ pthread_cond_wait(&pool->queue_cond, &pool->mtx);
+
+ if (pool->status == 0) {
+ item = pool->queue;
+ pool->queue = item->next;
+ item->next = NULL;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+ }
+
+ return item;
+}
+
+static THREAD_FUN(worker_proc, arg)
+{
+ work_item_t *item = NULL;
+ worker_t *worker = arg;
+ int status = 0;
+
+ for (;;) {
+ pthread_mutex_lock(&worker->pool->mtx);
+ if (item != NULL)
+ store_completed(worker->pool, item, status);
+
+ item = get_next_work_item(worker->pool);
+ pthread_mutex_unlock(&worker->pool->mtx);
+
+ if (item == NULL)
+ break;
+
+ status = worker->fun(worker->user, item->data);
+ }
+
+ return THREAD_EXIT_SUCCESS;
+}
+
+/*****************************************************************************/
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ size_t i;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (i = 0; i < pool->num_workers; ++i)
+ pthread_join(pool->workers[i].thread, NULL);
+
+ pthread_cond_destroy(&pool->done_cond);
+ pthread_cond_destroy(&pool->queue_cond);
+ pthread_mutex_destroy(&pool->mtx);
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->done != NULL) {
+ work_item_t *item = pool->done;
+ pool->done = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->num_workers;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= pool->num_workers)
+ return;
+
+ pthread_mutex_lock(&pool->mtx);
+ pool->workers[idx].user = ptr;
+ pthread_mutex_unlock(&pool->mtx);
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+ int status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+
+ if (status == 0) {
+ item->data = ptr;
+ item->ticket_number = pool->next_ticket++;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ pool->item_count += 1;
+ }
+
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ if (status != 0) {
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+ }
+
+ return status;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *out = NULL;
+ void *ptr = NULL;
+
+ if (pool->item_count == 0)
+ return NULL;
+
+ /* dequeue */
+ pthread_mutex_lock(&pool->mtx);
+ while (out == NULL) {
+ if (pool->done == NULL) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ if (pool->done->ticket_number != pool->next_dequeue_ticket) {
+ pthread_cond_wait(&pool->done_cond, &pool->mtx);
+ continue;
+ }
+
+ out = pool->done;
+ pool->done = out->next;
+ out->next = NULL;
+ pool->next_dequeue_ticket += 1;
+ }
+ pthread_mutex_unlock(&pool->mtx);
+
+ /* get the data pointer, put the container in the recycle bin */
+ ptr = out->data;
+
+ out->ticket_number = 0;
+ out->data = NULL;
+ out->next = pool->recycle;
+ pool->recycle = out;
+
+ pool->item_count -= 1;
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ int status;
+
+ pthread_mutex_lock(&pool->mtx);
+ status = pool->status;
+ pthread_mutex_unlock(&pool->mtx);
+
+ return status;
+}
+
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool;
+ thread_pool_t *interface;
+ sigset_t set, oldset;
+ size_t i, j;
+ int ret;
+
+ if (num_jobs < 1)
+ num_jobs = 1;
+
+ pool = alloc_flex(sizeof(*pool), sizeof(pool->workers[0]), num_jobs);
+ if (pool == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&pool->mtx, NULL) != 0)
+ goto fail_free;
+
+ if (pthread_cond_init(&pool->queue_cond, NULL) != 0)
+ goto fail_mtx;
+
+ if (pthread_cond_init(&pool->done_cond, NULL) != 0)
+ goto fail_qcond;
+
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+ pool->num_workers = num_jobs;
+
+ for (i = 0; i < num_jobs; ++i) {
+ pool->workers[i].fun = worker;
+ pool->workers[i].pool = pool;
+
+ ret = pthread_create(&pool->workers[i].thread, NULL,
+ worker_proc, pool->workers + i);
+
+ if (ret != 0)
+ goto fail;
+ }
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+fail:
+ pthread_mutex_lock(&pool->mtx);
+ pool->status = -1;
+ pthread_cond_broadcast(&pool->queue_cond);
+ pthread_mutex_unlock(&pool->mtx);
+
+ for (j = 0; j < i; ++j)
+ pthread_join(pool->workers[j].thread, NULL);
+
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ pthread_cond_destroy(&pool->done_cond);
+fail_qcond:
+ pthread_cond_destroy(&pool->queue_cond);
+fail_mtx:
+ pthread_mutex_destroy(&pool->mtx);
+fail_free:
+ free(pool);
+ return NULL;
+}
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c
new file mode 100644
index 0000000..86eade8
--- /dev/null
+++ b/lib/util/threadpool_serial.c
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool_serial.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ void *data;
+} work_item_t;
+
+typedef struct {
+ thread_pool_t base;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *recycle;
+
+ thread_pool_worker_t fun;
+ void *user;
+ int status;
+} thread_pool_impl_t;
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *pool)
+{
+ (void)pool;
+ return 1;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= 1)
+ return;
+
+ pool->user = ptr;
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+
+ if (pool->status != 0)
+ return pool->status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ item->data = ptr;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ return 0;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item;
+ void *ptr;
+ int ret;
+
+ if (pool->queue == NULL)
+ return NULL;
+
+ item = pool->queue;
+ pool->queue = item->next;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+
+ ptr = item->data;
+
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+
+ ret = pool->fun(pool->user, ptr);
+
+ if (ret != 0 && pool->status == 0)
+ pool->status = ret;
+
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->status;
+}
+
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool = calloc(1, sizeof(*pool));
+ thread_pool_t *interface = (thread_pool_t *)pool;
+
+ if (pool == NULL)
+ return NULL;
+
+ pool->fun = worker;
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+
+}
+
+#ifdef NO_THREAD_IMPL
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ (void)num_jobs;
+ return thread_pool_create_serial(worker);
+}
+#endif