aboutsummaryrefslogtreecommitdiff
path: root/lib/sqfs/block_processor
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-01-14 04:38:33 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2021-01-19 10:45:59 +0100
commit4249e123d321650050259fb602f06497519077d0 (patch)
tree9638c0fb1d5735d2e3b0d044e991641754e97d36 /lib/sqfs/block_processor
parent854119c62621e017c13be5192a9494c0eea2fe2f (diff)
libsqfs: block processor: backport exact fragment matching
This commit is an amalgamation of the commits on master that implement exact matching of fragment blocks during deduplication. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/block_processor')
-rw-r--r--lib/sqfs/block_processor/common.c130
-rw-r--r--lib/sqfs/block_processor/internal.h19
-rw-r--r--lib/sqfs/block_processor/serial.c43
-rw-r--r--lib/sqfs/block_processor/winpthread.c127
4 files changed, 267 insertions, 52 deletions
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index b2657e6..d6c0889 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -176,8 +176,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
search.hash = frag->checksum;
search.size = frag->size;
+ proc->frag_cmp_current = frag;
entry = hash_table_search_pre_hashed(proc->frag_ht,
search.hash, &search);
+ proc->frag_cmp_current = NULL;
+
if (entry != NULL) {
if (frag->inode != NULL) {
chunk = entry->data;
@@ -238,8 +241,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,
chunk->size = frag->size;
chunk->hash = frag->checksum;
+ proc->frag_cmp_current = frag;
entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash,
chunk, chunk);
+ proc->frag_cmp_current = NULL;
+
if (entry == NULL) {
free(chunk);
goto fail_outblk;
@@ -261,17 +267,76 @@ fail_outblk:
return err;
}
-static uint32_t chunk_info_hash(const void *key)
+static uint32_t chunk_info_hash(void *user, const void *key)
{
const chunk_info_t *chunk = key;
+ (void)user;
return chunk->hash;
}
-static bool chunk_info_equals(const void *a, const void *b)
+static bool chunk_info_equals(void *user, const void *k, const void *c)
{
- const chunk_info_t *a_ = a, *b_ = b;
- return a_->size == b_->size &&
- a_->hash == b_->hash;
+ const chunk_info_t *key = k, *cmp = c;
+ sqfs_block_processor_t *proc = user;
+ sqfs_fragment_t frag;
+ unsigned char *temp;
+ size_t size;
+ int ret;
+
+ if (key->size != cmp->size || key->hash != cmp->hash)
+ return false;
+
+ if (proc->file == NULL || proc->uncmp == NULL)
+ return true;
+
+ ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current,
+ cmp->index, cmp->offset);
+ if (ret == 0)
+ return true;
+
+ if (proc->buffered_index != cmp->index ||
+ proc->buffered_blk_size == 0) {
+ if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag))
+ return false;
+
+ proc->buffered_index = 0xFFFFFFFF;
+ size = SQFS_ON_DISK_BLOCK_SIZE(frag.size);
+
+ if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) {
+ temp = proc->frag_buffer + proc->max_block_size;
+
+ ret = proc->file->read_at(proc->file, frag.start_offset,
+ temp, size);
+ if (ret != 0)
+ return false;
+
+ ret = proc->uncmp->do_block(proc->uncmp, temp, size,
+ proc->frag_buffer,
+ proc->max_block_size);
+ if (ret <= 0)
+ return false;
+
+ size = ret;
+ } else {
+ ret = proc->file->read_at(proc->file, frag.start_offset,
+ proc->frag_buffer, size);
+ if (ret != 0)
+ return false;
+ }
+
+ proc->buffered_index = cmp->index;
+ proc->buffered_blk_size = size;
+ }
+
+ if (cmp->offset >= proc->buffered_blk_size)
+ return false;
+
+ if (cmp->size > (proc->buffered_blk_size - cmp->offset))
+ return false;
+
+ return memcmp(proc->frag_buffer + cmp->offset,
+ proc->frag_cmp_current->data,
+ cmp->size) == 0;
}
static void ht_delete_function(struct hash_entry *entry)
@@ -287,6 +352,7 @@ void block_processor_cleanup(sqfs_block_processor_t *base)
release_old_block(base, base->frag_block);
free(base->blk_current);
+ free(base->frag_buffer);
while (base->free_list != NULL) {
it = base->free_list;
@@ -297,22 +363,58 @@ void block_processor_cleanup(sqfs_block_processor_t *base)
hash_table_destroy(base->frag_ht, ht_delete_function);
}
-int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size,
- sqfs_compressor_t *cmp, sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+int block_processor_init(sqfs_block_processor_t *base,
+ const sqfs_block_processor_desc_t *desc)
{
base->process_completed_block = process_completed_block;
base->process_completed_fragment = process_completed_fragment;
base->process_block = process_block;
- base->max_block_size = max_block_size;
- base->cmp = cmp;
- base->frag_tbl = tbl;
- base->wr = wr;
+ base->max_block_size = desc->max_block_size;
+ base->cmp = desc->cmp;
+ base->frag_tbl = desc->tbl;
+ base->wr = desc->wr;
+ base->file = desc->file;
+ base->uncmp = desc->uncmp;
+ base->buffered_index = 0xFFFFFFFF;
base->stats.size = sizeof(base->stats);
+ if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) {
+ base->frag_buffer = malloc(2 * desc->max_block_size);
+ if (base->frag_buffer == NULL)
+ return SQFS_ERROR_ALLOC;
+ }
+
base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
- if (base->frag_ht == NULL)
- return -1;
+ if (base->frag_ht == NULL) {
+ free(base->frag_buffer);
+ return SQFS_ERROR_ALLOC;
+ }
+ base->frag_ht->user = base;
return 0;
}
+
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ sqfs_block_processor_desc_t desc;
+ sqfs_block_processor_t *out;
+
+ memset(&desc, 0, sizeof(desc));
+ desc.size = sizeof(desc);
+ desc.max_block_size = max_block_size;
+ desc.num_workers = num_workers;
+ desc.max_backlog = max_backlog;
+ desc.cmp = cmp;
+ desc.wr = wr;
+ desc.tbl = tbl;
+
+ if (sqfs_block_processor_create_ex(&desc, &out) != 0)
+ return NULL;
+
+ return out;
+}
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index ee76946..4699dc6 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -26,8 +26,7 @@
#include <stdlib.h>
#include <assert.h>
-typedef struct chunk_info_t {
- struct chunk_info_t *next;
+typedef struct {
sqfs_u32 index;
sqfs_u32 offset;
sqfs_u32 size;
@@ -82,6 +81,13 @@ struct sqfs_block_processor_t {
bool begin_called;
+ sqfs_file_t *file;
+ sqfs_compressor_t *uncmp;
+ sqfs_block_t *frag_cmp_current;
+ sqfs_u8 *frag_buffer;
+ sqfs_u32 buffered_index;
+ sqfs_u32 buffered_blk_size;
+
int (*process_completed_block)(sqfs_block_processor_t *proc,
sqfs_block_t *block);
@@ -92,6 +98,10 @@ struct sqfs_block_processor_t {
int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp,
sqfs_u8 *scratch, size_t scratch_size);
+ int (*compare_frag_in_flight)(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag, sqfs_u32 index,
+ sqfs_u32 offset);
+
int (*append_to_work_queue)(sqfs_block_processor_t *proc,
sqfs_block_t *block);
@@ -101,9 +111,6 @@ struct sqfs_block_processor_t {
SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base);
SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base,
- size_t max_block_size,
- sqfs_compressor_t *cmp,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl);
+ const sqfs_block_processor_desc_t *desc);
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
index 4d6b3ec..b20d5b0 100644
--- a/lib/sqfs/block_processor/serial.c
+++ b/lib/sqfs/block_processor/serial.c
@@ -61,27 +61,46 @@ static int block_processor_sync(sqfs_block_processor_t *proc)
return ((serial_block_processor_t *)proc)->status;
}
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+static int compare_frag_in_flight(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag, sqfs_u32 index,
+ sqfs_u32 offset)
+{
+ if (proc->frag_block == NULL || index != proc->frag_block->index)
+ return -1;
+
+ if (offset >= proc->frag_block->size)
+ return -1;
+
+ if (frag->size > (proc->frag_block->size - offset))
+ return -1;
+
+ return memcmp(proc->frag_block->data + offset, frag->data, frag->size);
+}
+
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
serial_block_processor_t *proc;
- (void)num_workers; (void)max_backlog;
+ int ret;
- proc = alloc_flex(sizeof(*proc), 1, max_block_size);
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
+
+ proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size);
if (proc == NULL)
- return NULL;
+ return SQFS_ERROR_ALLOC;
- if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ ret = block_processor_init(&proc->base, desc);
+ if (ret != 0) {
free(proc);
- return NULL;
+ return ret;
}
proc->base.sync = block_processor_sync;
proc->base.append_to_work_queue = append_to_work_queue;
+ proc->base.compare_frag_in_flight = compare_frag_in_flight;
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
- return (sqfs_block_processor_t *)proc;
+
+ *out = (sqfs_block_processor_t *)proc;
+ return 0;
}
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 3531a3b..806c749 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -78,6 +78,8 @@ struct compress_worker_t {
thread_pool_processor_t *shared;
sqfs_compressor_t *cmp;
THREAD_HANDLE thread;
+ sqfs_u32 frag_blk_index;
+ sqfs_u32 frag_blk_size;
sqfs_u8 scratch[];
};
@@ -178,6 +180,14 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)
store_completed_block(shared, blk, status);
blk = get_next_work_item(shared);
+
+ if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) &&
+ shared->base.uncmp != NULL && shared->base.file != NULL) {
+ memcpy(worker->scratch + shared->base.max_block_size,
+ blk->data, blk->size);
+ worker->frag_blk_index = blk->index;
+ worker->frag_blk_size = blk->size;
+ }
UNLOCK(&shared->mtx);
if (blk == NULL)
@@ -388,40 +398,105 @@ static int append_to_work_queue(sqfs_block_processor_t *proc,
return status;
}
+static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index)
+{
+ while (q != NULL) {
+ if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index)
+ break;
+ q = q->next;
+ }
+ return q;
+}
+
+static int compare_frag_in_flight(sqfs_block_processor_t *proc,
+ sqfs_block_t *frag, sqfs_u32 index,
+ sqfs_u32 offset)
+{
+ thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;
+ sqfs_block_t *it = NULL;
+ void *blockbuf = NULL;
+ size_t i, size = 0;
+ int ret;
+
+ if (proc->frag_block != NULL && proc->frag_block->index == index)
+ it = proc->frag_block;
+
+ if (it == NULL)
+ it = find_frag_blk_in_queue(thproc->proc_queue, index);
+
+ if (it == NULL)
+ it = find_frag_blk_in_queue(thproc->io_queue, index);
+
+ if (it == NULL)
+ it = find_frag_blk_in_queue(thproc->done, index);
+
+ if (it == NULL) {
+ for (i = 0; i < thproc->num_workers; ++i) {
+ if (thproc->workers[i]->frag_blk_index == index) {
+ size = thproc->workers[i]->frag_blk_size;
+ blockbuf = thproc->workers[i]->scratch +
+ proc->max_block_size;
+ break;
+ }
+ }
+ } else if (it->flags & SQFS_BLK_IS_COMPRESSED) {
+ proc->buffered_index = 0xFFFFFFFF;
+ blockbuf = proc->frag_buffer;
+ ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size,
+ blockbuf, proc->max_block_size);
+ if (ret <= 0)
+ return -1;
+ proc->buffered_index = it->index;
+ size = ret;
+ } else {
+ blockbuf = it->data;
+ size = it->size;
+ }
+
+ if (blockbuf == NULL || size == 0)
+ return -1;
+
+ if (offset >= size || frag->size > (size - offset))
+ return -1;
+
+ return memcmp((const char *)blockbuf + offset, frag->data, frag->size);
+}
+
static int block_processor_sync(sqfs_block_processor_t *proc)
{
return append_to_work_queue(proc, NULL);
}
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
thread_pool_processor_t *proc;
+ unsigned int i, num_workers;
+ size_t scratch_size;
sigset_t oldset;
- unsigned int i;
int ret;
- if (num_workers < 1)
- num_workers = 1;
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
+
+ num_workers = desc->num_workers < 1 ? 1 : desc->num_workers;
proc = alloc_flex(sizeof(*proc),
sizeof(proc->workers[0]), num_workers);
if (proc == NULL)
- return NULL;
+ return SQFS_ERROR_ALLOC;
- if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ ret = block_processor_init(&proc->base, desc);
+ if (ret != 0) {
free(proc);
- return NULL;
+ return ret;
}
proc->base.sync = block_processor_sync;
proc->base.append_to_work_queue = append_to_work_queue;
+ proc->base.compare_frag_in_flight = compare_frag_in_flight;
proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
+ proc->max_backlog = desc->max_backlog;
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
MUTEX_INIT(&proc->mtx);
@@ -431,28 +506,40 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
SIGNAL_DISABLE(&oldset);
for (i = 0; i < num_workers; ++i) {
+ scratch_size = desc->max_block_size;
+ if (desc->uncmp != NULL && desc->file != NULL)
+ scratch_size *= 2;
+
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
+ 1, scratch_size);
- if (proc->workers[i] == NULL)
+ if (proc->workers[i] == NULL) {
+ ret = SQFS_ERROR_ALLOC;
goto fail;
+ }
proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = sqfs_copy(cmp);
+ proc->workers[i]->cmp = sqfs_copy(desc->cmp);
+ proc->workers[i]->frag_blk_index = 0xFFFFFFFF;
- if (proc->workers[i]->cmp == NULL)
+ if (proc->workers[i]->cmp == NULL) {
+ ret = SQFS_ERROR_ALLOC;
goto fail;
+ }
ret = THREAD_CREATE(&proc->workers[i]->thread,
worker_proc, proc->workers[i]);
- if (ret != 0)
+ if (ret != 0) {
+ ret = SQFS_ERROR_INTERNAL;
goto fail;
+ }
}
SIGNAL_ENABLE(&oldset);
- return (sqfs_block_processor_t *)proc;
+ *out = (sqfs_block_processor_t *)proc;
+ return 0;
fail:
SIGNAL_ENABLE(&oldset);
block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
+ return ret;
}