diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2022-11-27 03:07:28 +0100 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2022-11-27 03:08:33 +0100 |
commit | 62e598c500b53902e3d97b62d879e6e9c7785d00 (patch) | |
tree | cc3469f8637dab9957aff0fcad4be55b21677f1c /tests/libutil | |
parent | 2ff19f084f3a50272606583c27eb8e2bb01fe75f (diff) |
Improve thread pool test
Instead of sleeping in the worker thread, busy-loop-wait on a ticket
counter to try and serialize the workers in backward order.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'tests/libutil')
-rw-r--r-- | tests/libutil/Makemodule.am | 5 | ||||
-rw-r--r-- | tests/libutil/threadpool.c | 154 |
2 files changed, 110 insertions, 49 deletions
diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am index 724af50..c84f722 100644 --- a/tests/libutil/Makemodule.am +++ b/tests/libutil/Makemodule.am @@ -10,8 +10,13 @@ test_xxhash_LDADD = libutil.a libcompat.a test_threadpool_SOURCES = tests/libutil/threadpool.c test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) +test_threadpool_CPPFLAGS = $(AM_CPPFLAGS) test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS) +if HAVE_PTHREAD +test_threadpool_CPPFLAGS += -DHAVE_PTHREAD +endif + test_ismemzero_SOURCES = tests/libutil/is_memory_zero.c test_ismemzero_LDADD = libutil.a libcompat.a diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c index 8d1bda5..cf54484 100644 --- a/tests/libutil/threadpool.c +++ b/tests/libutil/threadpool.c @@ -13,59 +13,126 @@ #define WIN32_LEAN_AND_MEAN #define VC_EXTRALEAN #include <windows.h> -#endif -#include <time.h> +static CRITICAL_SECTION mutex; +static unsigned int ticket; -static int worker(void *user, void *work_item) +static void ticket_init(void) { - unsigned int value = *((unsigned int *)work_item); - (void)user; + InitializeCriticalSection(&mutex); + ticket = 0; +} -#if defined(_WIN32) || defined(__WINDOWS__) - Sleep(100 * value); -#else - { - struct timespec sp; +static void ticket_cleanup(void) +{ + DeleteCriticalSection(&mutex); + ticket = 0; +} + +static void ticket_wait(unsigned int value) +{ + for (;;) { + EnterCriticalSection(&mutex); - sp.tv_sec = value / 10; - sp.tv_nsec = 100000000; - sp.tv_nsec *= (long)(value % 10); + if (value == ticket) { + ticket += 1; + LeaveCriticalSection(&mutex); + break; + } - nanosleep(&sp, NULL); + LeaveCriticalSection(&mutex); + SwitchToThread(); } +} +#elif defined(HAVE_PTHREAD) +#include <pthread.h> + +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static unsigned int ticket; + +static void ticket_init(void) +{ + ticket = 0; +} + +static void ticket_cleanup(void) +{ +} + +static void ticket_wait(unsigned int value) +{ + for (;;) { + pthread_mutex_lock(&mutex); + + if (value == ticket) { + ticket += 1; + pthread_mutex_unlock(&mutex); + break; + } + + pthread_mutex_unlock(&mutex); + sched_yield(); + } +} +#else +static void ticket_init(void) +{ +} + +static void ticket_cleanup(void) +{ +} + +static void ticket_wait(unsigned int value) +{ + (void)value; +} #endif +static int worker(void *user, void *work_item) +{ + unsigned int value = *((unsigned int *)work_item); + (void)user; + + ticket_wait(value); + *((unsigned int *)work_item) = 42; return 0; } -int main(int argc, char **argv) +static int worker_serial(void *user, void *work_item) +{ + (void)user; + *((unsigned int *)work_item) = 42; + return 0; +} + +static void test_case(thread_pool_t *pool) { unsigned int values[10]; - thread_pool_t *pool; unsigned int *ptr; size_t i, count; int ret; - (void)argc; (void)argv; - - pool = thread_pool_create(10, worker); - TEST_NOT_NULL(pool); + /* must return a sane value */ count = pool->get_worker_count(pool); TEST_ASSERT(count >= 1); - /* dequeue on empty pool MUST NOT deadlock */ + /* dequeue on empty pool MUST NOT lock up */ ptr = pool->dequeue(pool); TEST_NULL(ptr); + /* submit work items in reverse order */ + ticket_init(); + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - values[i] = sizeof(values) / sizeof(values[0]) - i; + values[i] = (sizeof(values) / sizeof(values[0]) - 1) - i; ret = pool->submit(pool, values + i); TEST_EQUAL_I(ret, 0); } + /* items must dequeue in the same order */ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { ptr = pool->dequeue(pool); @@ -74,39 +141,28 @@ int main(int argc, char **argv) TEST_EQUAL_UI(*ptr, 42); } - ptr = pool->dequeue(pool); - TEST_NULL(ptr); - - pool->destroy(pool); - - /* redo the same test with the serial implementation */ - pool = thread_pool_create_serial(worker); - TEST_NOT_NULL(pool); + ticket_cleanup(); + /* queue now empty */ ptr = pool->dequeue(pool); TEST_NULL(ptr); +} - count = pool->get_worker_count(pool); - TEST_EQUAL_UI(count, 1); - - for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - values[i] = sizeof(values) / sizeof(values[0]) - i; - - ret = pool->submit(pool, values + i); - TEST_EQUAL_I(ret, 0); - } - - for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - ptr = pool->dequeue(pool); - - TEST_NOT_NULL(ptr); - TEST_ASSERT(ptr == (values + i)); - TEST_EQUAL_UI(*ptr, 42); - } +int main(int argc, char **argv) +{ + thread_pool_t *pool; + (void)argc; (void)argv; - ptr = pool->dequeue(pool); - TEST_NULL(ptr); + /* test the actual parallel implementation */ + pool = thread_pool_create(10, worker); + TEST_NOT_NULL(pool); + test_case(pool); + pool->destroy(pool); + /* repeate the test with the serial reference implementation */ + pool = thread_pool_create_serial(worker_serial); + TEST_NOT_NULL(pool); + test_case(pool); pool->destroy(pool); return EXIT_SUCCESS; } |