summaryrefslogtreecommitdiff
path: root/lib/util/threadpool.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-22 20:59:32 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-22 21:09:54 +0100
commit13c804062e9acc7f74850477679265e19704e65a (patch)
treecee1797ede03bfe9e6feb6e794c4d5a509b1840c /lib/util/threadpool.c
parente8cc53ad5fbac2cb2f578091125dc506f136bfde (diff)
Threadpool: pre-emtively dequeue items after enqueing
When we already hold the mutex, try to pre-emtively dequeue items into a "safe queue". When actually asked to dequeue, take blocks from there first and avoid having to enter the critical section if possible. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/util/threadpool.c')
-rw-r--r--lib/util/threadpool.c94
1 files changed, 60 insertions, 34 deletions
diff --git a/lib/util/threadpool.c b/lib/util/threadpool.c
index 245efe9..d20fe8b 100644
--- a/lib/util/threadpool.c
+++ b/lib/util/threadpool.c
@@ -57,6 +57,9 @@ struct thread_pool_impl_t {
work_item_t *done;
+ work_item_t *safe_done;
+ work_item_t *safe_done_last;
+
work_item_t *recycle;
int status;
@@ -138,6 +141,32 @@ static THREAD_FUN(worker_proc, arg)
/*****************************************************************************/
+static work_item_t *try_dequeue_done(thread_pool_impl_t *pool)
+{
+ work_item_t *out;
+
+ if (pool->done == NULL)
+ return NULL;
+
+ if (pool->done->ticket_number != pool->next_dequeue_ticket)
+ return NULL;
+
+ out = pool->done;
+ pool->done = out->next;
+ out->next = NULL;
+ pool->next_dequeue_ticket += 1;
+ return out;
+}
+
+static void free_item_list(work_item_t *list)
+{
+ while (list != NULL) {
+ work_item_t *item = list;
+ list = list->next;
+ free(item);
+ }
+}
+
static void destroy(thread_pool_t *interface)
{
thread_pool_impl_t *pool = (thread_pool_impl_t *)interface;
@@ -155,24 +184,10 @@ static void destroy(thread_pool_t *interface)
pthread_cond_destroy(&pool->queue_cond);
pthread_mutex_destroy(&pool->mtx);
- while (pool->queue != NULL) {
- work_item_t *item = pool->queue;
- pool->queue = item->next;
- free(item);
- }
-
- while (pool->done != NULL) {
- work_item_t *item = pool->done;
- pool->done = item->next;
- free(item);
- }
-
- while (pool->recycle != NULL) {
- work_item_t *item = pool->recycle;
- pool->recycle = item->next;
- free(item);
- }
-
+ free_item_list(pool->queue);
+ free_item_list(pool->done);
+ free_item_list(pool->recycle);
+ free_item_list(pool->safe_done);
free(pool);
}
@@ -230,6 +245,20 @@ static int submit(thread_pool_t *interface, void *ptr)
pool->item_count += 1;
}
+ for (;;) {
+ work_item_t *done = try_dequeue_done(pool);
+ if (done == NULL)
+ break;
+
+ if (pool->safe_done_last == NULL) {
+ pool->safe_done = done;
+ } else {
+ pool->safe_done_last->next = done;
+ }
+
+ pool->safe_done_last = done;
+ }
+
pthread_cond_broadcast(&pool->queue_cond);
pthread_mutex_unlock(&pool->mtx);
@@ -251,27 +280,24 @@ static void *dequeue(thread_pool_t *interface)
if (pool->item_count == 0)
return NULL;
- /* dequeue */
- pthread_mutex_lock(&pool->mtx);
- while (out == NULL) {
- if (pool->done == NULL) {
- pthread_cond_wait(&pool->done_cond, &pool->mtx);
- continue;
- }
+ if (pool->safe_done != NULL) {
+ out = pool->safe_done;
+
+ pool->safe_done = pool->safe_done->next;
+ if (pool->safe_done == NULL)
+ pool->safe_done_last = NULL;
+ } else {
+ pthread_mutex_lock(&pool->mtx);
+ for (;;) {
+ out = try_dequeue_done(pool);
+ if (out != NULL)
+ break;
- if (pool->done->ticket_number != pool->next_dequeue_ticket) {
pthread_cond_wait(&pool->done_cond, &pool->mtx);
- continue;
}
-
- out = pool->done;
- pool->done = out->next;
- out->next = NULL;
- pool->next_dequeue_ticket += 1;
+ pthread_mutex_unlock(&pool->mtx);
}
- pthread_mutex_unlock(&pool->mtx);
- /* get the data pointer, put the container in the recycle bin */
ptr = out->data;
out->ticket_number = 0;