From bb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 21 Mar 2021 16:59:08 +0100 Subject: Cleanup: Rewrite block processor to use the libutil thread_pool_t Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer --- lib/sqfs/Makemodule.am | 12 +- lib/sqfs/block_processor/common.c | 201 +++++++------ lib/sqfs/block_processor/frontend.c | 189 +++++++++++- lib/sqfs/block_processor/internal.h | 46 ++- lib/sqfs/block_processor/serial.c | 106 ------- lib/sqfs/block_processor/winpthread.c | 545 ---------------------------------- 6 files changed, 304 insertions(+), 795 deletions(-) delete mode 100644 lib/sqfs/block_processor/serial.c delete mode 100644 lib/sqfs/block_processor/winpthread.c diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index b770fd4..5a62894 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -43,6 +43,7 @@ libsquashfs_la_SOURCES += lib/util/xxhash.c libsquashfs_la_SOURCES += lib/util/hash_table.c include/hash_table.h libsquashfs_la_SOURCES += lib/util/rbtree.c include/rbtree.h libsquashfs_la_SOURCES += lib/util/array.c include/array.h +libsquashfs_la_SOURCES += include/threadpool.h if CUSTOM_ALLOC libsquashfs_la_SOURCES += lib/util/mempool.c include/mempool.h @@ -50,6 +51,7 @@ endif if WINDOWS libsquashfs_la_SOURCES += lib/sqfs/win32/io_file.c +libsquashfs_la_SOURCES += lib/util/threadpool.c libsquashfs_la_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600 libsquashfs_la_CFLAGS += -Wc,-static-libgcc libsquashfs_la_LDFLAGS += -no-undefined -avoid-version @@ -58,14 +60,10 @@ libsquashfs_la_SOURCES += lib/sqfs/unix/io_file.c endif if HAVE_PTHREAD -libsquashfs_la_SOURCES += lib/sqfs/block_processor/winpthread.c -libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD +libsquashfs_la_SOURCES += lib/util/threadpool.c else -if WINDOWS -libsquashfs_la_SOURCES += lib/sqfs/block_processor/winpthread.c -else -libsquashfs_la_SOURCES += lib/sqfs/block_processor/serial.c -endif +libsquashfs_la_SOURCES += lib/util/threadpool_serial.c +libsquashfs_la_CPPFLAGS += -DNO_THREAD_IMPL endif if WITH_GZIP diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index f1aca1e..62c355b 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -50,8 +50,7 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) proc->free_list = blk; } -static int process_completed_block(sqfs_block_processor_t *proc, - sqfs_block_t *blk) +int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) { sqfs_u64 location; sqfs_u32 size; @@ -113,9 +112,12 @@ static bool is_zero_block(unsigned char *ptr, size_t size) return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; } -static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp, - sqfs_u8 *scratch, size_t scratch_size) +static int process_block(void *userptr, void *workitem) { + sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp; + sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch; + size_t scratch_size = ((worker_data_t *)userptr)->scratch_size; + sqfs_block_t *block = workitem; sqfs_s32 ret; if (block->size == 0) @@ -149,9 +151,9 @@ static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp, return 0; } -static int process_completed_fragment(sqfs_block_processor_t *proc, - sqfs_block_t *frag, - sqfs_block_t **blk_out) +int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag, + sqfs_block_t **blk_out) { chunk_info_t *chunk, search; struct hash_entry *entry; @@ -176,10 +178,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, search.hash = frag->checksum; search.size = frag->size; - proc->frag_cmp_current = frag; entry = hash_table_search_pre_hashed(proc->frag_ht, search.hash, &search); - proc->frag_cmp_current = NULL; if (entry != NULL) { if (frag->inode != NULL) { @@ -241,10 +241,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, chunk->size = frag->size; chunk->hash = frag->checksum; - proc->frag_cmp_current = frag; entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, chunk, chunk); - proc->frag_cmp_current = NULL; if (entry == NULL) { free(chunk); @@ -277,121 +275,122 @@ static uint32_t chunk_info_hash(void *user, const void *key) static bool chunk_info_equals(void *user, const void *k, const void *c) { const chunk_info_t *key = k, *cmp = c; - sqfs_block_processor_t *proc = user; - sqfs_fragment_t frag; - unsigned char *temp; - size_t size; - int ret; - - if (key->size != cmp->size || key->hash != cmp->hash) - return false; + (void)user; + return key->size == cmp->size && key->hash == cmp->hash; +} - if (proc->file == NULL || proc->uncmp == NULL) - return true; +static void ht_delete_function(struct hash_entry *entry) +{ + free(entry->data); +} - ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current, - cmp->index, cmp->offset); - if (ret == 0) - return true; +static void block_processor_destroy(sqfs_object_t *base) +{ + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; + sqfs_block_t *it; - if (proc->buffered_index != cmp->index || - proc->buffered_blk_size == 0) { - if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag)) - return false; + if (proc->frag_block != NULL) + release_old_block(proc, proc->frag_block); - proc->buffered_index = 0xFFFFFFFF; - size = SQFS_ON_DISK_BLOCK_SIZE(frag.size); + free(proc->blk_current); - if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) { - temp = proc->frag_buffer + proc->max_block_size; + while (proc->free_list != NULL) { + it = proc->free_list; + proc->free_list = it->next; + free(it); + } - ret = proc->file->read_at(proc->file, frag.start_offset, - temp, size); - if (ret != 0) - return false; + hash_table_destroy(proc->frag_ht, ht_delete_function); - ret = proc->uncmp->do_block(proc->uncmp, temp, size, - proc->frag_buffer, - proc->max_block_size); - if (ret <= 0) - return false; + /* XXX: shut down the pool first before cleaning up the worker data */ + proc->pool->destroy(proc->pool); - size = ret; - } else { - ret = proc->file->read_at(proc->file, frag.start_offset, - proc->frag_buffer, size); - if (ret != 0) - return false; - } + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; - proc->buffered_index = cmp->index; - proc->buffered_blk_size = size; + sqfs_destroy(worker->cmp); + free(worker); } - if (cmp->offset >= proc->buffered_blk_size) - return false; - - if (cmp->size > (proc->buffered_blk_size - cmp->offset)) - return false; - - return memcmp(proc->frag_buffer + cmp->offset, - proc->frag_cmp_current->data, - cmp->size) == 0; + free(proc); } -static void ht_delete_function(struct hash_entry *entry) +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) { - free(entry->data); -} - -void block_processor_cleanup(sqfs_block_processor_t *base) -{ - sqfs_block_t *it; + sqfs_block_processor_t *proc; + size_t i, count; + int ret; - if (base->frag_block != NULL) - release_old_block(base, base->frag_block); + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; - free(base->blk_current); - free(base->frag_buffer); + proc = calloc(1, sizeof(*proc)); + if (proc == NULL) + return SQFS_ERROR_ALLOC; - while (base->free_list != NULL) { - it = base->free_list; - base->free_list = it->next; - free(it); + proc->max_backlog = desc->max_backlog; + proc->max_block_size = desc->max_block_size; + proc->frag_tbl = desc->tbl; + proc->wr = desc->wr; + proc->file = desc->file; + proc->uncmp = desc->uncmp; + proc->stats.size = sizeof(proc->stats); + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + + /* create the thread pool */ + proc->pool = thread_pool_create(desc->num_workers, process_block); + if (proc->pool == NULL) { + free(proc); + return SQFS_ERROR_INTERNAL; } - hash_table_destroy(base->frag_ht, ht_delete_function); -} + /* create the worker compressors & scratch buffer */ + count = proc->pool->get_worker_count(proc->pool); -int block_processor_init(sqfs_block_processor_t *base, - const sqfs_block_processor_desc_t *desc) -{ - base->process_completed_block = process_completed_block; - base->process_completed_fragment = process_completed_fragment; - base->process_block = process_block; - base->max_block_size = desc->max_block_size; - base->cmp = desc->cmp; - base->frag_tbl = desc->tbl; - base->wr = desc->wr; - base->file = desc->file; - base->uncmp = desc->uncmp; - base->buffered_index = 0xFFFFFFFF; - base->stats.size = sizeof(base->stats); - - if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) { - base->frag_buffer = malloc(2 * desc->max_block_size); - if (base->frag_buffer == NULL) - return SQFS_ERROR_ALLOC; + for (i = 0; i < count; ++i) { + worker_data_t *worker = alloc_flex(sizeof(*worker), 1, + desc->max_block_size); + if (worker == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + worker->scratch_size = desc->max_block_size; + worker->next = proc->workers; + proc->workers = worker; + + worker->cmp = sqfs_copy(desc->cmp); + if (worker->cmp == NULL) + goto fail_pool; + + proc->pool->set_worker_ptr(proc->pool, i, worker); } - base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); - if (base->frag_ht == NULL) { - free(base->frag_buffer); - return SQFS_ERROR_ALLOC; + /* create the fragment hash table */ + proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); + if (proc->frag_ht == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; } - base->frag_ht->user = base; + proc->frag_ht->user = proc; + *out = proc; return 0; +fail_pool: + proc->pool->destroy(proc->pool); + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; + + if (worker->cmp != NULL) + sqfs_destroy(worker->cmp); + + free(worker); + } + free(proc); + return ret; } sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c index f6aeffa..8bd6cf2 100644 --- a/lib/sqfs/block_processor/frontend.c +++ b/lib/sqfs/block_processor/frontend.c @@ -24,26 +24,159 @@ static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc) return blk; } +static int dequeue_block(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk, *fragblk, *it, *prev; + bool have_dequeued = false; + int status; +retry: + while (proc->io_queue != NULL) { + if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) + break; + + blk = proc->io_queue; + proc->io_queue = blk->next; + proc->io_deq_seq_num += 1; + proc->backlog -= 1; + have_dequeued = true; + + status = process_completed_block(proc, blk); + if (status != 0) + return status; + } + + if (have_dequeued) + return 0; + + blk = proc->pool->dequeue(proc->pool); + + if (blk == NULL) { + status = proc->pool->get_status(proc->pool); + if (status == 0) + status = SQFS_ERROR_INTERNAL; + + return status; + } + + proc->backlog -= 1; + have_dequeued = true; + + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + fragblk = NULL; + status = process_completed_fragment(proc, blk, &fragblk); + + if (status != 0) { + free(fragblk); + return status; + } + + if (fragblk != NULL) { + fragblk->io_seq_num = proc->io_seq_num++; + + if (proc->pool->submit(proc->pool, fragblk) != 0) { + free(fragblk); + + if (status == 0) { + status = proc->pool-> + get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + } + + return status; + } + + proc->backlog += 1; + have_dequeued = false; + } + } else { + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) + blk->io_seq_num = proc->io_seq_num++; + + prev = NULL; + it = proc->io_queue; + + while (it != NULL) { + if (it->io_seq_num >= blk->io_seq_num) + break; + + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = proc->io_queue; + proc->io_queue = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } + + proc->backlog += 1; + have_dequeued = false; + } + + if (!have_dequeued) + goto retry; + + return 0; +} + +static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + int status; + + if (proc->pool->submit(proc->pool, blk) != 0) { + status = proc->pool->get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + + free(blk); + return status; + } + + proc->backlog += 1; + return 0; +} + static int add_sentinel_block(sqfs_block_processor_t *proc) { - sqfs_block_t *blk = get_new_block(proc); + sqfs_block_t *blk; + int ret; + + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + blk = get_new_block(proc); if (blk == NULL) return SQFS_ERROR_ALLOC; blk->inode = proc->inode; blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return proc->append_to_work_queue(proc, blk); + return enqueue_block(proc, blk); } static int flush_block(sqfs_block_processor_t *proc) { - sqfs_block_t *block = proc->blk_current; + sqfs_block_t *block; + int ret; + + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + block = proc->blk_current; proc->blk_current = NULL; - return proc->append_to_work_queue(proc, block); + return enqueue_block(proc, block); } int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, @@ -177,6 +310,7 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, size_t size) { sqfs_block_t *blk; + int ret; if (proc->begin_called) return SQFS_ERROR_SEQUENCE; @@ -187,6 +321,12 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, if (flags & ~SQFS_BLK_FLAGS_ALL) return SQFS_ERROR_UNSUPPORTED; + if (proc->backlog == proc->max_backlog) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + blk = get_new_block(proc); if (blk == NULL) return SQFS_ERROR_ALLOC; @@ -196,12 +336,25 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, blk->size = size; memcpy(blk->data, data, size); - return proc->append_to_work_queue(proc, blk); + ret = proc->pool->submit(proc->pool, blk); + if (ret != 0) + free(blk); + + proc->backlog += 1; + return ret; } int sqfs_block_processor_sync(sqfs_block_processor_t *proc) { - return proc->sync(proc); + int ret; + + while (proc->backlog > 0) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + + return 0; } int sqfs_block_processor_finish(sqfs_block_processor_t *proc) @@ -209,18 +362,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) sqfs_block_t *blk; int status; - status = proc->sync(proc); + status = sqfs_block_processor_sync(proc); + if (status != 0) + return status; - if (status == 0 && proc->frag_block != NULL) { + if (proc->frag_block != NULL) { blk = proc->frag_block; blk->next = NULL; - blk->flags |= BLK_FLAG_MANUAL_SUBMISSION; proc->frag_block = NULL; - status = proc->append_to_work_queue(proc, blk); + blk->io_seq_num = proc->io_seq_num++; - if (status == 0) - status = proc->sync(proc); + status = proc->pool->submit(proc->pool, blk); + if (status != 0) { + status = proc->pool->get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + + free(blk); + return status; + } + + proc->backlog += 1; + status = sqfs_block_processor_sync(proc); } return status; diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index 4699dc6..2393380 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -20,6 +20,7 @@ #include "sqfs/io.h" #include "hash_table.h" +#include "threadpool.h" #include "util.h" #include @@ -42,7 +43,6 @@ typedef struct sqfs_block_t { struct sqfs_block_t *next; sqfs_inode_generic_t **inode; - sqfs_u32 proc_seq_num; sqfs_u32 io_seq_num; sqfs_u32 flags; sqfs_u32 size; @@ -58,11 +58,18 @@ typedef struct sqfs_block_t { sqfs_u8 data[]; } sqfs_block_t; +typedef struct worker_data_t { + struct worker_data_t *next; + sqfs_compressor_t *cmp; + + size_t scratch_size; + sqfs_u8 scratch[]; +} worker_data_t; + struct sqfs_block_processor_t { sqfs_object_t obj; sqfs_frag_table_t *frag_tbl; - sqfs_compressor_t *cmp; sqfs_block_t *frag_block; sqfs_block_writer_t *wr; @@ -78,39 +85,30 @@ struct sqfs_block_processor_t { sqfs_block_t *free_list; size_t max_block_size; + size_t max_backlog; + size_t backlog; bool begin_called; sqfs_file_t *file; sqfs_compressor_t *uncmp; - sqfs_block_t *frag_cmp_current; - sqfs_u8 *frag_buffer; - sqfs_u32 buffered_index; - sqfs_u32 buffered_blk_size; - int (*process_completed_block)(sqfs_block_processor_t *proc, - sqfs_block_t *block); + thread_pool_t *pool; + worker_data_t *workers; - int (*process_completed_fragment)(sqfs_block_processor_t *proc, - sqfs_block_t *frag, - sqfs_block_t **blk_out); - int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp, - sqfs_u8 *scratch, size_t scratch_size); - int (*compare_frag_in_flight)(sqfs_block_processor_t *proc, - sqfs_block_t *frag, sqfs_u32 index, - sqfs_u32 offset); - - int (*append_to_work_queue)(sqfs_block_processor_t *proc, - sqfs_block_t *block); - - int (*sync)(sqfs_block_processor_t *proc); + sqfs_block_t *io_queue; + sqfs_u32 io_seq_num; + sqfs_u32 io_deq_seq_num; }; -SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base); +SQFS_INTERNAL +int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk); -SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base, - const sqfs_block_processor_desc_t *desc); +SQFS_INTERNAL +int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag, + sqfs_block_t **blk_out); #endif /* INTERNAL_H */ diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c deleted file mode 100644 index b20d5b0..0000000 --- a/lib/sqfs/block_processor/serial.c +++ /dev/null @@ -1,106 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * serial.c - * - * Copyright (C) 2019 David Oberhollenzer - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -typedef struct { - sqfs_block_processor_t base; - int status; - sqfs_u8 scratch[]; -} serial_block_processor_t; - -static void block_processor_destroy(sqfs_object_t *obj) -{ - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj; - - block_processor_cleanup(proc); - free(proc); -} - -static int append_to_work_queue(sqfs_block_processor_t *proc, - sqfs_block_t *block) -{ - serial_block_processor_t *sproc = (serial_block_processor_t *)proc; - sqfs_block_t *fragblk = NULL; - - if (sproc->status != 0) - goto fail; - - sproc->status = proc->process_block(block, proc->cmp, sproc->scratch, - proc->max_block_size); - if (sproc->status != 0) - goto fail; - - if (block->flags & SQFS_BLK_IS_FRAGMENT) { - sproc->status = proc->process_completed_fragment(proc, block, - &fragblk); - if (fragblk == NULL) - return sproc->status; - - block = fragblk; - sproc->status = proc->process_block(block, proc->cmp, - sproc->scratch, - proc->max_block_size); - if (sproc->status != 0) - goto fail; - } - - sproc->status = proc->process_completed_block(proc, block); - return sproc->status; -fail: - free(block); - return sproc->status; -} - -static int block_processor_sync(sqfs_block_processor_t *proc) -{ - return ((serial_block_processor_t *)proc)->status; -} - -static int compare_frag_in_flight(sqfs_block_processor_t *proc, - sqfs_block_t *frag, sqfs_u32 index, - sqfs_u32 offset) -{ - if (proc->frag_block == NULL || index != proc->frag_block->index) - return -1; - - if (offset >= proc->frag_block->size) - return -1; - - if (frag->size > (proc->frag_block->size - offset)) - return -1; - - return memcmp(proc->frag_block->data + offset, frag->data, frag->size); -} - -int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, - sqfs_block_processor_t **out) -{ - serial_block_processor_t *proc; - int ret; - - if (desc->size != sizeof(sqfs_block_processor_desc_t)) - return SQFS_ERROR_ARG_INVALID; - - proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size); - if (proc == NULL) - return SQFS_ERROR_ALLOC; - - ret = block_processor_init(&proc->base, desc); - if (ret != 0) { - free(proc); - return ret; - } - - proc->base.sync = block_processor_sync; - proc->base.append_to_work_queue = append_to_work_queue; - proc->base.compare_frag_in_flight = compare_frag_in_flight; - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - *out = (sqfs_block_processor_t *)proc; - return 0; -} diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c deleted file mode 100644 index 806c749..0000000 --- a/lib/sqfs/block_processor/winpthread.c +++ /dev/null @@ -1,545 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * winpthread.c - * - * Copyright (C) 2019 David Oberhollenzer - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -#if defined(_WIN32) || defined(__WINDOWS__) -# define WIN32_LEAN_AND_MEAN -# include -# define LOCK EnterCriticalSection -# define UNLOCK LeaveCriticalSection -# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) -# define SIGNAL_ALL WakeAllConditionVariable -# define THREAD_CREATE(hndptr, proc, arg) \ - ((*hndptr) = CreateThread(NULL, 0, proc, arg, 0, 0), \ - ((*hndptr) == NULL) ? -1 : 0) -# define THREAD_JOIN(t) \ - do { \ - WaitForSingleObject(t, INFINITE); \ - CloseHandle(t); \ - } while (0) -# define MUTEX_INIT InitializeCriticalSection -# define MUTEX_DESTROY DeleteCriticalSection -# define CONDITION_INIT InitializeConditionVariable -# define CONDITION_DESTROY(cond) -# define THREAD_EXIT_SUCCESS 0 -# define THREAD_INVALID NULL -# define THREAD_TYPE DWORD WINAPI -# define SIGNAL_DISABLE(oldset) (void)oldset -# define SIGNAL_ENABLE(oldset) (void)oldset - -typedef int sigset_t; -typedef LPVOID THREAD_ARG; -typedef HANDLE THREAD_HANDLE; -typedef CRITICAL_SECTION MUTEX_TYPE; -typedef CONDITION_VARIABLE CONDITION_TYPE; -#else -# include -# include -# define LOCK pthread_mutex_lock -# define UNLOCK pthread_mutex_unlock -# define AWAIT pthread_cond_wait -# define SIGNAL_ALL pthread_cond_broadcast -# define THREAD_CREATE(hndptr, proc, arg) \ - pthread_create((hndptr), NULL, (proc), (arg)) -# define THREAD_JOIN(t) pthread_join((t), NULL) -# define MUTEX_INIT(mtx) \ - *(mtx) = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER -# define MUTEX_DESTROY pthread_mutex_destroy -# define CONDITION_INIT(cond) \ - *(cond) = (pthread_cond_t)PTHREAD_COND_INITIALIZER -# define CONDITION_DESTROY pthread_cond_destroy -# define THREAD_EXIT_SUCCESS NULL -# define THREAD_INVALID (pthread_t)0 -# define SIGNAL_ENABLE(oldset) pthread_sigmask(SIG_SETMASK, oldset, NULL) - -typedef void *THREAD_TYPE; -typedef void *THREAD_ARG; -typedef pthread_t THREAD_HANDLE; -typedef pthread_mutex_t MUTEX_TYPE; -typedef pthread_cond_t CONDITION_TYPE; - -static inline void SIGNAL_DISABLE(sigset_t *oldset) -{ - sigset_t set; - sigfillset(&set); - pthread_sigmask(SIG_SETMASK, &set, oldset); -} -#endif - -typedef struct compress_worker_t compress_worker_t; -typedef struct thread_pool_processor_t thread_pool_processor_t; - -struct compress_worker_t { - thread_pool_processor_t *shared; - sqfs_compressor_t *cmp; - THREAD_HANDLE thread; - sqfs_u32 frag_blk_index; - sqfs_u32 frag_blk_size; - sqfs_u8 scratch[]; -}; - -struct thread_pool_processor_t { - sqfs_block_processor_t base; - - MUTEX_TYPE mtx; - CONDITION_TYPE queue_cond; - CONDITION_TYPE done_cond; - - sqfs_block_t *proc_queue; - sqfs_block_t *proc_queue_last; - - sqfs_block_t *io_queue; - sqfs_block_t *done; - size_t backlog; - int status; - - sqfs_u32 proc_enq_id; - sqfs_u32 proc_deq_id; - - sqfs_u32 io_enq_id; - sqfs_u32 io_deq_id; - - unsigned int num_workers; - size_t max_backlog; - - compress_worker_t *workers[]; -}; - -static void free_blk_list(sqfs_block_t *list) -{ - sqfs_block_t *it; - - while (list != NULL) { - it = list; - list = list->next; - free(it); - } -} - -static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared) -{ - sqfs_block_t *blk = NULL; - - while (shared->proc_queue == NULL && shared->status == 0) - AWAIT(&shared->queue_cond, &shared->mtx); - - if (shared->status == 0) { - blk = shared->proc_queue; - shared->proc_queue = blk->next; - blk->next = NULL; - - if (shared->proc_queue == NULL) - shared->proc_queue_last = NULL; - } - - return blk; -} - -static void store_completed_block(thread_pool_processor_t *shared, - sqfs_block_t *blk, int status) -{ - sqfs_block_t *it = shared->done, *prev = NULL; - - while (it != NULL) { - if (it->proc_seq_num >= blk->proc_seq_num) - break; - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = shared->done; - shared->done = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - if (status != 0 && shared->status == 0) - shared->status = status; - - SIGNAL_ALL(&shared->done_cond); -} - -static THREAD_TYPE worker_proc(THREAD_ARG arg) -{ - compress_worker_t *worker = arg; - thread_pool_processor_t *shared = worker->shared; - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared; - sqfs_block_t *blk = NULL; - int status = 0; - - for (;;) { - LOCK(&shared->mtx); - if (blk != NULL) - store_completed_block(shared, blk, status); - - blk = get_next_work_item(shared); - - if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && - shared->base.uncmp != NULL && shared->base.file != NULL) { - memcpy(worker->scratch + shared->base.max_block_size, - blk->data, blk->size); - worker->frag_blk_index = blk->index; - worker->frag_blk_size = blk->size; - } - UNLOCK(&shared->mtx); - - if (blk == NULL) - break; - - status = proc->process_block(blk, worker->cmp, worker->scratch, - proc->max_block_size); - } - - return THREAD_EXIT_SUCCESS; -} - -static void block_processor_destroy(sqfs_object_t *obj) -{ - thread_pool_processor_t *proc = (thread_pool_processor_t *)obj; - unsigned int i; - - LOCK(&proc->mtx); - proc->status = -1; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - - for (i = 0; i < proc->num_workers; ++i) { - if (proc->workers[i] != NULL) { - if (proc->workers[i]->thread != THREAD_INVALID) - THREAD_JOIN(proc->workers[i]->thread); - - if (proc->workers[i]->cmp != NULL) - sqfs_destroy(proc->workers[i]->cmp); - - free(proc->workers[i]); - } - } - - CONDITION_DESTROY(&proc->done_cond); - CONDITION_DESTROY(&proc->queue_cond); - MUTEX_DESTROY(&proc->mtx); - - free_blk_list(proc->proc_queue); - free_blk_list(proc->io_queue); - free_blk_list(proc->done); - block_processor_cleanup(&proc->base); - free(proc); -} - -static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk) -{ - sqfs_block_t *it = proc->io_queue, *prev = NULL; - - while (it != NULL && it->io_seq_num < blk->io_seq_num) { - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = proc->io_queue; - proc->io_queue = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - proc->backlog += 1; -} - -static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc) -{ - sqfs_block_t *out; - - if (proc->io_queue == NULL) - return NULL; - - if (proc->io_queue->io_seq_num != proc->io_deq_id) - return NULL; - - out = proc->io_queue; - proc->io_queue = out->next; - out->next = NULL; - proc->io_deq_id += 1; - proc->backlog -= 1; - return out; -} - -static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc) -{ - sqfs_block_t *out; - - if (proc->done == NULL) - return NULL; - - if (proc->done->proc_seq_num != proc->proc_deq_id) - return NULL; - - out = proc->done; - proc->done = out->next; - out->next = NULL; - proc->proc_deq_id += 1; - proc->backlog -= 1; - return out; -} - -static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block) -{ - if (proc->proc_queue_last == NULL) { - proc->proc_queue = proc->proc_queue_last = block; - } else { - proc->proc_queue_last->next = block; - proc->proc_queue_last = block; - } - - block->proc_seq_num = proc->proc_enq_id++; - block->next = NULL; - proc->backlog += 1; -} - -static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list) -{ - sqfs_block_t *it; - int status = 0; - - while (status == 0 && list != NULL) { - it = list; - list = list->next; - status = proc->base.process_completed_block(&proc->base, it); - - if (status != 0) { - LOCK(&proc->mtx); - if (proc->status == 0) - proc->status = status; - SIGNAL_ALL(&proc->queue_cond); - UNLOCK(&proc->mtx); - } - } - - return status; -} - -static int append_to_work_queue(sqfs_block_processor_t *proc, - sqfs_block_t *block) -{ - thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - sqfs_block_t *io_list = NULL, *io_list_last = NULL; - sqfs_block_t *blk, *fragblk; - int status; - - LOCK(&thproc->mtx); - for (;;) { - status = thproc->status; - if (status != 0) - break; - - if (block == NULL) { - if (thproc->backlog == 0) - break; - } else { - if (thproc->backlog < thproc->max_backlog) { - append_block(thproc, block); - block = NULL; - break; - } - } - - blk = try_dequeue_io(thproc); - if (blk != NULL) { - if (io_list_last == NULL) { - io_list = io_list_last = blk; - } else { - io_list_last->next = blk; - io_list_last = blk; - } - continue; - } - - blk = try_dequeue_done(thproc); - if (blk == NULL) { - AWAIT(&thproc->done_cond, &thproc->mtx); - continue; - } - - if (blk->flags & SQFS_BLK_IS_FRAGMENT) { - fragblk = NULL; - thproc->status = - proc->process_completed_fragment(proc, blk, - &fragblk); - - if (fragblk != NULL) { - fragblk->io_seq_num = thproc->io_enq_id++; - append_block(thproc, fragblk); - SIGNAL_ALL(&thproc->queue_cond); - } - } else { - if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK) || - blk->flags & BLK_FLAG_MANUAL_SUBMISSION) - blk->io_seq_num = thproc->io_enq_id++; - store_io_block(thproc, blk); - } - } - SIGNAL_ALL(&thproc->queue_cond); - UNLOCK(&thproc->mtx); - free(block); - - if (status == 0) { - status = handle_io_queue(thproc, io_list); - } else { - free_blk_list(io_list); - } - - return status; -} - -static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index) -{ - while (q != NULL) { - if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index) - break; - q = q->next; - } - return q; -} - -static int compare_frag_in_flight(sqfs_block_processor_t *proc, - sqfs_block_t *frag, sqfs_u32 index, - sqfs_u32 offset) -{ - thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; - sqfs_block_t *it = NULL; - void *blockbuf = NULL; - size_t i, size = 0; - int ret; - - if (proc->frag_block != NULL && proc->frag_block->index == index) - it = proc->frag_block; - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->proc_queue, index); - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->io_queue, index); - - if (it == NULL) - it = find_frag_blk_in_queue(thproc->done, index); - - if (it == NULL) { - for (i = 0; i < thproc->num_workers; ++i) { - if (thproc->workers[i]->frag_blk_index == index) { - size = thproc->workers[i]->frag_blk_size; - blockbuf = thproc->workers[i]->scratch + - proc->max_block_size; - break; - } - } - } else if (it->flags & SQFS_BLK_IS_COMPRESSED) { - proc->buffered_index = 0xFFFFFFFF; - blockbuf = proc->frag_buffer; - ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size, - blockbuf, proc->max_block_size); - if (ret <= 0) - return -1; - proc->buffered_index = it->index; - size = ret; - } else { - blockbuf = it->data; - size = it->size; - } - - if (blockbuf == NULL || size == 0) - return -1; - - if (offset >= size || frag->size > (size - offset)) - return -1; - - return memcmp((const char *)blockbuf + offset, frag->data, frag->size); -} - -static int block_processor_sync(sqfs_block_processor_t *proc) -{ - return append_to_work_queue(proc, NULL); -} - -int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, - sqfs_block_processor_t **out) -{ - thread_pool_processor_t *proc; - unsigned int i, num_workers; - size_t scratch_size; - sigset_t oldset; - int ret; - - if (desc->size != sizeof(sqfs_block_processor_desc_t)) - return SQFS_ERROR_ARG_INVALID; - - num_workers = desc->num_workers < 1 ? 1 : desc->num_workers; - - proc = alloc_flex(sizeof(*proc), - sizeof(proc->workers[0]), num_workers); - if (proc == NULL) - return SQFS_ERROR_ALLOC; - - ret = block_processor_init(&proc->base, desc); - if (ret != 0) { - free(proc); - return ret; - } - - proc->base.sync = block_processor_sync; - proc->base.append_to_work_queue = append_to_work_queue; - proc->base.compare_frag_in_flight = compare_frag_in_flight; - proc->num_workers = num_workers; - proc->max_backlog = desc->max_backlog; - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - MUTEX_INIT(&proc->mtx); - CONDITION_INIT(&proc->queue_cond); - CONDITION_INIT(&proc->done_cond); - - SIGNAL_DISABLE(&oldset); - - for (i = 0; i < num_workers; ++i) { - scratch_size = desc->max_block_size; - if (desc->uncmp != NULL && desc->file != NULL) - scratch_size *= 2; - - proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, scratch_size); - - if (proc->workers[i] == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail; - } - - proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(desc->cmp); - proc->workers[i]->frag_blk_index = 0xFFFFFFFF; - - if (proc->workers[i]->cmp == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail; - } - - ret = THREAD_CREATE(&proc->workers[i]->thread, - worker_proc, proc->workers[i]); - if (ret != 0) { - ret = SQFS_ERROR_INTERNAL; - goto fail; - } - } - - SIGNAL_ENABLE(&oldset); - *out = (sqfs_block_processor_t *)proc; - return 0; -fail: - SIGNAL_ENABLE(&oldset); - block_processor_destroy((sqfs_object_t *)proc); - return ret; -} -- cgit v1.2.3