From bb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 21 Mar 2021 16:59:08 +0100 Subject: Cleanup: Rewrite block processor to use the libutil thread_pool_t Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/winpthread.c | 545 ---------------------------------- 1 file changed, 545 deletions(-) delete mode 100644 lib/sqfs/block_processor/winpthread.c (limited to 'lib/sqfs/block_processor/winpthread.c') diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c deleted file mode 100644 index 806c749..0000000 --- a/lib/sqfs/block_processor/winpthread.c +++ /dev/null @@ -1,545 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * winpthread.c - * - * Copyright (C) 2019 David Oberhollenzer - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -#if defined(_WIN32) || defined(__WINDOWS__) -# define WIN32_LEAN_AND_MEAN -# include -# define LOCK EnterCriticalSection -# define UNLOCK LeaveCriticalSection -# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) -# define SIGNAL_ALL WakeAllConditionVariable -# define THREAD_CREATE(hndptr, proc, arg) \ - ((*hndptr) = CreateThread(NULL, 0, proc, arg, 0, 0), \ - ((*hndptr) == NULL) ? -1 : 0) -# define THREAD_JOIN(t) \ - do { \ - WaitForSingleObject(t, INFINITE); \ - CloseHandle(t); \ - } while (0) -# define MUTEX_INIT InitializeCriticalSection -# define MUTEX_DESTROY DeleteCriticalSection -# define CONDITION_INIT InitializeConditionVariable -# define CONDITION_DESTROY(cond) -# define THREAD_EXIT_SUCCESS 0 -# define THREAD_INVALID NULL -# define THREAD_TYPE DWORD WINAPI -# define SIGNAL_DISABLE(oldset) (void)oldset -# define SIGNAL_ENABLE(oldset) (void)oldset - -typedef int sigset_t; -typedef LPVOID THREAD_ARG; -typedef HANDLE THREAD_HANDLE; -typedef CRITICAL_SECTION MUTEX_TYPE; -typedef CONDITION_VARIABLE CONDITION_TYPE; -#else -# include -# include -# define LOCK pthread_mutex_lock -# define UNLOCK pthread_mutex_unlock -# define AWAIT pthread_cond_wait -# define SIGNAL_ALL pthread_cond_broadcast -# define THREAD_CREATE(hndptr, proc, arg) \ - pthread_create((hndptr), NULL, (proc), (arg)) -# define THREAD_JOIN(t) pthread_join((t), NULL) -# define MUTEX_INIT(mtx) \ - *(mtx) = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER -# define MUTEX_DESTROY pthread_mutex_destroy -# define CONDITION_INIT(cond) \ - *(cond) = (pthread_cond_t)PTHREAD_COND_INITIALIZER -# define CONDITION_DESTROY pthread_cond_destroy -# define THREAD_EXIT_SUCCESS NULL -# define THREAD_INVALID (pthread_t)0 -# define SIGNAL_ENABLE(oldset) pthread_sigmask(SIG_SETMASK, oldset, NULL) - -typedef void *THREAD_TYPE; -typedef void *THREAD_ARG; -typedef pthread_t THREAD_HANDLE; -typedef pthread_mutex_t MUTEX_TYPE; -typedef pthread_cond_t CONDITION_TYPE; - -static inline void SIGNAL_DISABLE(sigset_t *oldset) -{ - sigset_t set; - sigfillset(&set); - pthread_sigmask(SIG_SETMASK, &set, oldset); -} -#endif - -typedef struct compress_worker_t compress_worker_t; -typedef struct thread_pool_processor_t thread_pool_processor_t; - -struct compress_worker_t { - thread_pool_processor_t *shared; - sqfs_compressor_t *cmp; - THREAD_HANDLE thread; - sqfs_u32 frag_blk_index; - sqfs_u32 frag_blk_size; - sqfs_u8 scratch[]; -}; - -struct thread_pool_processor_t { - sqfs_block_processor_t base; - - MUTEX_TYPE mtx; - CONDITION_TYPE queue_cond; - CONDITION_TYPE done_cond; - - sqfs_block_t *proc_queue; - sqfs_block_t *proc_queue_last; - - sqfs_block_t *io_queue; - sqfs_block_t *done; - size_t backlog; - int status; - - sqfs_u32 proc_enq_id; - sqfs_u32 proc_deq_id; - - sqfs_u32 io_enq_id; - sqfs_u32 io_deq_id; - - unsigned int num_workers; - size_t max_backlog; - - compress_worker_t *workers[]; -}; - -static void free_blk_list(sqfs_block_t *list) -{ - sqfs_block_t *it; - - while (list != NULL) { - it = list; - list = list->next; - free(it); - } -} - -static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared) -{ - sqfs_block_t *blk = NULL; - - while (shared->proc_queue == NULL && shared->status == 0) - AWAIT(&shared->queue_cond, &shared->mtx); - - if (shared->status == 0) { - blk = shared->proc_queue; - shared->proc_queue = blk->next; - blk->next = NULL; - - if (shared->proc_queue == NULL) - shared->proc_queue_last = NULL; - } - - return blk; -} - -static void store_completed_block(thread_pool_processor_t *shared, - sqfs_block_t *blk, int status) -{ - sqfs_block_t *it = shared->done, *prev = NULL; - - while (it != NULL) { - if (it->proc_seq_num >= blk->proc_seq_num) - break; - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - if (status != 0 && shared->status == 0) - shared->status = status; - - SIGNAL_ALL(&shared->done_cond); -} - -static THREAD_TYPE worker_proc(THREAD_ARG arg) -{ - compress_worker_t *worker = arg; - thread_pool_processor_t *shared = worker->shared; - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared; - sqfs_block_t *blk = NULL; - int status = 0; - - for (;;) { - LOCK(&shared->mtx); - if (blk != NULL) - store_completed_block(shared, blk, status); - - blk = get_next_work_item(shared); - - if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && - shared->base.uncmp != NULL && shared->base.file != NULL) { - memcpy(worker->scratch + shared->base.max_block_size, - blk->data, blk->size); - worker->frag_blk_index = blk->index; - worker->frag_blk_size = blk->size; - } - UNLOCK(&shared->mtx); - - if (blk == NULL) - break; - - status = proc->process_block(blk, worker->cmp, worker->scratch, - proc->max_block_size); - } - - return THREAD_EXIT_SUCCESS; -} - -static void block_processor_destroy(sqfs_object_t *obj) -{ - thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; - unsigned int i; - - LOCK(&proc->mtx); - proc->status = -1; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - - for (i = 0; i < proc->num_workers; ++i) { - if (proc->workers[i] != NULL) { - if (proc->workers[i]->thread != THREAD_INVALID) - THREAD_JOIN(proc->workers[i]->thread); - - if (proc->workers[i]->cmp != NULL) - sqfs_destroy(proc->workers[i]->cmp); - - free(proc->workers[i]); - } - } - - CONDITION_DESTROY(&proc->done_cond); - CONDITION_DESTROY(&proc->queue_cond); - MUTEX_DESTROY(&proc->mtx); - - free_blk_list(proc->proc_queue); - free_blk_list(proc->io_queue); - free_blk_list(proc->done); - block_processor_cleanup(&proc->base); - free(proc); -} - -static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk) -{ - sqfs_block_t *it = proc->io_queue, *prev = NULL; - - while (it != NULL && it->io_seq_num < blk->io_seq_num) { - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = proc->io_queue; - proc->io_queue = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - proc->backlog += 1; -} - -static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc) -{ - sqfs_block_t *out; - - if (proc->io_queue == NULL) - return NULL; - - if (proc->io_queue->io_seq_num != proc->io_deq_id) - return NULL; - - out = proc->io_queue; - proc->io_queue = out->next; - out->next = NULL; - proc->io_deq_id += 1; - proc->backlog -= 1; - return out; -} - -static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc) -{ - sqfs_block_t *out; - - if (proc->done == NULL) - return NULL; - - if (proc->done->proc_seq_num != proc->proc_deq_id) - return NULL; - - out = proc->done; - proc->done = out->next; - out->next = NULL; - proc->proc_deq_id += 1; - proc->backlog -= 1; - return out; -} - -static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block) -{ - if (proc->proc_queue_last == NULL) { - proc->proc_queue = proc->proc_queue_last = block; - } else { - proc->proc_queue_last->next = block; - proc->proc_queue_last = block; - } - - block->proc_seq_num = proc->proc_enq_id++; - block->next = NULL; - proc->backlog += 1; -} - -static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) -{ - sqfs_block_t *it; - int status = 0; - - while (status == 0 && list != NULL) { - it = list; - list = list->next; - status = proc->base.process_completed_block(&proc->base, it); - - if (status != 0) { - LOCK(&proc->mtx); - if (proc->status == 0) - proc->status = status; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - } - } - - return status; -} - -static int append_to_work_queue(sqfs_block_processor_t *proc, - sqfs_block_t *block) -{ - thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - sqfs_block_t *io_list = NULL, *io_list_last = NULL; - sqfs_block_t *blk, *fragblk; - int status; - - LOCK(&thproc->mtx); - for (;;) { - status = thproc->status; - if (status != 0) - break; - - if (block == NULL) { - if (thproc->backlog == 0) - break; - } else { - if (thproc->backlog < thproc->max_backlog) { - append_block(thproc, block); - block = NULL; - break; - } - } - - blk = try_dequeue_io(thproc); - if (blk != NULL) { - if (io_list_last == NULL) { - io_list = io_list_last = blk; - } else { - io_list_last->next = blk; - io_list_last = blk; - } - continue; - } - - blk = try_dequeue_done(thproc); - if (blk == NULL) { - AWAIT(&thproc->done_cond, &thproc->mtx); - continue; - } - - if (blk->flags & SQFS_BLK_IS_FRAGMENT) { - fragblk = NULL; - thproc->status = - proc->process_completed_fragment(proc, blk, - &fragblk); - - if (fragblk != NULL) { - fragblk->io_seq_num = thproc->io_enq_id++; - append_block(thproc, fragblk); - SIGNAL_ALL(&thproc->queue_cond); - } - } else { - if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK) || - blk->flags & BLK_FLAG_MANUAL_SUBMISSION) - blk->io_seq_num = thproc->io_enq_id++; - store_io_block(thproc, blk); - } - } - SIGNAL_ALL(&thproc->queue_cond); - UNLOCK(&thproc->mtx); - free(block); - - if (status == 0) { - status = handle_io_queue(thproc, io_list); - } else { - free_blk_list(io_list); - } - - return status; -} - -static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index) -{ - while (q != NULL) { - if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index) - break; - q = q->next; - } - return q; -} - -static int compare_frag_in_flight(sqfs_block_processor_t *proc, - sqfs_block_t *frag, sqfs_u32 index, - sqfs_u32 offset) -{ - thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - sqfs_block_t *it = NULL; - void *blockbuf = NULL; - size_t i, size = 0; - int ret; - - if (proc->frag_block != NULL && proc->frag_block->index == index) - it = proc->frag_block; - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->proc_queue, index); - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->io_queue, index); - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->done, index); - - if (it == NULL) { - for (i = 0; i < thproc->num_workers; ++i) { - if (thproc->workers[i]->frag_blk_index == index) { - size = thproc->workers[i]->frag_blk_size; - blockbuf = thproc->workers[i]->scratch + - proc->max_block_size; - break; - } - } - } else if (it->flags & SQFS_BLK_IS_COMPRESSED) { - proc->buffered_index = 0xFFFFFFFF; - blockbuf = proc->frag_buffer; - ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size, - blockbuf, proc->max_block_size); - if (ret <= 0) - return -1; - proc->buffered_index = it->index; - size = ret; - } else { - blockbuf = it->data; - size = it->size; - } - - if (blockbuf == NULL || size == 0) - return -1; - - if (offset >= size || frag->size > (size - offset)) - return -1; - - return memcmp((const char *)blockbuf + offset, frag->data, frag->size); -} - -static int block_processor_sync(sqfs_block_processor_t *proc) -{ - return append_to_work_queue(proc, NULL); -} - -int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, - sqfs_block_processor_t **out) -{ - thread_pool_processor_t *proc; - unsigned int i, num_workers; - size_t scratch_size; - sigset_t oldset; - int ret; - - if (desc->size != sizeof(sqfs_block_processor_desc_t)) - return SQFS_ERROR_ARG_INVALID; - - num_workers = desc->num_workers < 1 ? 1 : desc->num_workers; - - proc = alloc_flex(sizeof(*proc), - sizeof(proc->workers[0]), num_workers); - if (proc == NULL) - return SQFS_ERROR_ALLOC; - - ret = block_processor_init(&proc->base, desc); - if (ret != 0) { - free(proc); - return ret; - } - - proc->base.sync = block_processor_sync; - proc->base.append_to_work_queue = append_to_work_queue; - proc->base.compare_frag_in_flight = compare_frag_in_flight; - proc->num_workers = num_workers; - proc->max_backlog = desc->max_backlog; - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - MUTEX_INIT(&proc->mtx); - CONDITION_INIT(&proc->queue_cond); - CONDITION_INIT(&proc->done_cond); - - SIGNAL_DISABLE(&oldset); - - for (i = 0; i < num_workers; ++i) { - scratch_size = desc->max_block_size; - if (desc->uncmp != NULL && desc->file != NULL) - scratch_size *= 2; - - proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, scratch_size); - - if (proc->workers[i] == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail; - } - - proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(desc->cmp); - proc->workers[i]->frag_blk_index = 0xFFFFFFFF; - - if (proc->workers[i]->cmp == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail; - } - - ret = THREAD_CREATE(&proc->workers[i]->thread, - worker_proc, proc->workers[i]); - if (ret != 0) { - ret = SQFS_ERROR_INTERNAL; - goto fail; - } - } - - SIGNAL_ENABLE(&oldset); - *out = (sqfs_block_processor_t *)proc; - return 0; -fail: - SIGNAL_ENABLE(&oldset); - block_processor_destroy((sqfs_object_t *)proc); - return ret; -} -- cgit v1.2.3