From 4249e123d321650050259fb602f06497519077d0 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Thu, 14 Jan 2021 04:38:33 +0100 Subject: libsqfs: block processor: backport exact fragment matching This commit is an amalgamation of the commits on master that implement exact matching of fragment blocks during deduplication. Signed-off-by: David Oberhollenzer --- lib/sqfs/block_processor/winpthread.c | 127 ++++++++++++++++++++++++++++------ 1 file changed, 107 insertions(+), 20 deletions(-) (limited to 'lib/sqfs/block_processor/winpthread.c') diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 3531a3b..806c749 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -78,6 +78,8 @@ struct compress_worker_t { thread_pool_processor_t *shared; sqfs_compressor_t *cmp; THREAD_HANDLE thread; + sqfs_u32 frag_blk_index; + sqfs_u32 frag_blk_size; sqfs_u8 scratch[]; }; @@ -178,6 +180,14 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg) store_completed_block(shared, blk, status); blk = get_next_work_item(shared); + + if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && + shared->base.uncmp != NULL && shared->base.file != NULL) { + memcpy(worker->scratch + shared->base.max_block_size, + blk->data, blk->size); + worker->frag_blk_index = blk->index; + worker->frag_blk_size = blk->size; + } UNLOCK(&shared->mtx); if (blk == NULL) @@ -388,40 +398,105 @@ static int append_to_work_queue(sqfs_block_processor_t *proc, return status; } +static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index) +{ + while (q != NULL) { + if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index) + break; + q = q->next; + } + return q; +} + +static int compare_frag_in_flight(sqfs_block_processor_t *proc, + sqfs_block_t *frag, sqfs_u32 index, + sqfs_u32 offset) +{ + thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; + sqfs_block_t *it = NULL; + void *blockbuf = NULL; + size_t i, size = 0; + int ret; + + if (proc->frag_block != NULL && proc->frag_block->index == index) + it = proc->frag_block; + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->proc_queue, index); + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->io_queue, index); + + if (it == NULL) + it = find_frag_blk_in_queue(thproc->done, index); + + if (it == NULL) { + for (i = 0; i < thproc->num_workers; ++i) { + if (thproc->workers[i]->frag_blk_index == index) { + size = thproc->workers[i]->frag_blk_size; + blockbuf = thproc->workers[i]->scratch + + proc->max_block_size; + break; + } + } + } else if (it->flags & SQFS_BLK_IS_COMPRESSED) { + proc->buffered_index = 0xFFFFFFFF; + blockbuf = proc->frag_buffer; + ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size, + blockbuf, proc->max_block_size); + if (ret <= 0) + return -1; + proc->buffered_index = it->index; + size = ret; + } else { + blockbuf = it->data; + size = it->size; + } + + if (blockbuf == NULL || size == 0) + return -1; + + if (offset >= size || frag->size > (size - offset)) + return -1; + + return memcmp((const char *)blockbuf + offset, frag->data, frag->size); +} + static int block_processor_sync(sqfs_block_processor_t *proc) { return append_to_work_queue(proc, NULL); } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, - sqfs_compressor_t *cmp, - unsigned int num_workers, - size_t max_backlog, - sqfs_block_writer_t *wr, - sqfs_frag_table_t *tbl) +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) { thread_pool_processor_t *proc; + unsigned int i, num_workers; + size_t scratch_size; sigset_t oldset; - unsigned int i; int ret; - if (num_workers < 1) - num_workers = 1; + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; + + num_workers = desc->num_workers < 1 ? 1 : desc->num_workers; proc = alloc_flex(sizeof(*proc), sizeof(proc->workers[0]), num_workers); if (proc == NULL) - return NULL; + return SQFS_ERROR_ALLOC; - if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { + ret = block_processor_init(&proc->base, desc); + if (ret != 0) { free(proc); - return NULL; + return ret; } proc->base.sync = block_processor_sync; proc->base.append_to_work_queue = append_to_work_queue; + proc->base.compare_frag_in_flight = compare_frag_in_flight; proc->num_workers = num_workers; - proc->max_backlog = max_backlog; + proc->max_backlog = desc->max_backlog; ((sqfs_object_t *)proc)->destroy = block_processor_destroy; MUTEX_INIT(&proc->mtx); @@ -431,28 +506,40 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, SIGNAL_DISABLE(&oldset); for (i = 0; i < num_workers; ++i) { + scratch_size = desc->max_block_size; + if (desc->uncmp != NULL && desc->file != NULL) + scratch_size *= 2; + proc->workers[i] = alloc_flex(sizeof(compress_worker_t), - 1, max_block_size); + 1, scratch_size); - if (proc->workers[i] == NULL) + if (proc->workers[i] == NULL) { + ret = SQFS_ERROR_ALLOC; goto fail; + } proc->workers[i]->shared = proc; - proc->workers[i]->cmp = sqfs_copy(cmp); + proc->workers[i]->cmp = sqfs_copy(desc->cmp); + proc->workers[i]->frag_blk_index = 0xFFFFFFFF; - if (proc->workers[i]->cmp == NULL) + if (proc->workers[i]->cmp == NULL) { + ret = SQFS_ERROR_ALLOC; goto fail; + } ret = THREAD_CREATE(&proc->workers[i]->thread, worker_proc, proc->workers[i]); - if (ret != 0) + if (ret != 0) { + ret = SQFS_ERROR_INTERNAL; goto fail; + } } SIGNAL_ENABLE(&oldset); - return (sqfs_block_processor_t *)proc; + *out = (sqfs_block_processor_t *)proc; + return 0; fail: SIGNAL_ENABLE(&oldset); block_processor_destroy((sqfs_object_t *)proc); - return NULL; + return ret; } -- cgit v1.2.3