summaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor/common.c
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 16:59:08 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-03-21 17:29:18 +0100
commitbb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 (patch)
tree62ec813c654f0962adc7048e849e6bb196b22430 /lib/sqfs/block_processor/common.c
parenta18f724aa3bf57aeed285b5f61eca4a0ba891c21 (diff)
Cleanup: Rewrite block processor to use the libutil thread_pool_t
Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor/common.c')
-rw-r--r--lib/sqfs/block_processor/common.c201
1 files changed, 100 insertions, 101 deletions
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index f1aca1e..62c355b 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -50,8 +50,7 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
proc->free_list = blk;
}
-static int process_completed_block(sqfs_block_processor_t *proc,
- sqfs_block_t *blk)
+int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
{
sqfs_u64 location;
sqfs_u32 size;
@@ -113,9 +112,12 @@ static bool is_zero_block(unsigned char *ptr, size_t size)
return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0;
}
-static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
- sqfs_u8 *scratch, size_t scratch_size)
+static int process_block(void *userptr, void *workitem)
{
+ sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp;
+ sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch;
+ size_t scratch_size = ((worker_data_t *)userptr)->scratch_size;
+ sqfs_block_t *block = workitem;
sqfs_s32 ret;
if (block->size == 0)
@@ -149,9 +151,9 @@ static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
return 0;
}
-static int process_completed_fragment(sqfs_block_processor_t *proc,
- sqfs_block_t *frag,
- sqfs_block_t **blk_out)
+int process_completed_fragment(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag,
+ sqfs_block_t **blk_out)
{
chunk_info_t *chunk, search;
struct hash_entry *entry;
@@ -176,10 +178,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
search.hash = frag->checksum;
search.size = frag->size;
- proc->frag_cmp_current = frag;
entry = hash_table_search_pre_hashed(proc->frag_ht,
search.hash, &search);
- proc->frag_cmp_current = NULL;
if (entry != NULL) {
if (frag->inode != NULL) {
@@ -241,10 +241,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
chunk->size = frag->size;
chunk->hash = frag->checksum;
- proc->frag_cmp_current = frag;
entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash,
chunk, chunk);
- proc->frag_cmp_current = NULL;
if (entry == NULL) {
free(chunk);
@@ -277,121 +275,122 @@ static uint32_t chunk_info_hash(void *user, const void *key)
static bool chunk_info_equals(void *user, const void *k, const void *c)
{
const chunk_info_t *key = k, *cmp = c;
- sqfs_block_processor_t *proc = user;
- sqfs_fragment_t frag;
- unsigned char *temp;
- size_t size;
- int ret;
-
- if (key->size != cmp->size || key->hash != cmp->hash)
- return false;
+ (void)user;
+ return key->size == cmp->size && key->hash == cmp->hash;
+}
- if (proc->file == NULL || proc->uncmp == NULL)
- return true;
+static void ht_delete_function(struct hash_entry *entry)
+{
+ free(entry->data);
+}
- ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current,
- cmp->index, cmp->offset);
- if (ret == 0)
- return true;
+static void block_processor_destroy(sqfs_object_t *base)
+{
+ sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base;
+ sqfs_block_t *it;
- if (proc->buffered_index != cmp->index ||
- proc->buffered_blk_size == 0) {
- if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag))
- return false;
+ if (proc->frag_block != NULL)
+ release_old_block(proc, proc->frag_block);
- proc->buffered_index = 0xFFFFFFFF;
- size = SQFS_ON_DISK_BLOCK_SIZE(frag.size);
+ free(proc->blk_current);
- if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) {
- temp = proc->frag_buffer + proc->max_block_size;
+ while (proc->free_list != NULL) {
+ it = proc->free_list;
+ proc->free_list = it->next;
+ free(it);
+ }
- ret = proc->file->read_at(proc->file, frag.start_offset,
- temp, size);
- if (ret != 0)
- return false;
+ hash_table_destroy(proc->frag_ht, ht_delete_function);
- ret = proc->uncmp->do_block(proc->uncmp, temp, size,
- proc->frag_buffer,
- proc->max_block_size);
- if (ret <= 0)
- return false;
+ /* XXX: shut down the pool first before cleaning up the worker data */
+ proc->pool->destroy(proc->pool);
- size = ret;
- } else {
- ret = proc->file->read_at(proc->file, frag.start_offset,
- proc->frag_buffer, size);
- if (ret != 0)
- return false;
- }
+ while (proc->workers != NULL) {
+ worker_data_t *worker = proc->workers;
+ proc->workers = worker->next;
- proc->buffered_index = cmp->index;
- proc->buffered_blk_size = size;
+ sqfs_destroy(worker->cmp);
+ free(worker);
}
- if (cmp->offset >= proc->buffered_blk_size)
- return false;
-
- if (cmp->size > (proc->buffered_blk_size - cmp->offset))
- return false;
-
- return memcmp(proc->frag_buffer + cmp->offset,
- proc->frag_cmp_current->data,
- cmp->size) == 0;
+ free(proc);
}
-static void ht_delete_function(struct hash_entry *entry)
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
- free(entry->data);
-}
-
-void block_processor_cleanup(sqfs_block_processor_t *base)
-{
- sqfs_block_t *it;
+ sqfs_block_processor_t *proc;
+ size_t i, count;
+ int ret;
- if (base->frag_block != NULL)
- release_old_block(base, base->frag_block);
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
- free(base->blk_current);
- free(base->frag_buffer);
+ proc = calloc(1, sizeof(*proc));
+ if (proc == NULL)
+ return SQFS_ERROR_ALLOC;
- while (base->free_list != NULL) {
- it = base->free_list;
- base->free_list = it->next;
- free(it);
+ proc->max_backlog = desc->max_backlog;
+ proc->max_block_size = desc->max_block_size;
+ proc->frag_tbl = desc->tbl;
+ proc->wr = desc->wr;
+ proc->file = desc->file;
+ proc->uncmp = desc->uncmp;
+ proc->stats.size = sizeof(proc->stats);
+ ((sqfs_object_t *)proc)->destroy = block_processor_destroy;
+
+ /* create the thread pool */
+ proc->pool = thread_pool_create(desc->num_workers, process_block);
+ if (proc->pool == NULL) {
+ free(proc);
+ return SQFS_ERROR_INTERNAL;
}
- hash_table_destroy(base->frag_ht, ht_delete_function);
-}
+ /* create the worker compressors & scratch buffer */
+ count = proc->pool->get_worker_count(proc->pool);
-int block_processor_init(sqfs_block_processor_t *base,
- const sqfs_block_processor_desc_t *desc)
-{
- base->process_completed_block = process_completed_block;
- base->process_completed_fragment = process_completed_fragment;
- base->process_block = process_block;
- base->max_block_size = desc->max_block_size;
- base->cmp = desc->cmp;
- base->frag_tbl = desc->tbl;
- base->wr = desc->wr;
- base->file = desc->file;
- base->uncmp = desc->uncmp;
- base->buffered_index = 0xFFFFFFFF;
- base->stats.size = sizeof(base->stats);
-
- if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) {
- base->frag_buffer = malloc(2 * desc->max_block_size);
- if (base->frag_buffer == NULL)
- return SQFS_ERROR_ALLOC;
+ for (i = 0; i < count; ++i) {
+ worker_data_t *worker = alloc_flex(sizeof(*worker), 1,
+ desc->max_block_size);
+ if (worker == NULL) {
+ ret = SQFS_ERROR_ALLOC;
+ goto fail_pool;
+ }
+
+ worker->scratch_size = desc->max_block_size;
+ worker->next = proc->workers;
+ proc->workers = worker;
+
+ worker->cmp = sqfs_copy(desc->cmp);
+ if (worker->cmp == NULL)
+ goto fail_pool;
+
+ proc->pool->set_worker_ptr(proc->pool, i, worker);
}
- base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
- if (base->frag_ht == NULL) {
- free(base->frag_buffer);
- return SQFS_ERROR_ALLOC;
+ /* create the fragment hash table */
+ proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
+ if (proc->frag_ht == NULL) {
+ ret = SQFS_ERROR_ALLOC;
+ goto fail_pool;
}
- base->frag_ht->user = base;
+ proc->frag_ht->user = proc;
+ *out = proc;
return 0;
+fail_pool:
+ proc->pool->destroy(proc->pool);
+ while (proc->workers != NULL) {
+ worker_data_t *worker = proc->workers;
+ proc->workers = worker->next;
+
+ if (worker->cmp != NULL)
+ sqfs_destroy(worker->cmp);
+
+ free(worker);
+ }
+ free(proc);
+ return ret;
}
sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,