From 4249e123d321650050259fb602f06497519077d0 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Thu, 14 Jan 2021 04:38:33 +0100 Subject: libsqfs: block processor: backport exact fragment matching This commit is an amalgamation of the commits on master that implement exact matching of fragment blocks during deduplication. Signed-off-by: David Oberhollenzer --- include/hash_table.h | 9 +-- include/sqfs/block_processor.h | 98 ++++++++++++++++++++++++- include/sqfs/predef.h | 1 + lib/sqfs/block_processor/common.c | 130 ++++++++++++++++++++++++++++++---- lib/sqfs/block_processor/internal.h | 19 +++-- lib/sqfs/block_processor/serial.c | 43 +++++++---- lib/sqfs/block_processor/winpthread.c | 127 +++++++++++++++++++++++++++------ lib/util/hash_table.c | 18 ++--- 8 files changed, 379 insertions(+), 66 deletions(-) diff --git a/include/hash_table.h b/include/hash_table.h index 6f377c9..813e059 100644 --- a/include/hash_table.h +++ b/include/hash_table.h @@ -42,9 +42,10 @@ struct hash_entry { struct hash_table { struct hash_entry *table; - sqfs_u32 (*key_hash_function)(const void *key); - bool (*key_equals_function)(const void *a, const void *b); + sqfs_u32 (*key_hash_function)(void *user, const void *key); + bool (*key_equals_function)(void *user, const void *a, const void *b); const void *deleted_key; + void *user; sqfs_u32 size; sqfs_u32 rehash; sqfs_u64 size_magic; @@ -56,8 +57,8 @@ struct hash_table { }; SQFS_INTERNAL struct hash_table * -hash_table_create(sqfs_u32 (*key_hash_function)(const void *key), - bool (*key_equals_function)(const void *a, +hash_table_create(sqfs_u32 (*key_hash_function)(void *user, const void *key), + bool (*key_equals_function)(void *user, const void *a, const void *b)); SQFS_INTERNAL struct hash_table * diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h index 1846069..a5eed33 100644 --- a/include/sqfs/block_processor.h +++ b/include/sqfs/block_processor.h @@ -110,12 +110,90 @@ struct sqfs_block_processor_stats_t { sqfs_u64 actual_frag_count; }; +/** + * @struct sqfs_block_processor_desc_t + * + * @brief Encapsulates a description for an @ref sqfs_block_processor_t + * + * An instance of this struct is used by @ref sqfs_block_processor_create_ex to + * instantiate block processor objects. + */ +struct sqfs_block_processor_desc_t { + /** + * @brief Holds the size of the structure. + * + * If a later version of libsquashfs expands this structure, the value + * of this field can be used to check at runtime whether the newer + * fields are avaialable or not. + * + * If @ref sqfs_block_processor_create_ex is given a struct whose size + * it does not recognize, it returns @ref SQFS_ERROR_ARG_INVALID. + */ + sqfs_u32 size; + + /** + * @brief The maximum size of a data block. + */ + sqfs_u32 max_block_size; + + /** + * @brief The number of worker threads to create. + */ + sqfs_u32 num_workers; + + /** + * @brief The maximum number of blocks currently in flight. + * + * When trying to add more, enqueueing blocks until the + * in-flight block count drops below the threshold. + */ + sqfs_u32 max_backlog; + + /** + * @brief A pointer to a compressor. + * + * If multiple worker threads are used, the deep copy function of the + * compressor is used to create several instances that don't interfere + * with each other. This means, the compressor implementation must be + * able to create copies of itself that can be used independendly and + * concurrently. + */ + sqfs_compressor_t *cmp; + + /** + * @brief A block writer to send to finished blocks to. + */ + sqfs_block_writer_t *wr; + + /** + * @brief A fragment table to use for storing block locations. + */ + sqfs_frag_table_t *tbl; + + /** + * @brief Pointer to a file to read back fragment blocks from. + * + * If file and uncmp are not NULL, the file is used to read back + * fragment blocks during fragment deduplication and verify possible + * matches. If either of them are NULL, the deduplication relies on + * fragment size and hash alone. + */ + sqfs_file_t *file; + + /** + * @brief A pointer to a compressor the decompresses data. + * + * @copydoc file + */ + sqfs_compressor_t *uncmp; +}; + #ifdef __cplusplus extern "C" { #endif /** - * @brief Create a data block writer. + * @brief Create a data block processor. * * @memberof sqfs_block_processor_t * @@ -132,7 +210,7 @@ extern "C" { * @param tbl A fragment table to use for storing fragment and fragment block * locations. * - * @return A pointer to a data writer object on success, NULL on allocation + * @return A pointer to a block processor object on success, NULL on allocation * failure or on failure to create and initialize the worker threads. */ SQFS_API @@ -143,6 +221,22 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_block_writer_t *wr, sqfs_frag_table_t *tbl); +/** + * @brief Create a data block processor. + * + * @memberof sqfs_block_processor_t + * + * @param desc A pointer to an extensible structure that holds the description + * of the block processor. + * @param out On success, returns the pointer to the newly created block + * processor object. + * + * @return Zero on success, an @ref SQFS_ERROR value on failure. + */ +SQFS_API +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out); + /** * @brief Start writing a file. * diff --git a/include/sqfs/predef.h b/include/sqfs/predef.h index e0afac4..55ccc86 100644 --- a/include/sqfs/predef.h +++ b/include/sqfs/predef.h @@ -91,6 +91,7 @@ typedef struct sqfs_frag_table_t sqfs_frag_table_t; typedef struct sqfs_block_writer_t sqfs_block_writer_t; typedef struct sqfs_block_writer_stats_t sqfs_block_writer_stats_t; typedef struct sqfs_block_processor_stats_t sqfs_block_processor_stats_t; +typedef struct sqfs_block_processor_desc_t sqfs_block_processor_desc_t; typedef struct sqfs_fragment_t sqfs_fragment_t; typedef struct sqfs_dir_header_t sqfs_dir_header_t; diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index b2657e6..d6c0889 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -176,8 +176,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, search.hash = frag->checksum; search.size = frag->size; + proc->frag_cmp_current = frag; entry = hash_table_search_pre_hashed(proc->frag_ht, search.hash, &search); + proc->frag_cmp_current = NULL; + if (entry != NULL) { if (frag->inode != NULL) { chunk = entry->data; @@ -238,8 +241,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc, chunk->size = frag->size; chunk->hash = frag->checksum; + proc->frag_cmp_current = frag; entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, chunk, chunk); + proc->frag_cmp_current = NULL; + if (entry == NULL) { free(chunk); goto fail_outblk; @@ -261,17 +267,76 @@ fail_outblk: return err; } -static uint32_t chunk_info_hash(const void *key) +static uint32_t chunk_info_hash(void *user, const void *key) { const chunk_info_t *chunk = key; + (void)user; return chunk->hash; } -static bool chunk_info_equals(const void *a, const void *b) +static bool chunk_info_equals(void *user, const void *k, const void *c) { - const chunk_info_t *a_ = a, *b_ = b; - return a_->size == b_->size && - a_->hash == b_->hash; + const chunk_info_t *key = k, *cmp = c; + sqfs_block_processor_t *proc = user; + sqfs_fragment_t frag; + unsigned char *temp; + size_t size; + int ret; + + if (key->size != cmp->size || key->hash != cmp->hash) + return false; + + if (proc->file == NULL || proc->uncmp == NULL) + return true; + + ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current, + cmp->index, cmp->offset); + if (ret == 0) + return true; + + if (proc->buffered_index != cmp->index || + proc->buffered_blk_size == 0) { + if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag)) + return false; + + proc->buffered_index = 0xFFFFFFFF; + size = SQFS_ON_DISK_BLOCK_SIZE(frag.size); + + if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) { + temp = proc->frag_buffer + proc->max_block_size; + + ret = proc->file->read_at(proc->file, frag.start_offset, + temp, size); + if (ret != 0) + return false; + + ret = proc->uncmp->do_block(proc->uncmp, temp, size, + proc->frag_buffer, + proc->max_block_size); + if (ret <= 0) + return false; + + size = ret; + } else { + ret = proc->file->read_at(proc->file, frag.start_offset, + proc->frag_buffer, size); + if (ret != 0) + return false; + } + + proc->buffered_index = cmp->index; + proc->buffered_blk_size = size; + } + + if (cmp->offset >= proc->buffered_blk_size) + return false; + + if (cmp->size > (proc->buffered_blk_size - cmp->offset)) + return false; + + return memcmp(proc->frag_buffer + cmp->offset, + proc->frag_cmp_current->data, + cmp->size) == 0; } static void ht_delete_function(struct hash_entry *entry) @@ -287,6 +352,7 @@ void block_processor_cleanup(sqfs_block_processor_t *base) release_old_block(base, base->frag_block); free(base->blk_current); + free(base->frag_buffer); while (base->free_list != NULL) { it = base->free_list; @@ -297,22 +363,58 @@ void block_processor_cleanup(sqfs_block_processor_t *base) hash_table_destroy(base->frag_ht, ht_delete_function); } -int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size, - sqfs_compressor_t *cmp, sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) +int block_processor_init(sqfs_block_processor_t *base, + const sqfs_block_processor_desc_t *desc) { base->process_completed_block = process_completed_block; base->process_completed_fragment = process_completed_fragment; base->process_block = process_block; - base->max_block_size = max_block_size; - base->cmp = cmp; - base->frag_tbl = tbl; - base->wr = wr; + base->max_block_size = desc->max_block_size; + base->cmp = desc->cmp; + base->frag_tbl = desc->tbl; + base->wr = desc->wr; + base->file = desc->file; + base->uncmp = desc->uncmp; + base->buffered_index = 0xFFFFFFFF; base->stats.size = sizeof(base->stats); + if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) { + base->frag_buffer = malloc(2 * desc->max_block_size); + if (base->frag_buffer == NULL) + return SQFS_ERROR_ALLOC; + } + base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); - if (base->frag_ht == NULL) - return -1; + if (base->frag_ht == NULL) { + free(base->frag_buffer); + return SQFS_ERROR_ALLOC; + } + base->frag_ht->user = base; return 0; } + +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + sqfs_block_processor_desc_t desc; + sqfs_block_processor_t *out; + + memset(&desc, 0, sizeof(desc)); + desc.size = sizeof(desc); + desc.max_block_size = max_block_size; + desc.num_workers = num_workers; + desc.max_backlog = max_backlog; + desc.cmp = cmp; + desc.wr = wr; + desc.tbl = tbl; + + if (sqfs_block_processor_create_ex(&desc, &out) != 0) + return NULL; + + return out; +} diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index ee76946..4699dc6 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -26,8 +26,7 @@ #include #include -typedef struct chunk_info_t { - struct chunk_info_t *next; +typedef struct { sqfs_u32 index; sqfs_u32 offset; sqfs_u32 size; @@ -82,6 +81,13 @@ struct sqfs_block_processor_t { bool begin_called; + sqfs_file_t *file; + sqfs_compressor_t *uncmp; + sqfs_block_t *frag_cmp_current; + sqfs_u8 *frag_buffer; + sqfs_u32 buffered_index; + sqfs_u32 buffered_blk_size; + int (*process_completed_block)(sqfs_block_processor_t *proc, sqfs_block_t *block); @@ -92,6 +98,10 @@ struct sqfs_block_processor_t { int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp, sqfs_u8 *scratch, size_t scratch_size); + int (*compare_frag_in_flight)(sqfs_block_processor_t *proc, + sqfs_block_t *frag, sqfs_u32 index, + sqfs_u32 offset); + int (*append_to_work_queue)(sqfs_block_processor_t *proc, sqfs_block_t *block); @@ -101,9 +111,6 @@ struct sqfs_block_processor_t { SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base); SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base, - size_t max_block_size, - sqfs_compressor_t *cmp, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl); + const sqfs_block_processor_desc_t *desc); #endif /* INTERNAL_H */ diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c index 4d6b3ec..b20d5b0 100644 --- a/lib/sqfs/block_processor/serial.c +++ b/lib/sqfs/block_processor/serial.c @@ -61,27 +61,46 @@ static int block_processor_sync(sqfs_block_processor_t *proc) return ((serial_block_processor_t *)proc)->status; } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) +static int compare_frag_in_flight(sqfs_block_processor_t *proc, + sqfs_block_t *frag, sqfs_u32 index, + sqfs_u32 offset) +{ + if (proc->frag_block == NULL || index != proc->frag_block->index) + return -1; + + if (offset >= proc->frag_block->size) + return -1; + + if (frag->size > (proc->frag_block->size - offset)) + return -1; + + return memcmp(proc->frag_block->data + offset, frag->data, frag->size); +} + +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) { serial_block_processor_t *proc; - (void)num_workers; (void)max_backlog; + int ret; - proc = alloc_flex(sizeof(*proc), 1, max_block_size); + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; + + proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size); if (proc == NULL) - return NULL; + return SQFS_ERROR_ALLOC; - if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + ret = block_processor_init(&proc->base, desc); + if (ret != 0) { free(proc); - return NULL; + return ret; } proc->base.sync = block_processor_sync; proc->base.append_to_work_queue = append_to_work_queue; + proc->base.compare_frag_in_flight = compare_frag_in_flight; ((sqfs_object_t *)proc)->destroy = block_processor_destroy; - return (sqfs_block_processor_t *)proc; + + *out = (sqfs_block_processor_t *)proc; + return 0; } diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 3531a3b..806c749 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -78,6 +78,8 @@ struct compress_worker_t { thread_pool_processor_t *shared; sqfs_compressor_t *cmp; THREAD_HANDLE thread; + sqfs_u32 frag_blk_index; + sqfs_u32 frag_blk_size; sqfs_u8 scratch[]; }; @@ -178,6 +180,14 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) store_completed_block(shared, blk, status); blk = get_next_work_item(shared); + + if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && + shared->base.uncmp != NULL && shared->base.file != NULL) { + memcpy(worker->scratch + shared->base.max_block_size, + blk->data, blk->size); + worker->frag_blk_index = blk->index; + worker->frag_blk_size = blk->size; + } UNLOCK(&shared->mtx); if (blk == NULL) @@ -388,40 +398,105 @@ static int append_to_work_queue(sqfs_block_processor_t *proc, return status; } +static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index) +{ + while (q != NULL) { + if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index) + break; + q = q->next; + } + return q; +} + +static int compare_frag_in_flight(sqfs_block_processor_t *proc, + sqfs_block_t *frag, sqfs_u32 index, + sqfs_u32 offset) +{ + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; + sqfs_block_t *it = NULL; + void *blockbuf = NULL; + size_t i, size = 0; + int ret; + + if (proc->frag_block != NULL && proc->frag_block->index == index) + it = proc->frag_block; + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->proc_queue, index); + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->io_queue, index); + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->done, index); + + if (it == NULL) { + for (i = 0; i < thproc->num_workers; ++i) { + if (thproc->workers[i]->frag_blk_index == index) { + size = thproc->workers[i]->frag_blk_size; + blockbuf = thproc->workers[i]->scratch + + proc->max_block_size; + break; + } + } + } else if (it->flags & SQFS_BLK_IS_COMPRESSED) { + proc->buffered_index = 0xFFFFFFFF; + blockbuf = proc->frag_buffer; + ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size, + blockbuf, proc->max_block_size); + if (ret <= 0) + return -1; + proc->buffered_index = it->index; + size = ret; + } else { + blockbuf = it->data; + size = it->size; + } + + if (blockbuf == NULL || size == 0) + return -1; + + if (offset >= size || frag->size > (size - offset)) + return -1; + + return memcmp((const char *)blockbuf + offset, frag->data, frag->size); +} + static int block_processor_sync(sqfs_block_processor_t *proc) { return append_to_work_queue(proc, NULL); } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) { thread_pool_processor_t *proc; + unsigned int i, num_workers; + size_t scratch_size; sigset_t oldset; - unsigned int i; int ret; - if (num_workers < 1) - num_workers = 1; + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; + + num_workers = desc->num_workers < 1 ? 1 : desc->num_workers; proc = alloc_flex(sizeof(*proc), sizeof(proc->workers[0]), num_workers); if (proc == NULL) - return NULL; + return SQFS_ERROR_ALLOC; - if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + ret = block_processor_init(&proc->base, desc); + if (ret != 0) { free(proc); - return NULL; + return ret; } proc->base.sync = block_processor_sync; proc->base.append_to_work_queue = append_to_work_queue; + proc->base.compare_frag_in_flight = compare_frag_in_flight; proc->num_workers = num_workers; - proc->max_backlog = max_backlog; + proc->max_backlog = desc->max_backlog; ((sqfs_object_t *)proc)->destroy = block_processor_destroy; MUTEX_INIT(&proc->mtx); @@ -431,28 +506,40 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, SIGNAL_DISABLE(&oldset); for (i = 0; i < num_workers; ++i) { + scratch_size = desc->max_block_size; + if (desc->uncmp != NULL && desc->file != NULL) + scratch_size *= 2; + proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, max_block_size); + 1, scratch_size); - if (proc->workers[i] == NULL) + if (proc->workers[i] == NULL) { + ret = SQFS_ERROR_ALLOC; goto fail; + } proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(cmp); + proc->workers[i]->cmp = sqfs_copy(desc->cmp); + proc->workers[i]->frag_blk_index = 0xFFFFFFFF; - if (proc->workers[i]->cmp == NULL) + if (proc->workers[i]->cmp == NULL) { + ret = SQFS_ERROR_ALLOC; goto fail; + } ret = THREAD_CREATE(&proc->workers[i]->thread, worker_proc, proc->workers[i]); - if (ret != 0) + if (ret != 0) { + ret = SQFS_ERROR_INTERNAL; goto fail; + } } SIGNAL_ENABLE(&oldset); - return (sqfs_block_processor_t *)proc; + *out = (sqfs_block_processor_t *)proc; + return 0; fail: SIGNAL_ENABLE(&oldset); block_processor_destroy((sqfs_object_t *)proc); - return NULL; + return ret; } diff --git a/lib/util/hash_table.c b/lib/util/hash_table.c index 63882f3..a78aeee 100644 --- a/lib/util/hash_table.c +++ b/lib/util/hash_table.c @@ -123,8 +123,8 @@ entry_is_present(const struct hash_table *ht, struct hash_entry *entry) static bool hash_table_init(struct hash_table *ht, - sqfs_u32 (*key_hash_function)(const void *key), - bool (*key_equals_function)(const void *a, + sqfs_u32 (*key_hash_function)(void *user, const void *key), + bool (*key_equals_function)(void *user, const void *a, const void *b)) { ht->size_index = 0; @@ -144,8 +144,8 @@ hash_table_init(struct hash_table *ht, } struct hash_table * -hash_table_create(sqfs_u32 (*key_hash_function)(const void *key), - bool (*key_equals_function)(const void *a, +hash_table_create(sqfs_u32 (*key_hash_function)(void *user, const void *key), + bool (*key_equals_function)(void *user, const void *a, const void *b)) { struct hash_table *ht; @@ -220,7 +220,7 @@ hash_table_search(struct hash_table *ht, sqfs_u32 hash, const void *key) if (entry_is_free(entry)) { return NULL; } else if (entry_is_present(ht, entry) && entry->hash == hash) { - if (ht->key_equals_function(key, entry->key)) { + if (ht->key_equals_function(ht->user, key, entry->key)) { return entry; } } @@ -243,7 +243,8 @@ struct hash_entry * hash_table_search_pre_hashed(struct hash_table *ht, sqfs_u32 hash, const void *key) { - assert(ht->key_hash_function == NULL || hash == ht->key_hash_function(key)); + assert(ht->key_hash_function == NULL || + hash == ht->key_hash_function(ht->user, key)); return hash_table_search(ht, hash, key); } @@ -349,7 +350,7 @@ hash_table_insert(struct hash_table *ht, sqfs_u32 hash, */ if (!entry_is_deleted(ht, entry) && entry->hash == hash && - ht->key_equals_function(key, entry->key)) { + ht->key_equals_function(ht->user, key, entry->key)) { entry->key = key; entry->data = data; return entry; @@ -386,7 +387,8 @@ struct hash_entry * hash_table_insert_pre_hashed(struct hash_table *ht, sqfs_u32 hash, const void *key, void *data) { - assert(ht->key_hash_function == NULL || hash == ht->key_hash_function(key)); + assert(ht->key_hash_function == NULL || + hash == ht->key_hash_function(ht->user, key)); return hash_table_insert(ht, hash, key, data); } -- cgit v1.2.3