aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-12 14:23:17 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-12 16:43:01 +0100
commitf274c6a208bb329298f83f05d0f9fe8e1a8b5423 (patch)
treef709fc72cdb07a0f059bf69e6db2439bf330c780 /lib
parent6d4faedcb53f54253160f1717fac609f922ae0c7 (diff)
Don't kick off the threads until the queue is full
The idea is as follows: Initially let the user submit blocks until the queue is filled, then kick of the threads. Every thread will end up getting a block without any waits until they completely deplete the queue. Assuming the threads take longer to process the data than it takes the main thread to do I/O and submit new blocks, the queue should stay mostly filled with minimal wait times. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib')
-rw-r--r--lib/sqfs/data_writer/internal.h1
-rw-r--r--lib/sqfs/data_writer/pthread.c31
2 files changed, 21 insertions, 11 deletions
diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h
index ea8fe79..24f520e 100644
--- a/lib/sqfs/data_writer/internal.h
+++ b/lib/sqfs/data_writer/internal.h
@@ -99,6 +99,7 @@ struct sqfs_data_writer_t {
const sqfs_block_hooks_t *hooks;
void *user_ptr;
+ bool notify_threads;
/* file API */
sqfs_inode_generic_t *inode;
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c
index 97114c5..cfd2d3e 100644
--- a/lib/sqfs/data_writer/pthread.c
+++ b/lib/sqfs/data_writer/pthread.c
@@ -155,7 +155,7 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
}
static int append_to_work_queue(sqfs_data_writer_t *proc,
- sqfs_block_t *block)
+ sqfs_block_t *block, bool signal_threads)
{
int status;
@@ -167,17 +167,21 @@ static int append_to_work_queue(sqfs_data_writer_t *proc,
return status;
}
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
+ if (block != NULL) {
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
+ }
+
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
}
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
- proc->backlog += 1;
- pthread_cond_broadcast(&proc->queue_cond);
+ if (signal_threads)
+ pthread_cond_broadcast(&proc->queue_cond);
pthread_mutex_unlock(&proc->mtx);
return 0;
}
@@ -327,13 +331,18 @@ int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
return status;
}
- return append_to_work_queue(proc, block);
+ if (proc->backlog == proc->max_backlog)
+ proc->notify_threads = true;
+
+ return append_to_work_queue(proc, block, proc->notify_threads);
}
int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
{
int status;
+ append_to_work_queue(proc, NULL, true);
+
while (proc->backlog > 0) {
status = wait_completed(proc);
if (status)