aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor/frontend.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 16:59:08 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 17:29:18 +0100
commitbb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 (patch)
tree62ec813c654f0962adc7048e849e6bb196b22430 /lib/sqfs/block_processor/frontend.c
parenta18f724aa3bf57aeed285b5f61eca4a0ba891c21 (diff)
Cleanup: Rewrite block processor to use the libutil thread_pool_t
Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/frontend.c')
-rw-r--r--lib/sqfs/block_processor/frontend.c189
1 files changed, 177 insertions, 12 deletions
diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c
index f6aeffa..8bd6cf2 100644
--- a/lib/sqfs/block_processor/frontend.c
+++ b/lib/sqfs/block_processor/frontend.c
@@ -24,26 +24,159 @@ static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc)
return blk;
}
+static int dequeue_block(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk, *fragblk, *it, *prev;
+ bool have_dequeued = false;
+ int status;
+retry:
+ while (proc->io_queue != NULL) {
+ if (proc->io_queue->io_seq_num != proc->io_deq_seq_num)
+ break;
+
+ blk = proc->io_queue;
+ proc->io_queue = blk->next;
+ proc->io_deq_seq_num += 1;
+ proc->backlog -= 1;
+ have_dequeued = true;
+
+ status = process_completed_block(proc, blk);
+ if (status != 0)
+ return status;
+ }
+
+ if (have_dequeued)
+ return 0;
+
+ blk = proc->pool->dequeue(proc->pool);
+
+ if (blk == NULL) {
+ status = proc->pool->get_status(proc->pool);
+ if (status == 0)
+ status = SQFS_ERROR_INTERNAL;
+
+ return status;
+ }
+
+ proc->backlog -= 1;
+ have_dequeued = true;
+
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ status = process_completed_fragment(proc, blk, &fragblk);
+
+ if (status != 0) {
+ free(fragblk);
+ return status;
+ }
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = proc->io_seq_num++;
+
+ if (proc->pool->submit(proc->pool, fragblk) != 0) {
+ free(fragblk);
+
+ if (status == 0) {
+ status = proc->pool->
+ get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+ }
+
+ return status;
+ }
+
+ proc->backlog += 1;
+ have_dequeued = false;
+ }
+ } else {
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = proc->io_seq_num++;
+
+ prev = NULL;
+ it = proc->io_queue;
+
+ while (it != NULL) {
+ if (it->io_seq_num >= blk->io_seq_num)
+ break;
+
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = proc->io_queue;
+ proc->io_queue = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ proc->backlog += 1;
+ have_dequeued = false;
+ }
+
+ if (!have_dequeued)
+ goto retry;
+
+ return 0;
+}
+
+static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
+{
+ int status;
+
+ if (proc->pool->submit(proc->pool, blk) != 0) {
+ status = proc->pool->get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+
+ free(blk);
+ return status;
+ }
+
+ proc->backlog += 1;
+ return 0;
+}
+
static int add_sentinel_block(sqfs_block_processor_t *proc)
{
- sqfs_block_t *blk = get_new_block(proc);
+ sqfs_block_t *blk;
+ int ret;
+
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+ blk = get_new_block(proc);
if (blk == NULL)
return SQFS_ERROR_ALLOC;
blk->inode = proc->inode;
blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK;
- return proc->append_to_work_queue(proc, blk);
+ return enqueue_block(proc, blk);
}
static int flush_block(sqfs_block_processor_t *proc)
{
- sqfs_block_t *block = proc->blk_current;
+ sqfs_block_t *block;
+ int ret;
+
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+ block = proc->blk_current;
proc->blk_current = NULL;
- return proc->append_to_work_queue(proc, block);
+ return enqueue_block(proc, block);
}
int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
@@ -177,6 +310,7 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
size_t size)
{
sqfs_block_t *blk;
+ int ret;
if (proc->begin_called)
return SQFS_ERROR_SEQUENCE;
@@ -187,6 +321,12 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
if (flags & ~SQFS_BLK_FLAGS_ALL)
return SQFS_ERROR_UNSUPPORTED;
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+
blk = get_new_block(proc);
if (blk == NULL)
return SQFS_ERROR_ALLOC;
@@ -196,12 +336,25 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
blk->size = size;
memcpy(blk->data, data, size);
- return proc->append_to_work_queue(proc, blk);
+ ret = proc->pool->submit(proc->pool, blk);
+ if (ret != 0)
+ free(blk);
+
+ proc->backlog += 1;
+ return ret;
}
int sqfs_block_processor_sync(sqfs_block_processor_t *proc)
{
- return proc->sync(proc);
+ int ret;
+
+ while (proc->backlog > 0) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+
+ return 0;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
@@ -209,18 +362,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
sqfs_block_t *blk;
int status;
- status = proc->sync(proc);
+ status = sqfs_block_processor_sync(proc);
+ if (status != 0)
+ return status;
- if (status == 0 && proc->frag_block != NULL) {
+ if (proc->frag_block != NULL) {
blk = proc->frag_block;
blk->next = NULL;
- blk->flags |= BLK_FLAG_MANUAL_SUBMISSION;
proc->frag_block = NULL;
- status = proc->append_to_work_queue(proc, blk);
+ blk->io_seq_num = proc->io_seq_num++;
- if (status == 0)
- status = proc->sync(proc);
+ status = proc->pool->submit(proc->pool, blk);
+ if (status != 0) {
+ status = proc->pool->get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+
+ free(blk);
+ return status;
+ }
+
+ proc->backlog += 1;
+ status = sqfs_block_processor_sync(proc);
}
return status;