diff options
Diffstat (limited to 'lib/util/threadpool_serial.c')
-rw-r--r-- | lib/util/threadpool_serial.c | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c new file mode 100644 index 0000000..86eade8 --- /dev/null +++ b/lib/util/threadpool_serial.c @@ -0,0 +1,162 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool_serial.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "threadpool.h" +#include "util.h" + +#include <stdlib.h> +#include <string.h> + +typedef struct work_item_t { + struct work_item_t *next; + void *data; +} work_item_t; + +typedef struct { + thread_pool_t base; + + work_item_t *queue; + work_item_t *queue_last; + + work_item_t *recycle; + + thread_pool_worker_t fun; + void *user; + int status; +} thread_pool_impl_t; + +static void destroy(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + while (pool->queue != NULL) { + work_item_t *item = pool->queue; + pool->queue = item->next; + free(item); + } + + while (pool->recycle != NULL) { + work_item_t *item = pool->recycle; + pool->recycle = item->next; + free(item); + } + + free(pool); +} + +static size_t get_worker_count(thread_pool_t *pool) +{ + (void)pool; + return 1; +} + +static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + if (idx >= 1) + return; + + pool->user = ptr; +} + +static int submit(thread_pool_t *interface, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item = NULL; + + if (pool->status != 0) + return pool->status; + + if (pool->recycle != NULL) { + item = pool->recycle; + pool->recycle = item->next; + item->next = NULL; + } + + if (item == NULL) { + item = calloc(1, sizeof(*item)); + if (item == NULL) + return -1; + } + + item->data = ptr; + + if (pool->queue_last == NULL) { + pool->queue = item; + } else { + pool->queue_last->next = item; + } + + pool->queue_last = item; + return 0; +} + +static void *dequeue(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item; + void *ptr; + int ret; + + if (pool->queue == NULL) + return NULL; + + item = pool->queue; + pool->queue = item->next; + + if (pool->queue == NULL) + pool->queue_last = NULL; + + ptr = item->data; + + memset(item, 0, sizeof(*item)); + item->next = pool->recycle; + pool->recycle = item; + + ret = pool->fun(pool->user, ptr); + + if (ret != 0 && pool->status == 0) + pool->status = ret; + + return ptr; +} + +static int get_status(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + return pool->status; +} + +thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker) +{ + thread_pool_impl_t *pool = calloc(1, sizeof(*pool)); + thread_pool_t *interface = (thread_pool_t *)pool; + + if (pool == NULL) + return NULL; + + pool->fun = worker; + + interface = (thread_pool_t *)pool; + interface->destroy = destroy; + interface->get_worker_count = get_worker_count; + interface->set_worker_ptr = set_worker_ptr; + interface->submit = submit; + interface->dequeue = dequeue; + interface->get_status = get_status; + return interface; + +} + +#ifdef NO_THREAD_IMPL +thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker) +{ + (void)num_jobs; + return thread_pool_create_serial(worker); +} +#endif |