From 2d2772341fa65e3d412e76c6e9d4a8815756c0ec Mon Sep 17 00:00:00 2001
From: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Date: Fri, 13 Dec 2019 02:27:17 +0100
Subject: Merge windows and pthread thread pool implementations

Since they are both structured the same way using condition variables,
they are only a few defines away from removing code duplication.

Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
---
 lib/sqfs/Makemodule.am            |   4 +-
 lib/sqfs/data_writer/pthread.c    | 319 ----------------------------
 lib/sqfs/data_writer/windows.c    | 294 --------------------------
 lib/sqfs/data_writer/winpthread.c | 422 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 424 insertions(+), 615 deletions(-)
 delete mode 100644 lib/sqfs/data_writer/pthread.c
 delete mode 100644 lib/sqfs/data_writer/windows.c
 create mode 100644 lib/sqfs/data_writer/winpthread.c

(limited to 'lib/sqfs')

diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am
index 228d3e9..2bc0fa3 100644
--- a/lib/sqfs/Makemodule.am
+++ b/lib/sqfs/Makemodule.am
@@ -43,11 +43,11 @@ libsquashfs_la_SOURCES += lib/sqfs/unix/io_file.c
 endif
 
 if HAVE_PTHREAD
-libsquashfs_la_SOURCES += lib/sqfs/data_writer/pthread.c
+libsquashfs_la_SOURCES += lib/sqfs/data_writer/winpthread.c
 libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD
 else
 if WINDOWS
-libsquashfs_la_SOURCES += lib/sqfs/data_writer/windows.c
+libsquashfs_la_SOURCES += lib/sqfs/data_writer/winpthread.c
 else
 libsquashfs_la_SOURCES += lib/sqfs/data_writer/serial.c
 endif
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c
deleted file mode 100644
index 8196641..0000000
--- a/lib/sqfs/data_writer/pthread.c
+++ /dev/null
@@ -1,319 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * pthread.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-struct compress_worker_t {
-	sqfs_data_writer_t *shared;
-	sqfs_compressor_t *cmp;
-	pthread_t thread;
-	sqfs_u8 scratch[];
-};
-
-static void *worker_proc(void *arg)
-{
-	compress_worker_t *worker = arg;
-	sqfs_data_writer_t *shared = worker->shared;
-	sqfs_block_t *blk = NULL;
-	int status = 0;
-
-	for (;;) {
-		pthread_mutex_lock(&shared->mtx);
-		if (blk != NULL) {
-			data_writer_store_done(shared, blk, status);
-			pthread_cond_broadcast(&shared->done_cond);
-		}
-
-		while (shared->queue == NULL && shared->status == 0)
-			pthread_cond_wait(&shared->queue_cond, &shared->mtx);
-
-		blk = data_writer_next_work_item(shared);
-		pthread_mutex_unlock(&shared->mtx);
-
-		if (blk == NULL)
-			break;
-
-		status = data_writer_do_block(blk, worker->cmp,
-					      worker->scratch,
-					      shared->max_block_size);
-	}
-	return NULL;
-}
-
-sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
-					    sqfs_compressor_t *cmp,
-					    unsigned int num_workers,
-					    size_t max_backlog,
-					    size_t devblksz,
-					    sqfs_file_t *file)
-{
-	sqfs_data_writer_t *proc;
-	sigset_t set, oldset;
-	unsigned int i;
-	int ret;
-
-	if (num_workers < 1)
-		num_workers = 1;
-
-	proc = alloc_flex(sizeof(*proc),
-			  sizeof(proc->workers[0]), num_workers);
-	if (proc == NULL)
-		return NULL;
-
-	proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
-	proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-	proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-
-	if (data_writer_init(proc, max_block_size, cmp, num_workers,
-			     max_backlog, devblksz, file)) {
-		goto fail_init;
-	}
-
-	for (i = 0; i < num_workers; ++i) {
-		proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
-					      1, max_block_size);
-
-		if (proc->workers[i] == NULL)
-			goto fail_init;
-
-		proc->workers[i]->shared = proc;
-		proc->workers[i]->cmp = cmp->create_copy(cmp);
-
-		if (proc->workers[i]->cmp == NULL)
-			goto fail_init;
-	}
-
-	sigfillset(&set);
-	pthread_sigmask(SIG_SETMASK, &set, &oldset);
-
-	for (i = 0; i < num_workers; ++i) {
-		ret = pthread_create(&proc->workers[i]->thread, NULL,
-				     worker_proc, proc->workers[i]);
-
-		if (ret != 0)
-			goto fail_thread;
-	}
-
-	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-
-	return proc;
-fail_thread:
-	pthread_mutex_lock(&proc->mtx);
-	proc->status = -1;
-	pthread_cond_broadcast(&proc->queue_cond);
-	pthread_mutex_unlock(&proc->mtx);
-
-	for (i = 0; i < num_workers; ++i) {
-		if (proc->workers[i]->thread > 0) {
-			pthread_join(proc->workers[i]->thread, NULL);
-		}
-	}
-	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-fail_init:
-	for (i = 0; i < num_workers; ++i) {
-		if (proc->workers[i] != NULL) {
-			if (proc->workers[i]->cmp != NULL) {
-				proc->workers[i]->cmp->
-					destroy(proc->workers[i]->cmp);
-			}
-
-			free(proc->workers[i]);
-		}
-	}
-	pthread_cond_destroy(&proc->done_cond);
-	pthread_cond_destroy(&proc->queue_cond);
-	pthread_mutex_destroy(&proc->mtx);
-	data_writer_cleanup(proc);
-	return NULL;
-}
-
-void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
-{
-	unsigned int i;
-
-	pthread_mutex_lock(&proc->mtx);
-	proc->status = -1;
-	pthread_cond_broadcast(&proc->queue_cond);
-	pthread_mutex_unlock(&proc->mtx);
-
-	for (i = 0; i < proc->num_workers; ++i) {
-		pthread_join(proc->workers[i]->thread, NULL);
-
-		proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
-		free(proc->workers[i]);
-	}
-
-	pthread_cond_destroy(&proc->done_cond);
-	pthread_cond_destroy(&proc->queue_cond);
-	pthread_mutex_destroy(&proc->mtx);
-
-	data_writer_cleanup(proc);
-}
-
-int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
-			 bool signal_threads)
-{
-	int status;
-
-	pthread_mutex_lock(&proc->mtx);
-	status = proc->status;
-	if (status != 0) {
-		free(block);
-		pthread_mutex_unlock(&proc->mtx);
-		return status;
-	}
-
-	if (block != NULL) {
-		if (proc->queue_last == NULL) {
-			proc->queue = proc->queue_last = block;
-		} else {
-			proc->queue_last->next = block;
-			proc->queue_last = block;
-		}
-
-		block->sequence_number = proc->enqueue_id++;
-		block->next = NULL;
-		proc->backlog += 1;
-	}
-
-	if (signal_threads)
-		pthread_cond_broadcast(&proc->queue_cond);
-	pthread_mutex_unlock(&proc->mtx);
-	return 0;
-}
-
-static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
-{
-	sqfs_block_t *queue, *it, *prev;
-
-	it = proc->done;
-	prev = NULL;
-
-	while (it != NULL && it->sequence_number == proc->dequeue_id) {
-		prev = it;
-		it = it->next;
-		proc->dequeue_id += 1;
-	}
-
-	if (prev == NULL) {
-		queue = NULL;
-	} else {
-		queue = proc->done;
-		prev->next = NULL;
-		proc->done = it;
-	}
-
-	return queue;
-}
-
-static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
-{
-	sqfs_block_t *it, *head = NULL, **next_ptr = &head;
-
-	while (lhs != NULL && rhs != NULL) {
-		if (lhs->sequence_number <= rhs->sequence_number) {
-			it = lhs;
-			lhs = lhs->next;
-		} else {
-			it = rhs;
-			rhs = rhs->next;
-		}
-
-		*next_ptr = it;
-		next_ptr = &it->next;
-	}
-
-	it = (lhs != NULL ? lhs : rhs);
-	*next_ptr = it;
-	return head;
-}
-
-static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
-{
-	sqfs_block_t *it, *block = NULL;
-	int status = 0;
-
-	while (queue != NULL && status == 0) {
-		it = queue;
-		queue = it->next;
-		proc->backlog -= 1;
-
-		if (it->flags & SQFS_BLK_IS_FRAGMENT) {
-			block = NULL;
-			status = process_completed_fragment(proc, it, &block);
-
-			if (block != NULL && status == 0) {
-				pthread_mutex_lock(&proc->mtx);
-				proc->dequeue_id = it->sequence_number;
-				block->sequence_number = it->sequence_number;
-
-				if (proc->queue == NULL) {
-					proc->queue = block;
-					proc->queue_last = block;
-				} else {
-					block->next = proc->queue;
-					proc->queue = block;
-				}
-
-				proc->backlog += 1;
-				proc->done = queue_merge(queue, proc->done);
-				pthread_cond_broadcast(&proc->queue_cond);
-				pthread_mutex_unlock(&proc->mtx);
-
-				queue = NULL;
-			} else {
-				free(block);
-			}
-		} else {
-			status = process_completed_block(proc, it);
-		}
-
-		free(it);
-	}
-
-	free_blk_list(queue);
-	return status;
-}
-
-int test_and_set_status(sqfs_data_writer_t *proc, int status)
-{
-	pthread_mutex_lock(&proc->mtx);
-	if (proc->status == 0) {
-		proc->status = status;
-	} else {
-		status = proc->status;
-	}
-	pthread_cond_broadcast(&proc->queue_cond);
-	pthread_mutex_unlock(&proc->mtx);
-	return status;
-}
-
-int wait_completed(sqfs_data_writer_t *proc)
-{
-	sqfs_block_t *queue;
-	int status;
-
-	pthread_mutex_lock(&proc->mtx);
-	for (;;) {
-		queue = try_dequeue(proc);
-		status = proc->status;
-
-		if (queue != NULL || status != 0)
-			break;
-
-		pthread_cond_wait(&proc->done_cond, &proc->mtx);
-	}
-	pthread_mutex_unlock(&proc->mtx);
-
-	if (status != 0) {
-		free_blk_list(queue);
-		return status;
-	}
-
-	status = process_done_queue(proc, queue);
-	return status ? test_and_set_status(proc, status) : status;
-}
diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c
deleted file mode 100644
index d790f79..0000000
--- a/lib/sqfs/data_writer/windows.c
+++ /dev/null
@@ -1,294 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * windows.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-struct compress_worker_t {
-	sqfs_data_writer_t *shared;
-	sqfs_compressor_t *cmp;
-	HANDLE thread;
-	sqfs_u8 scratch[];
-};
-
-static DWORD WINAPI worker_proc(LPVOID arg)
-{
-	compress_worker_t *worker = arg;
-	sqfs_data_writer_t *shared = worker->shared;
-	sqfs_block_t *blk = NULL;
-	int status = 0;
-
-	for (;;) {
-		EnterCriticalSection(&shared->mtx);
-		if (blk != NULL) {
-			data_writer_store_done(shared, blk, status);
-			WakeConditionVariable(&shared->done_cond);
-		}
-
-		while (shared->queue == NULL && shared->status == 0) {
-			SleepConditionVariableCS(&shared->queue_cond,
-						 &shared->mtx, INFINITE);
-		}
-
-		blk = data_writer_next_work_item(shared);
-		LeaveCriticalSection(&shared->mtx);
-
-		if (blk == NULL)
-			break;
-
-		status = data_writer_do_block(blk, worker->cmp,
-					      worker->scratch,
-					      shared->max_block_size);
-	}
-
-	return 0;
-}
-
-sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
-					    sqfs_compressor_t *cmp,
-					    unsigned int num_workers,
-					    size_t max_backlog,
-					    size_t devblksz,
-					    sqfs_file_t *file)
-{
-	sqfs_data_writer_t *proc;
-	unsigned int i;
-
-	if (num_workers < 1)
-		num_workers = 1;
-
-	proc = alloc_flex(sizeof(*proc),
-			  sizeof(proc->workers[0]), num_workers);
-	if (proc == NULL)
-		return NULL;
-
-	InitializeCriticalSection(&proc->mtx);
-	InitializeConditionVariable(&proc->queue_cond);
-	InitializeConditionVariable(&proc->done_cond);
-
-	if (data_writer_init(proc, max_block_size, cmp, num_workers,
-			     max_backlog, devblksz, file)) {
-		goto fail;
-	}
-
-	for (i = 0; i < num_workers; ++i) {
-		proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
-					      1, max_block_size);
-
-		if (proc->workers[i] == NULL)
-			goto fail;
-
-		proc->workers[i]->shared = proc;
-		proc->workers[i]->cmp = cmp->create_copy(cmp);
-
-		if (proc->workers[i]->cmp == NULL)
-			goto fail;
-
-		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
-							proc->workers[i], 0, 0);
-		if (proc->workers[i]->thread == NULL)
-			goto fail;
-	}
-
-	return proc;
-fail:
-	sqfs_data_writer_destroy(proc);
-	return NULL;
-}
-
-void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
-{
-	unsigned int i;
-
-	EnterCriticalSection(&proc->mtx);
-	proc->status = -1;
-	WakeAllConditionVariable(&proc->queue_cond);
-	LeaveCriticalSection(&proc->mtx);
-
-	for (i = 0; i < proc->num_workers; ++i) {
-		if (proc->workers[i] == NULL)
-			continue;
-
-		if (proc->workers[i]->thread != NULL) {
-			WaitForSingleObject(proc->workers[i]->thread, INFINITE);
-			CloseHandle(proc->workers[i]->thread);
-		}
-
-		if (proc->workers[i]->cmp != NULL)
-			proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
-
-		free(proc->workers[i]);
-	}
-
-	DeleteCriticalSection(&proc->mtx);
-	data_writer_cleanup(proc);
-}
-
-int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
-			 bool signal_threads)
-{
-	int status;
-
-	EnterCriticalSection(&proc->mtx);
-	status = proc->status;
-
-	if (status != 0) {
-		free(block);
-		LeaveCriticalSection(&proc->mtx);
-		return status;
-	}
-
-	if (block != NULL) {
-		if (proc->queue_last == NULL) {
-			proc->queue = proc->queue_last = block;
-		} else {
-			proc->queue_last->next = block;
-			proc->queue_last = block;
-		}
-
-		block->sequence_number = proc->enqueue_id++;
-		block->next = NULL;
-		proc->backlog += 1;
-	}
-
-	if (signal_threads)
-		WakeAllConditionVariable(&proc->queue_cond);
-
-	LeaveCriticalSection(&proc->mtx);
-	return 0;
-}
-
-static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
-{
-	sqfs_block_t *queue, *it, *prev;
-
-	it = proc->done;
-	prev = NULL;
-
-	while (it != NULL && it->sequence_number == proc->dequeue_id) {
-		prev = it;
-		it = it->next;
-		proc->dequeue_id += 1;
-	}
-
-	if (prev == NULL) {
-		queue = NULL;
-	} else {
-		queue = proc->done;
-		prev->next = NULL;
-		proc->done = it;
-	}
-
-	return queue;
-}
-
-static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
-{
-	sqfs_block_t *it, *head = NULL, **next_ptr = &head;
-
-	while (lhs != NULL && rhs != NULL) {
-		if (lhs->sequence_number <= rhs->sequence_number) {
-			it = lhs;
-			lhs = lhs->next;
-		} else {
-			it = rhs;
-			rhs = rhs->next;
-		}
-
-		*next_ptr = it;
-		next_ptr = &it->next;
-	}
-
-	it = (lhs != NULL ? lhs : rhs);
-	*next_ptr = it;
-	return head;
-}
-
-static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
-{
-	sqfs_block_t *it, *block = NULL;
-	int status = 0;
-
-	while (queue != NULL && status == 0) {
-		it = queue;
-		queue = it->next;
-
-		if (it->flags & SQFS_BLK_IS_FRAGMENT) {
-			block = NULL;
-			status = process_completed_fragment(proc, it, &block);
-
-			if (block != NULL && status == 0) {
-				EnterCriticalSection(&proc->mtx);
-				proc->dequeue_id = it->sequence_number;
-				block->sequence_number = it->sequence_number;
-
-				if (proc->queue == NULL) {
-					proc->queue = block;
-					proc->queue_last = block;
-				} else {
-					block->next = proc->queue;
-					proc->queue = block;
-				}
-
-				proc->backlog += 1;
-				proc->done = queue_merge(queue, proc->done);
-				WakeAllConditionVariable(&proc->queue_cond);
-				LeaveCriticalSection(&proc->mtx);
-
-				queue = NULL;
-			} else {
-				free(block);
-			}
-		} else {
-			status = process_completed_block(proc, it);
-		}
-
-		free(it);
-	}
-
-	free_blk_list(queue);
-	return status;
-}
-
-int test_and_set_status(sqfs_data_writer_t *proc, int status)
-{
-	EnterCriticalSection(&proc->mtx);
-	if (proc->status == 0) {
-		proc->status = status;
-	} else {
-		status = proc->status;
-	}
-	WakeAllConditionVariable(&proc->queue_cond);
-	LeaveCriticalSection(&proc->mtx);
-	return status;
-}
-
-int wait_completed(sqfs_data_writer_t *proc)
-{
-	sqfs_block_t *queue;
-	int status;
-
-	EnterCriticalSection(&proc->mtx);
-	for (;;) {
-		queue = try_dequeue(proc);
-		status = proc->status;
-
-		if (queue != NULL || status != 0)
-			break;
-
-		SleepConditionVariableCS(&proc->done_cond, &proc->mtx,
-					 INFINITE);
-	}
-	LeaveCriticalSection(&proc->mtx);
-
-	if (status != 0) {
-		free_blk_list(queue);
-		return status;
-	}
-
-	status = process_done_queue(proc, queue);
-	return status ? test_and_set_status(proc, status) : status;
-}
diff --git a/lib/sqfs/data_writer/winpthread.c b/lib/sqfs/data_writer/winpthread.c
new file mode 100644
index 0000000..bba5894
--- /dev/null
+++ b/lib/sqfs/data_writer/winpthread.c
@@ -0,0 +1,422 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * winpthread.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+#	define LOCK(mtx) EnterCriticalSection(mtx)
+#	define UNLOCK(mtx) LeaveCriticalSection(mtx)
+#	define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
+#	define SIGNAL_ALL(cond) WakeAllConditionVariable(cond)
+#	define THREAD_EXIT_SUCCESS 0
+#	define THREAD_TYPE DWORD WINAPI
+#	define THREAD_ARG LPVOID
+#	define THREAD_HANDLE HANDLE
+#else
+#	define LOCK(mtx) pthread_mutex_lock(mtx)
+#	define UNLOCK(mtx) pthread_mutex_unlock(mtx)
+#	define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx)
+#	define SIGNAL_ALL(cond) pthread_cond_broadcast(cond)
+#	define THREAD_EXIT_SUCCESS NULL
+#	define THREAD_TYPE void *
+#	define THREAD_ARG void *
+#	define THREAD_HANDLE pthread_t
+#endif
+
+struct compress_worker_t {
+	sqfs_data_writer_t *shared;
+	sqfs_compressor_t *cmp;
+	THREAD_HANDLE thread;
+	sqfs_u8 scratch[];
+};
+
+static THREAD_TYPE worker_proc(THREAD_ARG arg)
+{
+	compress_worker_t *worker = arg;
+	sqfs_data_writer_t *shared = worker->shared;
+	sqfs_block_t *blk = NULL;
+	int status = 0;
+
+	for (;;) {
+		LOCK(&shared->mtx);
+		if (blk != NULL) {
+			data_writer_store_done(shared, blk, status);
+			SIGNAL_ALL(&shared->done_cond);
+		}
+
+		while (shared->queue == NULL && shared->status == 0)
+			AWAIT(&shared->queue_cond, &shared->mtx);
+
+		blk = data_writer_next_work_item(shared);
+		UNLOCK(&shared->mtx);
+
+		if (blk == NULL)
+			break;
+
+		status = data_writer_do_block(blk, worker->cmp,
+					      worker->scratch,
+					      shared->max_block_size);
+	}
+
+	return THREAD_EXIT_SUCCESS;
+}
+
+#if defined(_WIN32) || defined(__WINDOWS__)
+sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
+					    sqfs_compressor_t *cmp,
+					    unsigned int num_workers,
+					    size_t max_backlog,
+					    size_t devblksz,
+					    sqfs_file_t *file)
+{
+	sqfs_data_writer_t *proc;
+	unsigned int i;
+
+	if (num_workers < 1)
+		num_workers = 1;
+
+	proc = alloc_flex(sizeof(*proc),
+			  sizeof(proc->workers[0]), num_workers);
+	if (proc == NULL)
+		return NULL;
+
+	InitializeCriticalSection(&proc->mtx);
+	InitializeConditionVariable(&proc->queue_cond);
+	InitializeConditionVariable(&proc->done_cond);
+
+	if (data_writer_init(proc, max_block_size, cmp, num_workers,
+			     max_backlog, devblksz, file)) {
+		goto fail;
+	}
+
+	for (i = 0; i < num_workers; ++i) {
+		proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+					      1, max_block_size);
+
+		if (proc->workers[i] == NULL)
+			goto fail;
+
+		proc->workers[i]->shared = proc;
+		proc->workers[i]->cmp = cmp->create_copy(cmp);
+
+		if (proc->workers[i]->cmp == NULL)
+			goto fail;
+
+		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc,
+							proc->workers[i], 0, 0);
+		if (proc->workers[i]->thread == NULL)
+			goto fail;
+	}
+
+	return proc;
+fail:
+	sqfs_data_writer_destroy(proc);
+	return NULL;
+}
+
+void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
+{
+	unsigned int i;
+
+	EnterCriticalSection(&proc->mtx);
+	proc->status = -1;
+	WakeAllConditionVariable(&proc->queue_cond);
+	LeaveCriticalSection(&proc->mtx);
+
+	for (i = 0; i < proc->num_workers; ++i) {
+		if (proc->workers[i] == NULL)
+			continue;
+
+		if (proc->workers[i]->thread != NULL) {
+			WaitForSingleObject(proc->workers[i]->thread, INFINITE);
+			CloseHandle(proc->workers[i]->thread);
+		}
+
+		if (proc->workers[i]->cmp != NULL)
+			proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
+
+		free(proc->workers[i]);
+	}
+
+	DeleteCriticalSection(&proc->mtx);
+	data_writer_cleanup(proc);
+}
+#else
+sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
+					    sqfs_compressor_t *cmp,
+					    unsigned int num_workers,
+					    size_t max_backlog,
+					    size_t devblksz,
+					    sqfs_file_t *file)
+{
+	sqfs_data_writer_t *proc;
+	sigset_t set, oldset;
+	unsigned int i;
+	int ret;
+
+	if (num_workers < 1)
+		num_workers = 1;
+
+	proc = alloc_flex(sizeof(*proc),
+			  sizeof(proc->workers[0]), num_workers);
+	if (proc == NULL)
+		return NULL;
+
+	proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
+	proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+	proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+
+	if (data_writer_init(proc, max_block_size, cmp, num_workers,
+			     max_backlog, devblksz, file)) {
+		goto fail_init;
+	}
+
+	for (i = 0; i < num_workers; ++i) {
+		proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+					      1, max_block_size);
+
+		if (proc->workers[i] == NULL)
+			goto fail_init;
+
+		proc->workers[i]->shared = proc;
+		proc->workers[i]->cmp = cmp->create_copy(cmp);
+
+		if (proc->workers[i]->cmp == NULL)
+			goto fail_init;
+	}
+
+	sigfillset(&set);
+	pthread_sigmask(SIG_SETMASK, &set, &oldset);
+
+	for (i = 0; i < num_workers; ++i) {
+		ret = pthread_create(&proc->workers[i]->thread, NULL,
+				     worker_proc, proc->workers[i]);
+
+		if (ret != 0)
+			goto fail_thread;
+	}
+
+	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+
+	return proc;
+fail_thread:
+	pthread_mutex_lock(&proc->mtx);
+	proc->status = -1;
+	pthread_cond_broadcast(&proc->queue_cond);
+	pthread_mutex_unlock(&proc->mtx);
+
+	for (i = 0; i < num_workers; ++i) {
+		if (proc->workers[i]->thread > 0) {
+			pthread_join(proc->workers[i]->thread, NULL);
+		}
+	}
+	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+fail_init:
+	for (i = 0; i < num_workers; ++i) {
+		if (proc->workers[i] != NULL) {
+			if (proc->workers[i]->cmp != NULL) {
+				proc->workers[i]->cmp->
+					destroy(proc->workers[i]->cmp);
+			}
+
+			free(proc->workers[i]);
+		}
+	}
+	pthread_cond_destroy(&proc->done_cond);
+	pthread_cond_destroy(&proc->queue_cond);
+	pthread_mutex_destroy(&proc->mtx);
+	data_writer_cleanup(proc);
+	return NULL;
+}
+
+void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
+{
+	unsigned int i;
+
+	pthread_mutex_lock(&proc->mtx);
+	proc->status = -1;
+	pthread_cond_broadcast(&proc->queue_cond);
+	pthread_mutex_unlock(&proc->mtx);
+
+	for (i = 0; i < proc->num_workers; ++i) {
+		pthread_join(proc->workers[i]->thread, NULL);
+
+		proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
+		free(proc->workers[i]);
+	}
+
+	pthread_cond_destroy(&proc->done_cond);
+	pthread_cond_destroy(&proc->queue_cond);
+	pthread_mutex_destroy(&proc->mtx);
+
+	data_writer_cleanup(proc);
+}
+#endif
+
+int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
+			 bool signal_threads)
+{
+	int status;
+
+	LOCK(&proc->mtx);
+	status = proc->status;
+	if (status != 0)
+		goto out;
+
+	if (block != NULL) {
+		if (proc->queue_last == NULL) {
+			proc->queue = proc->queue_last = block;
+		} else {
+			proc->queue_last->next = block;
+			proc->queue_last = block;
+		}
+
+		block->sequence_number = proc->enqueue_id++;
+		block->next = NULL;
+		proc->backlog += 1;
+		block = NULL;
+	}
+out:
+	if (signal_threads)
+		SIGNAL_ALL(&proc->queue_cond);
+
+	UNLOCK(&proc->mtx);
+	free(block);
+	return 0;
+}
+
+static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
+{
+	sqfs_block_t *queue, *it, *prev;
+
+	it = proc->done;
+	prev = NULL;
+
+	while (it != NULL && it->sequence_number == proc->dequeue_id) {
+		prev = it;
+		it = it->next;
+		proc->dequeue_id += 1;
+	}
+
+	if (prev == NULL) {
+		queue = NULL;
+	} else {
+		queue = proc->done;
+		prev->next = NULL;
+		proc->done = it;
+	}
+
+	return queue;
+}
+
+static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
+{
+	sqfs_block_t *it, *head = NULL, **next_ptr = &head;
+
+	while (lhs != NULL && rhs != NULL) {
+		if (lhs->sequence_number <= rhs->sequence_number) {
+			it = lhs;
+			lhs = lhs->next;
+		} else {
+			it = rhs;
+			rhs = rhs->next;
+		}
+
+		*next_ptr = it;
+		next_ptr = &it->next;
+	}
+
+	it = (lhs != NULL ? lhs : rhs);
+	*next_ptr = it;
+	return head;
+}
+
+static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
+{
+	sqfs_block_t *it, *block = NULL;
+	int status = 0;
+
+	while (queue != NULL && status == 0) {
+		it = queue;
+		queue = it->next;
+		proc->backlog -= 1;
+
+		if (it->flags & SQFS_BLK_IS_FRAGMENT) {
+			block = NULL;
+			status = process_completed_fragment(proc, it, &block);
+
+			if (block != NULL && status == 0) {
+				LOCK(&proc->mtx);
+				proc->dequeue_id = it->sequence_number;
+				block->sequence_number = it->sequence_number;
+
+				if (proc->queue == NULL) {
+					proc->queue = block;
+					proc->queue_last = block;
+				} else {
+					block->next = proc->queue;
+					proc->queue = block;
+				}
+
+				proc->backlog += 1;
+				proc->done = queue_merge(queue, proc->done);
+				SIGNAL_ALL(&proc->queue_cond);
+				UNLOCK(&proc->mtx);
+
+				queue = NULL;
+			} else {
+				free(block);
+			}
+		} else {
+			status = process_completed_block(proc, it);
+		}
+
+		free(it);
+	}
+
+	free_blk_list(queue);
+	return status;
+}
+
+int test_and_set_status(sqfs_data_writer_t *proc, int status)
+{
+	LOCK(&proc->mtx);
+	if (proc->status == 0) {
+		proc->status = status;
+	} else {
+		status = proc->status;
+	}
+	SIGNAL_ALL(&proc->queue_cond);
+	UNLOCK(&proc->mtx);
+	return status;
+}
+
+int wait_completed(sqfs_data_writer_t *proc)
+{
+	sqfs_block_t *queue;
+	int status;
+
+	LOCK(&proc->mtx);
+	for (;;) {
+		queue = try_dequeue(proc);
+		status = proc->status;
+
+		if (queue != NULL || status != 0)
+			break;
+
+		AWAIT(&proc->done_cond, &proc->mtx);
+	}
+	UNLOCK(&proc->mtx);
+
+	if (status != 0) {
+		free_blk_list(queue);
+		return status;
+	}
+
+	status = process_done_queue(proc, queue);
+	return status ? test_and_set_status(proc, status) : status;
+}
-- 
cgit v1.2.3