aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2022-11-27 03:07:28 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2022-11-27 03:08:33 +0100
commit62e598c500b53902e3d97b62d879e6e9c7785d00 (patch)
treecc3469f8637dab9957aff0fcad4be55b21677f1c
parent2ff19f084f3a50272606583c27eb8e2bb01fe75f (diff)
Improve thread pool test
Instead of sleeping in the worker thread, busy-loop-wait on a ticket counter to try and serialize the workers in backward order. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--tests/libutil/Makemodule.am5
-rw-r--r--tests/libutil/threadpool.c154
2 files changed, 110 insertions, 49 deletions
diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am
index 724af50..c84f722 100644
--- a/tests/libutil/Makemodule.am
+++ b/tests/libutil/Makemodule.am
@@ -10,8 +10,13 @@ test_xxhash_LDADD = libutil.a libcompat.a
test_threadpool_SOURCES = tests/libutil/threadpool.c
test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
+test_threadpool_CPPFLAGS = $(AM_CPPFLAGS)
test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS)
+if HAVE_PTHREAD
+test_threadpool_CPPFLAGS += -DHAVE_PTHREAD
+endif
+
test_ismemzero_SOURCES = tests/libutil/is_memory_zero.c
test_ismemzero_LDADD = libutil.a libcompat.a
diff --git a/tests/libutil/threadpool.c b/tests/libutil/threadpool.c
index 8d1bda5..cf54484 100644
--- a/tests/libutil/threadpool.c
+++ b/tests/libutil/threadpool.c
@@ -13,59 +13,126 @@
#define WIN32_LEAN_AND_MEAN
#define VC_EXTRALEAN
#include <windows.h>
-#endif
-#include <time.h>
+static CRITICAL_SECTION mutex;
+static unsigned int ticket;
-static int worker(void *user, void *work_item)
+static void ticket_init(void)
{
- unsigned int value = *((unsigned int *)work_item);
- (void)user;
+ InitializeCriticalSection(&mutex);
+ ticket = 0;
+}
-#if defined(_WIN32) || defined(__WINDOWS__)
- Sleep(100 * value);
-#else
- {
- struct timespec sp;
+static void ticket_cleanup(void)
+{
+ DeleteCriticalSection(&mutex);
+ ticket = 0;
+}
+
+static void ticket_wait(unsigned int value)
+{
+ for (;;) {
+ EnterCriticalSection(&mutex);
- sp.tv_sec = value / 10;
- sp.tv_nsec = 100000000;
- sp.tv_nsec *= (long)(value % 10);
+ if (value == ticket) {
+ ticket += 1;
+ LeaveCriticalSection(&mutex);
+ break;
+ }
- nanosleep(&sp, NULL);
+ LeaveCriticalSection(&mutex);
+ SwitchToThread();
}
+}
+#elif defined(HAVE_PTHREAD)
+#include <pthread.h>
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static unsigned int ticket;
+
+static void ticket_init(void)
+{
+ ticket = 0;
+}
+
+static void ticket_cleanup(void)
+{
+}
+
+static void ticket_wait(unsigned int value)
+{
+ for (;;) {
+ pthread_mutex_lock(&mutex);
+
+ if (value == ticket) {
+ ticket += 1;
+ pthread_mutex_unlock(&mutex);
+ break;
+ }
+
+ pthread_mutex_unlock(&mutex);
+ sched_yield();
+ }
+}
+#else
+static void ticket_init(void)
+{
+}
+
+static void ticket_cleanup(void)
+{
+}
+
+static void ticket_wait(unsigned int value)
+{
+ (void)value;
+}
#endif
+static int worker(void *user, void *work_item)
+{
+ unsigned int value = *((unsigned int *)work_item);
+ (void)user;
+
+ ticket_wait(value);
+
*((unsigned int *)work_item) = 42;
return 0;
}
-int main(int argc, char **argv)
+static int worker_serial(void *user, void *work_item)
+{
+ (void)user;
+ *((unsigned int *)work_item) = 42;
+ return 0;
+}
+
+static void test_case(thread_pool_t *pool)
{
unsigned int values[10];
- thread_pool_t *pool;
unsigned int *ptr;
size_t i, count;
int ret;
- (void)argc; (void)argv;
-
- pool = thread_pool_create(10, worker);
- TEST_NOT_NULL(pool);
+ /* must return a sane value */
count = pool->get_worker_count(pool);
TEST_ASSERT(count >= 1);
- /* dequeue on empty pool MUST NOT deadlock */
+ /* dequeue on empty pool MUST NOT lock up */
ptr = pool->dequeue(pool);
TEST_NULL(ptr);
+ /* submit work items in reverse order */
+ ticket_init();
+
for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
- values[i] = sizeof(values) / sizeof(values[0]) - i;
+ values[i] = (sizeof(values) / sizeof(values[0]) - 1) - i;
ret = pool->submit(pool, values + i);
TEST_EQUAL_I(ret, 0);
}
+ /* items must dequeue in the same order */
for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
ptr = pool->dequeue(pool);
@@ -74,39 +141,28 @@ int main(int argc, char **argv)
TEST_EQUAL_UI(*ptr, 42);
}
- ptr = pool->dequeue(pool);
- TEST_NULL(ptr);
-
- pool->destroy(pool);
-
- /* redo the same test with the serial implementation */
- pool = thread_pool_create_serial(worker);
- TEST_NOT_NULL(pool);
+ ticket_cleanup();
+ /* queue now empty */
ptr = pool->dequeue(pool);
TEST_NULL(ptr);
+}
- count = pool->get_worker_count(pool);
- TEST_EQUAL_UI(count, 1);
-
- for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
- values[i] = sizeof(values) / sizeof(values[0]) - i;
-
- ret = pool->submit(pool, values + i);
- TEST_EQUAL_I(ret, 0);
- }
-
- for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
- ptr = pool->dequeue(pool);
-
- TEST_NOT_NULL(ptr);
- TEST_ASSERT(ptr == (values + i));
- TEST_EQUAL_UI(*ptr, 42);
- }
+int main(int argc, char **argv)
+{
+ thread_pool_t *pool;
+ (void)argc; (void)argv;
- ptr = pool->dequeue(pool);
- TEST_NULL(ptr);
+ /* test the actual parallel implementation */
+ pool = thread_pool_create(10, worker);
+ TEST_NOT_NULL(pool);
+ test_case(pool);
+ pool->destroy(pool);
+ /* repeate the test with the serial reference implementation */
+ pool = thread_pool_create_serial(worker_serial);
+ TEST_NOT_NULL(pool);
+ test_case(pool);
pool->destroy(pool);
return EXIT_SUCCESS;
}