summaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor/winpthread.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-19 15:32:47 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-20 17:04:22 +0100
commit3215679c2c87f3f40809f5eb75b5c679bcb4879d (patch)
treeed0e725fc884c721eeae847e3f54bfb0cb86072e /lib/sqfs/block_processor/winpthread.c
parent754e4f1bdeb2f89a04058c9f830a903a2a70c627 (diff)
Restructure thread pool block processor
Implement the io-queue based design as outline in doc/parallelism.txt Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/winpthread.c')
-rw-r--r--lib/sqfs/block_processor/winpthread.c291
1 files changed, 195 insertions, 96 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 0cb6d02..da69cb7 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -52,15 +52,19 @@ struct thread_pool_processor_t {
CONDITION_TYPE queue_cond;
CONDITION_TYPE done_cond;
- sqfs_block_t *queue;
- sqfs_block_t *queue_last;
+ sqfs_block_t *proc_queue;
+ sqfs_block_t *proc_queue_last;
+ sqfs_block_t *io_queue;
sqfs_block_t *done;
size_t backlog;
int status;
- sqfs_u32 enqueue_id;
- sqfs_u32 dequeue_id;
+ sqfs_u32 proc_enq_id;
+ sqfs_u32 proc_deq_id;
+
+ sqfs_u32 io_enq_id;
+ sqfs_u32 io_deq_id;
unsigned int num_workers;
size_t max_backlog;
@@ -83,16 +87,16 @@ static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared)
{
sqfs_block_t *blk = NULL;
- while (shared->queue == NULL && shared->status == 0)
+ while (shared->proc_queue == NULL && shared->status == 0)
AWAIT(&shared->queue_cond, &shared->mtx);
if (shared->status == 0) {
- blk = shared->queue;
- shared->queue = blk->next;
+ blk = shared->proc_queue;
+ shared->proc_queue = blk->next;
blk->next = NULL;
- if (shared->queue == NULL)
- shared->queue_last = NULL;
+ if (shared->proc_queue == NULL)
+ shared->proc_queue_last = NULL;
}
return blk;
@@ -104,7 +108,7 @@ static void store_completed_block(thread_pool_processor_t *shared,
sqfs_block_t *it = shared->done, *prev = NULL;
while (it != NULL) {
- if (it->sequence_number >= blk->sequence_number)
+ if (it->proc_seq_num >= blk->proc_seq_num)
break;
prev = it;
it = it->next;
@@ -177,7 +181,7 @@ static void block_processor_destroy(sqfs_object_t *obj)
}
DeleteCriticalSection(&proc->mtx);
- free_blk_list(proc->queue);
+ free_blk_list(proc->proc_queue);
free_blk_list(proc->done);
free(proc->base.blk_current);
free(proc->base.frag_block);
@@ -261,7 +265,7 @@ static void block_processor_destroy(sqfs_object_t *obj)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free_blk_list(proc->queue);
+ free_blk_list(proc->proc_queue);
free_blk_list(proc->done);
free(proc->base.blk_current);
free(proc->base.frag_block);
@@ -357,149 +361,244 @@ fail_init:
}
#endif
-static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc)
+static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)
+{
+ sqfs_block_t *it = proc->io_queue, *prev = NULL;
+
+ while (it != NULL && it->io_seq_num < blk->io_seq_num) {
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = proc->io_queue;
+ proc->io_queue = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ proc->backlog += 1;
+}
+
+static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc)
+{
+ sqfs_block_t *out;
+
+ if (proc->io_queue == NULL)
+ return NULL;
+
+ if (proc->io_queue->io_seq_num != proc->io_deq_id)
+ return NULL;
+
+ out = proc->io_queue;
+ proc->io_queue = out->next;
+ out->next = NULL;
+ proc->io_deq_id += 1;
+ proc->backlog -= 1;
+ return out;
+}
+
+static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc)
{
sqfs_block_t *out;
if (proc->done == NULL)
return NULL;
- if (proc->done->sequence_number != proc->dequeue_id)
+ if (proc->done->proc_seq_num != proc->proc_deq_id)
return NULL;
out = proc->done;
proc->done = out->next;
out->next = NULL;
- proc->dequeue_id += 1;
+ proc->proc_deq_id += 1;
proc->backlog -= 1;
return out;
}
-static int process_done_block(thread_pool_processor_t *proc, sqfs_block_t *it)
+static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block)
{
- sqfs_block_t *fragblk = NULL;
- int status = 0;
+ if (proc->proc_queue_last == NULL) {
+ proc->proc_queue = proc->proc_queue_last = block;
+ } else {
+ proc->proc_queue_last->next = block;
+ proc->proc_queue_last = block;
+ }
- if (it->flags & SQFS_BLK_IS_FRAGMENT) {
- status = process_completed_fragment(&proc->base, it, &fragblk);
+ block->proc_seq_num = proc->proc_enq_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+}
- if (fragblk != NULL && status == 0) {
- LOCK(&proc->mtx);
- proc->dequeue_id = it->sequence_number;
- fragblk->sequence_number = it->sequence_number;
+static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
+{
+ sqfs_block_t *it = list;
+ int status = 0;
- if (proc->queue == NULL) {
- proc->queue = fragblk;
- proc->queue_last = fragblk;
- } else {
- fragblk->next = proc->queue;
- proc->queue = fragblk;
- }
+ while (status == 0 && it != NULL) {
+ status = process_completed_block(&proc->base, it);
+ it = it->next;
- proc->backlog += 1;
+ if (status != 0) {
+ LOCK(&proc->mtx);
+ if (proc->status == 0)
+ proc->status = status;
SIGNAL_ALL(&proc->queue_cond);
UNLOCK(&proc->mtx);
- } else {
- free(fragblk);
}
- } else {
- status = process_completed_block(&proc->base, it);
}
- free(it);
return status;
}
-static int wait_completed(thread_pool_processor_t *proc)
+int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
- sqfs_block_t *blk;
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
+ sqfs_block_t *io_list = NULL, *io_list_last = NULL;
+ sqfs_block_t *blk, *fragblk, *free_list = NULL;
int status;
- LOCK(&proc->mtx);
+ LOCK(&thproc->mtx);
for (;;) {
- blk = try_dequeue(proc);
- status = proc->status;
-
- if (blk != NULL || status != 0)
+ status = thproc->status;
+ if (status != 0)
break;
- AWAIT(&proc->done_cond, &proc->mtx);
- }
- UNLOCK(&proc->mtx);
+ if (thproc->backlog < thproc->max_backlog) {
+ append_block(thproc, block);
+ block = NULL;
+ break;
+ }
- if (status != 0) {
- free(blk);
- return status;
- }
+ blk = try_dequeue_io(thproc);
+ if (blk != NULL) {
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+ continue;
+ }
- status = process_done_block(proc, blk);
+ blk = try_dequeue_done(thproc);
+ if (blk == NULL) {
+ AWAIT(&thproc->done_cond, &thproc->mtx);
+ continue;
+ }
- if (status != 0) {
- LOCK(&proc->mtx);
- if (proc->status == 0) {
- proc->status = status;
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ thproc->status = process_completed_fragment(proc, blk,
+ &fragblk);
+ blk->next = free_list;
+ free_list = blk;
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = thproc->io_enq_id++;
+ append_block(thproc, fragblk);
+ SIGNAL_ALL(&thproc->queue_cond);
+ }
} else {
- status = proc->status;
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = thproc->io_enq_id++;
+ store_io_block(thproc, blk);
}
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
}
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
+ free(block);
+
+ if (status == 0)
+ status = handle_io_queue(thproc, io_list);
+
+ free_blk_list(io_list);
+ free_blk_list(free_list);
return status;
}
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
+int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
+ sqfs_block_t *io_list = NULL, *io_list_last = NULL;
+ sqfs_block_t *blk, *fragblk, *free_list = NULL;
int status;
- while (thproc->backlog > thproc->max_backlog) {
- status = wait_completed(thproc);
- if (status)
- return status;
- }
-
LOCK(&thproc->mtx);
- status = thproc->status;
- if (status != 0)
- goto out;
+ for (;;) {
+ status = thproc->status;
+ if (status != 0)
+ break;
- if (block != NULL) {
- if (thproc->queue_last == NULL) {
- thproc->queue = thproc->queue_last = block;
- } else {
- thproc->queue_last->next = block;
- thproc->queue_last = block;
+ if (thproc->backlog == 0)
+ break;
+
+ blk = try_dequeue_io(thproc);
+ if (blk != NULL) {
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+ continue;
+ }
+
+ blk = try_dequeue_done(thproc);
+ if (blk == NULL) {
+ AWAIT(&thproc->done_cond, &thproc->mtx);
+ continue;
}
- block->sequence_number = thproc->enqueue_id++;
- block->next = NULL;
- thproc->backlog += 1;
- block = NULL;
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ thproc->status = process_completed_fragment(proc, blk,
+ &fragblk);
+ blk->next = free_list;
+ free_list = blk;
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = thproc->io_enq_id++;
+ append_block(thproc, fragblk);
+ SIGNAL_ALL(&thproc->queue_cond);
+ }
+ } else {
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = thproc->io_enq_id++;
+ store_io_block(thproc, blk);
+ }
}
-out:
- SIGNAL_ALL(&thproc->queue_cond);
UNLOCK(&thproc->mtx);
- free(block);
- return 0;
-}
-
-int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
-{
- thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
- int status = 0;
-
- while (status == 0 && thproc->backlog > 0)
- status = wait_completed(thproc);
if (status == 0 && proc->frag_block != NULL) {
- status = append_to_work_queue(proc, proc->frag_block);
+ blk = proc->frag_block;
proc->frag_block = NULL;
- if (status)
- return status;
+ status = block_processor_do_block(blk, proc->cmp,
+ thproc->workers[0]->scratch,
+ proc->max_block_size);
- status = wait_completed(thproc);
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+
+ if (status != 0) {
+ LOCK(&thproc->mtx);
+ if (thproc->status == 0)
+ thproc->status = status;
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
+ }
}
+ if (status == 0)
+ status = handle_io_queue(thproc, io_list);
+
+ free_blk_list(io_list);
+ free_blk_list(free_list);
return status;
}