aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/sqfs/block_processor.h7
-rw-r--r--lib/sqfs/blk_proc/internal.h2
-rw-r--r--lib/sqfs/blk_proc/process_block.c24
-rw-r--r--lib/sqfs/blk_proc/pthread.c77
-rw-r--r--lib/sqfs/blk_proc/serial.c3
5 files changed, 82 insertions, 31 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h
index 2086a3d..5b7b3e0 100644
--- a/include/sqfs/block_processor.h
+++ b/include/sqfs/block_processor.h
@@ -86,12 +86,7 @@ typedef enum {
/**
* @brief Set by compressor worker if the block was actually compressed.
*/
- SQFS_BLK_IS_COMPRESSED = 0x4000,
-
- /**
- * @brief Set by compressor worker if compression failed.
- */
- SQFS_BLK_COMPRESS_ERROR = 0x8000,
+ SQFS_BLK_IS_COMPRESSED = 0x8000,
/**
* @brief The combination of all flags that are user settable.
diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h
index b5af751..8cb1a23 100644
--- a/lib/sqfs/blk_proc/internal.h
+++ b/lib/sqfs/blk_proc/internal.h
@@ -59,13 +59,13 @@ struct sqfs_block_processor_t {
sqfs_block_t *done;
bool terminate;
size_t backlog;
+ int status;
/* used by main thread only */
uint32_t enqueue_id;
uint32_t dequeue_id;
unsigned int num_workers;
- int status;
size_t max_backlog;
size_t devblksz;
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index 5d1fa58..643d964 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -20,11 +20,8 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
ret = cmp->do_block(cmp, block->data, block->size,
scratch, scratch_size);
-
- if (ret < 0) {
- block->flags |= SQFS_BLK_COMPRESS_ERROR;
+ if (ret < 0)
return ret;
- }
if (ret > 0) {
memcpy(block->data, scratch, ret);
@@ -188,19 +185,24 @@ static int handle_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue)
{
sqfs_block_t *it;
+ int err;
while (queue != NULL) {
it = queue;
queue = queue->next;
- if (it->flags & SQFS_BLK_COMPRESS_ERROR) {
- proc->status = SQFS_ERROR_COMRPESSOR;
- } else if (proc->status == 0) {
- proc->status = handle_block(proc, it);
- }
-
+ err = handle_block(proc, it);
free(it);
+
+ if (err) {
+ while (queue != NULL) {
+ it = queue;
+ queue = it->next;
+ free(it);
+ }
+ return err;
+ }
}
- return proc->status;
+ return 0;
}
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index f1874c0..fb83bbe 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -33,21 +33,26 @@ static void *worker_proc(void *arg)
compress_worker_t *worker = arg;
sqfs_block_processor_t *shared = worker->shared;
sqfs_block_t *blk = NULL;
+ int status = 0;
for (;;) {
pthread_mutex_lock(&shared->mtx);
if (blk != NULL) {
store_completed_block(shared, blk);
shared->backlog -= 1;
+
+ if (status != 0 && shared->status == 0)
+ shared->status = status;
pthread_cond_broadcast(&shared->done_cond);
}
- while (shared->queue == NULL && !shared->terminate) {
+ while (shared->queue == NULL && !shared->terminate &&
+ shared->status == 0) {
pthread_cond_wait(&shared->queue_cond,
&shared->mtx);
}
- if (shared->terminate) {
+ if (shared->terminate || shared->status != 0) {
pthread_mutex_unlock(&shared->mtx);
break;
}
@@ -60,8 +65,8 @@ static void *worker_proc(void *arg)
shared->queue_last = NULL;
pthread_mutex_unlock(&shared->mtx);
- sqfs_block_process(blk, worker->cmp, worker->scratch,
- shared->max_block_size);
+ status = sqfs_block_process(blk, worker->cmp, worker->scratch,
+ shared->max_block_size);
}
return NULL;
}
@@ -148,6 +153,8 @@ fail_init:
pthread_cond_destroy(&proc->done_cond);
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
+ free(proc->fragments);
+ free(proc->blocks);
free(proc);
return NULL;
}
@@ -185,6 +192,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
free(blk);
}
+ free(proc->fragments);
+ free(proc->blocks);
free(proc);
}
@@ -192,14 +201,24 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
sqfs_block_t *queue = NULL, *it, *prev;
+ int status;
- if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS)
- return SQFS_ERROR_UNSUPPORTED;
+ pthread_mutex_lock(&proc->mtx);
+ if (proc->status != 0) {
+ status = proc->status;
+ goto fail;
+ }
+
+ if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
+ status = SQFS_ERROR_UNSUPPORTED;
+ proc->status = status;
+ pthread_cond_broadcast(&proc->queue_cond);
+ goto fail;
+ }
block->sequence_number = proc->enqueue_id++;
block->next = NULL;
- pthread_mutex_lock(&proc->mtx);
if (block->size == 0) {
block->checksum = 0;
store_completed_block(proc, block);
@@ -236,22 +255,31 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
pthread_mutex_unlock(&proc->mtx);
return process_completed_blocks(proc, queue);
+fail:
+ pthread_mutex_unlock(&proc->mtx);
+ free(block);
+ return status;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
sqfs_block_t *queue, *it;
+ int status;
pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > 0)
+ while (proc->backlog > 0 && proc->status == 0)
pthread_cond_wait(&proc->done_cond, &proc->mtx);
+ if (proc->status != 0) {
+ status = proc->status;
+ goto fail;
+ }
+
for (it = proc->done; it != NULL; it = it->next) {
if (it->sequence_number != proc->dequeue_id++) {
- pthread_mutex_unlock(&proc->mtx);
-
- /* XXX: this would actually be a BUG */
- return SQFS_ERROR_INTERNAL;
+ status = SQFS_ERROR_INTERNAL;
+ proc->status = status;
+ goto fail;
}
}
@@ -259,5 +287,28 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
proc->done = NULL;
pthread_mutex_unlock(&proc->mtx);
- return process_completed_blocks(proc, queue);
+ status = process_completed_blocks(proc, queue);
+ if (status != 0) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->status = status;
+ pthread_cond_broadcast(&proc->queue_cond);
+ goto fail;
+ }
+
+ return 0;
+fail:
+ while (proc->queue != NULL) {
+ it = proc->queue;
+ proc->queue = it->next;
+ free(it);
+ }
+
+ while (proc->done != NULL) {
+ it = proc->done;
+ proc->done = it->next;
+ free(it);
+ }
+
+ pthread_mutex_unlock(&proc->mtx);
+ return status;
}
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index b7bc545..8e842fb 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -40,6 +40,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
+ free(proc->fragments);
+ free(proc->blocks);
free(proc);
}
@@ -53,6 +55,7 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
proc->status = SQFS_ERROR_UNSUPPORTED;
+ free(block);
return proc->status;
}