aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/data_writer
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-13 02:27:17 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-13 02:27:17 +0100
commit2d2772341fa65e3d412e76c6e9d4a8815756c0ec (patch)
tree8bee446efe0fb6e4939a2bf4cf865b6897e50e1a /lib/sqfs/data_writer
parent514e9e500abdd8ea91ea3b2fca214587ee24a342 (diff)
Merge windows and pthread thread pool implementations
Since they are both structured the same way using condition variables, they are only a few defines away from removing code duplication. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/data_writer')
-rw-r--r--lib/sqfs/data_writer/windows.c294
-rw-r--r--lib/sqfs/data_writer/winpthread.c (renamed from lib/sqfs/data_writer/pthread.c)153
2 files changed, 128 insertions, 319 deletions
diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c
deleted file mode 100644
index d790f79..0000000
--- a/lib/sqfs/data_writer/windows.c
+++ /dev/null
@@ -1,294 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * windows.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-struct compress_worker_t {
- sqfs_data_writer_t *shared;
- sqfs_compressor_t *cmp;
- HANDLE thread;
- sqfs_u8 scratch[];
-};
-
-static DWORD WINAPI worker_proc(LPVOID arg)
-{
- compress_worker_t *worker = arg;
- sqfs_data_writer_t *shared = worker->shared;
- sqfs_block_t *blk = NULL;
- int status = 0;
-
- for (;;) {
- EnterCriticalSection(&shared->mtx);
- if (blk != NULL) {
- data_writer_store_done(shared, blk, status);
- WakeConditionVariable(&shared->done_cond);
- }
-
- while (shared->queue == NULL && shared->status == 0) {
- SleepConditionVariableCS(&shared->queue_cond,
- &shared->mtx, INFINITE);
- }
-
- blk = data_writer_next_work_item(shared);
- LeaveCriticalSection(&shared->mtx);
-
- if (blk == NULL)
- break;
-
- status = data_writer_do_block(blk, worker->cmp,
- worker->scratch,
- shared->max_block_size);
- }
-
- return 0;
-}
-
-sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- size_t devblksz,
- sqfs_file_t *file)
-{
- sqfs_data_writer_t *proc;
- unsigned int i;
-
- if (num_workers < 1)
- num_workers = 1;
-
- proc = alloc_flex(sizeof(*proc),
- sizeof(proc->workers[0]), num_workers);
- if (proc == NULL)
- return NULL;
-
- InitializeCriticalSection(&proc->mtx);
- InitializeConditionVariable(&proc->queue_cond);
- InitializeConditionVariable(&proc->done_cond);
-
- if (data_writer_init(proc, max_block_size, cmp, num_workers,
- max_backlog, devblksz, file)) {
- goto fail;
- }
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
-
- if (proc->workers[i] == NULL)
- goto fail;
-
- proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = cmp->create_copy(cmp);
-
- if (proc->workers[i]->cmp == NULL)
- goto fail;
-
- proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
- proc->workers[i], 0, 0);
- if (proc->workers[i]->thread == NULL)
- goto fail;
- }
-
- return proc;
-fail:
- sqfs_data_writer_destroy(proc);
- return NULL;
-}
-
-void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
-{
- unsigned int i;
-
- EnterCriticalSection(&proc->mtx);
- proc->status = -1;
- WakeAllConditionVariable(&proc->queue_cond);
- LeaveCriticalSection(&proc->mtx);
-
- for (i = 0; i < proc->num_workers; ++i) {
- if (proc->workers[i] == NULL)
- continue;
-
- if (proc->workers[i]->thread != NULL) {
- WaitForSingleObject(proc->workers[i]->thread, INFINITE);
- CloseHandle(proc->workers[i]->thread);
- }
-
- if (proc->workers[i]->cmp != NULL)
- proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
-
- free(proc->workers[i]);
- }
-
- DeleteCriticalSection(&proc->mtx);
- data_writer_cleanup(proc);
-}
-
-int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
- bool signal_threads)
-{
- int status;
-
- EnterCriticalSection(&proc->mtx);
- status = proc->status;
-
- if (status != 0) {
- free(block);
- LeaveCriticalSection(&proc->mtx);
- return status;
- }
-
- if (block != NULL) {
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
- proc->backlog += 1;
- }
-
- if (signal_threads)
- WakeAllConditionVariable(&proc->queue_cond);
-
- LeaveCriticalSection(&proc->mtx);
- return 0;
-}
-
-static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
-{
- sqfs_block_t *queue, *it, *prev;
-
- it = proc->done;
- prev = NULL;
-
- while (it != NULL && it->sequence_number == proc->dequeue_id) {
- prev = it;
- it = it->next;
- proc->dequeue_id += 1;
- }
-
- if (prev == NULL) {
- queue = NULL;
- } else {
- queue = proc->done;
- prev->next = NULL;
- proc->done = it;
- }
-
- return queue;
-}
-
-static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
-{
- sqfs_block_t *it, *head = NULL, **next_ptr = &head;
-
- while (lhs != NULL && rhs != NULL) {
- if (lhs->sequence_number <= rhs->sequence_number) {
- it = lhs;
- lhs = lhs->next;
- } else {
- it = rhs;
- rhs = rhs->next;
- }
-
- *next_ptr = it;
- next_ptr = &it->next;
- }
-
- it = (lhs != NULL ? lhs : rhs);
- *next_ptr = it;
- return head;
-}
-
-static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
-{
- sqfs_block_t *it, *block = NULL;
- int status = 0;
-
- while (queue != NULL && status == 0) {
- it = queue;
- queue = it->next;
-
- if (it->flags & SQFS_BLK_IS_FRAGMENT) {
- block = NULL;
- status = process_completed_fragment(proc, it, &block);
-
- if (block != NULL && status == 0) {
- EnterCriticalSection(&proc->mtx);
- proc->dequeue_id = it->sequence_number;
- block->sequence_number = it->sequence_number;
-
- if (proc->queue == NULL) {
- proc->queue = block;
- proc->queue_last = block;
- } else {
- block->next = proc->queue;
- proc->queue = block;
- }
-
- proc->backlog += 1;
- proc->done = queue_merge(queue, proc->done);
- WakeAllConditionVariable(&proc->queue_cond);
- LeaveCriticalSection(&proc->mtx);
-
- queue = NULL;
- } else {
- free(block);
- }
- } else {
- status = process_completed_block(proc, it);
- }
-
- free(it);
- }
-
- free_blk_list(queue);
- return status;
-}
-
-int test_and_set_status(sqfs_data_writer_t *proc, int status)
-{
- EnterCriticalSection(&proc->mtx);
- if (proc->status == 0) {
- proc->status = status;
- } else {
- status = proc->status;
- }
- WakeAllConditionVariable(&proc->queue_cond);
- LeaveCriticalSection(&proc->mtx);
- return status;
-}
-
-int wait_completed(sqfs_data_writer_t *proc)
-{
- sqfs_block_t *queue;
- int status;
-
- EnterCriticalSection(&proc->mtx);
- for (;;) {
- queue = try_dequeue(proc);
- status = proc->status;
-
- if (queue != NULL || status != 0)
- break;
-
- SleepConditionVariableCS(&proc->done_cond, &proc->mtx,
- INFINITE);
- }
- LeaveCriticalSection(&proc->mtx);
-
- if (status != 0) {
- free_blk_list(queue);
- return status;
- }
-
- status = process_done_queue(proc, queue);
- return status ? test_and_set_status(proc, status) : status;
-}
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/winpthread.c
index 8196641..bba5894 100644
--- a/lib/sqfs/data_writer/pthread.c
+++ b/lib/sqfs/data_writer/winpthread.c
@@ -1,20 +1,40 @@
/* SPDX-License-Identifier: LGPL-3.0-or-later */
/*
- * pthread.c
+ * winpthread.c
*
* Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
*/
#define SQFS_BUILDING_DLL
#include "internal.h"
+#if defined(_WIN32) || defined(__WINDOWS__)
+# define LOCK(mtx) EnterCriticalSection(mtx)
+# define UNLOCK(mtx) LeaveCriticalSection(mtx)
+# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
+# define SIGNAL_ALL(cond) WakeAllConditionVariable(cond)
+# define THREAD_EXIT_SUCCESS 0
+# define THREAD_TYPE DWORD WINAPI
+# define THREAD_ARG LPVOID
+# define THREAD_HANDLE HANDLE
+#else
+# define LOCK(mtx) pthread_mutex_lock(mtx)
+# define UNLOCK(mtx) pthread_mutex_unlock(mtx)
+# define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx)
+# define SIGNAL_ALL(cond) pthread_cond_broadcast(cond)
+# define THREAD_EXIT_SUCCESS NULL
+# define THREAD_TYPE void *
+# define THREAD_ARG void *
+# define THREAD_HANDLE pthread_t
+#endif
+
struct compress_worker_t {
sqfs_data_writer_t *shared;
sqfs_compressor_t *cmp;
- pthread_t thread;
+ THREAD_HANDLE thread;
sqfs_u8 scratch[];
};
-static void *worker_proc(void *arg)
+static THREAD_TYPE worker_proc(THREAD_ARG arg)
{
compress_worker_t *worker = arg;
sqfs_data_writer_t *shared = worker->shared;
@@ -22,17 +42,17 @@ static void *worker_proc(void *arg)
int status = 0;
for (;;) {
- pthread_mutex_lock(&shared->mtx);
+ LOCK(&shared->mtx);
if (blk != NULL) {
data_writer_store_done(shared, blk, status);
- pthread_cond_broadcast(&shared->done_cond);
+ SIGNAL_ALL(&shared->done_cond);
}
while (shared->queue == NULL && shared->status == 0)
- pthread_cond_wait(&shared->queue_cond, &shared->mtx);
+ AWAIT(&shared->queue_cond, &shared->mtx);
blk = data_writer_next_work_item(shared);
- pthread_mutex_unlock(&shared->mtx);
+ UNLOCK(&shared->mtx);
if (blk == NULL)
break;
@@ -41,9 +61,91 @@ static void *worker_proc(void *arg)
worker->scratch,
shared->max_block_size);
}
+
+ return THREAD_EXIT_SUCCESS;
+}
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ size_t devblksz,
+ sqfs_file_t *file)
+{
+ sqfs_data_writer_t *proc;
+ unsigned int i;
+
+ if (num_workers < 1)
+ num_workers = 1;
+
+ proc = alloc_flex(sizeof(*proc),
+ sizeof(proc->workers[0]), num_workers);
+ if (proc == NULL)
+ return NULL;
+
+ InitializeCriticalSection(&proc->mtx);
+ InitializeConditionVariable(&proc->queue_cond);
+ InitializeConditionVariable(&proc->done_cond);
+
+ if (data_writer_init(proc, max_block_size, cmp, num_workers,
+ max_backlog, devblksz, file)) {
+ goto fail;
+ }
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+ 1, max_block_size);
+
+ if (proc->workers[i] == NULL)
+ goto fail;
+
+ proc->workers[i]->shared = proc;
+ proc->workers[i]->cmp = cmp->create_copy(cmp);
+
+ if (proc->workers[i]->cmp == NULL)
+ goto fail;
+
+ proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
+ proc->workers[i], 0, 0);
+ if (proc->workers[i]->thread == NULL)
+ goto fail;
+ }
+
+ return proc;
+fail:
+ sqfs_data_writer_destroy(proc);
return NULL;
}
+void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
+{
+ unsigned int i;
+
+ EnterCriticalSection(&proc->mtx);
+ proc->status = -1;
+ WakeAllConditionVariable(&proc->queue_cond);
+ LeaveCriticalSection(&proc->mtx);
+
+ for (i = 0; i < proc->num_workers; ++i) {
+ if (proc->workers[i] == NULL)
+ continue;
+
+ if (proc->workers[i]->thread != NULL) {
+ WaitForSingleObject(proc->workers[i]->thread, INFINITE);
+ CloseHandle(proc->workers[i]->thread);
+ }
+
+ if (proc->workers[i]->cmp != NULL)
+ proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
+
+ free(proc->workers[i]);
+ }
+
+ DeleteCriticalSection(&proc->mtx);
+ data_writer_cleanup(proc);
+}
+#else
sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
@@ -153,19 +255,17 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
data_writer_cleanup(proc);
}
+#endif
int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
bool signal_threads)
{
int status;
- pthread_mutex_lock(&proc->mtx);
+ LOCK(&proc->mtx);
status = proc->status;
- if (status != 0) {
- free(block);
- pthread_mutex_unlock(&proc->mtx);
- return status;
- }
+ if (status != 0)
+ goto out;
if (block != NULL) {
if (proc->queue_last == NULL) {
@@ -178,11 +278,14 @@ int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
block->sequence_number = proc->enqueue_id++;
block->next = NULL;
proc->backlog += 1;
+ block = NULL;
}
-
+out:
if (signal_threads)
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
+ SIGNAL_ALL(&proc->queue_cond);
+
+ UNLOCK(&proc->mtx);
+ free(block);
return 0;
}
@@ -247,7 +350,7 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
status = process_completed_fragment(proc, it, &block);
if (block != NULL && status == 0) {
- pthread_mutex_lock(&proc->mtx);
+ LOCK(&proc->mtx);
proc->dequeue_id = it->sequence_number;
block->sequence_number = it->sequence_number;
@@ -261,8 +364,8 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
proc->backlog += 1;
proc->done = queue_merge(queue, proc->done);
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
+ SIGNAL_ALL(&proc->queue_cond);
+ UNLOCK(&proc->mtx);
queue = NULL;
} else {
@@ -281,14 +384,14 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
int test_and_set_status(sqfs_data_writer_t *proc, int status)
{
- pthread_mutex_lock(&proc->mtx);
+ LOCK(&proc->mtx);
if (proc->status == 0) {
proc->status = status;
} else {
status = proc->status;
}
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
+ SIGNAL_ALL(&proc->queue_cond);
+ UNLOCK(&proc->mtx);
return status;
}
@@ -297,7 +400,7 @@ int wait_completed(sqfs_data_writer_t *proc)
sqfs_block_t *queue;
int status;
- pthread_mutex_lock(&proc->mtx);
+ LOCK(&proc->mtx);
for (;;) {
queue = try_dequeue(proc);
status = proc->status;
@@ -305,9 +408,9 @@ int wait_completed(sqfs_data_writer_t *proc)
if (queue != NULL || status != 0)
break;
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
+ AWAIT(&proc->done_cond, &proc->mtx);
}
- pthread_mutex_unlock(&proc->mtx);
+ UNLOCK(&proc->mtx);
if (status != 0) {
free_blk_list(queue);