summaryrefslogtreecommitdiff
path: root/lib/sqfs/blk_proc/pthread.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 02:56:42 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 03:04:38 +0200
commitc56c830ae96ed000c999fb93c23bbaad0303acf9 (patch)
tree589d038e56f6000e55b3cfce5cc74e64304958c9 /lib/sqfs/blk_proc/pthread.c
parent87d577a66eb3b1aaca91c4841445cccaf151ee81 (diff)
Cleanup pthread based block processing code
Break convoluted, long functions up into smaller ones where the control flow (especially locking and signalling) is more easily readable and remove some copy and paste clean up code. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/blk_proc/pthread.c')
-rw-r--r--lib/sqfs/blk_proc/pthread.c223
1 files changed, 110 insertions, 113 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index ee73e52..a3dc01f 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -7,10 +7,21 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static void store_completed_block(sqfs_block_processor_t *shared,
- sqfs_block_t *blk)
+static void free_blk_list(sqfs_block_t *list)
{
- sqfs_block_t *it = shared->done, *prev = NULL;
+ sqfs_block_t *it;
+
+ while (list != NULL) {
+ it = list;
+ list = list->next;
+ free(it);
+ }
+}
+
+static void store_completed_block(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk, int status)
+{
+ sqfs_block_t *it = proc->done, *prev = NULL;
while (it != NULL) {
if (it->sequence_number >= blk->sequence_number)
@@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared,
}
if (prev == NULL) {
- blk->next = shared->done;
- shared->done = blk;
+ blk->next = proc->done;
+ proc->done = blk;
} else {
blk->next = prev->next;
prev->next = blk;
}
+
+ if (status != 0 && proc->status == 0)
+ proc->status = status;
+
+ proc->backlog -= 1;
+ pthread_cond_broadcast(&proc->done_cond);
+}
+
+static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk;
+
+ while (proc->queue == NULL) {
+ if (proc->terminate || proc->status != 0)
+ return NULL;
+
+ pthread_cond_wait(&proc->queue_cond, &proc->mtx);
+ }
+
+ blk = proc->queue;
+ proc->queue = blk->next;
+ blk->next = NULL;
+
+ if (proc->queue == NULL)
+ proc->queue_last = NULL;
+
+ return blk;
}
static void *worker_proc(void *arg)
@@ -37,33 +75,14 @@ static void *worker_proc(void *arg)
for (;;) {
pthread_mutex_lock(&shared->mtx);
- if (blk != NULL) {
- store_completed_block(shared, blk);
- shared->backlog -= 1;
-
- if (status != 0 && shared->status == 0)
- shared->status = status;
- pthread_cond_broadcast(&shared->done_cond);
- }
+ if (blk != NULL)
+ store_completed_block(shared, blk, status);
- while (shared->queue == NULL && !shared->terminate &&
- shared->status == 0) {
- pthread_cond_wait(&shared->queue_cond,
- &shared->mtx);
- }
+ blk = get_next_work_item(shared);
+ pthread_mutex_unlock(&shared->mtx);
- if (shared->terminate || shared->status != 0) {
- pthread_mutex_unlock(&shared->mtx);
+ if (blk == NULL)
break;
- }
-
- blk = shared->queue;
- shared->queue = blk->next;
- blk->next = NULL;
-
- if (shared->queue == NULL)
- shared->queue_last = NULL;
- pthread_mutex_unlock(&shared->mtx);
status = sqfs_block_process(blk, worker->cmp, worker->scratch,
shared->max_block_size);
@@ -161,7 +180,6 @@ fail_init:
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
- sqfs_block_t *blk;
unsigned int i;
pthread_mutex_lock(&proc->mtx);
@@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- while (proc->queue != NULL) {
- blk = proc->queue;
- proc->queue = blk->next;
- free(blk);
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ free(proc->fragments);
+ free(proc->blocks);
+ free(proc);
+}
+
+static void append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
+{
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
}
- while (proc->done != NULL) {
- blk = proc->done;
- proc->done = blk->next;
- free(blk);
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+ pthread_cond_broadcast(&proc->queue_cond);
+}
+
+static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *block = NULL;
+
+ if (proc->done != NULL &&
+ proc->done->sequence_number == proc->dequeue_id) {
+ block = proc->done;
+ proc->done = proc->done->next;
+ proc->dequeue_id += 1;
}
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ return block;
}
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
+ sqfs_block_t *completed = NULL;
int status;
pthread_mutex_lock(&proc->mtx);
@@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
goto fail;
}
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
-
- if (block->size == 0) {
- block->checksum = 0;
- store_completed_block(proc, block);
- } else {
- while (proc->backlog > proc->max_backlog)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- proc->backlog += 1;
- }
+ while (proc->backlog > proc->max_backlog)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
- if (proc->done != NULL &&
- proc->done->sequence_number == proc->dequeue_id) {
- block = proc->done;
- proc->done = proc->done->next;
- proc->dequeue_id += 1;
- } else {
- block = NULL;
- }
+ completed = get_completed_if_avail(proc);
- pthread_cond_broadcast(&proc->queue_cond);
+ append_to_work_queue(proc, block);
+ block = NULL;
pthread_mutex_unlock(&proc->mtx);
- if (block == NULL)
- return 0;
+ if (completed != NULL) {
+ status = process_completed_block(proc, completed);
- status = process_completed_block(proc, block);
- if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
- proc->status = status;
- goto fail;
+ if (status != 0) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->status = status;
+ goto fail;
+ }
}
- free(block);
+ free(completed);
return 0;
fail:
pthread_mutex_unlock(&proc->mtx);
free(block);
+ free(completed);
return status;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
- sqfs_block_t *queue, *it;
- int status;
+ sqfs_block_t *it;
+ int status = 0;
pthread_mutex_lock(&proc->mtx);
while (proc->backlog > 0 && proc->status == 0)
@@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (proc->status != 0) {
status = proc->status;
- goto fail;
+ goto out;
}
- for (it = proc->done; it != NULL; it = it->next) {
- if (it->sequence_number != proc->dequeue_id++) {
+ while (proc->done != NULL) {
+ it = get_completed_if_avail(proc);
+
+ if (it == NULL) {
status = SQFS_ERROR_INTERNAL;
- proc->status = status;
- goto fail;
+ } else {
+ status = process_completed_block(proc, it);
+ free(it);
}
- }
-
- queue = proc->done;
- proc->done = NULL;
- pthread_mutex_unlock(&proc->mtx);
-
- while (queue != NULL) {
- it = queue;
- queue = queue->next;
- it->next = NULL;
- status = process_completed_block(proc, it);
- free(it);
if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
proc->status = status;
pthread_cond_broadcast(&proc->queue_cond);
- goto fail;
+ goto out;
}
}
-
- return 0;
-fail:
- while (proc->queue != NULL) {
- it = proc->queue;
- proc->queue = it->next;
- free(it);
- }
-
- while (proc->done != NULL) {
- it = proc->done;
- proc->done = it->next;
- free(it);
- }
-
+out:
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ proc->queue = NULL;
+ proc->done = NULL;
pthread_mutex_unlock(&proc->mtx);
return status;
}