aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/blk_proc/pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqfs/blk_proc/pthread.c')
-rw-r--r--lib/sqfs/blk_proc/pthread.c110
1 files changed, 18 insertions, 92 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index 1c1088a..57b61c1 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -7,65 +7,6 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static void free_blk_list(sqfs_block_t *list)
-{
- sqfs_block_t *it;
-
- while (list != NULL) {
- it = list;
- list = list->next;
- free(it);
- }
-}
-
-static void store_completed_block(sqfs_block_processor_t *proc,
- sqfs_block_t *blk, int status)
-{
- sqfs_block_t *it = proc->done, *prev = NULL;
-
- while (it != NULL) {
- if (it->sequence_number >= blk->sequence_number)
- break;
- prev = it;
- it = it->next;
- }
-
- if (prev == NULL) {
- blk->next = proc->done;
- proc->done = blk;
- } else {
- blk->next = prev->next;
- prev->next = blk;
- }
-
- if (status != 0 && proc->status == 0)
- proc->status = status;
-
- proc->backlog -= 1;
- pthread_cond_broadcast(&proc->done_cond);
-}
-
-static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc)
-{
- sqfs_block_t *blk;
-
- while (proc->queue == NULL) {
- if (proc->terminate || proc->status != 0)
- return NULL;
-
- pthread_cond_wait(&proc->queue_cond, &proc->mtx);
- }
-
- blk = proc->queue;
- proc->queue = blk->next;
- blk->next = NULL;
-
- if (proc->queue == NULL)
- proc->queue_last = NULL;
-
- return blk;
-}
-
static void *worker_proc(void *arg)
{
compress_worker_t *worker = arg;
@@ -75,17 +16,23 @@ static void *worker_proc(void *arg)
for (;;) {
pthread_mutex_lock(&shared->mtx);
- if (blk != NULL)
- store_completed_block(shared, blk, status);
+ if (blk != NULL) {
+ block_processor_store_done(shared, blk, status);
+ pthread_cond_broadcast(&shared->done_cond);
+ }
+
+ while (shared->queue == NULL && shared->status == 0)
+ pthread_cond_wait(&shared->queue_cond, &shared->mtx);
- blk = get_next_work_item(shared);
+ blk = block_processor_next_work_item(shared);
pthread_mutex_unlock(&shared->mtx);
if (blk == NULL)
break;
- status = sqfs_block_process(blk, worker->cmp, worker->scratch,
- shared->max_block_size);
+ status = block_processor_do_block(blk, worker->cmp,
+ worker->scratch,
+ shared->max_block_size);
}
return NULL;
}
@@ -109,26 +56,14 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
- proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->devblksz = devblksz;
- proc->cmp = cmp;
- proc->file = file;
- proc->max_blocks = INIT_BLOCK_COUNT;
- proc->frag_list_max = INIT_BLOCK_COUNT;
-
- proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
- if (proc->blocks == NULL)
- goto fail_init;
- proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
- proc->frag_list_max);
- if (proc->frag_list == NULL)
+ if (block_processor_init(proc, max_block_size, cmp, num_workers,
+ max_backlog, devblksz, file)) {
goto fail_init;
+ }
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
@@ -155,7 +90,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
return proc;
fail_thread:
pthread_mutex_lock(&proc->mtx);
- proc->terminate = true;
+ proc->status = -1;
pthread_cond_broadcast(&proc->queue_cond);
pthread_mutex_unlock(&proc->mtx);
@@ -178,10 +113,7 @@ fail_init:
pthread_cond_destroy(&proc->done_cond);
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free(proc->frag_list);
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ block_processor_cleanup(proc);
return NULL;
}
@@ -190,7 +122,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
unsigned int i;
pthread_mutex_lock(&proc->mtx);
- proc->terminate = true;
+ proc->status = -1;
pthread_cond_broadcast(&proc->queue_cond);
pthread_mutex_unlock(&proc->mtx);
@@ -205,13 +137,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free_blk_list(proc->queue);
- free_blk_list(proc->done);
- free(proc->frag_block);
- free(proc->frag_list);
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ block_processor_cleanup(proc);
}
static void append_to_work_queue(sqfs_block_processor_t *proc,