From bb0ef9e0eec5c27610fe381b905ef46b3f5f09c6 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Sun, 21 Mar 2021 16:59:08 +0100 Subject: Cleanup: Rewrite block processor to use the libutil thread_pool_t Throw out the messy thread pool implementation and temporarily also remove the exact fragment matching for simplicity. Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/common.c | 201 +++++++++++++++++++------------------- 1 file changed, 100 insertions(+), 101 deletions(-) (limited to 'lib/sqfs/block_processor/common.c') diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index f1aca1e..62c355b 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -50,8 +50,7 @@ static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) proc->free_list = blk; } -static int process_completed_block(sqfs_block_processor_t *proc, - sqfs_block_t *blk) +int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) { sqfs_u64 location; sqfs_u32 size; @@ -113,9 +112,12 @@ static bool is_zero_block(unsigned char *ptr, size_t size) return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; } -static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp, - sqfs_u8 *scratch, size_t scratch_size) +static int process_block(void *userptr, void *workitem) { + sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp; + sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch; + size_t scratch_size = ((worker_data_t *)userptr)->scratch_size; + sqfs_block_t *block = workitem; sqfs_s32 ret; if (block->size == 0) @@ -149,9 +151,9 @@ static int process_block(sqfs_block_t *block, sqfs_compressor_t *cmp, return 0; } -static int process_completed_fragment(sqfs_block_processor_t *proc, - sqfs_block_t *frag, - sqfs_block_t **blk_out) +int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag, + sqfs_block_t **blk_out) { chunk_info_t *chunk, search; struct hash_entry *entry; @@ -176,10 +178,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, search.hash = frag->checksum; search.size = frag->size; - proc->frag_cmp_current = frag; entry = hash_table_search_pre_hashed(proc->frag_ht, search.hash, &search); - proc->frag_cmp_current = NULL; if (entry != NULL) { if (frag->inode != NULL) { @@ -241,10 +241,8 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, chunk->size = frag->size; chunk->hash = frag->checksum; - proc->frag_cmp_current = frag; entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, chunk, chunk); - proc->frag_cmp_current = NULL; if (entry == NULL) { free(chunk); @@ -277,121 +275,122 @@ static uint32_t chunk_info_hash(void *user, const void *key) static bool chunk_info_equals(void *user, const void *k, const void *c) { const chunk_info_t *key = k, *cmp = c; - sqfs_block_processor_t *proc = user; - sqfs_fragment_t frag; - unsigned char *temp; - size_t size; - int ret; - - if (key->size != cmp->size || key->hash != cmp->hash) - return false; + (void)user; + return key->size == cmp->size && key->hash == cmp->hash; +} - if (proc->file == NULL || proc->uncmp == NULL) - return true; +static void ht_delete_function(struct hash_entry *entry) +{ + free(entry->data); +} - ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current, - cmp->index, cmp->offset); - if (ret == 0) - return true; +static void block_processor_destroy(sqfs_object_t *base) +{ + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; + sqfs_block_t *it; - if (proc->buffered_index != cmp->index || - proc->buffered_blk_size == 0) { - if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag)) - return false; + if (proc->frag_block != NULL) + release_old_block(proc, proc->frag_block); - proc->buffered_index = 0xFFFFFFFF; - size = SQFS_ON_DISK_BLOCK_SIZE(frag.size); + free(proc->blk_current); - if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) { - temp = proc->frag_buffer + proc->max_block_size; + while (proc->free_list != NULL) { + it = proc->free_list; + proc->free_list = it->next; + free(it); + } - ret = proc->file->read_at(proc->file, frag.start_offset, - temp, size); - if (ret != 0) - return false; + hash_table_destroy(proc->frag_ht, ht_delete_function); - ret = proc->uncmp->do_block(proc->uncmp, temp, size, - proc->frag_buffer, - proc->max_block_size); - if (ret <= 0) - return false; + /* XXX: shut down the pool first before cleaning up the worker data */ + proc->pool->destroy(proc->pool); - size = ret; - } else { - ret = proc->file->read_at(proc->file, frag.start_offset, - proc->frag_buffer, size); - if (ret != 0) - return false; - } + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; - proc->buffered_index = cmp->index; - proc->buffered_blk_size = size; + sqfs_destroy(worker->cmp); + free(worker); } - if (cmp->offset >= proc->buffered_blk_size) - return false; - - if (cmp->size > (proc->buffered_blk_size - cmp->offset)) - return false; - - return memcmp(proc->frag_buffer + cmp->offset, - proc->frag_cmp_current->data, - cmp->size) == 0; + free(proc); } -static void ht_delete_function(struct hash_entry *entry) +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) { - free(entry->data); -} - -void block_processor_cleanup(sqfs_block_processor_t *base) -{ - sqfs_block_t *it; + sqfs_block_processor_t *proc; + size_t i, count; + int ret; - if (base->frag_block != NULL) - release_old_block(base, base->frag_block); + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; - free(base->blk_current); - free(base->frag_buffer); + proc = calloc(1, sizeof(*proc)); + if (proc == NULL) + return SQFS_ERROR_ALLOC; - while (base->free_list != NULL) { - it = base->free_list; - base->free_list = it->next; - free(it); + proc->max_backlog = desc->max_backlog; + proc->max_block_size = desc->max_block_size; + proc->frag_tbl = desc->tbl; + proc->wr = desc->wr; + proc->file = desc->file; + proc->uncmp = desc->uncmp; + proc->stats.size = sizeof(proc->stats); + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + + /* create the thread pool */ + proc->pool = thread_pool_create(desc->num_workers, process_block); + if (proc->pool == NULL) { + free(proc); + return SQFS_ERROR_INTERNAL; } - hash_table_destroy(base->frag_ht, ht_delete_function); -} + /* create the worker compressors & scratch buffer */ + count = proc->pool->get_worker_count(proc->pool); -int block_processor_init(sqfs_block_processor_t *base, - const sqfs_block_processor_desc_t *desc) -{ - base->process_completed_block = process_completed_block; - base->process_completed_fragment = process_completed_fragment; - base->process_block = process_block; - base->max_block_size = desc->max_block_size; - base->cmp = desc->cmp; - base->frag_tbl = desc->tbl; - base->wr = desc->wr; - base->file = desc->file; - base->uncmp = desc->uncmp; - base->buffered_index = 0xFFFFFFFF; - base->stats.size = sizeof(base->stats); - - if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) { - base->frag_buffer = malloc(2 * desc->max_block_size); - if (base->frag_buffer == NULL) - return SQFS_ERROR_ALLOC; + for (i = 0; i < count; ++i) { + worker_data_t *worker = alloc_flex(sizeof(*worker), 1, + desc->max_block_size); + if (worker == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + worker->scratch_size = desc->max_block_size; + worker->next = proc->workers; + proc->workers = worker; + + worker->cmp = sqfs_copy(desc->cmp); + if (worker->cmp == NULL) + goto fail_pool; + + proc->pool->set_worker_ptr(proc->pool, i, worker); } - base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); - if (base->frag_ht == NULL) { - free(base->frag_buffer); - return SQFS_ERROR_ALLOC; + /* create the fragment hash table */ + proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); + if (proc->frag_ht == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; } - base->frag_ht->user = base; + proc->frag_ht->user = proc; + *out = proc; return 0; +fail_pool: + proc->pool->destroy(proc->pool); + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; + + if (worker->cmp != NULL) + sqfs_destroy(worker->cmp); + + free(worker); + } + free(proc); + return ret; } sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -- cgit v1.2.3