diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-21 16:59:08 +0100 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-21 17:29:18 +0100 |
commit | bb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 (patch) | |
tree | 62ec813c654f0962adc7048e849e6bb196b22430 /lib/sqfs/block_processor/frontend.c | |
parent | a18f724aa3bf57aeed285b5f61eca4a0ba891c21 (diff) |
Cleanup: Rewrite block processor to use the libutil thread_pool_t
Throw out the messy thread pool implementation and temporarily also
remove the exact fragment matching for simplicity.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/frontend.c')
-rw-r--r-- | lib/sqfs/block_processor/frontend.c | 189 |
1 files changed, 177 insertions, 12 deletions
diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c index f6aeffa..8bd6cf2 100644 --- a/lib/sqfs/block_processor/frontend.c +++ b/lib/sqfs/block_processor/frontend.c @@ -24,26 +24,159 @@ static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc) return blk; } +static int dequeue_block(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk, *fragblk, *it, *prev; + bool have_dequeued = false; + int status; +retry: + while (proc->io_queue != NULL) { + if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) + break; + + blk = proc->io_queue; + proc->io_queue = blk->next; + proc->io_deq_seq_num += 1; + proc->backlog -= 1; + have_dequeued = true; + + status = process_completed_block(proc, blk); + if (status != 0) + return status; + } + + if (have_dequeued) + return 0; + + blk = proc->pool->dequeue(proc->pool); + + if (blk == NULL) { + status = proc->pool->get_status(proc->pool); + if (status == 0) + status = SQFS_ERROR_INTERNAL; + + return status; + } + + proc->backlog -= 1; + have_dequeued = true; + + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + fragblk = NULL; + status = process_completed_fragment(proc, blk, &fragblk); + + if (status != 0) { + free(fragblk); + return status; + } + + if (fragblk != NULL) { + fragblk->io_seq_num = proc->io_seq_num++; + + if (proc->pool->submit(proc->pool, fragblk) != 0) { + free(fragblk); + + if (status == 0) { + status = proc->pool-> + get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + } + + return status; + } + + proc->backlog += 1; + have_dequeued = false; + } + } else { + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) + blk->io_seq_num = proc->io_seq_num++; + + prev = NULL; + it = proc->io_queue; + + while (it != NULL) { + if (it->io_seq_num >= blk->io_seq_num) + break; + + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = proc->io_queue; + proc->io_queue = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } + + proc->backlog += 1; + have_dequeued = false; + } + + if (!have_dequeued) + goto retry; + + return 0; +} + +static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + int status; + + if (proc->pool->submit(proc->pool, blk) != 0) { + status = proc->pool->get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + + free(blk); + return status; + } + + proc->backlog += 1; + return 0; +} + static int add_sentinel_block(sqfs_block_processor_t *proc) { - sqfs_block_t *blk = get_new_block(proc); + sqfs_block_t *blk; + int ret; + + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + blk = get_new_block(proc); if (blk == NULL) return SQFS_ERROR_ALLOC; blk->inode = proc->inode; blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return proc->append_to_work_queue(proc, blk); + return enqueue_block(proc, blk); } static int flush_block(sqfs_block_processor_t *proc) { - sqfs_block_t *block = proc->blk_current; + sqfs_block_t *block; + int ret; + + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + block = proc->blk_current; proc->blk_current = NULL; - return proc->append_to_work_queue(proc, block); + return enqueue_block(proc, block); } int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, @@ -177,6 +310,7 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, size_t size) { sqfs_block_t *blk; + int ret; if (proc->begin_called) return SQFS_ERROR_SEQUENCE; @@ -187,6 +321,12 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, if (flags & ~SQFS_BLK_FLAGS_ALL) return SQFS_ERROR_UNSUPPORTED; + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + blk = get_new_block(proc); if (blk == NULL) return SQFS_ERROR_ALLOC; @@ -196,12 +336,25 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, blk->size = size; memcpy(blk->data, data, size); - return proc->append_to_work_queue(proc, blk); + ret = proc->pool->submit(proc->pool, blk); + if (ret != 0) + free(blk); + + proc->backlog += 1; + return ret; } int sqfs_block_processor_sync(sqfs_block_processor_t *proc) { - return proc->sync(proc); + int ret; + + while (proc->backlog > 0) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + + return 0; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) @@ -209,18 +362,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) sqfs_block_t *blk; int status; - status = proc->sync(proc); + status = sqfs_block_processor_sync(proc); + if (status != 0) + return status; - if (status == 0 && proc->frag_block != NULL) { + if (proc->frag_block != NULL) { blk = proc->frag_block; blk->next = NULL; - blk->flags |= BLK_FLAG_MANUAL_SUBMISSION; proc->frag_block = NULL; - status = proc->append_to_work_queue(proc, blk); + blk->io_seq_num = proc->io_seq_num++; - if (status == 0) - status = proc->sync(proc); + status = proc->pool->submit(proc->pool, blk); + if (status != 0) { + status = proc->pool->get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + + free(blk); + return status; + } + + proc->backlog += 1; + status = sqfs_block_processor_sync(proc); } return status; |