summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/sqfs/blk_proc/process_block.c5
-rw-r--r--lib/sqfs/blk_proc/pthread.c223
-rw-r--r--lib/sqfs/blk_proc/serial.c14
3 files changed, 120 insertions, 122 deletions
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index b303278..ccc6df0 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -15,6 +15,11 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
{
ssize_t ret;
+ if (block->size == 0) {
+ block->checksum = 0;
+ return 0;
+ }
+
block->checksum = crc32(0, block->data, block->size);
if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index ee73e52..a3dc01f 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -7,10 +7,21 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static void store_completed_block(sqfs_block_processor_t *shared,
- sqfs_block_t *blk)
+static void free_blk_list(sqfs_block_t *list)
{
- sqfs_block_t *it = shared->done, *prev = NULL;
+ sqfs_block_t *it;
+
+ while (list != NULL) {
+ it = list;
+ list = list->next;
+ free(it);
+ }
+}
+
+static void store_completed_block(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk, int status)
+{
+ sqfs_block_t *it = proc->done, *prev = NULL;
while (it != NULL) {
if (it->sequence_number >= blk->sequence_number)
@@ -20,12 +31,39 @@ static void store_completed_block(sqfs_block_processor_t *shared,
}
if (prev == NULL) {
- blk->next = shared->done;
- shared->done = blk;
+ blk->next = proc->done;
+ proc->done = blk;
} else {
blk->next = prev->next;
prev->next = blk;
}
+
+ if (status != 0 && proc->status == 0)
+ proc->status = status;
+
+ proc->backlog -= 1;
+ pthread_cond_broadcast(&proc->done_cond);
+}
+
+static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk;
+
+ while (proc->queue == NULL) {
+ if (proc->terminate || proc->status != 0)
+ return NULL;
+
+ pthread_cond_wait(&proc->queue_cond, &proc->mtx);
+ }
+
+ blk = proc->queue;
+ proc->queue = blk->next;
+ blk->next = NULL;
+
+ if (proc->queue == NULL)
+ proc->queue_last = NULL;
+
+ return blk;
}
static void *worker_proc(void *arg)
@@ -37,33 +75,14 @@ static void *worker_proc(void *arg)
for (;;) {
pthread_mutex_lock(&shared->mtx);
- if (blk != NULL) {
- store_completed_block(shared, blk);
- shared->backlog -= 1;
-
- if (status != 0 && shared->status == 0)
- shared->status = status;
- pthread_cond_broadcast(&shared->done_cond);
- }
+ if (blk != NULL)
+ store_completed_block(shared, blk, status);
- while (shared->queue == NULL && !shared->terminate &&
- shared->status == 0) {
- pthread_cond_wait(&shared->queue_cond,
- &shared->mtx);
- }
+ blk = get_next_work_item(shared);
+ pthread_mutex_unlock(&shared->mtx);
- if (shared->terminate || shared->status != 0) {
- pthread_mutex_unlock(&shared->mtx);
+ if (blk == NULL)
break;
- }
-
- blk = shared->queue;
- shared->queue = blk->next;
- blk->next = NULL;
-
- if (shared->queue == NULL)
- shared->queue_last = NULL;
- pthread_mutex_unlock(&shared->mtx);
status = sqfs_block_process(blk, worker->cmp, worker->scratch,
shared->max_block_size);
@@ -161,7 +180,6 @@ fail_init:
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
- sqfs_block_t *blk;
unsigned int i;
pthread_mutex_lock(&proc->mtx);
@@ -180,26 +198,47 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- while (proc->queue != NULL) {
- blk = proc->queue;
- proc->queue = blk->next;
- free(blk);
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ free(proc->fragments);
+ free(proc->blocks);
+ free(proc);
+}
+
+static void append_to_work_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
+{
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
}
- while (proc->done != NULL) {
- blk = proc->done;
- proc->done = blk->next;
- free(blk);
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+ pthread_cond_broadcast(&proc->queue_cond);
+}
+
+static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *block = NULL;
+
+ if (proc->done != NULL &&
+ proc->done->sequence_number == proc->dequeue_id) {
+ block = proc->done;
+ proc->done = proc->done->next;
+ proc->dequeue_id += 1;
}
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ return block;
}
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
+ sqfs_block_t *completed = NULL;
int status;
pthread_mutex_lock(&proc->mtx);
@@ -215,60 +254,38 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
goto fail;
}
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
-
- if (block->size == 0) {
- block->checksum = 0;
- store_completed_block(proc, block);
- } else {
- while (proc->backlog > proc->max_backlog)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- proc->backlog += 1;
- }
+ while (proc->backlog > proc->max_backlog)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
- if (proc->done != NULL &&
- proc->done->sequence_number == proc->dequeue_id) {
- block = proc->done;
- proc->done = proc->done->next;
- proc->dequeue_id += 1;
- } else {
- block = NULL;
- }
+ completed = get_completed_if_avail(proc);
- pthread_cond_broadcast(&proc->queue_cond);
+ append_to_work_queue(proc, block);
+ block = NULL;
pthread_mutex_unlock(&proc->mtx);
- if (block == NULL)
- return 0;
+ if (completed != NULL) {
+ status = process_completed_block(proc, completed);
- status = process_completed_block(proc, block);
- if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
- proc->status = status;
- goto fail;
+ if (status != 0) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->status = status;
+ goto fail;
+ }
}
- free(block);
+ free(completed);
return 0;
fail:
pthread_mutex_unlock(&proc->mtx);
free(block);
+ free(completed);
return status;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
- sqfs_block_t *queue, *it;
- int status;
+ sqfs_block_t *it;
+ int status = 0;
pthread_mutex_lock(&proc->mtx);
while (proc->backlog > 0 && proc->status == 0)
@@ -276,50 +293,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (proc->status != 0) {
status = proc->status;
- goto fail;
+ goto out;
}
- for (it = proc->done; it != NULL; it = it->next) {
- if (it->sequence_number != proc->dequeue_id++) {
+ while (proc->done != NULL) {
+ it = get_completed_if_avail(proc);
+
+ if (it == NULL) {
status = SQFS_ERROR_INTERNAL;
- proc->status = status;
- goto fail;
+ } else {
+ status = process_completed_block(proc, it);
+ free(it);
}
- }
-
- queue = proc->done;
- proc->done = NULL;
- pthread_mutex_unlock(&proc->mtx);
-
- while (queue != NULL) {
- it = queue;
- queue = queue->next;
- it->next = NULL;
- status = process_completed_block(proc, it);
- free(it);
if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
proc->status = status;
pthread_cond_broadcast(&proc->queue_cond);
- goto fail;
+ goto out;
}
}
-
- return 0;
-fail:
- while (proc->queue != NULL) {
- it = proc->queue;
- proc->queue = it->next;
- free(it);
- }
-
- while (proc->done != NULL) {
- it = proc->done;
- proc->done = it->next;
- free(it);
- }
-
+out:
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ proc->queue = NULL;
+ proc->done = NULL;
pthread_mutex_unlock(&proc->mtx);
return status;
}
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index 850c33d..06c811e 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -59,16 +59,12 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
return proc->status;
}
- if (block->size == 0) {
- block->checksum = 0;
- } else {
- proc->status = sqfs_block_process(block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
- }
+ proc->status = sqfs_block_process(block, proc->cmp, proc->scratch,
+ proc->max_block_size);
+
+ if (proc->status == 0)
+ proc->status = process_completed_block(proc, block);
- block->next = NULL;
- proc->status = process_completed_block(proc, block);
free(block);
return proc->status;
}