From d98411f53939b7e308f186e5b61791677f3b0ad9 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 8 Dec 2019 14:17:53 +0100 Subject: Add native Windows port of the multi-threaded data writer Signed-off-by: David Oberhollenzer --- lib/sqfs/Makemodule.am | 4 + lib/sqfs/data_writer/internal.h | 9 +- lib/sqfs/data_writer/windows.c | 333 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 lib/sqfs/data_writer/windows.c (limited to 'lib') diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index dffca44..ad3b4de 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -45,8 +45,12 @@ if HAVE_PTHREAD libsquashfs_la_SOURCES += lib/sqfs/data_writer/pthread.c libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD else +if WINDOWS +libsquashfs_la_SOURCES += lib/sqfs/data_writer/windows.c +else libsquashfs_la_SOURCES += lib/sqfs/data_writer/serial.c endif +endif if WITH_GZIP libsquashfs_la_SOURCES += lib/sqfs/comp/gzip.c diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h index fab6c50..312922b 100644 --- a/lib/sqfs/data_writer/internal.h +++ b/lib/sqfs/data_writer/internal.h @@ -25,6 +25,9 @@ #ifdef WITH_PTHREAD #include #include +#elif defined(_WIN32) || defined(__WINDOWS__) +#define WIN32_LEAN_AND_MEAN +#include #endif @@ -53,6 +56,10 @@ struct sqfs_data_writer_t { pthread_mutex_t mtx; pthread_cond_t queue_cond; pthread_cond_t done_cond; +#elif defined(_WIN32) || defined(__WINDOWS__) + CRITICAL_SECTION mtx; + HANDLE queue_cond; + HANDLE done_cond; #endif /* needs rw access by worker and main thread */ @@ -102,7 +109,7 @@ struct sqfs_data_writer_t { /* used only by workers */ size_t max_block_size; -#ifdef WITH_PTHREAD +#if defined(WITH_PTHREAD) || defined(_WIN32) || defined(__WINDOWS__) compress_worker_t *workers[]; #else sqfs_u8 scratch[]; diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c new file mode 100644 index 0000000..b0459a5 --- /dev/null +++ b/lib/sqfs/data_writer/windows.c @@ -0,0 +1,333 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * windows.c + * + * Copyright (C) 2019 David Oberhollenzer + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +struct compress_worker_t { + sqfs_data_writer_t *shared; + sqfs_compressor_t *cmp; + HANDLE thread; + sqfs_u8 scratch[]; +}; + +static DWORD WINAPI worker_proc(LPVOID arg) +{ + compress_worker_t *worker = arg; + sqfs_data_writer_t *shared = worker->shared; + sqfs_block_t *blk = NULL; + int status = 0; + + for (;;) { + EnterCriticalSection(&shared->mtx); + if (blk != NULL) { + data_writer_store_done(shared, blk, status); + SetEvent(shared->done_cond); + } + + while (shared->queue == NULL && shared->status == 0) { + LeaveCriticalSection(&shared->mtx); + WaitForSingleObject(shared->queue_cond, INFINITE); + EnterCriticalSection(&shared->mtx); + } + + blk = data_writer_next_work_item(shared); + LeaveCriticalSection(&shared->mtx); + + if (blk == NULL) + break; + + status = data_writer_do_block(blk, worker->cmp, + worker->scratch, + shared->max_block_size); + } + + return 0; +} + +sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + size_t devblksz, + sqfs_file_t *file) +{ + sqfs_data_writer_t *proc; + unsigned int i; + + if (num_workers < 1) + num_workers = 1; + + proc = alloc_flex(sizeof(*proc), + sizeof(proc->workers[0]), num_workers); + if (proc == NULL) + return NULL; + + InitializeCriticalSection(&proc->mtx); + + proc->queue_cond = CreateEvent(NULL, FALSE, FALSE, NULL); + if (proc->queue_cond == NULL) + goto fail; + + proc->done_cond = CreateEvent(NULL, FALSE, FALSE, NULL); + if (proc->done_cond == NULL) + goto fail; + + if (data_writer_init(proc, max_block_size, cmp, num_workers, + max_backlog, devblksz, file)) { + goto fail; + } + + for (i = 0; i < num_workers; ++i) { + proc->workers[i] = alloc_flex(sizeof(compress_worker_t), + 1, max_block_size); + + if (proc->workers[i] == NULL) + goto fail; + + proc->workers[i]->shared = proc; + proc->workers[i]->cmp = cmp->create_copy(cmp); + + if (proc->workers[i]->cmp == NULL) + goto fail; + + proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, + proc->workers[i], 0, 0); + if (proc->workers[i]->thread == NULL) + goto fail; + } + + return proc; +fail: + sqfs_data_writer_destroy(proc); + return NULL; +} + +void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) +{ + unsigned int i; + + EnterCriticalSection(&proc->mtx); + proc->status = -1; + if (proc->queue_cond != NULL) + SetEvent(proc->queue_cond); + LeaveCriticalSection(&proc->mtx); + + for (i = 0; i < proc->num_workers; ++i) { + if (proc->workers[i] == NULL) + continue; + + if (proc->workers[i]->thread != NULL) { + WaitForSingleObject(proc->workers[i]->thread, INFINITE); + CloseHandle(proc->workers[i]->thread); + } + + if (proc->workers[i]->cmp != NULL) + proc->workers[i]->cmp->destroy(proc->workers[i]->cmp); + + free(proc->workers[i]); + } + + if (proc->queue_cond != NULL) + CloseHandle(proc->queue_cond); + + if (proc->queue_cond != NULL) + CloseHandle(proc->done_cond); + + DeleteCriticalSection(&proc->mtx); + data_writer_cleanup(proc); +} + +static void append_to_work_queue(sqfs_data_writer_t *proc, + sqfs_block_t *block) +{ + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + SetEvent(proc->queue_cond); +} + +static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc) +{ + sqfs_block_t *queue, *it, *prev; + + it = proc->done; + prev = NULL; + + while (it != NULL && it->sequence_number == proc->dequeue_id) { + prev = it; + it = it->next; + proc->dequeue_id += 1; + } + + if (prev == NULL) { + queue = NULL; + } else { + queue = proc->done; + prev->next = NULL; + proc->done = it; + } + + return queue; +} + +static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) +{ + sqfs_block_t *it, *head = NULL, **next_ptr = &head; + + while (lhs != NULL && rhs != NULL) { + if (lhs->sequence_number <= rhs->sequence_number) { + it = lhs; + lhs = lhs->next; + } else { + it = rhs; + rhs = rhs->next; + } + + *next_ptr = it; + next_ptr = &it->next; + } + + it = (lhs != NULL ? lhs : rhs); + *next_ptr = it; + return head; +} + +static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue) +{ + sqfs_block_t *it, *block = NULL; + int status = 0; + + while (queue != NULL && status == 0) { + it = queue; + queue = it->next; + + if (it->flags & SQFS_BLK_IS_FRAGMENT) { + block = NULL; + status = process_completed_fragment(proc, it, &block); + + if (block != NULL && status == 0) { + EnterCriticalSection(&proc->mtx); + proc->dequeue_id = it->sequence_number; + block->sequence_number = it->sequence_number; + + if (proc->queue == NULL) { + proc->queue = block; + proc->queue_last = block; + } else { + block->next = proc->queue; + proc->queue = block; + } + + proc->backlog += 1; + proc->done = queue_merge(queue, proc->done); + SetEvent(proc->queue_cond); + LeaveCriticalSection(&proc->mtx); + + queue = NULL; + } else { + free(block); + } + } else { + status = process_completed_block(proc, it); + } + + free(it); + } + + free_blk_list(queue); + return status; +} + +int test_and_set_status(sqfs_data_writer_t *proc, int status) +{ + EnterCriticalSection(&proc->mtx); + if (proc->status == 0) { + proc->status = status; + } else { + status = proc->status; + } + SetEvent(proc->queue_cond); + LeaveCriticalSection(&proc->mtx); + return status; +} + +int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) +{ + sqfs_block_t *queue; + int status; + + EnterCriticalSection(&proc->mtx); + while (proc->backlog > proc->max_backlog && proc->status == 0) { + LeaveCriticalSection(&proc->mtx); + WaitForSingleObject(proc->done_cond, INFINITE); + EnterCriticalSection(&proc->mtx); + } + + if (proc->status != 0) { + status = proc->status; + LeaveCriticalSection(&proc->mtx); + free(block); + return status; + } + + append_to_work_queue(proc, block); + block = NULL; + + queue = try_dequeue(proc); + LeaveCriticalSection(&proc->mtx); + + status = process_done_queue(proc, queue); + + return status ? test_and_set_status(proc, status) : status; +} + +int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +{ + sqfs_block_t *queue; + int status = 0; + + for (;;) { + EnterCriticalSection(&proc->mtx); + while (proc->backlog > 0 && proc->status == 0) { + LeaveCriticalSection(&proc->mtx); + WaitForSingleObject(proc->done_cond, INFINITE); + EnterCriticalSection(&proc->mtx); + } + + if (proc->status != 0) { + status = proc->status; + LeaveCriticalSection(&proc->mtx); + return status; + } + + queue = proc->done; + proc->done = NULL; + LeaveCriticalSection(&proc->mtx); + + if (queue == NULL) { + if (proc->frag_block != NULL) { + append_to_work_queue(proc, proc->frag_block); + proc->frag_block = NULL; + continue; + } + break; + } + + status = process_done_queue(proc, queue); + if (status != 0) + return status; + } + + return 0; +} -- cgit v1.2.3