summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 16:59:08 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 17:29:18 +0100
commitbb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 (patch)
tree62ec813c654f0962adc7048e849e6bb196b22430
parenta18f724aa3bf57aeed285b5f61eca4a0ba891c21 (diff)
Cleanup: Rewrite block processor to use the libutil thread_pool_t
Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/Makemodule.am12
-rw-r--r--lib/sqfs/block_processor/common.c201
-rw-r--r--lib/sqfs/block_processor/frontend.c189
-rw-r--r--lib/sqfs/block_processor/internal.h46
-rw-r--r--lib/sqfs/block_processor/serial.c106
-rw-r--r--lib/sqfs/block_processor/winpthread.c545
6 files changed, 304 insertions, 795 deletions
diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am
index b770fd4..5a62894 100644
--- a/lib/sqfs/Makemodule.am
+++ b/lib/sqfs/Makemodule.am
@@ -43,6 +43,7 @@ libsquashfs_la_SOURCES += lib/util/xxhash.c
libsquashfs_la_SOURCES += lib/util/hash_table.c include/hash_table.h
libsquashfs_la_SOURCES += lib/util/rbtree.c include/rbtree.h
libsquashfs_la_SOURCES += lib/util/array.c include/array.h
+libsquashfs_la_SOURCES += include/threadpool.h
if CUSTOM_ALLOC
libsquashfs_la_SOURCES += lib/util/mempool.c include/mempool.h
@@ -50,6 +51,7 @@ endif
if WINDOWS
libsquashfs_la_SOURCES += lib/sqfs/win32/io_file.c
+libsquashfs_la_SOURCES += lib/util/threadpool.c
libsquashfs_la_CFLAGS += -DWINVER=0x0600 -D_WIN32_WINNT=0x0600
libsquashfs_la_CFLAGS += -Wc,-static-libgcc
libsquashfs_la_LDFLAGS += -no-undefined -avoid-version
@@ -58,14 +60,10 @@ libsquashfs_la_SOURCES += lib/sqfs/unix/io_file.c
endif
if HAVE_PTHREAD
-libsquashfs_la_SOURCES += lib/sqfs/block_processor/winpthread.c
-libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD
+libsquashfs_la_SOURCES += lib/util/threadpool.c
else
-if WINDOWS
-libsquashfs_la_SOURCES += lib/sqfs/block_processor/winpthread.c
-else
-libsquashfs_la_SOURCES += lib/sqfs/block_processor/serial.c
-endif
+libsquashfs_la_SOURCES += lib/util/threadpool_serial.c
+libsquashfs_la_CPPFLAGS += -DNO_THREAD_IMPL
endif
if WITH_GZIP
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index f1aca1e..62c355b 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -50,8 +50,7 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
proc->free_list = blk;
}
-static int process_completed_block(sqfs_block_processor_t *proc,
- sqfs_block_t *blk)
+int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
{
sqfs_u64 location;
sqfs_u32 size;
@@ -113,9 +112,12 @@ static bool is_zero_block(unsigned char *ptr, size_t size)
return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0;
}
-static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
- sqfs_u8 *scratch, size_t scratch_size)
+static int process_block(void *userptr, void *workitem)
{
+ sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp;
+ sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch;
+ size_t scratch_size = ((worker_data_t *)userptr)->scratch_size;
+ sqfs_block_t *block = workitem;
sqfs_s32 ret;
if (block->size == 0)
@@ -149,9 +151,9 @@ static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
return 0;
}
-static int process_completed_fragment(sqfs_block_processor_t *proc,
- sqfs_block_t *frag,
- sqfs_block_t **blk_out)
+int process_completed_fragment(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag,
+ sqfs_block_t **blk_out)
{
chunk_info_t *chunk, search;
struct hash_entry *entry;
@@ -176,10 +178,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
search.hash = frag->checksum;
search.size = frag->size;
- proc->frag_cmp_current = frag;
entry = hash_table_search_pre_hashed(proc->frag_ht,
search.hash, &search);
- proc->frag_cmp_current = NULL;
if (entry != NULL) {
if (frag->inode != NULL) {
@@ -241,10 +241,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
chunk->size = frag->size;
chunk->hash = frag->checksum;
- proc->frag_cmp_current = frag;
entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash,
chunk, chunk);
- proc->frag_cmp_current = NULL;
if (entry == NULL) {
free(chunk);
@@ -277,121 +275,122 @@ static uint32_t chunk_info_hash(void *user, const void *key)
static bool chunk_info_equals(void *user, const void *k, const void *c)
{
const chunk_info_t *key = k, *cmp = c;
- sqfs_block_processor_t *proc = user;
- sqfs_fragment_t frag;
- unsigned char *temp;
- size_t size;
- int ret;
-
- if (key->size != cmp->size || key->hash != cmp->hash)
- return false;
+ (void)user;
+ return key->size == cmp->size && key->hash == cmp->hash;
+}
- if (proc->file == NULL || proc->uncmp == NULL)
- return true;
+static void ht_delete_function(struct hash_entry *entry)
+{
+ free(entry->data);
+}
- ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current,
- cmp->index, cmp->offset);
- if (ret == 0)
- return true;
+static void block_processor_destroy(sqfs_object_t *base)
+{
+ sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base;
+ sqfs_block_t *it;
- if (proc->buffered_index != cmp->index ||
- proc->buffered_blk_size == 0) {
- if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag))
- return false;
+ if (proc->frag_block != NULL)
+ release_old_block(proc, proc->frag_block);
- proc->buffered_index = 0xFFFFFFFF;
- size = SQFS_ON_DISK_BLOCK_SIZE(frag.size);
+ free(proc->blk_current);
- if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) {
- temp = proc->frag_buffer + proc->max_block_size;
+ while (proc->free_list != NULL) {
+ it = proc->free_list;
+ proc->free_list = it->next;
+ free(it);
+ }
- ret = proc->file->read_at(proc->file, frag.start_offset,
- temp, size);
- if (ret != 0)
- return false;
+ hash_table_destroy(proc->frag_ht, ht_delete_function);
- ret = proc->uncmp->do_block(proc->uncmp, temp, size,
- proc->frag_buffer,
- proc->max_block_size);
- if (ret <= 0)
- return false;
+ /* XXX: shut down the pool first before cleaning up the worker data */
+ proc->pool->destroy(proc->pool);
- size = ret;
- } else {
- ret = proc->file->read_at(proc->file, frag.start_offset,
- proc->frag_buffer, size);
- if (ret != 0)
- return false;
- }
+ while (proc->workers != NULL) {
+ worker_data_t *worker = proc->workers;
+ proc->workers = worker->next;
- proc->buffered_index = cmp->index;
- proc->buffered_blk_size = size;
+ sqfs_destroy(worker->cmp);
+ free(worker);
}
- if (cmp->offset >= proc->buffered_blk_size)
- return false;
-
- if (cmp->size > (proc->buffered_blk_size - cmp->offset))
- return false;
-
- return memcmp(proc->frag_buffer + cmp->offset,
- proc->frag_cmp_current->data,
- cmp->size) == 0;
+ free(proc);
}
-static void ht_delete_function(struct hash_entry *entry)
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
- free(entry->data);
-}
-
-void block_processor_cleanup(sqfs_block_processor_t *base)
-{
- sqfs_block_t *it;
+ sqfs_block_processor_t *proc;
+ size_t i, count;
+ int ret;
- if (base->frag_block != NULL)
- release_old_block(base, base->frag_block);
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
- free(base->blk_current);
- free(base->frag_buffer);
+ proc = calloc(1, sizeof(*proc));
+ if (proc == NULL)
+ return SQFS_ERROR_ALLOC;
- while (base->free_list != NULL) {
- it = base->free_list;
- base->free_list = it->next;
- free(it);
+ proc->max_backlog = desc->max_backlog;
+ proc->max_block_size = desc->max_block_size;
+ proc->frag_tbl = desc->tbl;
+ proc->wr = desc->wr;
+ proc->file = desc->file;
+ proc->uncmp = desc->uncmp;
+ proc->stats.size = sizeof(proc->stats);
+ ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+
+ /* create the thread pool */
+ proc->pool = thread_pool_create(desc->num_workers, process_block);
+ if (proc->pool == NULL) {
+ free(proc);
+ return SQFS_ERROR_INTERNAL;
}
- hash_table_destroy(base->frag_ht, ht_delete_function);
-}
+ /* create the worker compressors & scratch buffer */
+ count = proc->pool->get_worker_count(proc->pool);
-int block_processor_init(sqfs_block_processor_t *base,
- const sqfs_block_processor_desc_t *desc)
-{
- base->process_completed_block = process_completed_block;
- base->process_completed_fragment = process_completed_fragment;
- base->process_block = process_block;
- base->max_block_size = desc->max_block_size;
- base->cmp = desc->cmp;
- base->frag_tbl = desc->tbl;
- base->wr = desc->wr;
- base->file = desc->file;
- base->uncmp = desc->uncmp;
- base->buffered_index = 0xFFFFFFFF;
- base->stats.size = sizeof(base->stats);
-
- if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) {
- base->frag_buffer = malloc(2 * desc->max_block_size);
- if (base->frag_buffer == NULL)
- return SQFS_ERROR_ALLOC;
+ for (i = 0; i < count; ++i) {
+ worker_data_t *worker = alloc_flex(sizeof(*worker), 1,
+ desc->max_block_size);
+ if (worker == NULL) {
+ ret = SQFS_ERROR_ALLOC;
+ goto fail_pool;
+ }
+
+ worker->scratch_size = desc->max_block_size;
+ worker->next = proc->workers;
+ proc->workers = worker;
+
+ worker->cmp = sqfs_copy(desc->cmp);
+ if (worker->cmp == NULL)
+ goto fail_pool;
+
+ proc->pool->set_worker_ptr(proc->pool, i, worker);
}
- base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
- if (base->frag_ht == NULL) {
- free(base->frag_buffer);
- return SQFS_ERROR_ALLOC;
+ /* create the fragment hash table */
+ proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
+ if (proc->frag_ht == NULL) {
+ ret = SQFS_ERROR_ALLOC;
+ goto fail_pool;
}
- base->frag_ht->user = base;
+ proc->frag_ht->user = proc;
+ *out = proc;
return 0;
+fail_pool:
+ proc->pool->destroy(proc->pool);
+ while (proc->workers != NULL) {
+ worker_data_t *worker = proc->workers;
+ proc->workers = worker->next;
+
+ if (worker->cmp != NULL)
+ sqfs_destroy(worker->cmp);
+
+ free(worker);
+ }
+ free(proc);
+ return ret;
}
sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c
index f6aeffa..8bd6cf2 100644
--- a/lib/sqfs/block_processor/frontend.c
+++ b/lib/sqfs/block_processor/frontend.c
@@ -24,26 +24,159 @@ static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc)
return blk;
}
+static int dequeue_block(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk, *fragblk, *it, *prev;
+ bool have_dequeued = false;
+ int status;
+retry:
+ while (proc->io_queue != NULL) {
+ if (proc->io_queue->io_seq_num != proc->io_deq_seq_num)
+ break;
+
+ blk = proc->io_queue;
+ proc->io_queue = blk->next;
+ proc->io_deq_seq_num += 1;
+ proc->backlog -= 1;
+ have_dequeued = true;
+
+ status = process_completed_block(proc, blk);
+ if (status != 0)
+ return status;
+ }
+
+ if (have_dequeued)
+ return 0;
+
+ blk = proc->pool->dequeue(proc->pool);
+
+ if (blk == NULL) {
+ status = proc->pool->get_status(proc->pool);
+ if (status == 0)
+ status = SQFS_ERROR_INTERNAL;
+
+ return status;
+ }
+
+ proc->backlog -= 1;
+ have_dequeued = true;
+
+ if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
+ fragblk = NULL;
+ status = process_completed_fragment(proc, blk, &fragblk);
+
+ if (status != 0) {
+ free(fragblk);
+ return status;
+ }
+
+ if (fragblk != NULL) {
+ fragblk->io_seq_num = proc->io_seq_num++;
+
+ if (proc->pool->submit(proc->pool, fragblk) != 0) {
+ free(fragblk);
+
+ if (status == 0) {
+ status = proc->pool->
+ get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+ }
+
+ return status;
+ }
+
+ proc->backlog += 1;
+ have_dequeued = false;
+ }
+ } else {
+ if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK))
+ blk->io_seq_num = proc->io_seq_num++;
+
+ prev = NULL;
+ it = proc->io_queue;
+
+ while (it != NULL) {
+ if (it->io_seq_num >= blk->io_seq_num)
+ break;
+
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = proc->io_queue;
+ proc->io_queue = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ proc->backlog += 1;
+ have_dequeued = false;
+ }
+
+ if (!have_dequeued)
+ goto retry;
+
+ return 0;
+}
+
+static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
+{
+ int status;
+
+ if (proc->pool->submit(proc->pool, blk) != 0) {
+ status = proc->pool->get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+
+ free(blk);
+ return status;
+ }
+
+ proc->backlog += 1;
+ return 0;
+}
+
static int add_sentinel_block(sqfs_block_processor_t *proc)
{
- sqfs_block_t *blk = get_new_block(proc);
+ sqfs_block_t *blk;
+ int ret;
+
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+ blk = get_new_block(proc);
if (blk == NULL)
return SQFS_ERROR_ALLOC;
blk->inode = proc->inode;
blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK;
- return proc->append_to_work_queue(proc, blk);
+ return enqueue_block(proc, blk);
}
static int flush_block(sqfs_block_processor_t *proc)
{
- sqfs_block_t *block = proc->blk_current;
+ sqfs_block_t *block;
+ int ret;
+
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+ block = proc->blk_current;
proc->blk_current = NULL;
- return proc->append_to_work_queue(proc, block);
+ return enqueue_block(proc, block);
}
int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
@@ -177,6 +310,7 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
size_t size)
{
sqfs_block_t *blk;
+ int ret;
if (proc->begin_called)
return SQFS_ERROR_SEQUENCE;
@@ -187,6 +321,12 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
if (flags & ~SQFS_BLK_FLAGS_ALL)
return SQFS_ERROR_UNSUPPORTED;
+ if (proc->backlog == proc->max_backlog) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+
blk = get_new_block(proc);
if (blk == NULL)
return SQFS_ERROR_ALLOC;
@@ -196,12 +336,25 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,
blk->size = size;
memcpy(blk->data, data, size);
- return proc->append_to_work_queue(proc, blk);
+ ret = proc->pool->submit(proc->pool, blk);
+ if (ret != 0)
+ free(blk);
+
+ proc->backlog += 1;
+ return ret;
}
int sqfs_block_processor_sync(sqfs_block_processor_t *proc)
{
- return proc->sync(proc);
+ int ret;
+
+ while (proc->backlog > 0) {
+ ret = dequeue_block(proc);
+ if (ret != 0)
+ return ret;
+ }
+
+ return 0;
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
@@ -209,18 +362,30 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
sqfs_block_t *blk;
int status;
- status = proc->sync(proc);
+ status = sqfs_block_processor_sync(proc);
+ if (status != 0)
+ return status;
- if (status == 0 && proc->frag_block != NULL) {
+ if (proc->frag_block != NULL) {
blk = proc->frag_block;
blk->next = NULL;
- blk->flags |= BLK_FLAG_MANUAL_SUBMISSION;
proc->frag_block = NULL;
- status = proc->append_to_work_queue(proc, blk);
+ blk->io_seq_num = proc->io_seq_num++;
- if (status == 0)
- status = proc->sync(proc);
+ status = proc->pool->submit(proc->pool, blk);
+ if (status != 0) {
+ status = proc->pool->get_status(proc->pool);
+
+ if (status == 0)
+ status = SQFS_ERROR_ALLOC;
+
+ free(blk);
+ return status;
+ }
+
+ proc->backlog += 1;
+ status = sqfs_block_processor_sync(proc);
}
return status;
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index 4699dc6..2393380 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -20,6 +20,7 @@
#include "sqfs/io.h"
#include "hash_table.h"
+#include "threadpool.h"
#include "util.h"
#include <string.h>
@@ -42,7 +43,6 @@ typedef struct sqfs_block_t {
struct sqfs_block_t *next;
sqfs_inode_generic_t **inode;
- sqfs_u32 proc_seq_num;
sqfs_u32 io_seq_num;
sqfs_u32 flags;
sqfs_u32 size;
@@ -58,11 +58,18 @@ typedef struct sqfs_block_t {
sqfs_u8 data[];
} sqfs_block_t;
+typedef struct worker_data_t {
+ struct worker_data_t *next;
+ sqfs_compressor_t *cmp;
+
+ size_t scratch_size;
+ sqfs_u8 scratch[];
+} worker_data_t;
+
struct sqfs_block_processor_t {
sqfs_object_t obj;
sqfs_frag_table_t *frag_tbl;
- sqfs_compressor_t *cmp;
sqfs_block_t *frag_block;
sqfs_block_writer_t *wr;
@@ -78,39 +85,30 @@ struct sqfs_block_processor_t {
sqfs_block_t *free_list;
size_t max_block_size;
+ size_t max_backlog;
+ size_t backlog;
bool begin_called;
sqfs_file_t *file;
sqfs_compressor_t *uncmp;
- sqfs_block_t *frag_cmp_current;
- sqfs_u8 *frag_buffer;
- sqfs_u32 buffered_index;
- sqfs_u32 buffered_blk_size;
- int (*process_completed_block)(sqfs_block_processor_t *proc,
- sqfs_block_t *block);
+ thread_pool_t *pool;
+ worker_data_t *workers;
- int (*process_completed_fragment)(sqfs_block_processor_t *proc,
- sqfs_block_t *frag,
- sqfs_block_t **blk_out);
- int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp,
- sqfs_u8 *scratch, size_t scratch_size);
- int (*compare_frag_in_flight)(sqfs_block_processor_t *proc,
- sqfs_block_t *frag, sqfs_u32 index,
- sqfs_u32 offset);
-
- int (*append_to_work_queue)(sqfs_block_processor_t *proc,
- sqfs_block_t *block);
-
- int (*sync)(sqfs_block_processor_t *proc);
+ sqfs_block_t *io_queue;
+ sqfs_u32 io_seq_num;
+ sqfs_u32 io_deq_seq_num;
};
-SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base);
+SQFS_INTERNAL
+int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk);
-SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base,
- const sqfs_block_processor_desc_t *desc);
+SQFS_INTERNAL
+int process_completed_fragment(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag,
+ sqfs_block_t **blk_out);
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
deleted file mode 100644
index b20d5b0..0000000
--- a/lib/sqfs/block_processor/serial.c
+++ /dev/null
@@ -1,106 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * serial.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-typedef struct {
- sqfs_block_processor_t base;
- int status;
- sqfs_u8 scratch[];
-} serial_block_processor_t;
-
-static void block_processor_destroy(sqfs_object_t *obj)
-{
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)obj;
-
- block_processor_cleanup(proc);
- free(proc);
-}
-
-static int append_to_work_queue(sqfs_block_processor_t *proc,
- sqfs_block_t *block)
-{
- serial_block_processor_t *sproc = (serial_block_processor_t *)proc;
- sqfs_block_t *fragblk = NULL;
-
- if (sproc->status != 0)
- goto fail;
-
- sproc->status = proc->process_block(block, proc->cmp, sproc->scratch,
- proc->max_block_size);
- if (sproc->status != 0)
- goto fail;
-
- if (block->flags & SQFS_BLK_IS_FRAGMENT) {
- sproc->status = proc->process_completed_fragment(proc, block,
- &fragblk);
- if (fragblk == NULL)
- return sproc->status;
-
- block = fragblk;
- sproc->status = proc->process_block(block, proc->cmp,
- sproc->scratch,
- proc->max_block_size);
- if (sproc->status != 0)
- goto fail;
- }
-
- sproc->status = proc->process_completed_block(proc, block);
- return sproc->status;
-fail:
- free(block);
- return sproc->status;
-}
-
-static int block_processor_sync(sqfs_block_processor_t *proc)
-{
- return ((serial_block_processor_t *)proc)->status;
-}
-
-static int compare_frag_in_flight(sqfs_block_processor_t *proc,
- sqfs_block_t *frag, sqfs_u32 index,
- sqfs_u32 offset)
-{
- if (proc->frag_block == NULL || index != proc->frag_block->index)
- return -1;
-
- if (offset >= proc->frag_block->size)
- return -1;
-
- if (frag->size > (proc->frag_block->size - offset))
- return -1;
-
- return memcmp(proc->frag_block->data + offset, frag->data, frag->size);
-}
-
-int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
- sqfs_block_processor_t **out)
-{
- serial_block_processor_t *proc;
- int ret;
-
- if (desc->size != sizeof(sqfs_block_processor_desc_t))
- return SQFS_ERROR_ARG_INVALID;
-
- proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size);
- if (proc == NULL)
- return SQFS_ERROR_ALLOC;
-
- ret = block_processor_init(&proc->base, desc);
- if (ret != 0) {
- free(proc);
- return ret;
- }
-
- proc->base.sync = block_processor_sync;
- proc->base.append_to_work_queue = append_to_work_queue;
- proc->base.compare_frag_in_flight = compare_frag_in_flight;
- ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
-
- *out = (sqfs_block_processor_t *)proc;
- return 0;
-}
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
deleted file mode 100644
index 806c749..0000000
--- a/lib/sqfs/block_processor/winpthread.c
+++ /dev/null
@@ -1,545 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * winpthread.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-#if defined(_WIN32) || defined(__WINDOWS__)
-# define WIN32_LEAN_AND_MEAN
-# include <windows.h>
-# define LOCK EnterCriticalSection
-# define UNLOCK LeaveCriticalSection
-# define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE)
-# define SIGNAL_ALL WakeAllConditionVariable
-# define THREAD_CREATE(hndptr, proc, arg) \
- ((*hndptr) = CreateThread(NULL, 0, proc, arg, 0, 0), \
- ((*hndptr) == NULL) ? -1 : 0)
-# define THREAD_JOIN(t) \
- do { \
- WaitForSingleObject(t, INFINITE); \
- CloseHandle(t); \
- } while (0)
-# define MUTEX_INIT InitializeCriticalSection
-# define MUTEX_DESTROY DeleteCriticalSection
-# define CONDITION_INIT InitializeConditionVariable
-# define CONDITION_DESTROY(cond)
-# define THREAD_EXIT_SUCCESS 0
-# define THREAD_INVALID NULL
-# define THREAD_TYPE DWORD WINAPI
-# define SIGNAL_DISABLE(oldset) (void)oldset
-# define SIGNAL_ENABLE(oldset) (void)oldset
-
-typedef int sigset_t;
-typedef LPVOID THREAD_ARG;
-typedef HANDLE THREAD_HANDLE;
-typedef CRITICAL_SECTION MUTEX_TYPE;
-typedef CONDITION_VARIABLE CONDITION_TYPE;
-#else
-# include <pthread.h>
-# include <signal.h>
-# define LOCK pthread_mutex_lock
-# define UNLOCK pthread_mutex_unlock
-# define AWAIT pthread_cond_wait
-# define SIGNAL_ALL pthread_cond_broadcast
-# define THREAD_CREATE(hndptr, proc, arg) \
- pthread_create((hndptr), NULL, (proc), (arg))
-# define THREAD_JOIN(t) pthread_join((t), NULL)
-# define MUTEX_INIT(mtx) \
- *(mtx) = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER
-# define MUTEX_DESTROY pthread_mutex_destroy
-# define CONDITION_INIT(cond) \
- *(cond) = (pthread_cond_t)PTHREAD_COND_INITIALIZER
-# define CONDITION_DESTROY pthread_cond_destroy
-# define THREAD_EXIT_SUCCESS NULL
-# define THREAD_INVALID (pthread_t)0
-# define SIGNAL_ENABLE(oldset) pthread_sigmask(SIG_SETMASK, oldset, NULL)
-
-typedef void *THREAD_TYPE;
-typedef void *THREAD_ARG;
-typedef pthread_t THREAD_HANDLE;
-typedef pthread_mutex_t MUTEX_TYPE;
-typedef pthread_cond_t CONDITION_TYPE;
-
-static inline void SIGNAL_DISABLE(sigset_t *oldset)
-{
- sigset_t set;
- sigfillset(&set);
- pthread_sigmask(SIG_SETMASK, &set, oldset);
-}
-#endif
-
-typedef struct compress_worker_t compress_worker_t;
-typedef struct thread_pool_processor_t thread_pool_processor_t;
-
-struct compress_worker_t {
- thread_pool_processor_t *shared;
- sqfs_compressor_t *cmp;
- THREAD_HANDLE thread;
- sqfs_u32 frag_blk_index;
- sqfs_u32 frag_blk_size;
- sqfs_u8 scratch[];
-};
-
-struct thread_pool_processor_t {
- sqfs_block_processor_t base;
-
- MUTEX_TYPE mtx;
- CONDITION_TYPE queue_cond;
- CONDITION_TYPE done_cond;
-
- sqfs_block_t *proc_queue;
- sqfs_block_t *proc_queue_last;
-
- sqfs_block_t *io_queue;
- sqfs_block_t *done;
- size_t backlog;
- int status;
-
- sqfs_u32 proc_enq_id;
- sqfs_u32 proc_deq_id;
-
- sqfs_u32 io_enq_id;
- sqfs_u32 io_deq_id;
-
- unsigned int num_workers;
- size_t max_backlog;
-
- compress_worker_t *workers[];
-};
-
-static void free_blk_list(sqfs_block_t *list)
-{
- sqfs_block_t *it;
-
- while (list != NULL) {
- it = list;
- list = list->next;
- free(it);
- }
-}
-
-static sqfs_block_t *get_next_work_item(thread_pool_processor_t *shared)
-{
- sqfs_block_t *blk = NULL;
-
- while (shared->proc_queue == NULL && shared->status == 0)
- AWAIT(&shared->queue_cond, &shared->mtx);
-
- if (shared->status == 0) {
- blk = shared->proc_queue;
- shared->proc_queue = blk->next;
- blk->next = NULL;
-
- if (shared->proc_queue == NULL)
- shared->proc_queue_last = NULL;
- }
-
- return blk;
-}
-
-static void store_completed_block(thread_pool_processor_t *shared,
- sqfs_block_t *blk, int status)
-{
- sqfs_block_t *it = shared->done, *prev = NULL;
-
- while (it != NULL) {
- if (it->proc_seq_num >= blk->proc_seq_num)
- break;
- prev = it;
- it = it->next;
- }
-
- if (prev == NULL) {
- blk->next = shared->done;
- shared->done = blk;
- } else {
- blk->next = prev->next;
- prev->next = blk;
- }
-
- if (status != 0 && shared->status == 0)
- shared->status = status;
-
- SIGNAL_ALL(&shared->done_cond);
-}
-
-static THREAD_TYPE worker_proc(THREAD_ARG arg)
-{
- compress_worker_t *worker = arg;
- thread_pool_processor_t *shared = worker->shared;
- sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared;
- sqfs_block_t *blk = NULL;
- int status = 0;
-
- for (;;) {
- LOCK(&shared->mtx);
- if (blk != NULL)
- store_completed_block(shared, blk, status);
-
- blk = get_next_work_item(shared);
-
- if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) &&
- shared->base.uncmp != NULL && shared->base.file != NULL) {
- memcpy(worker->scratch + shared->base.max_block_size,
- blk->data, blk->size);
- worker->frag_blk_index = blk->index;
- worker->frag_blk_size = blk->size;
- }
- UNLOCK(&shared->mtx);
-
- if (blk == NULL)
- break;
-
- status = proc->process_block(blk, worker->cmp, worker->scratch,
- proc->max_block_size);
- }
-
- return THREAD_EXIT_SUCCESS;
-}
-
-static void block_processor_destroy(sqfs_object_t *obj)
-{
- thread_pool_processor_t *proc = (thread_pool_processor_t *)obj;
- unsigned int i;
-
- LOCK(&proc->mtx);
- proc->status = -1;
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
-
- for (i = 0; i < proc->num_workers; ++i) {
- if (proc->workers[i] != NULL) {
- if (proc->workers[i]->thread != THREAD_INVALID)
- THREAD_JOIN(proc->workers[i]->thread);
-
- if (proc->workers[i]->cmp != NULL)
- sqfs_destroy(proc->workers[i]->cmp);
-
- free(proc->workers[i]);
- }
- }
-
- CONDITION_DESTROY(&proc->done_cond);
- CONDITION_DESTROY(&proc->queue_cond);
- MUTEX_DESTROY(&proc->mtx);
-
- free_blk_list(proc->proc_queue);
- free_blk_list(proc->io_queue);
- free_blk_list(proc->done);
- block_processor_cleanup(&proc->base);
- free(proc);
-}
-
-static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)
-{
- sqfs_block_t *it = proc->io_queue, *prev = NULL;
-
- while (it != NULL && it->io_seq_num < blk->io_seq_num) {
- prev = it;
- it = it->next;
- }
-
- if (prev == NULL) {
- blk->next = proc->io_queue;
- proc->io_queue = blk;
- } else {
- blk->next = prev->next;
- prev->next = blk;
- }
-
- proc->backlog += 1;
-}
-
-static sqfs_block_t *try_dequeue_io(thread_pool_processor_t *proc)
-{
- sqfs_block_t *out;
-
- if (proc->io_queue == NULL)
- return NULL;
-
- if (proc->io_queue->io_seq_num != proc->io_deq_id)
- return NULL;
-
- out = proc->io_queue;
- proc->io_queue = out->next;
- out->next = NULL;
- proc->io_deq_id += 1;
- proc->backlog -= 1;
- return out;
-}
-
-static sqfs_block_t *try_dequeue_done(thread_pool_processor_t *proc)
-{
- sqfs_block_t *out;
-
- if (proc->done == NULL)
- return NULL;
-
- if (proc->done->proc_seq_num != proc->proc_deq_id)
- return NULL;
-
- out = proc->done;
- proc->done = out->next;
- out->next = NULL;
- proc->proc_deq_id += 1;
- proc->backlog -= 1;
- return out;
-}
-
-static void append_block(thread_pool_processor_t *proc, sqfs_block_t *block)
-{
- if (proc->proc_queue_last == NULL) {
- proc->proc_queue = proc->proc_queue_last = block;
- } else {
- proc->proc_queue_last->next = block;
- proc->proc_queue_last = block;
- }
-
- block->proc_seq_num = proc->proc_enq_id++;
- block->next = NULL;
- proc->backlog += 1;
-}
-
-static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)
-{
- sqfs_block_t *it;
- int status = 0;
-
- while (status == 0 && list != NULL) {
- it = list;
- list = list->next;
- status = proc->base.process_completed_block(&proc->base, it);
-
- if (status != 0) {
- LOCK(&proc->mtx);
- if (proc->status == 0)
- proc->status = status;
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
- }
- }
-
- return status;
-}
-
-static int append_to_work_queue(sqfs_block_processor_t *proc,
- sqfs_block_t *block)
-{
- thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
- sqfs_block_t *io_list = NULL, *io_list_last = NULL;
- sqfs_block_t *blk, *fragblk;
- int status;
-
- LOCK(&thproc->mtx);
- for (;;) {
- status = thproc->status;
- if (status != 0)
- break;
-
- if (block == NULL) {
- if (thproc->backlog == 0)
- break;
- } else {
- if (thproc->backlog < thproc->max_backlog) {
- append_block(thproc, block);
- block = NULL;
- break;
- }
- }
-
- blk = try_dequeue_io(thproc);
- if (blk != NULL) {
- if (io_list_last == NULL) {
- io_list = io_list_last = blk;
- } else {
- io_list_last->next = blk;
- io_list_last = blk;
- }
- continue;
- }
-
- blk = try_dequeue_done(thproc);
- if (blk == NULL) {
- AWAIT(&thproc->done_cond, &thproc->mtx);
- continue;
- }
-
- if (blk->flags & SQFS_BLK_IS_FRAGMENT) {
- fragblk = NULL;
- thproc->status =
- proc->process_completed_fragment(proc, blk,
- &fragblk);
-
- if (fragblk != NULL) {
- fragblk->io_seq_num = thproc->io_enq_id++;
- append_block(thproc, fragblk);
- SIGNAL_ALL(&thproc->queue_cond);
- }
- } else {
- if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK) ||
- blk->flags & BLK_FLAG_MANUAL_SUBMISSION)
- blk->io_seq_num = thproc->io_enq_id++;
- store_io_block(thproc, blk);
- }
- }
- SIGNAL_ALL(&thproc->queue_cond);
- UNLOCK(&thproc->mtx);
- free(block);
-
- if (status == 0) {
- status = handle_io_queue(thproc, io_list);
- } else {
- free_blk_list(io_list);
- }
-
- return status;
-}
-
-static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index)
-{
- while (q != NULL) {
- if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index)
- break;
- q = q->next;
- }
- return q;
-}
-
-static int compare_frag_in_flight(sqfs_block_processor_t *proc,
- sqfs_block_t *frag, sqfs_u32 index,
- sqfs_u32 offset)
-{
- thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
- sqfs_block_t *it = NULL;
- void *blockbuf = NULL;
- size_t i, size = 0;
- int ret;
-
- if (proc->frag_block != NULL && proc->frag_block->index == index)
- it = proc->frag_block;
-
- if (it == NULL)
- it = find_frag_blk_in_queue(thproc->proc_queue, index);
-
- if (it == NULL)
- it = find_frag_blk_in_queue(thproc->io_queue, index);
-
- if (it == NULL)
- it = find_frag_blk_in_queue(thproc->done, index);
-
- if (it == NULL) {
- for (i = 0; i < thproc->num_workers; ++i) {
- if (thproc->workers[i]->frag_blk_index == index) {
- size = thproc->workers[i]->frag_blk_size;
- blockbuf = thproc->workers[i]->scratch +
- proc->max_block_size;
- break;
- }
- }
- } else if (it->flags & SQFS_BLK_IS_COMPRESSED) {
- proc->buffered_index = 0xFFFFFFFF;
- blockbuf = proc->frag_buffer;
- ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size,
- blockbuf, proc->max_block_size);
- if (ret <= 0)
- return -1;
- proc->buffered_index = it->index;
- size = ret;
- } else {
- blockbuf = it->data;
- size = it->size;
- }
-
- if (blockbuf == NULL || size == 0)
- return -1;
-
- if (offset >= size || frag->size > (size - offset))
- return -1;
-
- return memcmp((const char *)blockbuf + offset, frag->data, frag->size);
-}
-
-static int block_processor_sync(sqfs_block_processor_t *proc)
-{
- return append_to_work_queue(proc, NULL);
-}
-
-int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
- sqfs_block_processor_t **out)
-{
- thread_pool_processor_t *proc;
- unsigned int i, num_workers;
- size_t scratch_size;
- sigset_t oldset;
- int ret;
-
- if (desc->size != sizeof(sqfs_block_processor_desc_t))
- return SQFS_ERROR_ARG_INVALID;
-
- num_workers = desc->num_workers < 1 ? 1 : desc->num_workers;
-
- proc = alloc_flex(sizeof(*proc),
- sizeof(proc->workers[0]), num_workers);
- if (proc == NULL)
- return SQFS_ERROR_ALLOC;
-
- ret = block_processor_init(&proc->base, desc);
- if (ret != 0) {
- free(proc);
- return ret;
- }
-
- proc->base.sync = block_processor_sync;
- proc->base.append_to_work_queue = append_to_work_queue;
- proc->base.compare_frag_in_flight = compare_frag_in_flight;
- proc->num_workers = num_workers;
- proc->max_backlog = desc->max_backlog;
- ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
-
- MUTEX_INIT(&proc->mtx);
- CONDITION_INIT(&proc->queue_cond);
- CONDITION_INIT(&proc->done_cond);
-
- SIGNAL_DISABLE(&oldset);
-
- for (i = 0; i < num_workers; ++i) {
- scratch_size = desc->max_block_size;
- if (desc->uncmp != NULL && desc->file != NULL)
- scratch_size *= 2;
-
- proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, scratch_size);
-
- if (proc->workers[i] == NULL) {
- ret = SQFS_ERROR_ALLOC;
- goto fail;
- }
-
- proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = sqfs_copy(desc->cmp);
- proc->workers[i]->frag_blk_index = 0xFFFFFFFF;
-
- if (proc->workers[i]->cmp == NULL) {
- ret = SQFS_ERROR_ALLOC;
- goto fail;
- }
-
- ret = THREAD_CREATE(&proc->workers[i]->thread,
- worker_proc, proc->workers[i]);
- if (ret != 0) {
- ret = SQFS_ERROR_INTERNAL;
- goto fail;
- }
- }
-
- SIGNAL_ENABLE(&oldset);
- *out = (sqfs_block_processor_t *)proc;
- return 0;
-fail:
- SIGNAL_ENABLE(&oldset);
- block_processor_destroy((sqfs_object_t *)proc);
- return ret;
-}