From f274c6a208bb329298f83f05d0f9fe8e1a8b5423 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Thu, 12 Dec 2019 14:23:17 +0100 Subject: Don't kick off the threads until the queue is full The idea is as follows: Initially let the user submit blocks until the queue is filled, then kick of the threads. Every thread will end up getting a block without any waits until they completely deplete the queue. Assuming the threads take longer to process the data than it takes the main thread to do I/O and submit new blocks, the queue should stay mostly filled with minimal wait times. Signed-off-by: David Oberhollenzer --- lib/sqfs/data_writer/internal.h | 1 + lib/sqfs/data_writer/pthread.c | 31 ++++++++++++++++++++----------- 2 files changed, 21 insertions(+), 11 deletions(-) (limited to 'lib/sqfs/data_writer') diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h index ea8fe79..24f520e 100644 --- a/lib/sqfs/data_writer/internal.h +++ b/lib/sqfs/data_writer/internal.h @@ -99,6 +99,7 @@ struct sqfs_data_writer_t { const sqfs_block_hooks_t *hooks; void *user_ptr; + bool notify_threads; /* file API */ sqfs_inode_generic_t *inode; diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c index 97114c5..cfd2d3e 100644 --- a/lib/sqfs/data_writer/pthread.c +++ b/lib/sqfs/data_writer/pthread.c @@ -155,7 +155,7 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) } static int append_to_work_queue(sqfs_data_writer_t *proc, - sqfs_block_t *block) + sqfs_block_t *block, bool signal_threads) { int status; @@ -167,17 +167,21 @@ static int append_to_work_queue(sqfs_data_writer_t *proc, return status; } - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; + if (block != NULL) { + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; } - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - proc->backlog += 1; - pthread_cond_broadcast(&proc->queue_cond); + if (signal_threads) + pthread_cond_broadcast(&proc->queue_cond); pthread_mutex_unlock(&proc->mtx); return 0; } @@ -327,13 +331,18 @@ int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) return status; } - return append_to_work_queue(proc, block); + if (proc->backlog == proc->max_backlog) + proc->notify_threads = true; + + return append_to_work_queue(proc, block, proc->notify_threads); } int sqfs_data_writer_finish(sqfs_data_writer_t *proc) { int status; + append_to_work_queue(proc, NULL, true); + while (proc->backlog > 0) { status = wait_completed(proc); if (status) -- cgit v1.2.3