diff options
-rw-r--r-- | include/threadpool.h | 125 | ||||
-rw-r--r-- | include/w32threadwrap.h | 105 | ||||
-rw-r--r-- | lib/util/Makemodule.am | 18 | ||||
-rw-r--r-- | lib/util/threadpool.c | 366 | ||||
-rw-r--r-- | lib/util/threadpool_serial.c | 162 | ||||
-rw-r--r-- | tests/libutil/Makemodule.am | 6 | ||||
-rw-r--r-- | tests/libutil/threadpool.c | 111 |
7 files changed, 892 insertions, 1 deletions
diff --git a/include/threadpool.h b/include/threadpool.h new file mode 100644 index 0000000..f25c497 --- /dev/null +++ b/include/threadpool.h @@ -0,0 +1,125 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool.h + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include "sqfs/predef.h" + +typedef int (*thread_pool_worker_t)(void *user, void *work_item); + +/** + * @struct thread_pool_t + * + * @brief A thread pool with a ticket number based work item ordering. + * + * While the order in which items are non-deterministic, the thread pool + * implementation internally uses a ticket system to ensure the completed + * items are deqeueued in the same order that they were enqueued. + */ +typedef struct thread_pool_t { + /** + * @brief Shutdown and destroy a thread pool. + * + * @param pool A pointer to a pool returned by thread_pool_create + */ + void (*destroy)(struct thread_pool_t *pool); + + /** + * @brief Get the actual number of worker threads available. + * + * @return A number greater or equal to 1. + */ + size_t (*get_worker_count)(struct thread_pool_t *pool); + + /** + * @brief Change the user data pointer for a thread pool worker + * by index. + * + * @param idx A zero-based index into the worker list. + * @param ptr A user pointer that this specific worker thread should + * pass to the worker callback. + */ + void (*set_worker_ptr)(struct thread_pool_t *pool, size_t idx, + void *ptr); + + /** + * @brief Submit a work item to a thread pool. + * + * This function will fail on allocation failure or if the internal + * error state is set was set by one of the workers. + * + * @param ptr A pointer to a work object to enqueue. + * + * @return Zero on success. + */ + int (*submit)(struct thread_pool_t *pool, void *ptr); + + /** + * @brief Wait for a work item to be completed. + * + * This function dequeues a single completed work item. It may block + * until one of the worker threads signals completion of an additional + * item. + * + * This function guarantees to return the items in the same order as + * they were submitted, so the function can actually block longer than + * necessary, because it has to wait until the next item in sequence + * is finished. + * + * @return A pointer to a new work item or NULL if there are none + * in the pipeline. + */ + void *(*dequeue)(struct thread_pool_t *pool); + + /** + * @brief Get the internal worker return status value. + * + * If the worker functions returns a non-zero exit status in one of the + * worker threads, the thread pool stors the value internally and shuts + * down. This function can be used to retrieve the value. + * + * @return A non-zero value returned by the worker callback or zero if + * everything is A-OK. + */ + int (*get_status)(struct thread_pool_t *pool); +} thread_pool_t; + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Create a thread pool instance. + * + * @param num_jobs The number of worker threads to launch. + * @param worker A function to call from the worker threads to process + * the work items. + * + * @return A pointer to a thread pool on success, NULL on failure. + */ +SQFS_INTERNAL thread_pool_t *thread_pool_create(size_t num_jobs, + thread_pool_worker_t worker); + +/** + * @brief Create a serial mockup thread pool implementation. + * + * This returns a @ref thread_pool_t implementation that, instead of running a + * thread pool actually does the work in-situ when dequeueing. + * + * @param worker A function to call from the worker threads to process + * the work items. + * + * @return A pointer to a thread pool on success, NULL on failure. + */ +SQFS_INTERNAL +thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker); + +#ifdef __cplusplus +} +#endif + +#endif /* THREADPOOL_H */ diff --git a/include/w32threadwrap.h b/include/w32threadwrap.h new file mode 100644 index 0000000..6b7344c --- /dev/null +++ b/include/w32threadwrap.h @@ -0,0 +1,105 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * w32threadwrap.h + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#ifndef W32THREADWRAP_H +#define W32THREADWRAP_H + +#include "sqfs/predef.h" + +#define WIN32_LEAN_AND_MEAN +#define VC_EXTRALEAN +#include <windows.h> + +typedef unsigned int sigset_t; +typedef HANDLE pthread_t; +typedef CRITICAL_SECTION pthread_mutex_t; +typedef CONDITION_VARIABLE pthread_cond_t; + +static inline int pthread_create(pthread_t *thread, const void *attr, + LPTHREAD_START_ROUTINE proc, LPVOID arg) +{ + HANDLE hnd = CreateThread(NULL, 0, proc, arg, 0, 0); + (void)attr; + + if (hnd == NULL) + return -1; + + *thread = hnd; + return 0; +} + +static int pthread_join(pthread_t thread, void **retval) +{ + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + if (retval != NULL) + *retval = NULL; + return 0; +} + +static inline int pthread_mutex_init(pthread_mutex_t *mutex, const void *attr) +{ + (void)attr; + InitializeCriticalSection(mutex); + return 0; +} + +static inline int pthread_mutex_lock(pthread_mutex_t *mutex) +{ + EnterCriticalSection(mutex); + return 0; +} + +static inline int pthread_mutex_unlock(pthread_mutex_t *mutex) +{ + LeaveCriticalSection(mutex); + return 0; +} + +static inline void pthread_mutex_destroy(pthread_mutex_t *mutex) +{ + DeleteCriticalSection(mutex); +} + +static inline int pthread_cond_init(pthread_cond_t *cond, const void *attr) +{ + (void)attr; + InitializeConditionVariable(cond); + return 0; +} + +static inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx) +{ + return SleepConditionVariableCS(cond, mtx, INFINITE) != 0; +} + +static inline int pthread_cond_broadcast(pthread_cond_t *cond) +{ + WakeAllConditionVariable(cond); + return 0; +} + +static inline void pthread_cond_destroy(pthread_cond_t *cond) +{ + (void)cond; +} + +#define SIG_SETMASK (0) + +static inline int sigfillset(sigset_t *set) +{ + *set = 0xFFFFFFFF; + return 0; +} + +static inline int pthread_sigmask(int how, const sigset_t *set, + const sigset_t *old) +{ + (void)how; (void)set; (void)old; + return 0; +} + +#endif /* W32THREADWRAP_H */ diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am index 39352f4..113855c 100644 --- a/lib/util/Makemodule.am +++ b/lib/util/Makemodule.am @@ -4,9 +4,27 @@ libutil_a_SOURCES += lib/util/rbtree.c include/rbtree.h libutil_a_SOURCES += lib/util/array.c include/array.h libutil_a_SOURCES += lib/util/xxhash.c lib/util/hash_table.c libutil_a_SOURCES += lib/util/fast_urem_by_const.h +libutil_a_SOURCES += include/threadpool.h +libutil_a_SOURCES += include/w32threadwrap.h +libutil_a_SOURCES += lib/util/threadpool_serial.c libutil_a_CFLAGS = $(AM_CFLAGS) libutil_a_CPPFLAGS = $(AM_CPPFLAGS) +if WINDOWS +libutil_a_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600 +endif + +if HAVE_PTHREAD +libutil_a_SOURCES += lib/util/threadpool.c +libutil_a_CFLAGS += $(PTHREAD_CFLAGS) +else +if WINDOWS +libutil_a_SOURCES += lib/util/threadpool.c +else +libutil_a_CPPFLAGS += -DNO_THREAD_IMPL +endif +endif + if CUSTOM_ALLOC libutil_a_SOURCES += lib/util/mempool.c include/mempool.h endif diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c new file mode 100644 index 0000000..245efe9 --- /dev/null +++ b/lib/util/threadpool.c @@ -0,0 +1,366 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "threadpool.h" +#include "util.h" + +#include <stdlib.h> +#include <string.h> + +#if defined(_WIN32) || defined(__WINDOWS__) +#include "w32threadwrap.h" + +#define THREAD_FUN(funname, argname) DWORD WINAPI funname(LPVOID argname) +#define THREAD_EXIT_SUCCESS (0) +#else +#include <pthread.h> +#include <signal.h> + +#define THREAD_FUN(funname, argname) void *funname(void *argname) +#define THREAD_EXIT_SUCCESS NULL +#endif + +typedef struct thread_pool_impl_t thread_pool_impl_t; + +typedef struct work_item_t { + struct work_item_t *next; + size_t ticket_number; + + void *data; +} work_item_t; + +typedef struct { + pthread_t thread; + thread_pool_impl_t *pool; + + thread_pool_worker_t fun; + void *user; +} worker_t; + +struct thread_pool_impl_t { + thread_pool_t base; + + pthread_mutex_t mtx; + pthread_cond_t queue_cond; + pthread_cond_t done_cond; + + size_t next_ticket; + size_t next_dequeue_ticket; + + size_t item_count; + + work_item_t *queue; + work_item_t *queue_last; + + work_item_t *done; + + work_item_t *recycle; + + int status; + + size_t num_workers; + worker_t workers[]; +}; + +/*****************************************************************************/ + +static void store_completed(thread_pool_impl_t *pool, work_item_t *done, + int status) +{ + work_item_t *it = pool->done, *prev = NULL; + + while (it != NULL) { + if (it->ticket_number >= done->ticket_number) + break; + + prev = it; + it = it->next; + } + + if (prev == NULL) { + done->next = pool->done; + pool->done = done; + } else { + done->next = prev->next; + prev->next = done; + } + + if (status != 0 && pool->status == 0) + pool->status = status; + + pthread_cond_broadcast(&pool->done_cond); +} + +static work_item_t *get_next_work_item(thread_pool_impl_t *pool) +{ + work_item_t *item = NULL; + + while (pool->queue == NULL && pool->status == 0) + pthread_cond_wait(&pool->queue_cond, &pool->mtx); + + if (pool->status == 0) { + item = pool->queue; + pool->queue = item->next; + item->next = NULL; + + if (pool->queue == NULL) + pool->queue_last = NULL; + } + + return item; +} + +static THREAD_FUN(worker_proc, arg) +{ + work_item_t *item = NULL; + worker_t *worker = arg; + int status = 0; + + for (;;) { + pthread_mutex_lock(&worker->pool->mtx); + if (item != NULL) + store_completed(worker->pool, item, status); + + item = get_next_work_item(worker->pool); + pthread_mutex_unlock(&worker->pool->mtx); + + if (item == NULL) + break; + + status = worker->fun(worker->user, item->data); + } + + return THREAD_EXIT_SUCCESS; +} + +/*****************************************************************************/ + +static void destroy(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + size_t i; + + pthread_mutex_lock(&pool->mtx); + pool->status = -1; + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + for (i = 0; i < pool->num_workers; ++i) + pthread_join(pool->workers[i].thread, NULL); + + pthread_cond_destroy(&pool->done_cond); + pthread_cond_destroy(&pool->queue_cond); + pthread_mutex_destroy(&pool->mtx); + + while (pool->queue != NULL) { + work_item_t *item = pool->queue; + pool->queue = item->next; + free(item); + } + + while (pool->done != NULL) { + work_item_t *item = pool->done; + pool->done = item->next; + free(item); + } + + while (pool->recycle != NULL) { + work_item_t *item = pool->recycle; + pool->recycle = item->next; + free(item); + } + + free(pool); +} + +static size_t get_worker_count(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + return pool->num_workers; +} + +static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + if (idx >= pool->num_workers) + return; + + pthread_mutex_lock(&pool->mtx); + pool->workers[idx].user = ptr; + pthread_mutex_unlock(&pool->mtx); +} + +static int submit(thread_pool_t *interface, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item = NULL; + int status; + + if (pool->recycle != NULL) { + item = pool->recycle; + pool->recycle = item->next; + item->next = NULL; + } + + if (item == NULL) { + item = calloc(1, sizeof(*item)); + if (item == NULL) + return -1; + } + + pthread_mutex_lock(&pool->mtx); + status = pool->status; + + if (status == 0) { + item->data = ptr; + item->ticket_number = pool->next_ticket++; + + if (pool->queue_last == NULL) { + pool->queue = item; + } else { + pool->queue_last->next = item; + } + + pool->queue_last = item; + pool->item_count += 1; + } + + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + if (status != 0) { + memset(item, 0, sizeof(*item)); + item->next = pool->recycle; + pool->recycle = item; + } + + return status; +} + +static void *dequeue(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *out = NULL; + void *ptr = NULL; + + if (pool->item_count == 0) + return NULL; + + /* dequeue */ + pthread_mutex_lock(&pool->mtx); + while (out == NULL) { + if (pool->done == NULL) { + pthread_cond_wait(&pool->done_cond, &pool->mtx); + continue; + } + + if (pool->done->ticket_number != pool->next_dequeue_ticket) { + pthread_cond_wait(&pool->done_cond, &pool->mtx); + continue; + } + + out = pool->done; + pool->done = out->next; + out->next = NULL; + pool->next_dequeue_ticket += 1; + } + pthread_mutex_unlock(&pool->mtx); + + /* get the data pointer, put the container in the recycle bin */ + ptr = out->data; + + out->ticket_number = 0; + out->data = NULL; + out->next = pool->recycle; + pool->recycle = out; + + pool->item_count -= 1; + return ptr; +} + +static int get_status(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + int status; + + pthread_mutex_lock(&pool->mtx); + status = pool->status; + pthread_mutex_unlock(&pool->mtx); + + return status; +} + +thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker) +{ + thread_pool_impl_t *pool; + thread_pool_t *interface; + sigset_t set, oldset; + size_t i, j; + int ret; + + if (num_jobs < 1) + num_jobs = 1; + + pool = alloc_flex(sizeof(*pool), sizeof(pool->workers[0]), num_jobs); + if (pool == NULL) + return NULL; + + if (pthread_mutex_init(&pool->mtx, NULL) != 0) + goto fail_free; + + if (pthread_cond_init(&pool->queue_cond, NULL) != 0) + goto fail_mtx; + + if (pthread_cond_init(&pool->done_cond, NULL) != 0) + goto fail_qcond; + + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, &oldset); + + pool->num_workers = num_jobs; + + for (i = 0; i < num_jobs; ++i) { + pool->workers[i].fun = worker; + pool->workers[i].pool = pool; + + ret = pthread_create(&pool->workers[i].thread, NULL, + worker_proc, pool->workers + i); + + if (ret != 0) + goto fail; + } + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + + interface = (thread_pool_t *)pool; + interface->destroy = destroy; + interface->get_worker_count = get_worker_count; + interface->set_worker_ptr = set_worker_ptr; + interface->submit = submit; + interface->dequeue = dequeue; + interface->get_status = get_status; + return interface; +fail: + pthread_mutex_lock(&pool->mtx); + pool->status = -1; + pthread_cond_broadcast(&pool->queue_cond); + pthread_mutex_unlock(&pool->mtx); + + for (j = 0; j < i; ++j) + pthread_join(pool->workers[j].thread, NULL); + + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + pthread_cond_destroy(&pool->done_cond); +fail_qcond: + pthread_cond_destroy(&pool->queue_cond); +fail_mtx: + pthread_mutex_destroy(&pool->mtx); +fail_free: + free(pool); + return NULL; +} diff --git a/lib/util/threadpool_serial.c b/lib/util/threadpool_serial.c new file mode 100644 index 0000000..86eade8 --- /dev/null +++ b/lib/util/threadpool_serial.c @@ -0,0 +1,162 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool_serial.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "threadpool.h" +#include "util.h" + +#include <stdlib.h> +#include <string.h> + +typedef struct work_item_t { + struct work_item_t *next; + void *data; +} work_item_t; + +typedef struct { + thread_pool_t base; + + work_item_t *queue; + work_item_t *queue_last; + + work_item_t *recycle; + + thread_pool_worker_t fun; + void *user; + int status; +} thread_pool_impl_t; + +static void destroy(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + while (pool->queue != NULL) { + work_item_t *item = pool->queue; + pool->queue = item->next; + free(item); + } + + while (pool->recycle != NULL) { + work_item_t *item = pool->recycle; + pool->recycle = item->next; + free(item); + } + + free(pool); +} + +static size_t get_worker_count(thread_pool_t *pool) +{ + (void)pool; + return 1; +} + +static void set_worker_ptr(thread_pool_t *interface, size_t idx, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + if (idx >= 1) + return; + + pool->user = ptr; +} + +static int submit(thread_pool_t *interface, void *ptr) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item = NULL; + + if (pool->status != 0) + return pool->status; + + if (pool->recycle != NULL) { + item = pool->recycle; + pool->recycle = item->next; + item->next = NULL; + } + + if (item == NULL) { + item = calloc(1, sizeof(*item)); + if (item == NULL) + return -1; + } + + item->data = ptr; + + if (pool->queue_last == NULL) { + pool->queue = item; + } else { + pool->queue_last->next = item; + } + + pool->queue_last = item; + return 0; +} + +static void *dequeue(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + work_item_t *item; + void *ptr; + int ret; + + if (pool->queue == NULL) + return NULL; + + item = pool->queue; + pool->queue = item->next; + + if (pool->queue == NULL) + pool->queue_last = NULL; + + ptr = item->data; + + memset(item, 0, sizeof(*item)); + item->next = pool->recycle; + pool->recycle = item; + + ret = pool->fun(pool->user, ptr); + + if (ret != 0 && pool->status == 0) + pool->status = ret; + + return ptr; +} + +static int get_status(thread_pool_t *interface) +{ + thread_pool_impl_t *pool = (thread_pool_impl_t *)interface; + + return pool->status; +} + +thread_pool_t *thread_pool_create_serial(thread_pool_worker_t worker) +{ + thread_pool_impl_t *pool = calloc(1, sizeof(*pool)); + thread_pool_t *interface = (thread_pool_t *)pool; + + if (pool == NULL) + return NULL; + + pool->fun = worker; + + interface = (thread_pool_t *)pool; + interface->destroy = destroy; + interface->get_worker_count = get_worker_count; + interface->set_worker_ptr = set_worker_ptr; + interface->submit = submit; + interface->dequeue = dequeue; + interface->get_status = get_status; + return interface; + +} + +#ifdef NO_THREAD_IMPL +thread_pool_t *thread_pool_create(size_t num_jobs, thread_pool_worker_t worker) +{ + (void)num_jobs; + return thread_pool_create_serial(worker); +} +#endif diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am index 1153db4..1fe4ebf 100644 --- a/tests/libutil/Makemodule.am +++ b/tests/libutil/Makemodule.am @@ -8,8 +8,12 @@ test_rbtree_LDADD = libutil.a libcompat.a test_xxhash_SOURCES = tests/libutil/xxhash.c test_xxhash_LDADD = libutil.a libcompat.a +test_threadpool_SOURCES = tests/libutil/threadpool.c +test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) +test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS) + LIBUTIL_TESTS = \ - test_str_table test_rbtree test_xxhash + test_str_table test_rbtree test_xxhash test_threadpool check_PROGRAMS += $(LIBUTIL_TESTS) TESTS += $(LIBUTIL_TESTS) diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c new file mode 100644 index 0000000..566239f --- /dev/null +++ b/tests/libutil/threadpool.c @@ -0,0 +1,111 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * threadpool.c + * + * Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at> + */ +#include "config.h" + +#include "threadpool.h" +#include "../test.h" + +#if defined(_WIN32) || defined(__WINDOWS__) +#define WIN32_LEAN_AND_MEAN +#define VC_EXTRALEAN +#include <windows.h> +#endif + +#include <time.h> + +static int worker(void *user, void *work_item) +{ + unsigned int value = *((unsigned int *)work_item); + (void)user; + +#if defined(_WIN32) || defined(__WINDOWS__) + Sleep(100 * value); +#else + { + struct timespec sp; + + sp.tv_sec = value / 10; + sp.tv_nsec = 100000000; + sp.tv_nsec *= (long)(value % 10); + + nanosleep(&sp, NULL); + } +#endif + + *((unsigned int *)work_item) = 42; + return 0; +} + +int main(void) +{ + unsigned int values[10]; + thread_pool_t *pool; + unsigned int *ptr; + size_t i, count; + int ret; + + pool = thread_pool_create(10, worker); + TEST_NOT_NULL(pool); + + count = pool->get_worker_count(pool); + TEST_ASSERT(count >= 1); + + /* dequeue on empty pool MUST NOT deadlock */ + ptr = pool->dequeue(pool); + TEST_NULL(ptr); + + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { + values[i] = sizeof(values) / sizeof(values[0]) - i; + + ret = pool->submit(pool, values + i); + TEST_EQUAL_I(ret, 0); + } + + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { + ptr = pool->dequeue(pool); + + TEST_NOT_NULL(ptr); + TEST_ASSERT(ptr == (values + i)); + TEST_EQUAL_UI(*ptr, 42); + } + + ptr = pool->dequeue(pool); + TEST_NULL(ptr); + + pool->destroy(pool); + + /* redo the same test with the serial implementation */ + pool = thread_pool_create_serial(worker); + TEST_NOT_NULL(pool); + + ptr = pool->dequeue(pool); + TEST_NULL(ptr); + + count = pool->get_worker_count(pool); + TEST_EQUAL_UI(count, 1); + + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { + values[i] = sizeof(values) / sizeof(values[0]) - i; + + ret = pool->submit(pool, values + i); + TEST_EQUAL_I(ret, 0); + } + + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { + ptr = pool->dequeue(pool); + + TEST_NOT_NULL(ptr); + TEST_ASSERT(ptr == (values + i)); + TEST_EQUAL_UI(*ptr, 42); + } + + ptr = pool->dequeue(pool); + TEST_NULL(ptr); + + pool->destroy(pool); + return EXIT_SUCCESS; +} |