From 5aa0f30173ecf3b6538b9136cb4783fc19266288 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Mon, 22 Mar 2021 11:30:15 +0100 Subject: Cleanup the block processor file structure A cleaner separation between common code, frontend code and backend code is made. The "is this byte blob zero" function is moved out to libutil (with test case and everything) with a more optimized implementation. Signed-off-by: David Oberhollenzer --- include/util.h | 5 + lib/sqfs/Makemodule.am | 4 +- lib/sqfs/block_processor/backend.c | 288 ++++++++++++++++++++ lib/sqfs/block_processor/block_processor.c | 234 ++++++++++++++++ lib/sqfs/block_processor/common.c | 421 ----------------------------- lib/sqfs/block_processor/frontend.c | 254 +++-------------- lib/sqfs/block_processor/internal.h | 12 +- lib/util/Makemodule.am | 1 + lib/util/is_memory_zero.c | 54 ++++ tests/libutil/Makemodule.am | 5 +- tests/libutil/is_memory_zero.c | 32 +++ 11 files changed, 665 insertions(+), 645 deletions(-) create mode 100644 lib/sqfs/block_processor/backend.c create mode 100644 lib/sqfs/block_processor/block_processor.c delete mode 100644 lib/sqfs/block_processor/common.c create mode 100644 lib/util/is_memory_zero.c create mode 100644 tests/libutil/is_memory_zero.c diff --git a/include/util.h b/include/util.h index 6774be2..4b05340 100644 --- a/include/util.h +++ b/include/util.h @@ -30,4 +30,9 @@ void *alloc_array(size_t item_size, size_t nmemb); SQFS_INTERNAL sqfs_u32 xxh32(const void *input, const size_t len); +/* + Returns true if the given region of memory is filled with zero-bytes only. + */ +SQFS_INTERNAL bool is_memory_zero(const void *blob, size_t size); + #endif /* SQFS_UTIL_H */ diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index 5a62894..72eb6bc 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -25,8 +25,9 @@ libsquashfs_la_SOURCES += lib/sqfs/xattr/xattr_writer_record.c libsquashfs_la_SOURCES += lib/sqfs/xattr/xattr_writer.h libsquashfs_la_SOURCES += lib/sqfs/write_super.c lib/sqfs/data_reader.c libsquashfs_la_SOURCES += lib/sqfs/block_processor/internal.h -libsquashfs_la_SOURCES += lib/sqfs/block_processor/common.c libsquashfs_la_SOURCES += lib/sqfs/block_processor/frontend.c +libsquashfs_la_SOURCES += lib/sqfs/block_processor/block_processor.c +libsquashfs_la_SOURCES += lib/sqfs/block_processor/backend.c libsquashfs_la_SOURCES += lib/sqfs/frag_table.c include/sqfs/frag_table.h libsquashfs_la_SOURCES += lib/sqfs/block_writer.c include/sqfs/block_writer.h libsquashfs_la_CPPFLAGS = $(AM_CPPFLAGS) @@ -43,6 +44,7 @@ libsquashfs_la_SOURCES += lib/util/xxhash.c libsquashfs_la_SOURCES += lib/util/hash_table.c include/hash_table.h libsquashfs_la_SOURCES += lib/util/rbtree.c include/rbtree.h libsquashfs_la_SOURCES += lib/util/array.c include/array.h +libsquashfs_la_SOURCES += lib/util/is_memory_zero.c libsquashfs_la_SOURCES += include/threadpool.h if CUSTOM_ALLOC diff --git a/lib/sqfs/block_processor/backend.c b/lib/sqfs/block_processor/backend.c new file mode 100644 index 0000000..ff142c9 --- /dev/null +++ b/lib/sqfs/block_processor/backend.c @@ -0,0 +1,288 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * backend.c + * + * Copyright (C) 2019 David Oberhollenzer + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int set_block_size(sqfs_inode_generic_t **inode, + sqfs_u32 index, sqfs_u32 size) +{ + size_t min_size = (index + 1) * sizeof(sqfs_u32); + size_t avail = (*inode)->payload_bytes_available; + sqfs_inode_generic_t *new; + size_t newsz; + + if (avail < min_size) { + newsz = avail ? avail : (sizeof(sqfs_u32) * 4); + while (newsz < min_size) + newsz *= 2; + + if (SZ_ADD_OV(newsz, sizeof(**inode), &newsz)) + return SQFS_ERROR_OVERFLOW; + + if (sizeof(size_t) > sizeof(sqfs_u32)) { + if ((newsz - sizeof(**inode)) > 0x0FFFFFFFFUL) + return SQFS_ERROR_OVERFLOW; + } + + new = realloc((*inode), newsz); + if (new == NULL) + return SQFS_ERROR_ALLOC; + + (*inode) = new; + (*inode)->payload_bytes_available = newsz - sizeof(**inode); + } + + (*inode)->extra[index] = size; + + if (min_size >= (*inode)->payload_bytes_used) + (*inode)->payload_bytes_used = min_size; + + return 0; +} + +static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + blk->next = proc->free_list; + proc->free_list = blk; +} + +static int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + sqfs_u64 location; + sqfs_u32 size; + int err; + + err = proc->wr->write_data_block(proc->wr, blk->user, blk->size, + blk->checksum, + blk->flags & ~BLK_FLAG_INTERNAL, + blk->data, &location); + if (err) + goto out; + + proc->stats.output_bytes_generated += blk->size; + + if (blk->flags & SQFS_BLK_IS_SPARSE) { + if (blk->inode != NULL) { + sqfs_inode_make_extended(*(blk->inode)); + (*(blk->inode))->data.file_ext.sparse += blk->size; + + err = set_block_size(blk->inode, blk->index, 0); + if (err) + goto out; + } + proc->stats.sparse_block_count += 1; + } else if (blk->size != 0) { + size = blk->size; + if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) + size |= 1 << 24; + + if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { + if (proc->frag_tbl != NULL) { + err = sqfs_frag_table_set(proc->frag_tbl, + blk->index, location, + size); + if (err) + goto out; + } + proc->stats.frag_block_count += 1; + } else { + if (blk->inode != NULL) { + err = set_block_size(blk->inode, blk->index, + size); + if (err) + goto out; + } + proc->stats.data_block_count += 1; + } + } + + if (blk->flags & SQFS_BLK_LAST_BLOCK && blk->inode != NULL) + sqfs_inode_set_file_block_start(*(blk->inode), location); +out: + release_old_block(proc, blk); + return err; +} + +static int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag) +{ + chunk_info_t *chunk = NULL, search; + struct hash_entry *entry; + sqfs_u32 index, offset; + int err; + + if (frag->flags & SQFS_BLK_IS_SPARSE) { + if (frag->inode != NULL) { + sqfs_inode_make_extended(*(frag->inode)); + set_block_size(frag->inode, frag->index, 0); + (*(frag->inode))->data.file_ext.sparse += frag->size; + } + proc->stats.sparse_block_count += 1; + release_old_block(proc, frag); + return 0; + } + + proc->stats.total_frag_count += 1; + + if (!(frag->flags & SQFS_BLK_DONT_DEDUPLICATE)) { + search.hash = frag->checksum; + search.size = frag->size; + + entry = hash_table_search_pre_hashed(proc->frag_ht, + search.hash, &search); + + if (entry != NULL) { + if (frag->inode != NULL) { + chunk = entry->data; + sqfs_inode_set_frag_location(*(frag->inode), + chunk->index, + chunk->offset); + } + release_old_block(proc, frag); + return 0; + } + } + + if (proc->frag_block != NULL) { + size_t size = proc->frag_block->size + frag->size; + + if (size > proc->max_block_size) { + proc->frag_block->io_seq_num = proc->io_seq_num++; + + err = enqueue_block(proc, proc->frag_block); + proc->frag_block = NULL; + + if (err) + goto fail; + } + } + + if (proc->frag_block == NULL) { + if (proc->frag_tbl == NULL) { + index = 0; + } else { + err = sqfs_frag_table_append(proc->frag_tbl, + 0, 0, &index); + if (err) + goto fail; + } + + offset = 0; + proc->frag_block = frag; + proc->frag_block->index = index; + proc->frag_block->flags &= SQFS_BLK_DONT_COMPRESS; + proc->frag_block->flags |= SQFS_BLK_FRAGMENT_BLOCK; + } else { + index = proc->frag_block->index; + offset = proc->frag_block->size; + + memcpy(proc->frag_block->data + proc->frag_block->size, + frag->data, frag->size); + + proc->frag_block->size += frag->size; + proc->frag_block->flags |= + (frag->flags & SQFS_BLK_DONT_COMPRESS); + } + + if (proc->frag_tbl != NULL) { + err = SQFS_ERROR_ALLOC; + chunk = calloc(1, sizeof(*chunk)); + if (chunk == NULL) + goto fail; + + chunk->index = index; + chunk->offset = offset; + chunk->size = frag->size; + chunk->hash = frag->checksum; + + entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, + chunk, chunk); + + if (entry == NULL) + goto fail; + } + + if (frag->inode != NULL) + sqfs_inode_set_frag_location(*(frag->inode), index, offset); + + if (frag != proc->frag_block) + release_old_block(proc, frag); + + proc->stats.actual_frag_count += 1; + return 0; +fail: + free(chunk); + free(frag); + return err; +} + +static void store_io_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + sqfs_block_t *prev = NULL, *it = proc->io_queue; + + while (it != NULL && (it->io_seq_num < blk->io_seq_num)) { + prev = it; + it = it->next; + } + + if (prev == NULL) { + proc->io_queue = blk; + } else { + prev->next = blk; + } + + blk->next = it; + proc->backlog += 1; +} + +int dequeue_block(sqfs_block_processor_t *proc) +{ + size_t backlog_old = proc->backlog; + sqfs_block_t *blk; + int status; + + do { + while (proc->io_queue != NULL) { + if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) + break; + + blk = proc->io_queue; + proc->io_queue = blk->next; + proc->io_deq_seq_num += 1; + proc->backlog -= 1; + + status = process_completed_block(proc, blk); + if (status != 0) + return status; + } + + if (proc->backlog < backlog_old) + break; + + blk = proc->pool->dequeue(proc->pool); + + if (blk == NULL) { + status = proc->pool->get_status(proc->pool); + return status ? status : SQFS_ERROR_INTERNAL; + } + + proc->backlog -= 1; + + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + status = process_completed_fragment(proc, blk); + if (status != 0) + return status; + } else { + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) + blk->io_seq_num = proc->io_seq_num++; + + store_io_block(proc, blk); + } + } while (proc->backlog >= backlog_old); + + return 0; +} diff --git a/lib/sqfs/block_processor/block_processor.c b/lib/sqfs/block_processor/block_processor.c new file mode 100644 index 0000000..f9313db --- /dev/null +++ b/lib/sqfs/block_processor/block_processor.c @@ -0,0 +1,234 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * block_processor.c + * + * Copyright (C) 2019 David Oberhollenzer + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int process_block(void *userptr, void *workitem) +{ + worker_data_t *worker = userptr; + sqfs_block_t *block = workitem; + sqfs_s32 ret; + + if (block->size == 0) + return 0; + + if (!(block->flags & SQFS_BLK_IGNORE_SPARSE) && + is_memory_zero(block->data, block->size)) { + block->flags |= SQFS_BLK_IS_SPARSE; + return 0; + } + + if (block->flags & SQFS_BLK_DONT_HASH) { + block->checksum = 0; + } else { + block->checksum = xxh32(block->data, block->size); + } + + if (block->flags & (SQFS_BLK_IS_FRAGMENT | SQFS_BLK_DONT_COMPRESS)) + return 0; + + ret = worker->cmp->do_block(worker->cmp, block->data, block->size, + worker->scratch, worker->scratch_size); + if (ret < 0) + return ret; + + if (ret > 0) { + memcpy(block->data, worker->scratch, ret); + block->size = ret; + block->flags |= SQFS_BLK_IS_COMPRESSED; + } + return 0; +} + +static bool chunk_info_equals(void *user, const void *k, const void *c) +{ + const chunk_info_t *key = k, *cmp = c; + (void)user; + return key->size == cmp->size && key->hash == cmp->hash; +} + +static void ht_delete_function(struct hash_entry *entry) +{ + free(entry->data); +} + +static void block_processor_destroy(sqfs_object_t *base) +{ + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; + sqfs_block_t *it; + + free(proc->frag_block); + free(proc->blk_current); + + while (proc->free_list != NULL) { + it = proc->free_list; + proc->free_list = it->next; + free(it); + } + + while (proc->io_queue != NULL) { + it = proc->io_queue; + proc->io_queue = it->next; + free(it); + } + + if (proc->frag_ht != NULL) + hash_table_destroy(proc->frag_ht, ht_delete_function); + + /* XXX: shut down the pool first before cleaning up the worker data */ + if (proc->pool != NULL) + proc->pool->destroy(proc->pool); + + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; + + sqfs_destroy(worker->cmp); + free(worker); + } + + free(proc); +} + +int sqfs_block_processor_sync(sqfs_block_processor_t *proc) +{ + int ret; + + while (proc->backlog > 0) { + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + + return 0; +} + +int sqfs_block_processor_finish(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + int status; + + status = sqfs_block_processor_sync(proc); + if (status != 0) + return status; + + if (proc->frag_block != NULL) { + blk = proc->frag_block; + blk->next = NULL; + proc->frag_block = NULL; + + blk->io_seq_num = proc->io_seq_num++; + + status = enqueue_block(proc, blk); + if (status != 0) + return status; + + status = sqfs_block_processor_sync(proc); + } + + return status; +} + +const sqfs_block_processor_stats_t +*sqfs_block_processor_get_stats(const sqfs_block_processor_t *proc) +{ + return &proc->stats; +} + +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) +{ + sqfs_block_processor_t *proc; + size_t i, count; + int ret; + + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; + + proc = calloc(1, sizeof(*proc)); + if (proc == NULL) + return SQFS_ERROR_ALLOC; + + proc->max_backlog = desc->max_backlog; + proc->max_block_size = desc->max_block_size; + proc->frag_tbl = desc->tbl; + proc->wr = desc->wr; + proc->file = desc->file; + proc->uncmp = desc->uncmp; + proc->stats.size = sizeof(proc->stats); + ((sqfs_object_t *)proc)->destroy = block_processor_destroy; + + /* create the thread pool */ + proc->pool = thread_pool_create(desc->num_workers, process_block); + if (proc->pool == NULL) { + free(proc); + return SQFS_ERROR_INTERNAL; + } + + /* create the worker compressors & scratch buffer */ + count = proc->pool->get_worker_count(proc->pool); + + for (i = 0; i < count; ++i) { + worker_data_t *worker = alloc_flex(sizeof(*worker), 1, + desc->max_block_size); + if (worker == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + worker->scratch_size = desc->max_block_size; + worker->next = proc->workers; + proc->workers = worker; + + worker->cmp = sqfs_copy(desc->cmp); + if (worker->cmp == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + proc->pool->set_worker_ptr(proc->pool, i, worker); + } + + /* create the fragment hash table */ + proc->frag_ht = hash_table_create(NULL, chunk_info_equals); + if (proc->frag_ht == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + proc->frag_ht->user = proc; + *out = proc; + return 0; +fail_pool: + block_processor_destroy((sqfs_object_t *)proc); + return ret; +} + +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + sqfs_block_processor_desc_t desc; + sqfs_block_processor_t *out; + + memset(&desc, 0, sizeof(desc)); + desc.size = sizeof(desc); + desc.max_block_size = max_block_size; + desc.num_workers = num_workers; + desc.max_backlog = max_backlog; + desc.cmp = cmp; + desc.wr = wr; + desc.tbl = tbl; + + if (sqfs_block_processor_create_ex(&desc, &out) != 0) + return NULL; + + return out; +} diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c deleted file mode 100644 index 6a0015b..0000000 --- a/lib/sqfs/block_processor/common.c +++ /dev/null @@ -1,421 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * common.c - * - * Copyright (C) 2019 David Oberhollenzer - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -static int set_block_size(sqfs_inode_generic_t **inode, - sqfs_u32 index, sqfs_u32 size) -{ - size_t min_size = (index + 1) * sizeof(sqfs_u32); - size_t avail = (*inode)->payload_bytes_available; - sqfs_inode_generic_t *new; - size_t newsz; - - if (avail < min_size) { - newsz = avail ? avail : (sizeof(sqfs_u32) * 4); - while (newsz < min_size) - newsz *= 2; - - if (SZ_ADD_OV(newsz, sizeof(**inode), &newsz)) - return SQFS_ERROR_OVERFLOW; - - if (sizeof(size_t) > sizeof(sqfs_u32)) { - if ((newsz - sizeof(**inode)) > 0x0FFFFFFFFUL) - return SQFS_ERROR_OVERFLOW; - } - - new = realloc((*inode), newsz); - if (new == NULL) - return SQFS_ERROR_ALLOC; - - (*inode) = new; - (*inode)->payload_bytes_available = newsz - sizeof(**inode); - } - - (*inode)->extra[index] = size; - - if (min_size >= (*inode)->payload_bytes_used) - (*inode)->payload_bytes_used = min_size; - - return 0; -} - -static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) -{ - blk->next = proc->free_list; - proc->free_list = blk; -} - -int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) -{ - sqfs_u64 location; - sqfs_u32 size; - int err; - - err = proc->wr->write_data_block(proc->wr, blk->user, blk->size, - blk->checksum, - blk->flags & ~BLK_FLAG_INTERNAL, - blk->data, &location); - if (err) - goto out; - - proc->stats.output_bytes_generated += blk->size; - - if (blk->flags & SQFS_BLK_IS_SPARSE) { - if (blk->inode != NULL) { - sqfs_inode_make_extended(*(blk->inode)); - (*(blk->inode))->data.file_ext.sparse += blk->size; - - err = set_block_size(blk->inode, blk->index, 0); - if (err) - goto out; - } - proc->stats.sparse_block_count += 1; - } else if (blk->size != 0) { - size = blk->size; - if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) - size |= 1 << 24; - - if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { - if (proc->frag_tbl != NULL) { - err = sqfs_frag_table_set(proc->frag_tbl, - blk->index, location, - size); - if (err) - goto out; - } - proc->stats.frag_block_count += 1; - } else { - if (blk->inode != NULL) { - err = set_block_size(blk->inode, blk->index, - size); - if (err) - goto out; - } - proc->stats.data_block_count += 1; - } - } - - if (blk->flags & SQFS_BLK_LAST_BLOCK && blk->inode != NULL) - sqfs_inode_set_file_block_start(*(blk->inode), location); -out: - release_old_block(proc, blk); - return err; -} - -static bool is_zero_block(unsigned char *ptr, size_t size) -{ - return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; -} - -static int process_block(void *userptr, void *workitem) -{ - sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp; - sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch; - size_t scratch_size = ((worker_data_t *)userptr)->scratch_size; - sqfs_block_t *block = workitem; - sqfs_s32 ret; - - if (block->size == 0) - return 0; - - if (!(block->flags & SQFS_BLK_IGNORE_SPARSE) && - is_zero_block(block->data, block->size)) { - block->flags |= SQFS_BLK_IS_SPARSE; - return 0; - } - - if (block->flags & SQFS_BLK_DONT_HASH) { - block->checksum = 0; - } else { - block->checksum = xxh32(block->data, block->size); - } - - if (block->flags & (SQFS_BLK_IS_FRAGMENT | SQFS_BLK_DONT_COMPRESS)) - return 0; - - ret = cmp->do_block(cmp, block->data, block->size, - scratch, scratch_size); - if (ret < 0) - return ret; - - if (ret > 0) { - memcpy(block->data, scratch, ret); - block->size = ret; - block->flags |= SQFS_BLK_IS_COMPRESSED; - } - return 0; -} - -int process_completed_fragment(sqfs_block_processor_t *proc, - sqfs_block_t *frag, - sqfs_block_t **blk_out) -{ - chunk_info_t *chunk, search; - struct hash_entry *entry; - sqfs_u32 index, offset; - size_t size; - int err; - - if (frag->flags & SQFS_BLK_IS_SPARSE) { - if (frag->inode != NULL) { - sqfs_inode_make_extended(*(frag->inode)); - set_block_size(frag->inode, frag->index, 0); - (*(frag->inode))->data.file_ext.sparse += frag->size; - } - proc->stats.sparse_block_count += 1; - release_old_block(proc, frag); - return 0; - } - - proc->stats.total_frag_count += 1; - - if (!(frag->flags & SQFS_BLK_DONT_DEDUPLICATE)) { - search.hash = frag->checksum; - search.size = frag->size; - - entry = hash_table_search_pre_hashed(proc->frag_ht, - search.hash, &search); - - if (entry != NULL) { - if (frag->inode != NULL) { - chunk = entry->data; - sqfs_inode_set_frag_location(*(frag->inode), - chunk->index, - chunk->offset); - } - release_old_block(proc, frag); - return 0; - } - } - - if (proc->frag_block != NULL) { - size = proc->frag_block->size + frag->size; - - if (size > proc->max_block_size) { - *blk_out = proc->frag_block; - proc->frag_block = NULL; - } - } - - if (proc->frag_block == NULL) { - if (proc->frag_tbl == NULL) { - index = 0; - } else { - err = sqfs_frag_table_append(proc->frag_tbl, - 0, 0, &index); - if (err) - goto fail; - } - - offset = 0; - proc->frag_block = frag; - proc->frag_block->index = index; - proc->frag_block->flags &= SQFS_BLK_DONT_COMPRESS; - proc->frag_block->flags |= SQFS_BLK_FRAGMENT_BLOCK; - } else { - index = proc->frag_block->index; - offset = proc->frag_block->size; - - memcpy(proc->frag_block->data + proc->frag_block->size, - frag->data, frag->size); - - proc->frag_block->size += frag->size; - proc->frag_block->flags |= - (frag->flags & SQFS_BLK_DONT_COMPRESS); - release_old_block(proc, frag); - } - - if (proc->frag_tbl != NULL) { - err = SQFS_ERROR_ALLOC; - chunk = calloc(1, sizeof(*chunk)); - if (chunk == NULL) - goto fail_outblk; - - chunk->index = index; - chunk->offset = offset; - chunk->size = frag->size; - chunk->hash = frag->checksum; - - entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, - chunk, chunk); - - if (entry == NULL) { - free(chunk); - goto fail_outblk; - } - } - - if (frag->inode != NULL) - sqfs_inode_set_frag_location(*(frag->inode), index, offset); - - proc->stats.actual_frag_count += 1; - return 0; -fail: - release_old_block(proc, frag); -fail_outblk: - if (*blk_out != NULL) { - release_old_block(proc, *blk_out); - *blk_out = NULL; - } - return err; -} - -static uint32_t chunk_info_hash(void *user, const void *key) -{ - const chunk_info_t *chunk = key; - (void)user; - return chunk->hash; -} - -static bool chunk_info_equals(void *user, const void *k, const void *c) -{ - const chunk_info_t *key = k, *cmp = c; - (void)user; - return key->size == cmp->size && key->hash == cmp->hash; -} - -static void ht_delete_function(struct hash_entry *entry) -{ - free(entry->data); -} - -static void block_processor_destroy(sqfs_object_t *base) -{ - sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; - sqfs_block_t *it; - - if (proc->frag_block != NULL) - release_old_block(proc, proc->frag_block); - - free(proc->blk_current); - - while (proc->free_list != NULL) { - it = proc->free_list; - proc->free_list = it->next; - free(it); - } - - hash_table_destroy(proc->frag_ht, ht_delete_function); - - /* XXX: shut down the pool first before cleaning up the worker data */ - proc->pool->destroy(proc->pool); - - while (proc->workers != NULL) { - worker_data_t *worker = proc->workers; - proc->workers = worker->next; - - sqfs_destroy(worker->cmp); - free(worker); - } - - free(proc); -} - -int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, - sqfs_block_processor_t **out) -{ - sqfs_block_processor_t *proc; - size_t i, count; - int ret; - - if (desc->size != sizeof(sqfs_block_processor_desc_t)) - return SQFS_ERROR_ARG_INVALID; - - proc = calloc(1, sizeof(*proc)); - if (proc == NULL) - return SQFS_ERROR_ALLOC; - - proc->max_backlog = desc->max_backlog; - proc->max_block_size = desc->max_block_size; - proc->frag_tbl = desc->tbl; - proc->wr = desc->wr; - proc->file = desc->file; - proc->uncmp = desc->uncmp; - proc->stats.size = sizeof(proc->stats); - ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - - /* create the thread pool */ - proc->pool = thread_pool_create(desc->num_workers, process_block); - if (proc->pool == NULL) { - free(proc); - return SQFS_ERROR_INTERNAL; - } - - /* create the worker compressors & scratch buffer */ - count = proc->pool->get_worker_count(proc->pool); - - for (i = 0; i < count; ++i) { - worker_data_t *worker = alloc_flex(sizeof(*worker), 1, - desc->max_block_size); - if (worker == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail_pool; - } - - worker->scratch_size = desc->max_block_size; - worker->next = proc->workers; - proc->workers = worker; - - worker->cmp = sqfs_copy(desc->cmp); - if (worker->cmp == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail_pool; - } - - proc->pool->set_worker_ptr(proc->pool, i, worker); - } - - /* create the fragment hash table */ - proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); - if (proc->frag_ht == NULL) { - ret = SQFS_ERROR_ALLOC; - goto fail_pool; - } - - proc->frag_ht->user = proc; - *out = proc; - return 0; -fail_pool: - proc->pool->destroy(proc->pool); - while (proc->workers != NULL) { - worker_data_t *worker = proc->workers; - proc->workers = worker->next; - - if (worker->cmp != NULL) - sqfs_destroy(worker->cmp); - - free(worker); - } - free(proc); - return ret; -} - -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) -{ - sqfs_block_processor_desc_t desc; - sqfs_block_processor_t *out; - - memset(&desc, 0, sizeof(desc)); - desc.size = sizeof(desc); - desc.max_block_size = max_block_size; - desc.num_workers = num_workers; - desc.max_backlog = max_backlog; - desc.cmp = cmp; - desc.wr = wr; - desc.tbl = tbl; - - if (sqfs_block_processor_create_ex(&desc, &out) != 0) - return NULL; - - return out; -} diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c index 8bd6cf2..ccf54c0 100644 --- a/lib/sqfs/block_processor/frontend.c +++ b/lib/sqfs/block_processor/frontend.c @@ -7,123 +7,46 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc) +static int get_new_block(sqfs_block_processor_t *proc, sqfs_block_t **out) { sqfs_block_t *blk; + while (proc->backlog >= proc->max_backlog) { + int ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + if (proc->free_list != NULL) { blk = proc->free_list; proc->free_list = blk->next; } else { blk = malloc(sizeof(*blk) + proc->max_block_size); + if (blk == NULL) + return SQFS_ERROR_ALLOC; } - if (blk != NULL) - memset(blk, 0, sizeof(*blk)); - - return blk; + memset(blk, 0, sizeof(*blk)); + *out = blk; + return 0; } -static int dequeue_block(sqfs_block_processor_t *proc) +static int add_sentinel_block(sqfs_block_processor_t *proc) { - sqfs_block_t *blk, *fragblk, *it, *prev; - bool have_dequeued = false; - int status; -retry: - while (proc->io_queue != NULL) { - if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) - break; - - blk = proc->io_queue; - proc->io_queue = blk->next; - proc->io_deq_seq_num += 1; - proc->backlog -= 1; - have_dequeued = true; - - status = process_completed_block(proc, blk); - if (status != 0) - return status; - } - - if (have_dequeued) - return 0; - - blk = proc->pool->dequeue(proc->pool); - - if (blk == NULL) { - status = proc->pool->get_status(proc->pool); - if (status == 0) - status = SQFS_ERROR_INTERNAL; - - return status; - } - - proc->backlog -= 1; - have_dequeued = true; - - if (blk->flags & SQFS_BLK_IS_FRAGMENT) { - fragblk = NULL; - status = process_completed_fragment(proc, blk, &fragblk); - - if (status != 0) { - free(fragblk); - return status; - } - - if (fragblk != NULL) { - fragblk->io_seq_num = proc->io_seq_num++; - - if (proc->pool->submit(proc->pool, fragblk) != 0) { - free(fragblk); - - if (status == 0) { - status = proc->pool-> - get_status(proc->pool); - - if (status == 0) - status = SQFS_ERROR_ALLOC; - } - - return status; - } - - proc->backlog += 1; - have_dequeued = false; - } - } else { - if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) - blk->io_seq_num = proc->io_seq_num++; - - prev = NULL; - it = proc->io_queue; - - while (it != NULL) { - if (it->io_seq_num >= blk->io_seq_num) - break; - - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = proc->io_queue; - proc->io_queue = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } + sqfs_block_t *blk; + int ret; - proc->backlog += 1; - have_dequeued = false; - } + ret = get_new_block(proc, &blk); + if (ret != 0) + return ret; - if (!have_dequeued) - goto retry; + blk->inode = proc->inode; + blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return 0; + return enqueue_block(proc, blk); } -static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) { int status; @@ -141,44 +64,6 @@ static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) return 0; } -static int add_sentinel_block(sqfs_block_processor_t *proc) -{ - sqfs_block_t *blk; - int ret; - - if (proc->backlog == proc->max_backlog) { - ret = dequeue_block(proc); - if (ret != 0) - return ret; - } - - blk = get_new_block(proc); - if (blk == NULL) - return SQFS_ERROR_ALLOC; - - blk->inode = proc->inode; - blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - - return enqueue_block(proc, blk); -} - -static int flush_block(sqfs_block_processor_t *proc) -{ - sqfs_block_t *block; - int ret; - - if (proc->backlog == proc->max_backlog) { - ret = dequeue_block(proc); - if (ret != 0) - return ret; - } - - block = proc->blk_current; - proc->blk_current = NULL; - - return enqueue_block(proc, block); -} - int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, sqfs_inode_generic_t **inode, void *user, sqfs_u32 flags) @@ -224,9 +109,9 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data, while (size > 0) { if (proc->blk_current == NULL) { - new = get_new_block(proc); - if (new == NULL) - return SQFS_ERROR_ALLOC; + err = get_new_block(proc, &new); + if (err != 0) + return err; proc->blk_current = new; proc->blk_current->flags = proc->blk_flags; @@ -239,7 +124,9 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data, diff = proc->max_block_size - proc->blk_current->size; if (diff == 0) { - err = flush_block(proc); + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + if (err) return err; continue; @@ -258,9 +145,12 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data, proc->stats.input_bytes_read += diff; } - if (proc->blk_current != NULL && - proc->blk_current->size == proc->max_block_size) { - return flush_block(proc); + if (proc->blk_current->size == proc->max_block_size) { + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + + if (err) + return err; } return 0; @@ -293,7 +183,9 @@ int sqfs_block_processor_end_file(sqfs_block_processor_t *proc) proc->blk_current->flags |= SQFS_BLK_IS_FRAGMENT; } - err = flush_block(proc); + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + if (err) return err; } @@ -321,78 +213,14 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, if (flags & ~SQFS_BLK_FLAGS_ALL) return SQFS_ERROR_UNSUPPORTED; - if (proc->backlog == proc->max_backlog) { - ret = dequeue_block(proc); - if (ret != 0) - return ret; - } - - blk = get_new_block(proc); - if (blk == NULL) - return SQFS_ERROR_ALLOC; + ret = get_new_block(proc, &blk); + if (ret != 0) + return ret; blk->flags = flags | BLK_FLAG_MANUAL_SUBMISSION; blk->user = user; blk->size = size; memcpy(blk->data, data, size); - ret = proc->pool->submit(proc->pool, blk); - if (ret != 0) - free(blk); - - proc->backlog += 1; - return ret; -} - -int sqfs_block_processor_sync(sqfs_block_processor_t *proc) -{ - int ret; - - while (proc->backlog > 0) { - ret = dequeue_block(proc); - if (ret != 0) - return ret; - } - - return 0; -} - -int sqfs_block_processor_finish(sqfs_block_processor_t *proc) -{ - sqfs_block_t *blk; - int status; - - status = sqfs_block_processor_sync(proc); - if (status != 0) - return status; - - if (proc->frag_block != NULL) { - blk = proc->frag_block; - blk->next = NULL; - proc->frag_block = NULL; - - blk->io_seq_num = proc->io_seq_num++; - - status = proc->pool->submit(proc->pool, blk); - if (status != 0) { - status = proc->pool->get_status(proc->pool); - - if (status == 0) - status = SQFS_ERROR_ALLOC; - - free(blk); - return status; - } - - proc->backlog += 1; - status = sqfs_block_processor_sync(proc); - } - - return status; -} - -const sqfs_block_processor_stats_t -*sqfs_block_processor_get_stats(const sqfs_block_processor_t *proc) -{ - return &proc->stats; + return enqueue_block(proc, blk); } diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index 2393380..ef8ff0f 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -25,7 +25,6 @@ #include #include -#include typedef struct { sqfs_u32 index; @@ -96,19 +95,14 @@ struct sqfs_block_processor_t { thread_pool_t *pool; worker_data_t *workers; - - sqfs_block_t *io_queue; sqfs_u32 io_seq_num; sqfs_u32 io_deq_seq_num; }; -SQFS_INTERNAL -int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk); +SQFS_INTERNAL int enqueue_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk); -SQFS_INTERNAL -int process_completed_fragment(sqfs_block_processor_t *proc, - sqfs_block_t *frag, - sqfs_block_t **blk_out); +SQFS_INTERNAL int dequeue_block(sqfs_block_processor_t *proc); #endif /* INTERNAL_H */ diff --git a/lib/util/Makemodule.am b/lib/util/Makemodule.am index 113855c..c66c786 100644 --- a/lib/util/Makemodule.am +++ b/lib/util/Makemodule.am @@ -7,6 +7,7 @@ libutil_a_SOURCES += lib/util/fast_urem_by_const.h libutil_a_SOURCES += include/threadpool.h libutil_a_SOURCES += include/w32threadwrap.h libutil_a_SOURCES += lib/util/threadpool_serial.c +libutil_a_SOURCES += lib/util/is_memory_zero.c libutil_a_CFLAGS = $(AM_CFLAGS) libutil_a_CPPFLAGS = $(AM_CPPFLAGS) diff --git a/lib/util/is_memory_zero.c b/lib/util/is_memory_zero.c new file mode 100644 index 0000000..3974ee2 --- /dev/null +++ b/lib/util/is_memory_zero.c @@ -0,0 +1,54 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * is_memory_zero.c + * + * Copyright (C) 2021 David Oberhollenzer + */ +#include "config.h" +#include "util.h" + +#include + +#define U64THRESHOLD (128) + +static bool test_u8(const unsigned char *blob, size_t size) +{ + while (size--) { + if (*(blob++) != 0) + return false; + } + + return true; +} + +bool is_memory_zero(const void *blob, size_t size) +{ + const sqfs_u64 *u64ptr; + size_t diff; + + if (size < U64THRESHOLD) + return test_u8(blob, size); + + diff = (uintptr_t)blob % sizeof(sqfs_u64); + + if (diff != 0) { + diff = sizeof(sqfs_u64) - diff; + + if (!test_u8(blob, diff)) + return false; + + blob = (const char *)blob + diff; + size -= diff; + } + + u64ptr = blob; + + while (size >= sizeof(sqfs_u64)) { + if (*(u64ptr++) != 0) + return false; + + size -= sizeof(sqfs_u64); + } + + return test_u8((const unsigned char *)u64ptr, size); +} diff --git a/tests/libutil/Makemodule.am b/tests/libutil/Makemodule.am index 1fe4ebf..27d6341 100644 --- a/tests/libutil/Makemodule.am +++ b/tests/libutil/Makemodule.am @@ -12,8 +12,11 @@ test_threadpool_SOURCES = tests/libutil/threadpool.c test_threadpool_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) test_threadpool_LDADD = libutil.a libcompat.a $(PTHREAD_LIBS) +test_ismemzero_SOURCES = tests/libutil/is_memory_zero.c +test_ismemzero_LDADD = libutil.a libcompat.a + LIBUTIL_TESTS = \ - test_str_table test_rbtree test_xxhash test_threadpool + test_str_table test_rbtree test_xxhash test_threadpool test_ismemzero check_PROGRAMS += $(LIBUTIL_TESTS) TESTS += $(LIBUTIL_TESTS) diff --git a/tests/libutil/is_memory_zero.c b/tests/libutil/is_memory_zero.c new file mode 100644 index 0000000..20bd93f --- /dev/null +++ b/tests/libutil/is_memory_zero.c @@ -0,0 +1,32 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* + * is_memory_zero.c + * + * Copyright (C) 2021 David Oberhollenzer + */ +#include "config.h" + +#include "../test.h" +#include "util.h" + +int main(void) +{ + unsigned char temp[1024]; + size_t i, j; + + memset(temp, 0, sizeof(temp)); + + for (i = 0; i < sizeof(temp); ++i) { + TEST_ASSERT(is_memory_zero(temp, i)); + + for (j = 0; j < i; ++j) { + TEST_ASSERT(is_memory_zero(temp, i)); + temp[j] = 42; + TEST_ASSERT(!is_memory_zero(temp, i)); + temp[j] = 0; + TEST_ASSERT(is_memory_zero(temp, i)); + } + } + + return EXIT_SUCCESS; +} -- cgit v1.2.3