From 8d9f24a65ef27a52615b3225776632de08462eba Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Tue, 24 Sep 2019 03:45:30 +0200 Subject: Move entire fragment processing from data writer to block processor So far, this is mostly a direct port from the block processor. The actual fragment checksumming is not done through the thread pool. Signed-off-by: David Oberhollenzer --- lib/sqfs/blk_proc/fragtbl.c | 110 ++++++++++++++++++++++++++++++++++---- lib/sqfs/blk_proc/internal.h | 17 +++++- lib/sqfs/blk_proc/process_block.c | 7 --- lib/sqfs/blk_proc/pthread.c | 52 +++++++++++++++--- lib/sqfs/blk_proc/serial.c | 41 ++++++++++++++ 5 files changed, 200 insertions(+), 27 deletions(-) (limited to 'lib/sqfs') diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c index 3f6c644..84f5ed6 100644 --- a/lib/sqfs/blk_proc/fragtbl.c +++ b/lib/sqfs/blk_proc/fragtbl.c @@ -35,24 +35,112 @@ int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc, return 0; } -int grow_fragment_table(sqfs_block_processor_t *proc, size_t index) +static int grow_fragment_table(sqfs_block_processor_t *proc) { size_t newsz; void *new; - if (index < proc->max_fragments) - return 0; - - do { + if (proc->num_fragments >= proc->max_fragments) { newsz = proc->max_fragments ? proc->max_fragments * 2 : 16; - } while (index >= newsz); - new = realloc(proc->fragments, sizeof(proc->fragments[0]) * newsz); + new = realloc(proc->fragments, + sizeof(proc->fragments[0]) * newsz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->max_fragments = newsz; + proc->fragments = new; + } + + return 0; +} + +static int store_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + uint64_t signature) +{ + size_t new_sz; + void *new; + + if (proc->frag_list_num == proc->frag_list_max) { + new_sz = proc->frag_list_max * 2; + new = realloc(proc->frag_list, + sizeof(proc->frag_list[0]) * new_sz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->frag_list = new; + proc->frag_list_max = new_sz; + } + + proc->frag_list[proc->frag_list_num].index = proc->frag_block->index; + proc->frag_list[proc->frag_list_num].offset = proc->frag_block->size; + proc->frag_list[proc->frag_list_num].signature = signature; + proc->frag_list_num += 1; + + sqfs_inode_set_frag_location(frag->inode, proc->frag_block->index, + proc->frag_block->size); + + memcpy(proc->frag_block->data + proc->frag_block->size, + frag->data, frag->size); + + proc->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS); + proc->frag_block->size += frag->size; + return 0; +} + +int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + sqfs_block_t **blk_out) +{ + uint64_t signature; + size_t i, size; + int err; + + signature = MK_BLK_SIG(frag->checksum, frag->size); + + for (i = 0; i < proc->frag_list_num; ++i) { + if (proc->frag_list[i].signature == signature) { + sqfs_inode_set_frag_location(frag->inode, + proc->frag_list[i].index, + proc->frag_list[i].offset); + return 0; + } + } + + if (proc->frag_block != NULL) { + size = proc->frag_block->size + frag->size; + + if (size > proc->max_block_size) { + *blk_out = proc->frag_block; + proc->frag_block = NULL; + } + } + + if (proc->frag_block == NULL) { + size = sizeof(sqfs_block_t) + proc->max_block_size; + + err = grow_fragment_table(proc); + if (err) + goto fail; + + proc->frag_block = calloc(1, size); + if (proc->frag_block == NULL) { + err = SQFS_ERROR_ALLOC; + goto fail; + } + + proc->frag_block->index = proc->num_fragments++; + proc->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK; + } - if (new == NULL) - return SQFS_ERROR_ALLOC; + err = store_fragment(proc, frag, signature); + if (err) + goto fail; - proc->max_fragments = newsz; - proc->fragments = new; return 0; +fail: + free(*blk_out); + *blk_out = NULL; + return err; } diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index 90a9d91..2e9980b 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -15,6 +15,7 @@ #include #include +#include #ifdef WITH_PTHREAD #include @@ -34,6 +35,12 @@ typedef struct { uint64_t signature; } blk_info_t; +typedef struct { + uint32_t index; + uint32_t offset; + uint64_t signature; +} frag_info_t; + #ifdef WITH_PTHREAD typedef struct { @@ -83,6 +90,11 @@ struct sqfs_block_processor_t { blk_info_t *blocks; sqfs_compressor_t *cmp; + sqfs_block_t *frag_block; + frag_info_t *frag_list; + size_t frag_list_num; + size_t frag_list_max; + /* used only by workers */ size_t max_block_size; @@ -100,8 +112,9 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *block); -SQFS_INTERNAL int grow_fragment_table(sqfs_block_processor_t *proc, - size_t index); +SQFS_INTERNAL +int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + sqfs_block_t **blk_out); SQFS_INTERNAL size_t deduplicate_blocks(sqfs_block_processor_t *proc, size_t count); diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index 78de31b..b4ed904 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -8,7 +8,6 @@ #include "internal.h" #include -#include int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, uint8_t *scratch, size_t scratch_size) @@ -71,16 +70,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) offset = proc->file->get_size(proc->file); if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { - if (grow_fragment_table(proc, blk->index)) - return 0; - offset = htole64(offset); proc->fragments[blk->index].start_offset = offset; proc->fragments[blk->index].pad0 = 0; proc->fragments[blk->index].size = htole32(out); - - if (blk->index >= proc->num_fragments) - proc->num_fragments = blk->index + 1; } else { blk->inode->block_sizes[blk->index] = out; } diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index a3dc01f..4f01bb3 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -119,11 +119,17 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->cmp = cmp; proc->file = file; proc->max_blocks = INIT_BLOCK_COUNT; + proc->frag_list_max = INIT_BLOCK_COUNT; proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); if (proc->blocks == NULL) goto fail_init; + proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), + proc->frag_list_max); + if (proc->frag_list == NULL) + goto fail_init; + for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), 1, max_block_size); @@ -172,6 +178,7 @@ fail_init: pthread_cond_destroy(&proc->done_cond); pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -200,6 +207,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) free_blk_list(proc->queue); free_blk_list(proc->done); + free(proc->frag_block); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -241,16 +250,40 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *completed = NULL; int status; - pthread_mutex_lock(&proc->mtx); - if (proc->status != 0) { - status = proc->status; + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { + status = SQFS_ERROR_UNSUPPORTED; + + pthread_mutex_lock(&proc->mtx); + if (proc->status == 0) { + proc->status = status; + pthread_cond_broadcast(&proc->queue_cond); + } goto fail; } - if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { - status = SQFS_ERROR_UNSUPPORTED; - proc->status = status; - pthread_cond_broadcast(&proc->queue_cond); + if (block->flags & SQFS_BLK_IS_FRAGMENT) { + block->checksum = crc32(0, block->data, block->size); + + completed = NULL; + status = handle_fragment(proc, block, &completed); + + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + goto fail; + } + + free(block); + if (completed == NULL) + return 0; + + block = completed; + completed = NULL; + } + + pthread_mutex_lock(&proc->mtx); + if (proc->status != 0) { + status = proc->status; goto fail; } @@ -288,6 +321,11 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) int status = 0; pthread_mutex_lock(&proc->mtx); + if (proc->frag_block != NULL) { + append_to_work_queue(proc, proc->frag_block); + proc->frag_block = NULL; + } + while (proc->backlog > 0 && proc->status == 0) pthread_cond_wait(&proc->done_cond, &proc->mtx); diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index 06c811e..c7ec366 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -28,6 +28,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->devblksz = devblksz; proc->file = file; proc->max_blocks = INIT_BLOCK_COUNT; + proc->frag_list_max = INIT_BLOCK_COUNT; proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); if (proc->blocks == NULL) { @@ -35,11 +36,21 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, return NULL; } + proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), + proc->frag_list_max); + if (proc->frag_list == NULL) { + free(proc->blocks); + free(proc); + return NULL; + } + return proc; } void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { + free(proc->frag_block); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -48,6 +59,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *fragblk = NULL; + if (proc->status != 0) { free(block); return proc->status; @@ -59,6 +72,23 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, return proc->status; } + if (block->flags & SQFS_BLK_IS_FRAGMENT) { + block->checksum = crc32(0, block->data, block->size); + + proc->status = handle_fragment(proc, block, &fragblk); + free(block); + + if (proc->status != 0) { + free(fragblk); + return proc->status; + } + + if (fragblk == NULL) + return 0; + + block = fragblk; + } + proc->status = sqfs_block_process(block, proc->cmp, proc->scratch, proc->max_block_size); @@ -71,5 +101,16 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { + if (proc->status != 0 || proc->frag_block == NULL) + return proc->status; + + proc->status = sqfs_block_process(proc->frag_block, proc->cmp, + proc->scratch, proc->max_block_size); + + if (proc->status == 0) + proc->status = process_completed_block(proc, proc->frag_block); + + free(proc->frag_block); + proc->frag_block = NULL; return proc->status; } -- cgit v1.2.3