aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-18 18:32:33 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-18 20:24:19 +0100
commit754e4f1bdeb2f89a04058c9f830a903a2a70c627 (patch)
tree7b0e663f39f294df54c61411fa0da2c20c77f519
parent7bfb01489bdc11ceb7a79f38e2c90b61232f605f (diff)
Simplify the thread pool block processor somewhat
- Split the worker function up into smaller functions that are a little more readable. - Only dequeue one block at a time. Makes the dequeueing a lot more readable and understandable. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/block_processor/winpthread.c209
1 files changed, 88 insertions, 121 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index aa1a99f..0cb6d02 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -79,52 +79,64 @@ static void free_blk_list(sqfs_block_t *list)
}
}
+static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared)
+{
+ sqfs_block_t *blk = NULL;
+
+ while (shared->queue == NULL && shared->status == 0)
+ AWAIT(&shared->queue_cond, &shared->mtx);
+
+ if (shared->status == 0) {
+ blk = shared->queue;
+ shared->queue = blk->next;
+ blk->next = NULL;
+
+ if (shared->queue == NULL)
+ shared->queue_last = NULL;
+ }
+
+ return blk;
+}
+
+static void store_completed_block(thread_pool_processor_t *shared,
+ sqfs_block_t *blk, int status)
+{
+ sqfs_block_t *it = shared->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->sequence_number >= blk->sequence_number)
+ break;
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = shared->done;
+ shared->done = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ if (status != 0 && shared->status == 0)
+ shared->status = status;
+
+ SIGNAL_ALL(&shared->done_cond);
+}
+
static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
thread_pool_processor_t *shared = worker->shared;
- sqfs_block_t *it, *prev, *blk = NULL;
+ sqfs_block_t *blk = NULL;
int status = 0;
for (;;) {
LOCK(&shared->mtx);
- if (blk != NULL) {
- it = shared->done;
- prev = NULL;
-
- while (it != NULL) {
- if (it->sequence_number >= blk->sequence_number)
- break;
- prev = it;
- it = it->next;
- }
+ if (blk != NULL)
+ store_completed_block(shared, blk, status);
- if (prev == NULL) {
- blk->next = shared->done;
- shared->done = blk;
- } else {
- blk->next = prev->next;
- prev->next = blk;
- }
-
- if (status != 0 && shared->status == 0)
- shared->status = status;
- SIGNAL_ALL(&shared->done_cond);
- }
-
- while (shared->queue == NULL && shared->status == 0)
- AWAIT(&shared->queue_cond, &shared->mtx);
-
- if (shared->status == 0) {
- blk = shared->queue;
- shared->queue = blk->next;
- blk->next = NULL;
-
- if (shared->queue == NULL)
- shared->queue_last = NULL;
- } else {
- blk = NULL;
- }
+ blk = get_next_work_item(shared);
UNLOCK(&shared->mtx);
if (blk == NULL)
@@ -347,108 +359,68 @@ fail_init:
static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc)
{
- sqfs_block_t *queue, *it, *prev;
-
- it = proc->done;
- prev = NULL;
+ sqfs_block_t *out;
- while (it != NULL && it->sequence_number == proc->dequeue_id) {
- prev = it;
- it = it->next;
- proc->dequeue_id += 1;
- }
+ if (proc->done == NULL)
+ return NULL;
- if (prev == NULL) {
- queue = NULL;
- } else {
- queue = proc->done;
- prev->next = NULL;
- proc->done = it;
- }
+ if (proc->done->sequence_number != proc->dequeue_id)
+ return NULL;
- return queue;
+ out = proc->done;
+ proc->done = out->next;
+ out->next = NULL;
+ proc->dequeue_id += 1;
+ proc->backlog -= 1;
+ return out;
}
-static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
+static int process_done_block(thread_pool_processor_t *proc, sqfs_block_t *it)
{
- sqfs_block_t *it, *head = NULL, **next_ptr = &head;
-
- while (lhs != NULL && rhs != NULL) {
- if (lhs->sequence_number <= rhs->sequence_number) {
- it = lhs;
- lhs = lhs->next;
- } else {
- it = rhs;
- rhs = rhs->next;
- }
+ sqfs_block_t *fragblk = NULL;
+ int status = 0;
- *next_ptr = it;
- next_ptr = &it->next;
- }
+ if (it->flags & SQFS_BLK_IS_FRAGMENT) {
+ status = process_completed_fragment(&proc->base, it, &fragblk);
- it = (lhs != NULL ? lhs : rhs);
- *next_ptr = it;
- return head;
-}
+ if (fragblk != NULL && status == 0) {
+ LOCK(&proc->mtx);
+ proc->dequeue_id = it->sequence_number;
+ fragblk->sequence_number = it->sequence_number;
-static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue)
-{
- sqfs_block_t *it, *block = NULL;
- int status = 0;
-
- while (queue != NULL && status == 0) {
- it = queue;
- queue = it->next;
- proc->backlog -= 1;
-
- if (it->flags & SQFS_BLK_IS_FRAGMENT) {
- block = NULL;
- status = process_completed_fragment(&proc->base, it, &block);
-
- if (block != NULL && status == 0) {
- LOCK(&proc->mtx);
- proc->dequeue_id = it->sequence_number;
- block->sequence_number = it->sequence_number;
-
- if (proc->queue == NULL) {
- proc->queue = block;
- proc->queue_last = block;
- } else {
- block->next = proc->queue;
- proc->queue = block;
- }
-
- proc->backlog += 1;
- proc->done = queue_merge(queue, proc->done);
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
-
- queue = NULL;
+ if (proc->queue == NULL) {
+ proc->queue = fragblk;
+ proc->queue_last = fragblk;
} else {
- free(block);
+ fragblk->next = proc->queue;
+ proc->queue = fragblk;
}
+
+ proc->backlog += 1;
+ SIGNAL_ALL(&proc->queue_cond);
+ UNLOCK(&proc->mtx);
} else {
- status = process_completed_block(&proc->base, it);
+ free(fragblk);
}
-
- free(it);
+ } else {
+ status = process_completed_block(&proc->base, it);
}
- free_blk_list(queue);
+ free(it);
return status;
}
static int wait_completed(thread_pool_processor_t *proc)
{
- sqfs_block_t *queue;
+ sqfs_block_t *blk;
int status;
LOCK(&proc->mtx);
for (;;) {
- queue = try_dequeue(proc);
+ blk = try_dequeue(proc);
status = proc->status;
- if (queue != NULL || status != 0)
+ if (blk != NULL || status != 0)
break;
AWAIT(&proc->done_cond, &proc->mtx);
@@ -456,11 +428,11 @@ static int wait_completed(thread_pool_processor_t *proc)
UNLOCK(&proc->mtx);
if (status != 0) {
- free_blk_list(queue);
+ free(blk);
return status;
}
- status = process_done_queue(proc, queue);
+ status = process_done_block(proc, blk);
if (status != 0) {
LOCK(&proc->mtx);
@@ -514,12 +486,7 @@ out:
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
- int status;
-
- LOCK(&thproc->mtx);
- status = thproc->status;
- SIGNAL_ALL(&thproc->queue_cond);
- UNLOCK(&thproc->mtx);
+ int status = 0;
while (status == 0 && thproc->backlog > 0)
status = wait_completed(thproc);