summaryrefslogtreecommitdiff
path: root/lib/util/threadpool_serial.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-20 16:46:22 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 17:29:18 +0100
commita18f724aa3bf57aeed285b5f61eca4a0ba891c21 (patch)
treece90fb7fd9494f340efc3416ab769353d480f82b /lib/util/threadpool_serial.c
parent977aa1d0d29b5b48b31279d7709a7209001ee309 (diff)
Add a thread pool implementation to libutil
The thread pool enforces ordering of items during dequeue similar to the already existing implementation in libsqfs. The idea is to eventually pull this functionality out of the block processor and turn it into a cleaner, separately tested module. The thread pool is implemented as an abstract interface, so we can have multiple implementations around, including the serial fallback implementation which we can then *always* test, irregardless of the compile config and run through static analysis as well. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/util/threadpool_serial.c')
-rw-r--r--lib/util/threadpool_serial.c162
1 files changed, 162 insertions, 0 deletions
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c
new file mode 100644
index 0000000..86eade8
--- /dev/null
+++ b/lib/util/threadpool_serial.c
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool_serial.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ void *data;
+} work_item_t;
+
+typedef struct {
+ thread_pool_t base;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *recycle;
+
+ thread_pool_worker_t fun;
+ void *user;
+ int status;
+} thread_pool_impl_t;
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *pool)
+{
+ (void)pool;
+ return 1;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= 1)
+ return;
+
+ pool->user = ptr;
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+
+ if (pool->status != 0)
+ return pool->status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ item->data = ptr;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ return 0;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item;
+ void *ptr;
+ int ret;
+
+ if (pool->queue == NULL)
+ return NULL;
+
+ item = pool->queue;
+ pool->queue = item->next;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+
+ ptr = item->data;
+
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+
+ ret = pool->fun(pool->user, ptr);
+
+ if (ret != 0 && pool->status == 0)
+ pool->status = ret;
+
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->status;
+}
+
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool = calloc(1, sizeof(*pool));
+ thread_pool_t *interface = (thread_pool_t *)pool;
+
+ if (pool == NULL)
+ return NULL;
+
+ pool->fun = worker;
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+
+}
+
+#ifdef NO_THREAD_IMPL
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ (void)num_jobs;
+ return thread_pool_create_serial(worker);
+}
+#endif