aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor/winpthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqfs/block_processor/winpthread.c')
-rw-r--r--lib/sqfs/block_processor/winpthread.c130
1 files changed, 83 insertions, 47 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 0ad8475..aa1a99f 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -8,6 +8,8 @@
#include "internal.h"
#if defined(_WIN32) || defined(__WINDOWS__)
+# define WIN32_LEAN_AND_MEAN
+# include <windows.h>
# define LOCK(mtx) EnterCriticalSection(mtx)
# define UNLOCK(mtx) LeaveCriticalSection(mtx)
# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
@@ -16,7 +18,11 @@
# define THREAD_TYPE DWORD WINAPI
# define THREAD_ARG LPVOID
# define THREAD_HANDLE HANDLE
+# define MUTEX_TYPE CRITICAL_SECTION
+# define CONDITION_TYPE CONDITION_VARIABLE
#else
+# include <pthread.h>
+# include <signal.h>
# define LOCK(mtx) pthread_mutex_lock(mtx)
# define UNLOCK(mtx) pthread_mutex_unlock(mtx)
# define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx)
@@ -25,15 +31,43 @@
# define THREAD_TYPE void *
# define THREAD_ARG void *
# define THREAD_HANDLE pthread_t
+# define MUTEX_TYPE pthread_mutex_t
+# define CONDITION_TYPE pthread_cond_t
#endif
+typedef struct compress_worker_t compress_worker_t;
+typedef struct thread_pool_processor_t thread_pool_processor_t;
+
struct compress_worker_t {
- sqfs_block_processor_t *shared;
+ thread_pool_processor_t *shared;
sqfs_compressor_t *cmp;
THREAD_HANDLE thread;
sqfs_u8 scratch[];
};
+struct thread_pool_processor_t {
+ sqfs_block_processor_t base;
+
+ MUTEX_TYPE mtx;
+ CONDITION_TYPE queue_cond;
+ CONDITION_TYPE done_cond;
+
+ sqfs_block_t *queue;
+ sqfs_block_t *queue_last;
+
+ sqfs_block_t *done;
+ size_t backlog;
+ int status;
+
+ sqfs_u32 enqueue_id;
+ sqfs_u32 dequeue_id;
+
+ unsigned int num_workers;
+ size_t max_backlog;
+
+ compress_worker_t *workers[];
+};
+
static void free_blk_list(sqfs_block_t *list)
{
sqfs_block_t *it;
@@ -48,7 +82,7 @@ static void free_blk_list(sqfs_block_t *list)
static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
- sqfs_block_processor_t *shared = worker->shared;
+ thread_pool_processor_t *shared = worker->shared;
sqfs_block_t *it, *prev, *blk = NULL;
int status = 0;
@@ -98,7 +132,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
status = block_processor_do_block(blk, worker->cmp,
worker->scratch,
- shared->max_block_size);
+ shared->base.max_block_size);
}
return THREAD_EXIT_SUCCESS;
@@ -107,7 +141,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
#if defined(_WIN32) || defined(__WINDOWS__)
static void block_processor_destroy(sqfs_object_t *obj)
{
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
+ thread_pool_processor_t *proc = (thread_pool_processor_t *)obj;
unsigned int i;
EnterCriticalSection(&proc->mtx);
@@ -133,8 +167,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
DeleteCriticalSection(&proc->mtx);
free_blk_list(proc->queue);
free_blk_list(proc->done);
- free(proc->blk_current);
- free(proc->frag_block);
+ free(proc->base.blk_current);
+ free(proc->base.frag_block);
free(proc);
}
@@ -145,7 +179,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_block_writer_t *wr,
sqfs_frag_table_t *tbl)
{
- sqfs_block_processor_t *proc;
+ thread_pool_processor_t *proc;
unsigned int i;
if (num_workers < 1)
@@ -156,13 +190,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
proc->num_workers = num_workers;
proc->max_backlog = max_backlog;
- proc->cmp = cmp;
- proc->frag_tbl = tbl;
- proc->wr = wr;
- proc->stats.size = sizeof(proc->stats);
+ proc->base.max_block_size = max_block_size;
+ proc->base.cmp = cmp;
+ proc->base.frag_tbl = tbl;
+ proc->base.wr = wr;
+ proc->base.stats.size = sizeof(proc->base.stats);
((sqfs_object_t *)obj)->destroy = block_processor_destroy;
InitializeCriticalSection(&proc->mtx);
@@ -188,7 +222,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
goto fail;
}
- return proc;
+ return (sqfs_block_processor_t *)proc;
fail:
block_processor_destroy(proc);
return NULL;
@@ -196,7 +230,7 @@ fail:
#else
static void block_processor_destroy(sqfs_object_t *obj)
{
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
+ thread_pool_processor_t *proc = (thread_pool_processor_t *)obj;
unsigned int i;
pthread_mutex_lock(&proc->mtx);
@@ -217,8 +251,8 @@ static void block_processor_destroy(sqfs_object_t *obj)
free_blk_list(proc->queue);
free_blk_list(proc->done);
- free(proc->blk_current);
- free(proc->frag_block);
+ free(proc->base.blk_current);
+ free(proc->base.frag_block);
free(proc);
}
@@ -229,7 +263,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_block_writer_t *wr,
sqfs_frag_table_t *tbl)
{
- sqfs_block_processor_t *proc;
+ thread_pool_processor_t *proc;
sigset_t set, oldset;
unsigned int i;
int ret;
@@ -246,13 +280,13 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->max_block_size = max_block_size;
proc->num_workers = num_workers;
proc->max_backlog = max_backlog;
- proc->cmp = cmp;
- proc->frag_tbl = tbl;
- proc->wr = wr;
- proc->stats.size = sizeof(proc->stats);
+ proc->base.max_block_size = max_block_size;
+ proc->base.cmp = cmp;
+ proc->base.frag_tbl = tbl;
+ proc->base.wr = wr;
+ proc->base.stats.size = sizeof(proc->base.stats);
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
@@ -281,7 +315,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
- return proc;
+ return (sqfs_block_processor_t *)proc;
fail_thread:
pthread_mutex_lock(&proc->mtx);
proc->status = -1;
@@ -311,7 +345,7 @@ fail_init:
}
#endif
-static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)
+static sqfs_block_t *try_dequeue(thread_pool_processor_t *proc)
{
sqfs_block_t *queue, *it, *prev;
@@ -357,7 +391,7 @@ static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
return head;
}
-static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
+static int process_done_queue(thread_pool_processor_t *proc, sqfs_block_t *queue)
{
sqfs_block_t *it, *block = NULL;
int status = 0;
@@ -369,7 +403,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
if (it->flags & SQFS_BLK_IS_FRAGMENT) {
block = NULL;
- status = process_completed_fragment(proc, it, &block);
+ status = process_completed_fragment(&proc->base, it, &block);
if (block != NULL && status == 0) {
LOCK(&proc->mtx);
@@ -394,7 +428,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
free(block);
}
} else {
- status = process_completed_block(proc, it);
+ status = process_completed_block(&proc->base, it);
}
free(it);
@@ -404,7 +438,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
return status;
}
-static int wait_completed(sqfs_block_processor_t *proc)
+static int wait_completed(thread_pool_processor_t *proc)
{
sqfs_block_t *queue;
int status;
@@ -443,50 +477,52 @@ static int wait_completed(sqfs_block_processor_t *proc)
int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
int status;
- while (proc->backlog > proc->max_backlog) {
- status = wait_completed(proc);
+ while (thproc->backlog > thproc->max_backlog) {
+ status = wait_completed(thproc);
if (status)
return status;
}
- LOCK(&proc->mtx);
- status = proc->status;
+ LOCK(&thproc->mtx);
+ status = thproc->status;
if (status != 0)
goto out;
if (block != NULL) {
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
+ if (thproc->queue_last == NULL) {
+ thproc->queue = thproc->queue_last = block;
} else {
- proc->queue_last->next = block;
- proc->queue_last = block;
+ thproc->queue_last->next = block;
+ thproc->queue_last = block;
}
- block->sequence_number = proc->enqueue_id++;
+ block->sequence_number = thproc->enqueue_id++;
block->next = NULL;
- proc->backlog += 1;
+ thproc->backlog += 1;
block = NULL;
}
out:
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
free(block);
return 0;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
int status;
- LOCK(&proc->mtx);
- status = proc->status;
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
+ LOCK(&thproc->mtx);
+ status = thproc->status;
+ SIGNAL_ALL(&thproc->queue_cond);
+ UNLOCK(&thproc->mtx);
- while (status == 0 && proc->backlog > 0)
- status = wait_completed(proc);
+ while (status == 0 && thproc->backlog > 0)
+ status = wait_completed(thproc);
if (status == 0 && proc->frag_block != NULL) {
status = append_to_work_queue(proc, proc->frag_block);
@@ -495,7 +531,7 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (status)
return status;
- status = wait_completed(proc);
+ status = wait_completed(thproc);
}
return status;