diff options
-rw-r--r-- | include/sqfs/block_processor.h | 7 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/internal.h | 2 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/process_block.c | 24 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 77 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/serial.c | 3 |
5 files changed, 82 insertions, 31 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h index 2086a3d..5b7b3e0 100644 --- a/include/sqfs/block_processor.h +++ b/include/sqfs/block_processor.h @@ -86,12 +86,7 @@ typedef enum { /** * @brief Set by compressor worker if the block was actually compressed. */ - SQFS_BLK_IS_COMPRESSED = 0x4000, - - /** - * @brief Set by compressor worker if compression failed. - */ - SQFS_BLK_COMPRESS_ERROR = 0x8000, + SQFS_BLK_IS_COMPRESSED = 0x8000, /** * @brief The combination of all flags that are user settable. diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index b5af751..8cb1a23 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -59,13 +59,13 @@ struct sqfs_block_processor_t { sqfs_block_t *done; bool terminate; size_t backlog; + int status; /* used by main thread only */ uint32_t enqueue_id; uint32_t dequeue_id; unsigned int num_workers; - int status; size_t max_backlog; size_t devblksz; diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index 5d1fa58..643d964 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -20,11 +20,8 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) { ret = cmp->do_block(cmp, block->data, block->size, scratch, scratch_size); - - if (ret < 0) { - block->flags |= SQFS_BLK_COMPRESS_ERROR; + if (ret < 0) return ret; - } if (ret > 0) { memcpy(block->data, scratch, ret); @@ -188,19 +185,24 @@ static int handle_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue) { sqfs_block_t *it; + int err; while (queue != NULL) { it = queue; queue = queue->next; - if (it->flags & SQFS_BLK_COMPRESS_ERROR) { - proc->status = SQFS_ERROR_COMRPESSOR; - } else if (proc->status == 0) { - proc->status = handle_block(proc, it); - } - + err = handle_block(proc, it); free(it); + + if (err) { + while (queue != NULL) { + it = queue; + queue = it->next; + free(it); + } + return err; + } } - return proc->status; + return 0; } diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index f1874c0..fb83bbe 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -33,21 +33,26 @@ static void *worker_proc(void *arg) compress_worker_t *worker = arg; sqfs_block_processor_t *shared = worker->shared; sqfs_block_t *blk = NULL; + int status = 0; for (;;) { pthread_mutex_lock(&shared->mtx); if (blk != NULL) { store_completed_block(shared, blk); shared->backlog -= 1; + + if (status != 0 && shared->status == 0) + shared->status = status; pthread_cond_broadcast(&shared->done_cond); } - while (shared->queue == NULL && !shared->terminate) { + while (shared->queue == NULL && !shared->terminate && + shared->status == 0) { pthread_cond_wait(&shared->queue_cond, &shared->mtx); } - if (shared->terminate) { + if (shared->terminate || shared->status != 0) { pthread_mutex_unlock(&shared->mtx); break; } @@ -60,8 +65,8 @@ static void *worker_proc(void *arg) shared->queue_last = NULL; pthread_mutex_unlock(&shared->mtx); - sqfs_block_process(blk, worker->cmp, worker->scratch, - shared->max_block_size); + status = sqfs_block_process(blk, worker->cmp, worker->scratch, + shared->max_block_size); } return NULL; } @@ -148,6 +153,8 @@ fail_init: pthread_cond_destroy(&proc->done_cond); pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); + free(proc->fragments); + free(proc->blocks); free(proc); return NULL; } @@ -185,6 +192,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) free(blk); } + free(proc->fragments); + free(proc->blocks); free(proc); } @@ -192,14 +201,24 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { sqfs_block_t *queue = NULL, *it, *prev; + int status; - if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) - return SQFS_ERROR_UNSUPPORTED; + pthread_mutex_lock(&proc->mtx); + if (proc->status != 0) { + status = proc->status; + goto fail; + } + + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { + status = SQFS_ERROR_UNSUPPORTED; + proc->status = status; + pthread_cond_broadcast(&proc->queue_cond); + goto fail; + } block->sequence_number = proc->enqueue_id++; block->next = NULL; - pthread_mutex_lock(&proc->mtx); if (block->size == 0) { block->checksum = 0; store_completed_block(proc, block); @@ -236,22 +255,31 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, pthread_mutex_unlock(&proc->mtx); return process_completed_blocks(proc, queue); +fail: + pthread_mutex_unlock(&proc->mtx); + free(block); + return status; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { sqfs_block_t *queue, *it; + int status; pthread_mutex_lock(&proc->mtx); - while (proc->backlog > 0) + while (proc->backlog > 0 && proc->status == 0) pthread_cond_wait(&proc->done_cond, &proc->mtx); + if (proc->status != 0) { + status = proc->status; + goto fail; + } + for (it = proc->done; it != NULL; it = it->next) { if (it->sequence_number != proc->dequeue_id++) { - pthread_mutex_unlock(&proc->mtx); - - /* XXX: this would actually be a BUG */ - return SQFS_ERROR_INTERNAL; + status = SQFS_ERROR_INTERNAL; + proc->status = status; + goto fail; } } @@ -259,5 +287,28 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) proc->done = NULL; pthread_mutex_unlock(&proc->mtx); - return process_completed_blocks(proc, queue); + status = process_completed_blocks(proc, queue); + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + pthread_cond_broadcast(&proc->queue_cond); + goto fail; + } + + return 0; +fail: + while (proc->queue != NULL) { + it = proc->queue; + proc->queue = it->next; + free(it); + } + + while (proc->done != NULL) { + it = proc->done; + proc->done = it->next; + free(it); + } + + pthread_mutex_unlock(&proc->mtx); + return status; } diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index b7bc545..8e842fb 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -40,6 +40,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { + free(proc->fragments); + free(proc->blocks); free(proc); } @@ -53,6 +55,7 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { proc->status = SQFS_ERROR_UNSUPPORTED; + free(block); return proc->status; } |