aboutsummaryrefslogtreecommitdiff
path: root/lib/util/threadpool_serial.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/util/threadpool_serial.c')
-rw-r--r--lib/util/threadpool_serial.c162
1 files changed, 162 insertions, 0 deletions
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c
new file mode 100644
index 0000000..86eade8
--- /dev/null
+++ b/lib/util/threadpool_serial.c
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * threadpool_serial.c
+ *
+ * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "threadpool.h"
+#include "util.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct work_item_t {
+ struct work_item_t *next;
+ void *data;
+} work_item_t;
+
+typedef struct {
+ thread_pool_t base;
+
+ work_item_t *queue;
+ work_item_t *queue_last;
+
+ work_item_t *recycle;
+
+ thread_pool_worker_t fun;
+ void *user;
+ int status;
+} thread_pool_impl_t;
+
+static void destroy(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ while (pool->queue != NULL) {
+ work_item_t *item = pool->queue;
+ pool->queue = item->next;
+ free(item);
+ }
+
+ while (pool->recycle != NULL) {
+ work_item_t *item = pool->recycle;
+ pool->recycle = item->next;
+ free(item);
+ }
+
+ free(pool);
+}
+
+static size_t get_worker_count(thread_pool_t *pool)
+{
+ (void)pool;
+ return 1;
+}
+
+static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ if (idx >= 1)
+ return;
+
+ pool->user = ptr;
+}
+
+static int submit(thread_pool_t *interface, void *ptr)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item = NULL;
+
+ if (pool->status != 0)
+ return pool->status;
+
+ if (pool->recycle != NULL) {
+ item = pool->recycle;
+ pool->recycle = item->next;
+ item->next = NULL;
+ }
+
+ if (item == NULL) {
+ item = calloc(1, sizeof(*item));
+ if (item == NULL)
+ return -1;
+ }
+
+ item->data = ptr;
+
+ if (pool->queue_last == NULL) {
+ pool->queue = item;
+ } else {
+ pool->queue_last->next = item;
+ }
+
+ pool->queue_last = item;
+ return 0;
+}
+
+static void *dequeue(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+ work_item_t *item;
+ void *ptr;
+ int ret;
+
+ if (pool->queue == NULL)
+ return NULL;
+
+ item = pool->queue;
+ pool->queue = item->next;
+
+ if (pool->queue == NULL)
+ pool->queue_last = NULL;
+
+ ptr = item->data;
+
+ memset(item, 0, sizeof(*item));
+ item->next = pool->recycle;
+ pool->recycle = item;
+
+ ret = pool->fun(pool->user, ptr);
+
+ if (ret != 0 && pool->status == 0)
+ pool->status = ret;
+
+ return ptr;
+}
+
+static int get_status(thread_pool_t *interface)
+{
+ thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
+
+ return pool->status;
+}
+
+thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker)
+{
+ thread_pool_impl_t *pool = calloc(1, sizeof(*pool));
+ thread_pool_t *interface = (thread_pool_t *)pool;
+
+ if (pool == NULL)
+ return NULL;
+
+ pool->fun = worker;
+
+ interface = (thread_pool_t *)pool;
+ interface->destroy = destroy;
+ interface->get_worker_count = get_worker_count;
+ interface->set_worker_ptr = set_worker_ptr;
+ interface->submit = submit;
+ interface->dequeue = dequeue;
+ interface->get_status = get_status;
+ return interface;
+
+}
+
+#ifdef NO_THREAD_IMPL
+thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker)
+{
+ (void)num_jobs;
+ return thread_pool_create_serial(worker);
+}
+#endif