diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-20 16:46:22 +0100 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-21 17:29:18 +0100 |
commit | a18f724aa3bf57aeed285b5f61eca4a0ba891c21 (patch) | |
tree | ce90fb7fd9494f340efc3416ab769353d480f82b /lib | |
parent | 977aa1d0d29b5b48b31279d7709a7209001ee309 (diff) |
Add a thread pool implementation to libutil
The thread pool enforces ordering of items during dequeue similar
to the already existing implementation in libsqfs. The idea is to
eventually pull this functionality out of the block processor and
turn it into a cleaner, separately tested module.
The thread pool is implemented as an abstract interface, so we can
have multiple implementations around, including the serial fallback
implementation which we can then *always* test, irregardless of the
compile config and run through static analysis as well.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/util/Makemodule.am | 18 | ||||
-rw-r--r-- | lib/util/threadpool.c | 366 | ||||
-rw-r--r-- | lib/util/threadpool_serial.c | 162 |
3 files changed, 546 insertions, 0 deletions
diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am index 39352f4..113855c 100644 --- a/lib/util/Makemodule.am +++ b/lib/util/Makemodule.am @@ -4,9 +4,27 @@ libutil_a_SOURCES += lib/util/rbtree.c include/rbtree.h libutil_a_SOURCES += lib/util/array.c include/array.h libutil_a_SOURCES += lib/util/xxhash.c lib/util/hash_table.c libutil_a_SOURCES += lib/util/fast_urem_by_const.h +libutil_a_SOURCES += include/threadpool.h +libutil_a_SOURCES += include/w32threadwrap.h +libutil_a_SOURCES += lib/util/threadpool_serial.c libutil_a_CFLAGS = $(AM_CFLAGS) libutil_a_CPPFLAGS = $(AM_CPPFLAGS) +if WINDOWS +libutil_a_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600 +endif + +if HAVE_PTHREAD +libutil_a_SOURCES += lib/util/threadpool.c +libutil_a_CFLAGS += $(PTHREAD_CFLAGS) +else +if WINDOWS +libutil_a_SOURCES += lib/util/threadpool.c +else +libutil_a_CPPFLAGS += -DNO_THREAD_IMPL +endif +endif + if CUSTOM_ALLOC libutil_a_SOURCES += lib/util/mempool.c include/mempool.h endif diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c new file mode 100644 index 0000000..245efe9 --- /dev/null +++ b/lib/util/threadpool.c @@ -0,0 +1,366 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "threadpool.h" +#include "util.h" + +#include <stdlib.h> +#include <string.h> + +#if defined(_WIN32) || defined(__WINDOWS__) +#include "w32threadwrap.h" + +#define THREAD_FUN(funname, argname) DWORD WINAPI funname(LPVOID argname) +#define THREAD_EXIT_SUCCESS (0) +#else +#include <pthread.h> +#include <signal.h> + +#define THREAD_FUN(funname, argname) void *funname(void *argname) +#define THREAD_EXIT_SUCCESS NULL +#endif + +typedef struct thread_pool_impl_t thread_pool_impl_t; + +typedef struct work_item_t { + struct work_item_t *next; + size_t ticket_number; + + void *data; +} work_item_t; + +typedef struct { + pthread_t thread; + thread_pool_impl_t *pool; + + thread_pool_worker_t fun; + void *user; +} worker_t; + +struct thread_pool_impl_t { + thread_pool_t base; + + pthread_mutex_t mtx; + pthread_cond_t queue_cond; + pthread_cond_t done_cond; + + size_t next_ticket; + size_t next_dequeue_ticket; + + size_t item_count; + + work_item_t *queue; + work_item_t *queue_last; + + work_item_t *done; + + work_item_t *recycle; + + int status; + + size_t num_workers; + worker_t workers[]; +}; + +/*****************************************************************************/ + +static void store_completed(thread_pool_impl_t *pool, work_item_t *done, + int status) +{ + work_item_t *it = pool->done, *prev = NULL; + + while (it != NULL) { + if (it->ticket_number >= done->ticket_number) + break; + + prev = it; + it = it->next; + } + + if (prev == NULL) { + done->next = pool->done; + pool->done = done; + } else { + done->next = prev->next; + prev->next = done; + } + + if (status != 0 && pool->status == 0) + pool->status = status; + + pthread_cond_broadcast(&pool->done_cond); +} + +static work_item_t *get_next_work_item(thread_pool_impl_t *pool) +{ + work_item_t *item = NULL; + + while (pool->queue == NULL && pool->status == 0) + pthread_cond_wait(&pool->queue_cond, &pool->mtx); + + if (pool->status == 0) { + item = pool->queue; + pool->queue = item->next; + item->next = NULL; + + if (pool->queue == NULL) + pool->queue_last = NULL; + } + + return item; +} + +static THREAD_FUN(worker_proc, arg) +{ + work_item_t *item = NULL; + worker_t *worker = arg; + int status = 0; + + for (;;) { + pthread_mutex_lock(&worker->pool->mtx); + if (item != NULL) + store_completed(worker->pool, item, status); + + item = get_next_work_item(worker->pool); + pthread_mutex_unlock(&worker->pool->mtx); + + if (item == NULL) + break; + + status = worker->fun(worker->user, item->data); + } + + return THREAD_EXIT_SUCCESS; +} + +/*****************************************************************************/ + +static void destroy(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + size_t i; + + pthread_mutex_lock(&pool->mtx); + pool->status = -1; + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + for (i = 0; i < pool->num_workers; ++i) + pthread_join(pool->workers[i].thread, NULL); + + pthread_cond_destroy(&pool->done_cond); + pthread_cond_destroy(&pool->queue_cond); + pthread_mutex_destroy(&pool->mtx); + + while (pool->queue != NULL) { + work_item_t *item = pool->queue; + pool->queue = item->next; + free(item); + } + + while (pool->done != NULL) { + work_item_t *item = pool->done; + pool->done = item->next; + free(item); + } + + while (pool->recycle != NULL) { + work_item_t *item = pool->recycle; + pool->recycle = item->next; + free(item); + } + + free(pool); +} + +static size_t get_worker_count(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + return pool->num_workers; +} + +static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + if (idx >= pool->num_workers) + return; + + pthread_mutex_lock(&pool->mtx); + pool->workers[idx].user = ptr; + pthread_mutex_unlock(&pool->mtx); +} + +static int submit(thread_pool_t *interface, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item = NULL; + int status; + + if (pool->recycle != NULL) { + item = pool->recycle; + pool->recycle = item->next; + item->next = NULL; + } + + if (item == NULL) { + item = calloc(1, sizeof(*item)); + if (item == NULL) + return -1; + } + + pthread_mutex_lock(&pool->mtx); + status = pool->status; + + if (status == 0) { + item->data = ptr; + item->ticket_number = pool->next_ticket++; + + if (pool->queue_last == NULL) { + pool->queue = item; + } else { + pool->queue_last->next = item; + } + + pool->queue_last = item; + pool->item_count += 1; + } + + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + if (status != 0) { + memset(item, 0, sizeof(*item)); + item->next = pool->recycle; + pool->recycle = item; + } + + return status; +} + +static void *dequeue(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *out = NULL; + void *ptr = NULL; + + if (pool->item_count == 0) + return NULL; + + /* dequeue */ + pthread_mutex_lock(&pool->mtx); + while (out == NULL) { + if (pool->done == NULL) { + pthread_cond_wait(&pool->done_cond, &pool->mtx); + continue; + } + + if (pool->done->ticket_number != pool->next_dequeue_ticket) { + pthread_cond_wait(&pool->done_cond, &pool->mtx); + continue; + } + + out = pool->done; + pool->done = out->next; + out->next = NULL; + pool->next_dequeue_ticket += 1; + } + pthread_mutex_unlock(&pool->mtx); + + /* get the data pointer, put the container in the recycle bin */ + ptr = out->data; + + out->ticket_number = 0; + out->data = NULL; + out->next = pool->recycle; + pool->recycle = out; + + pool->item_count -= 1; + return ptr; +} + +static int get_status(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + int status; + + pthread_mutex_lock(&pool->mtx); + status = pool->status; + pthread_mutex_unlock(&pool->mtx); + + return status; +} + +thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker) +{ + thread_pool_impl_t *pool; + thread_pool_t *interface; + sigset_t set, oldset; + size_t i, j; + int ret; + + if (num_jobs < 1) + num_jobs = 1; + + pool = alloc_flex(sizeof(*pool), sizeof(pool->workers[0]), num_jobs); + if (pool == NULL) + return NULL; + + if (pthread_mutex_init(&pool->mtx, NULL) != 0) + goto fail_free; + + if (pthread_cond_init(&pool->queue_cond, NULL) != 0) + goto fail_mtx; + + if (pthread_cond_init(&pool->done_cond, NULL) != 0) + goto fail_qcond; + + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, &oldset); + + pool->num_workers = num_jobs; + + for (i = 0; i < num_jobs; ++i) { + pool->workers[i].fun = worker; + pool->workers[i].pool = pool; + + ret = pthread_create(&pool->workers[i].thread, NULL, + worker_proc, pool->workers + i); + + if (ret != 0) + goto fail; + } + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + + interface = (thread_pool_t *)pool; + interface->destroy = destroy; + interface->get_worker_count = get_worker_count; + interface->set_worker_ptr = set_worker_ptr; + interface->submit = submit; + interface->dequeue = dequeue; + interface->get_status = get_status; + return interface; +fail: + pthread_mutex_lock(&pool->mtx); + pool->status = -1; + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + for (j = 0; j < i; ++j) + pthread_join(pool->workers[j].thread, NULL); + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + pthread_cond_destroy(&pool->done_cond); +fail_qcond: + pthread_cond_destroy(&pool->queue_cond); +fail_mtx: + pthread_mutex_destroy(&pool->mtx); +fail_free: + free(pool); + return NULL; +} diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c new file mode 100644 index 0000000..86eade8 --- /dev/null +++ b/lib/util/threadpool_serial.c @@ -0,0 +1,162 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool_serial.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "threadpool.h" +#include "util.h" + +#include <stdlib.h> +#include <string.h> + +typedef struct work_item_t { + struct work_item_t *next; + void *data; +} work_item_t; + +typedef struct { + thread_pool_t base; + + work_item_t *queue; + work_item_t *queue_last; + + work_item_t *recycle; + + thread_pool_worker_t fun; + void *user; + int status; +} thread_pool_impl_t; + +static void destroy(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + while (pool->queue != NULL) { + work_item_t *item = pool->queue; + pool->queue = item->next; + free(item); + } + + while (pool->recycle != NULL) { + work_item_t *item = pool->recycle; + pool->recycle = item->next; + free(item); + } + + free(pool); +} + +static size_t get_worker_count(thread_pool_t *pool) +{ + (void)pool; + return 1; +} + +static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + if (idx >= 1) + return; + + pool->user = ptr; +} + +static int submit(thread_pool_t *interface, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item = NULL; + + if (pool->status != 0) + return pool->status; + + if (pool->recycle != NULL) { + item = pool->recycle; + pool->recycle = item->next; + item->next = NULL; + } + + if (item == NULL) { + item = calloc(1, sizeof(*item)); + if (item == NULL) + return -1; + } + + item->data = ptr; + + if (pool->queue_last == NULL) { + pool->queue = item; + } else { + pool->queue_last->next = item; + } + + pool->queue_last = item; + return 0; +} + +static void *dequeue(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item; + void *ptr; + int ret; + + if (pool->queue == NULL) + return NULL; + + item = pool->queue; + pool->queue = item->next; + + if (pool->queue == NULL) + pool->queue_last = NULL; + + ptr = item->data; + + memset(item, 0, sizeof(*item)); + item->next = pool->recycle; + pool->recycle = item; + + ret = pool->fun(pool->user, ptr); + + if (ret != 0 && pool->status == 0) + pool->status = ret; + + return ptr; +} + +static int get_status(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + return pool->status; +} + +thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker) +{ + thread_pool_impl_t *pool = calloc(1, sizeof(*pool)); + thread_pool_t *interface = (thread_pool_t *)pool; + + if (pool == NULL) + return NULL; + + pool->fun = worker; + + interface = (thread_pool_t *)pool; + interface->destroy = destroy; + interface->get_worker_count = get_worker_count; + interface->set_worker_ptr = set_worker_ptr; + interface->submit = submit; + interface->dequeue = dequeue; + interface->get_status = get_status; + return interface; + +} + +#ifdef NO_THREAD_IMPL +thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker) +{ + (void)num_jobs; + return thread_pool_create_serial(worker); +} +#endif |