From 62e598c500b53902e3d97b62d879e6e9c7785d00 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 27 Nov 2022 03:07:28 +0100 Subject: Improve thread pool test Instead of sleeping in the worker thread, busy-loop-wait on a ticket counter to try and serialize the workers in backward order. Signed-off-by: David Oberhollenzer --- tests/libutil/Makemodule.am | 5 ++ tests/libutil/threadpool.c | 154 ++++++++++++++++++++++++++++++-------------- 2 files changed, 110 insertions(+), 49 deletions(-) (limited to 'tests/libutil') diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am index 724af50..c84f722 100644 --- a/tests/libutil/Makemodule.am +++ b/tests/libutil/Makemodule.am @@ -10,8 +10,13 @@ test_xxhash_LDADD = libutil.a libcompat.a test_threadpool_SOURCES = tests/libutil/threadpool.c test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) +test_threadpool_CPPFLAGS = $(AM_CPPFLAGS) test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS) +if HAVE_PTHREAD +test_threadpool_CPPFLAGS += -DHAVE_PTHREAD +endif + test_ismemzero_SOURCES = tests/libutil/is_memory_zero.c test_ismemzero_LDADD = libutil.a libcompat.a diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c index 8d1bda5..cf54484 100644 --- a/tests/libutil/threadpool.c +++ b/tests/libutil/threadpool.c @@ -13,59 +13,126 @@ #define WIN32_LEAN_AND_MEAN #define VC_EXTRALEAN #include -#endif -#include +static CRITICAL_SECTION mutex; +static unsigned int ticket; -static int worker(void *user, void *work_item) +static void ticket_init(void) { - unsigned int value = *((unsigned int *)work_item); - (void)user; + InitializeCriticalSection(&mutex); + ticket = 0; +} -#if defined(_WIN32) || defined(__WINDOWS__) - Sleep(100 * value); -#else - { - struct timespec sp; +static void ticket_cleanup(void) +{ + DeleteCriticalSection(&mutex); + ticket = 0; +} + +static void ticket_wait(unsigned int value) +{ + for (;;) { + EnterCriticalSection(&mutex); - sp.tv_sec = value / 10; - sp.tv_nsec = 100000000; - sp.tv_nsec *= (long)(value % 10); + if (value == ticket) { + ticket += 1; + LeaveCriticalSection(&mutex); + break; + } - nanosleep(&sp, NULL); + LeaveCriticalSection(&mutex); + SwitchToThread(); } +} +#elif defined(HAVE_PTHREAD) +#include + +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static unsigned int ticket; + +static void ticket_init(void) +{ + ticket = 0; +} + +static void ticket_cleanup(void) +{ +} + +static void ticket_wait(unsigned int value) +{ + for (;;) { + pthread_mutex_lock(&mutex); + + if (value == ticket) { + ticket += 1; + pthread_mutex_unlock(&mutex); + break; + } + + pthread_mutex_unlock(&mutex); + sched_yield(); + } +} +#else +static void ticket_init(void) +{ +} + +static void ticket_cleanup(void) +{ +} + +static void ticket_wait(unsigned int value) +{ + (void)value; +} #endif +static int worker(void *user, void *work_item) +{ + unsigned int value = *((unsigned int *)work_item); + (void)user; + + ticket_wait(value); + *((unsigned int *)work_item) = 42; return 0; } -int main(int argc, char **argv) +static int worker_serial(void *user, void *work_item) +{ + (void)user; + *((unsigned int *)work_item) = 42; + return 0; +} + +static void test_case(thread_pool_t *pool) { unsigned int values[10]; - thread_pool_t *pool; unsigned int *ptr; size_t i, count; int ret; - (void)argc; (void)argv; - - pool = thread_pool_create(10, worker); - TEST_NOT_NULL(pool); + /* must return a sane value */ count = pool->get_worker_count(pool); TEST_ASSERT(count >= 1); - /* dequeue on empty pool MUST NOT deadlock */ + /* dequeue on empty pool MUST NOT lock up */ ptr = pool->dequeue(pool); TEST_NULL(ptr); + /* submit work items in reverse order */ + ticket_init(); + for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - values[i] = sizeof(values) / sizeof(values[0]) - i; + values[i] = (sizeof(values) / sizeof(values[0]) - 1) - i; ret = pool->submit(pool, values + i); TEST_EQUAL_I(ret, 0); } + /* items must dequeue in the same order */ for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { ptr = pool->dequeue(pool); @@ -74,39 +141,28 @@ int main(int argc, char **argv) TEST_EQUAL_UI(*ptr, 42); } - ptr = pool->dequeue(pool); - TEST_NULL(ptr); - - pool->destroy(pool); - - /* redo the same test with the serial implementation */ - pool = thread_pool_create_serial(worker); - TEST_NOT_NULL(pool); + ticket_cleanup(); + /* queue now empty */ ptr = pool->dequeue(pool); TEST_NULL(ptr); +} - count = pool->get_worker_count(pool); - TEST_EQUAL_UI(count, 1); - - for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - values[i] = sizeof(values) / sizeof(values[0]) - i; - - ret = pool->submit(pool, values + i); - TEST_EQUAL_I(ret, 0); - } - - for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) { - ptr = pool->dequeue(pool); - - TEST_NOT_NULL(ptr); - TEST_ASSERT(ptr == (values + i)); - TEST_EQUAL_UI(*ptr, 42); - } +int main(int argc, char **argv) +{ + thread_pool_t *pool; + (void)argc; (void)argv; - ptr = pool->dequeue(pool); - TEST_NULL(ptr); + /* test the actual parallel implementation */ + pool = thread_pool_create(10, worker); + TEST_NOT_NULL(pool); + test_case(pool); + pool->destroy(pool); + /* repeate the test with the serial reference implementation */ + pool = thread_pool_create_serial(worker_serial); + TEST_NOT_NULL(pool); + test_case(pool); pool->destroy(pool); return EXIT_SUCCESS; } -- cgit v1.2.3