aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 02:56:42 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 03:04:38 +0200
commitc56c830ae96ed000c999fb93c23bbaad0303acf9 (patch)
tree589d038e56f6000e55b3cfce5cc74e64304958c9
parent87d577a66eb3b1aaca91c4841445cccaf151ee81 (diff)
Cleanup pthread based block processing code
Break convoluted, long functions up into smaller ones where the control flow (especially locking and signalling) is more easily readable and remove some copy and paste clean up code. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/blk_proc/process_block.c5
-rw-r--r--lib/sqfs/blk_proc/pthread.c223
-rw-r--r--lib/sqfs/blk_proc/serial.c14
3 files changed, 120 insertions, 122 deletions
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index b303278..ccc6df0 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -15,6 +15,11 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
{
ssize_t ret;
+ if (block->size == 0) {
+ block->checksum = 0;
+ return 0;
+ }
+
block->checksum = crc32(0, block->data, block->size);
if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index ee73e52..a3dc01f 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -7,10 +7,21 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static void store_completed_block(sqfs_block_processor_t *shared,
- sqfs_block_t *blk)
+static void free_blk_list(sqfs_block_t *list)
{
- sqfs_block_t *it = shared->done, *prev = NULL;
+ sqfs_block_t *it;
+
+ while (list != NULL) {
+ it = list;
+ list = list->next;
+ free(it);
+ }
+}
+
+static void store_completed_block(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk, int status)
+{
+ sqfs_block_t *it = proc->done, *prev = NULL;
while (it != NULL) {
if (it->sequence_number >= blk->sequence_number)
@@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared,
}
if (prev == NULL) {
- blk->next = shared->done;
- shared->done = blk;
+ blk->next = proc->done;
+ proc->done = blk;
} else {
blk->next = prev->next;
prev->next = blk;
}
+
+ if (status != 0 && proc->status == 0)
+ proc->status = status;
+
+ proc->backlog -= 1;
+ pthread_cond_broadcast(&proc->done_cond);
+}
+
+static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk;
+
+ while (proc->queue == NULL) {
+ if (proc->terminate || proc->status != 0)
+ return NULL;
+
+ pthread_cond_wait(&proc->queue_cond, &proc->mtx);
+ }
+
+ blk = proc->queue;
+ proc->queue = blk->next;
+ blk->next = NULL;
+
+ if (proc->queue == NULL)
+ proc->queue_last = NULL;
+
+ return blk;
}
static void *worker_proc(void *arg)
@@ -37,33 +75,14 @@ static void *worker_proc(void *arg)
for (;;) {
pthread_mutex_lock(&shared->mtx);
- if (blk != NULL) {
- store_completed_block(shared, blk);
- shared->backlog -= 1;
-
- if (status != 0 && shared->status == 0)
- shared->status = status;
- pthread_cond_broadcast(&shared->done_cond);
- }
+ if (blk != NULL)
+ store_completed_block(shared, blk, status);
- while (shared->queue == NULL && !shared->terminate &&
- shared->status == 0) {
- pthread_cond_wait(&shared->queue_cond,
- &shared->mtx);
- }
+ blk = get_next_work_item(shared);
+ pthread_mutex_unlock(&shared->mtx);
- if (shared->terminate || shared->status != 0) {
- pthread_mutex_unlock(&shared->mtx);
+ if (blk == NULL)
break;
- }
-
- blk = shared->queue;
- shared->queue = blk->next;
- blk->next = NULL;
-
- if (shared->queue == NULL)
- shared->queue_last = NULL;
- pthread_mutex_unlock(&shared->mtx);
status = sqfs_block_process(blk, worker->cmp, worker->scratch,
shared->max_block_size);
@@ -161,7 +180,6 @@ fail_init:
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
- sqfs_block_t *blk;
unsigned int i;
pthread_mutex_lock(&proc->mtx);
@@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- while (proc->queue != NULL) {
- blk = proc->queue;
- proc->queue = blk->next;
- free(blk);
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ free(proc->fragments);
+ free(proc->blocks);
+ free(proc);
+}
+
+static void append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
+{
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
}
- while (proc->done != NULL) {
- blk = proc->done;
- proc->done = blk->next;
- free(blk);
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+ pthread_cond_broadcast(&proc->queue_cond);
+}
+
+static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *block = NULL;
+
+ if (proc->done != NULL &&
+ proc->done->sequence_number == proc->dequeue_id) {
+ block = proc->done;
+ proc->done = proc->done->next;
+ proc->dequeue_id += 1;
}
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ return block;
}
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
+ sqfs_block_t *completed = NULL;
int status;
pthread_mutex_lock(&proc->mtx);
@@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
goto fail;
}
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
-
- if (block->size == 0) {
- block->checksum = 0;
- store_completed_block(proc, block);
- } else {
- while (proc->backlog > proc->max_backlog)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- proc->backlog += 1;
- }
+ while (proc->backlog > proc->max_backlog)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
- if (proc->done != NULL &&
- proc->done->sequence_number == proc->dequeue_id) {
- block = proc->done;
- proc->done = proc->done->next;
- proc->dequeue_id += 1;
- } else {
- block = NULL;
- }
+ completed = get_completed_if_avail(proc);
- pthread_cond_broadcast(&proc->queue_cond);
+ append_to_work_queue(proc, block);
+ block = NULL;
pthread_mutex_unlock(&proc->mtx);
- if (block == NULL)
- return 0;
+ if (completed != NULL) {
+ status = process_completed_block(proc, completed);
- status = process_completed_block(proc, block);
- if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
- proc->status = status;
- goto fail;
+ if (status != 0) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->status = status;
+ goto fail;
+ }
}
- free(block);
+ free(completed);
return 0;
fail:
pthread_mutex_unlock(&proc->mtx);
free(block);
+ free(completed);
return status;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
- sqfs_block_t *queue, *it;
- int status;
+ sqfs_block_t *it;
+ int status = 0;
pthread_mutex_lock(&proc->mtx);
while (proc->backlog > 0 && proc->status == 0)
@@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (proc->status != 0) {
status = proc->status;
- goto fail;
+ goto out;
}
- for (it = proc->done; it != NULL; it = it->next) {
- if (it->sequence_number != proc->dequeue_id++) {
+ while (proc->done != NULL) {
+ it = get_completed_if_avail(proc);
+
+ if (it == NULL) {
status = SQFS_ERROR_INTERNAL;
- proc->status = status;
- goto fail;
+ } else {
+ status = process_completed_block(proc, it);
+ free(it);
}
- }
-
- queue = proc->done;
- proc->done = NULL;
- pthread_mutex_unlock(&proc->mtx);
-
- while (queue != NULL) {
- it = queue;
- queue = queue->next;
- it->next = NULL;
- status = process_completed_block(proc, it);
- free(it);
if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
proc->status = status;
pthread_cond_broadcast(&proc->queue_cond);
- goto fail;
+ goto out;
}
}
-
- return 0;
-fail:
- while (proc->queue != NULL) {
- it = proc->queue;
- proc->queue = it->next;
- free(it);
- }
-
- while (proc->done != NULL) {
- it = proc->done;
- proc->done = it->next;
- free(it);
- }
-
+out:
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ proc->queue = NULL;
+ proc->done = NULL;
pthread_mutex_unlock(&proc->mtx);
return status;
}
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index 850c33d..06c811e 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -59,16 +59,12 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
return proc->status;
}
- if (block->size == 0) {
- block->checksum = 0;
- } else {
- proc->status = sqfs_block_process(block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
- }
+ proc->status = sqfs_block_process(block, proc->cmp, proc->scratch,
+ proc->max_block_size);
+
+ if (proc->status == 0)
+ proc->status = process_completed_block(proc, block);
- block->next = NULL;
- proc->status = process_completed_block(proc, block);
free(block);
return proc->status;
}