summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/sqfs/blk_proc/process_block.c3
-rw-r--r--lib/sqfs/blk_proc/pthread.c162
2 files changed, 101 insertions, 64 deletions
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index b4ed904..f154a14 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -21,6 +21,9 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
block->checksum = crc32(0, block->data, block->size);
+ if (block->flags & SQFS_BLK_IS_FRAGMENT)
+ return 0;
+
if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
ret = cmp->do_block(cmp, block->data, block->size,
scratch, scratch_size);
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index 4f01bb3..b1755ce 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -244,94 +244,98 @@ static sqfs_block_t *get_completed_if_avail(sqfs_block_processor_t *proc)
return block;
}
-int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
- sqfs_block_t *block)
+static int test_and_set_status(sqfs_block_processor_t *proc, int status)
{
- sqfs_block_t *completed = NULL;
- int status;
-
- if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
- status = SQFS_ERROR_UNSUPPORTED;
-
- pthread_mutex_lock(&proc->mtx);
- if (proc->status == 0) {
- proc->status = status;
- pthread_cond_broadcast(&proc->queue_cond);
- }
- goto fail;
+ pthread_mutex_lock(&proc->mtx);
+ if (proc->status == 0) {
+ proc->status = status;
+ } else {
+ status = proc->status;
}
+ pthread_cond_broadcast(&proc->queue_cond);
+ return status;
+}
- if (block->flags & SQFS_BLK_IS_FRAGMENT) {
- block->checksum = crc32(0, block->data, block->size);
-
- completed = NULL;
- status = handle_fragment(proc, block, &completed);
-
- if (status != 0) {
- pthread_mutex_lock(&proc->mtx);
- proc->status = status;
- goto fail;
- }
-
- free(block);
- if (completed == NULL)
- return 0;
-
- block = completed;
- completed = NULL;
- }
+static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block)
+{
+ sqfs_block_t *completed = NULL;
+ int status;
pthread_mutex_lock(&proc->mtx);
+ while (proc->backlog > proc->max_backlog && proc->status == 0)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
if (proc->status != 0) {
status = proc->status;
- goto fail;
+ pthread_mutex_unlock(&proc->mtx);
+ return status;
}
- while (proc->backlog > proc->max_backlog)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
completed = get_completed_if_avail(proc);
append_to_work_queue(proc, block);
block = NULL;
pthread_mutex_unlock(&proc->mtx);
- if (completed != NULL) {
- status = process_completed_block(proc, completed);
+ if (completed != NULL && (completed->flags & SQFS_BLK_IS_FRAGMENT)) {
+ status = handle_fragment(proc, completed, &block);
if (status != 0) {
+ free(block);
+ return test_and_set_status(proc, status);
+ }
+
+ if (block != NULL) {
pthread_mutex_lock(&proc->mtx);
- proc->status = status;
- goto fail;
+ proc->dequeue_id = completed->sequence_number;
+ block->sequence_number = proc->dequeue_id;
+
+ if (proc->queue == NULL) {
+ proc->queue = block;
+ proc->queue_last = block;
+ } else {
+ block->next = proc->queue;
+ proc->queue = block;
+ }
+
+ proc->backlog += 1;
+ pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
}
+ } else if (completed != NULL) {
+ status = process_completed_block(proc, completed);
+
+ if (status != 0)
+ status = test_and_set_status(proc, status);
}
free(completed);
- return 0;
-fail:
- pthread_mutex_unlock(&proc->mtx);
- free(block);
- free(completed);
return status;
}
+int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
+ sqfs_block_t *block)
+{
+ if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS)
+ return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED);
+
+ return queue_pump(proc, block);
+}
+
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
- sqfs_block_t *it;
+ sqfs_block_t *it, *block;
int status = 0;
pthread_mutex_lock(&proc->mtx);
- if (proc->frag_block != NULL) {
- append_to_work_queue(proc, proc->frag_block);
- proc->frag_block = NULL;
- }
-
+restart:
while (proc->backlog > 0 && proc->status == 0)
pthread_cond_wait(&proc->done_cond, &proc->mtx);
if (proc->status != 0) {
status = proc->status;
- goto out;
+ pthread_mutex_unlock(&proc->mtx);
+ return status;
}
while (proc->done != NULL) {
@@ -339,22 +343,52 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (it == NULL) {
status = SQFS_ERROR_INTERNAL;
+ } else if (it->flags & SQFS_BLK_IS_FRAGMENT) {
+ block = NULL;
+ status = handle_fragment(proc, it, &block);
+
+ if (status != 0) {
+ proc->status = status;
+ pthread_mutex_unlock(&proc->mtx);
+ free(it);
+ return status;
+ }
+
+ if (block != NULL) {
+ proc->dequeue_id = it->sequence_number;
+ block->sequence_number = proc->dequeue_id;
+ free(it);
+
+ if (proc->queue == NULL) {
+ proc->queue = block;
+ proc->queue_last = block;
+ } else {
+ block->next = proc->queue;
+ proc->queue = block;
+ }
+
+ proc->backlog += 1;
+ pthread_cond_broadcast(&proc->queue_cond);
+ goto restart;
+ }
} else {
status = process_completed_block(proc, it);
free(it);
- }
- if (status != 0) {
- proc->status = status;
- pthread_cond_broadcast(&proc->queue_cond);
- goto out;
+ if (status != 0) {
+ proc->status = status;
+ pthread_mutex_unlock(&proc->mtx);
+ return status;
+ }
}
}
-out:
- free_blk_list(proc->queue);
- free_blk_list(proc->done);
- proc->queue = NULL;
- proc->done = NULL;
+
+ if (proc->frag_block != NULL) {
+ append_to_work_queue(proc, proc->frag_block);
+ proc->frag_block = NULL;
+ goto restart;
+ }
+
pthread_mutex_unlock(&proc->mtx);
- return status;
+ return 0;
}