aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-19 15:32:47 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-20 17:04:22 +0100
commit3215679c2c87f3f40809f5eb75b5c679bcb4879d (patch)
treeed0e725fc884c721eeae847e3f54bfb0cb86072e
parent754e4f1bdeb2f89a04058c9f830a903a2a70c627 (diff)
Restructure thread pool block processor
Implement the io-queue based design as outline in doc/parallelism.txt Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/block_processor/common.c20
-rw-r--r--lib/sqfs/block_processor/internal.h4
-rw-r--r--lib/sqfs/block_processor/winpthread.c291
3 files changed, 212 insertions, 103 deletions
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index 674e4cf..ab5d1bc 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -21,8 +21,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
if (blk->flags & SQFS_BLK_IS_SPARSE) {
sqfs_inode_make_extended(blk->inode);
blk->inode->data.file_ext.sparse += blk->size;
- blk->inode->extra[blk->inode->num_file_blocks] = 0;
- blk->inode->num_file_blocks += 1;
+ blk->inode->extra[blk->index] = 0;
+
+ if (blk->index >= blk->inode->num_file_blocks)
+ blk->inode->num_file_blocks = blk->index + 1;
proc->stats.sparse_block_count += 1;
} else if (blk->size != 0) {
@@ -36,8 +38,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
if (err)
return err;
} else {
- blk->inode->extra[blk->inode->num_file_blocks] = size;
- blk->inode->num_file_blocks += 1;
+ blk->inode->extra[blk->index] = size;
+
+ if (blk->index >= blk->inode->num_file_blocks)
+ blk->inode->num_file_blocks = blk->index + 1;
proc->stats.data_block_count += 1;
}
@@ -95,8 +99,10 @@ int process_completed_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
if (frag->flags & SQFS_BLK_IS_SPARSE) {
sqfs_inode_make_extended(frag->inode);
frag->inode->data.file_ext.sparse += frag->size;
- frag->inode->extra[frag->inode->num_file_blocks] = 0;
- frag->inode->num_file_blocks += 1;
+ frag->inode->extra[frag->index] = 0;
+
+ if (frag->index >= frag->inode->num_file_blocks)
+ frag->inode->num_file_blocks = frag->index + 1;
proc->stats.sparse_block_count += 1;
return 0;
@@ -187,6 +193,7 @@ static int flush_block(sqfs_block_processor_t *proc)
proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK;
}
+ block->index = proc->blk_index++;
return append_to_work_queue(proc, block);
}
@@ -201,6 +208,7 @@ int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
proc->inode = inode;
proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK;
+ proc->blk_index = 0;
return 0;
}
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index 3c6570e..fb9c4a0 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -27,7 +27,8 @@ typedef struct sqfs_block_t {
struct sqfs_block_t *next;
sqfs_inode_generic_t *inode;
- sqfs_u32 sequence_number;
+ sqfs_u32 proc_seq_num;
+ sqfs_u32 io_seq_num;
sqfs_u32 flags;
sqfs_u32 size;
sqfs_u32 checksum;
@@ -51,6 +52,7 @@ struct sqfs_block_processor_t {
sqfs_inode_generic_t *inode;
sqfs_block_t *blk_current;
sqfs_u32 blk_flags;
+ sqfs_u32 blk_index;
size_t max_block_size;
};
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 0cb6d02..da69cb7 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -52,15 +52,19 @@ struct thread_pool_processor_t {
CONDITION_TYPE queue_cond;
CONDITION_TYPE done_cond;
- sqfs_block_t *queue;
- sqfs_block_t *queue_last;
+ sqfs_block_t *proc_queue;
+ sqfs_block_t *proc_queue_last;
+ sqfs_block_t *io_queue;
sqfs_block_t *done;
size_t backlog;
int status;
- sqfs_u32 enqueue_id;
- sqfs_u32 dequeue_id;
+ sqfs_u32 proc_enq_id;
+ sqfs_u32 proc_deq_id;
+
+ sqfs_u32 io_enq_id;
+ sqfs_u32 io_deq_id;
unsigned int num_workers;
size_t max_backlog;
@@ -83,16 +87,16 @@ static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared)
{
sqfs_block_t *blk = NULL;
- while (shared->queue == NULL && shared->status == 0)
+ while (shared->proc_queue == NULL && shared->status == 0)
AWAIT(&shared->queue_cond, &shared->mtx);
if (shared->status == 0) {
- blk = shared->queue;
- shared->queue = blk->next;
+ blk = shared->proc_queue;
+ shared->proc_queue = blk->next;
blk->next = NULL;
- if (shared->queue == NULL)
- shared->queue_last = NULL;
+ if (shared->proc_queue == NULL)
+ shared->proc_queue_last = NULL;
}
return blk;
@@ -104,7 +108,7 @@ static void store_completed_block(thread_pool_processor_t *shared,
sqfs_block_t *it = shared->done, *prev = NULL;
while (it != NULL) {
- if (it->sequence_number >= blk->sequence_number)
+ if (it->proc_seq_num >= blk->proc_seq_num)
break;
prev = it;
it = it->next;
@@ -177,7 +181,7 @@ static void block_processor_destroy(sqfs_object_t *obj)
}
DeleteCriticalSection(&proc->mtx);
- free_blk_list(proc->queue);
+ free_blk_list(proc->proc_queue);
free_blk_list(proc->done);
free(proc->base.blk_current);
free(proc->base.frag_block);
@@ -261,7 +265,7 @@ static void block_processor_destroy(sqfs_object_t *obj)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free_blk_list(proc->queue);
+ free_blk_list(proc->proc_queue);
free_blk_list(proc->done);
free(proc->base.blk_current);
free(proc->base.frag_block);
@@ -357,149 +361,244 @@ fail_init:
}
#endif
-static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc)
+static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)
+{
+ sqfs_block_t *it = proc->io_queue, *prev = NULL;
+
+ while (it != NULL && it->io_seq_num < blk->io_seq_num) {
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = proc->io_queue;
+ proc->io_queue = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ proc->backlog += 1;
+}
+
+static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc)
+{
+ sqfs_block_t *out;
+
+ if (proc->io_queue == NULL)
+ return NULL;
+
+ if (proc->io_queue->io_seq_num != proc->io_deq_id)
+ return NULL;
+
+ out = proc->io_queue;
+ proc->io_queue = out->next;
+ out->next = NULL;
+ proc->io_deq_id += 1;
+ proc->backlog -= 1;
+ return out;
+}
+
+static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc)
{
sqfs_block_t *out;
if (proc->done == NULL)
return NULL;
- if (proc->done->sequence_number != proc->dequeue_id)
+ if (proc->done->proc_seq_num != proc->proc_deq_id)
return NULL;
out = proc->done;
proc->done = out->next;
out->next = NULL;
- proc->dequeue_id += 1;
+ proc->proc_deq_id += 1;
proc->backlog -= 1;
return out;
}
-static int process_done_block(thread_pool_processor_t *proc, sqfs_block_t *it)
+static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block)
{
- sqfs_block_t *fragblk = NULL;
- int status = 0;
+ if (proc->proc_queue_last == NULL) {
+ proc->proc_queue = proc->proc_queue_last = block;
+ } else {
+ proc->proc_queue_last->next = block;
+ proc->proc_queue_last = block;
+ }
- if (it->flags & SQFS_BLK_IS_FRAGMENT) {
- status = process_completed_fragment(&proc->base, it, &fragblk);
+ block->proc_seq_num = proc->proc_enq_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+}
- if (fragblk != NULL && status == 0) {
- LOCK(&proc->mtx);
- proc->dequeue_id = it->sequence_number;
- fragblk->sequence_number = it->sequence_number;
+static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
+{
+ sqfs_block_t *it = list;
+ int status = 0;
- if (proc->queue == NULL) {
- proc->queue = fragblk;
- proc->queue_last = fragblk;
- } else {
- fragblk->next = proc->queue;
- proc->queue = fragblk;
- }
+ while (status == 0 && it != NULL) {
+ status = process_completed_block(&proc->base, it);
+ it = it->next;
- proc->backlog += 1;
+ if (status != 0) {
+ LOCK(&proc->mtx);
+ if (proc->status == 0)
+ proc->status = status;
SIGNAL_ALL(&proc->queue_cond);
UNLOCK(&proc->mtx);
- } else {
- free(fragblk);
}
- } else {
- status = process_completed_block(&proc->base, it);
}
- free(it);
return status;
}
-static int wait_completed(thread_pool_processor_t *proc)
+int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
- sqfs_block_t *blk;
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
+ sqfs_block_t *io_list = NULL, *io_list_last = NULL;
+ sqfs_block_t *blk, *fragblk, *free_list = NULL;
int status;
- LOCK(&proc->mtx);
+ LOCK(&thproc->mtx);
for (;;) {
- blk = try_dequeue(proc);
- status = proc->status;
-
- if (blk != NULL || status != 0)
+ status = thproc->status;
+ if (status != 0)
break;
- AWAIT(&proc->done_cond, &proc->mtx);
- }
- UNLOCK(&proc->mtx);
+ if (thproc->backlog < thproc->max_backlog) {
+ append_block(thproc, block);
+ block = NULL;
+ break;
+ }
- if (status != 0) {
- free(blk);
- return status;
- }
+ blk = try_dequeue_io(thproc);
+ if (blk != NULL) {
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+ continue;
+ }
- status = process_done_block(proc, blk);
+ blk = try_dequeue_done(thproc);
+ if (blk == NULL) {
+ AWAIT(&thproc->done_cond, &thproc->mtx);
+ continue;
+ }
- if (status != 0) {
- LOCK(&proc->mtx);
- if (proc->status == 0) {
- proc->status = status;
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ thproc->status = process_completed_fragment(proc, blk,
+ &fragblk);
+ blk->next = free_list;
+ free_list = blk;
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = thproc->io_enq_id++;
+ append_block(thproc, fragblk);
+ SIGNAL_ALL(&thproc->queue_cond);
+ }
} else {
- status = proc->status;
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = thproc->io_enq_id++;
+ store_io_block(thproc, blk);
}
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
}
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
+ free(block);
+
+ if (status == 0)
+ status = handle_io_queue(thproc, io_list);
+
+ free_blk_list(io_list);
+ free_blk_list(free_list);
return status;
}
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
+int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
+ sqfs_block_t *io_list = NULL, *io_list_last = NULL;
+ sqfs_block_t *blk, *fragblk, *free_list = NULL;
int status;
- while (thproc->backlog > thproc->max_backlog) {
- status = wait_completed(thproc);
- if (status)
- return status;
- }
-
LOCK(&thproc->mtx);
- status = thproc->status;
- if (status != 0)
- goto out;
+ for (;;) {
+ status = thproc->status;
+ if (status != 0)
+ break;
- if (block != NULL) {
- if (thproc->queue_last == NULL) {
- thproc->queue = thproc->queue_last = block;
- } else {
- thproc->queue_last->next = block;
- thproc->queue_last = block;
+ if (thproc->backlog == 0)
+ break;
+
+ blk = try_dequeue_io(thproc);
+ if (blk != NULL) {
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+ continue;
+ }
+
+ blk = try_dequeue_done(thproc);
+ if (blk == NULL) {
+ AWAIT(&thproc->done_cond, &thproc->mtx);
+ continue;
}
- block->sequence_number = thproc->enqueue_id++;
- block->next = NULL;
- thproc->backlog += 1;
- block = NULL;
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ thproc->status = process_completed_fragment(proc, blk,
+ &fragblk);
+ blk->next = free_list;
+ free_list = blk;
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = thproc->io_enq_id++;
+ append_block(thproc, fragblk);
+ SIGNAL_ALL(&thproc->queue_cond);
+ }
+ } else {
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = thproc->io_enq_id++;
+ store_io_block(thproc, blk);
+ }
}
-out:
- SIGNAL_ALL(&thproc->queue_cond);
UNLOCK(&thproc->mtx);
- free(block);
- return 0;
-}
-
-int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
-{
- thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
- int status = 0;
-
- while (status == 0 && thproc->backlog > 0)
- status = wait_completed(thproc);
if (status == 0 && proc->frag_block != NULL) {
- status = append_to_work_queue(proc, proc->frag_block);
+ blk = proc->frag_block;
proc->frag_block = NULL;
- if (status)
- return status;
+ status = block_processor_do_block(blk, proc->cmp,
+ thproc->workers[0]->scratch,
+ proc->max_block_size);
- status = wait_completed(thproc);
+ if (io_list_last == NULL) {
+ io_list = io_list_last = blk;
+ } else {
+ io_list_last->next = blk;
+ io_list_last = blk;
+ }
+
+ if (status != 0) {
+ LOCK(&thproc->mtx);
+ if (thproc->status == 0)
+ thproc->status = status;
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
+ }
}
+ if (status == 0)
+ status = handle_io_queue(thproc, io_list);
+
+ free_blk_list(io_list);
+ free_blk_list(free_list);
return status;
}