diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-24 03:45:30 +0200 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-24 04:01:10 +0200 |
commit | 8d9f24a65ef27a52615b3225776632de08462eba (patch) | |
tree | 76d64437a98e94b2749cbb2f48117f9d1b4af947 | |
parent | 035cbdfe1bc5aea16cbcada5e3c00ecf5ac08c96 (diff) |
Move entire fragment processing from data writer to block processor
So far, this is mostly a direct port from the block processor. The
actual fragment checksumming is not done through the thread pool.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r-- | include/sqfs/block_processor.h | 13 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/fragtbl.c | 110 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/internal.h | 17 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/process_block.c | 7 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 52 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/serial.c | 41 | ||||
-rw-r--r-- | lib/sqfshelper/data_writer.c | 134 |
7 files changed, 211 insertions, 163 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h index 5b7b3e0..f0f7145 100644 --- a/include/sqfs/block_processor.h +++ b/include/sqfs/block_processor.h @@ -77,11 +77,16 @@ typedef enum { SQFS_BLK_ALLIGN = 0x0008, /** - * @brief Indicates that a block is not part of a file but contains - * file tail ends and an entry in the fragment table has to be - * added. + * @brief Indicates that a block is a tail end of a file and the block + * processor should take care of fragment packing and accounting. */ - SQFS_BLK_FRAGMENT_BLOCK = 0x0010, + SQFS_BLK_IS_FRAGMENT = 0x0010, + + /** + * @brief Set by the block processor on fragment blocks that + * it generates. + */ + SQFS_BLK_FRAGMENT_BLOCK = 0x4000, /** * @brief Set by compressor worker if the block was actually compressed. diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c index 3f6c644..84f5ed6 100644 --- a/lib/sqfs/blk_proc/fragtbl.c +++ b/lib/sqfs/blk_proc/fragtbl.c @@ -35,24 +35,112 @@ int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc, return 0; } -int grow_fragment_table(sqfs_block_processor_t *proc, size_t index) +static int grow_fragment_table(sqfs_block_processor_t *proc) { size_t newsz; void *new; - if (index < proc->max_fragments) - return 0; - - do { + if (proc->num_fragments >= proc->max_fragments) { newsz = proc->max_fragments ? proc->max_fragments * 2 : 16; - } while (index >= newsz); - new = realloc(proc->fragments, sizeof(proc->fragments[0]) * newsz); + new = realloc(proc->fragments, + sizeof(proc->fragments[0]) * newsz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->max_fragments = newsz; + proc->fragments = new; + } + + return 0; +} + +static int store_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + uint64_t signature) +{ + size_t new_sz; + void *new; + + if (proc->frag_list_num == proc->frag_list_max) { + new_sz = proc->frag_list_max * 2; + new = realloc(proc->frag_list, + sizeof(proc->frag_list[0]) * new_sz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->frag_list = new; + proc->frag_list_max = new_sz; + } + + proc->frag_list[proc->frag_list_num].index = proc->frag_block->index; + proc->frag_list[proc->frag_list_num].offset = proc->frag_block->size; + proc->frag_list[proc->frag_list_num].signature = signature; + proc->frag_list_num += 1; + + sqfs_inode_set_frag_location(frag->inode, proc->frag_block->index, + proc->frag_block->size); + + memcpy(proc->frag_block->data + proc->frag_block->size, + frag->data, frag->size); + + proc->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS); + proc->frag_block->size += frag->size; + return 0; +} + +int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + sqfs_block_t **blk_out) +{ + uint64_t signature; + size_t i, size; + int err; + + signature = MK_BLK_SIG(frag->checksum, frag->size); + + for (i = 0; i < proc->frag_list_num; ++i) { + if (proc->frag_list[i].signature == signature) { + sqfs_inode_set_frag_location(frag->inode, + proc->frag_list[i].index, + proc->frag_list[i].offset); + return 0; + } + } + + if (proc->frag_block != NULL) { + size = proc->frag_block->size + frag->size; + + if (size > proc->max_block_size) { + *blk_out = proc->frag_block; + proc->frag_block = NULL; + } + } + + if (proc->frag_block == NULL) { + size = sizeof(sqfs_block_t) + proc->max_block_size; + + err = grow_fragment_table(proc); + if (err) + goto fail; + + proc->frag_block = calloc(1, size); + if (proc->frag_block == NULL) { + err = SQFS_ERROR_ALLOC; + goto fail; + } + + proc->frag_block->index = proc->num_fragments++; + proc->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK; + } - if (new == NULL) - return SQFS_ERROR_ALLOC; + err = store_fragment(proc, frag, signature); + if (err) + goto fail; - proc->max_fragments = newsz; - proc->fragments = new; return 0; +fail: + free(*blk_out); + *blk_out = NULL; + return err; } diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index 90a9d91..2e9980b 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -15,6 +15,7 @@ #include <string.h> #include <stdlib.h> +#include <zlib.h> #ifdef WITH_PTHREAD #include <pthread.h> @@ -34,6 +35,12 @@ typedef struct { uint64_t signature; } blk_info_t; +typedef struct { + uint32_t index; + uint32_t offset; + uint64_t signature; +} frag_info_t; + #ifdef WITH_PTHREAD typedef struct { @@ -83,6 +90,11 @@ struct sqfs_block_processor_t { blk_info_t *blocks; sqfs_compressor_t *cmp; + sqfs_block_t *frag_block; + frag_info_t *frag_list; + size_t frag_list_num; + size_t frag_list_max; + /* used only by workers */ size_t max_block_size; @@ -100,8 +112,9 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *block); -SQFS_INTERNAL int grow_fragment_table(sqfs_block_processor_t *proc, - size_t index); +SQFS_INTERNAL +int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag, + sqfs_block_t **blk_out); SQFS_INTERNAL size_t deduplicate_blocks(sqfs_block_processor_t *proc, size_t count); diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index 78de31b..b4ed904 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -8,7 +8,6 @@ #include "internal.h" #include <string.h> -#include <zlib.h> int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, uint8_t *scratch, size_t scratch_size) @@ -71,16 +70,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) offset = proc->file->get_size(proc->file); if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { - if (grow_fragment_table(proc, blk->index)) - return 0; - offset = htole64(offset); proc->fragments[blk->index].start_offset = offset; proc->fragments[blk->index].pad0 = 0; proc->fragments[blk->index].size = htole32(out); - - if (blk->index >= proc->num_fragments) - proc->num_fragments = blk->index + 1; } else { blk->inode->block_sizes[blk->index] = out; } diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index a3dc01f..4f01bb3 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -119,11 +119,17 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->cmp = cmp; proc->file = file; proc->max_blocks = INIT_BLOCK_COUNT; + proc->frag_list_max = INIT_BLOCK_COUNT; proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); if (proc->blocks == NULL) goto fail_init; + proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), + proc->frag_list_max); + if (proc->frag_list == NULL) + goto fail_init; + for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), 1, max_block_size); @@ -172,6 +178,7 @@ fail_init: pthread_cond_destroy(&proc->done_cond); pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -200,6 +207,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) free_blk_list(proc->queue); free_blk_list(proc->done); + free(proc->frag_block); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -241,16 +250,40 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *completed = NULL; int status; - pthread_mutex_lock(&proc->mtx); - if (proc->status != 0) { - status = proc->status; + if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { + status = SQFS_ERROR_UNSUPPORTED; + + pthread_mutex_lock(&proc->mtx); + if (proc->status == 0) { + proc->status = status; + pthread_cond_broadcast(&proc->queue_cond); + } goto fail; } - if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) { - status = SQFS_ERROR_UNSUPPORTED; - proc->status = status; - pthread_cond_broadcast(&proc->queue_cond); + if (block->flags & SQFS_BLK_IS_FRAGMENT) { + block->checksum = crc32(0, block->data, block->size); + + completed = NULL; + status = handle_fragment(proc, block, &completed); + + if (status != 0) { + pthread_mutex_lock(&proc->mtx); + proc->status = status; + goto fail; + } + + free(block); + if (completed == NULL) + return 0; + + block = completed; + completed = NULL; + } + + pthread_mutex_lock(&proc->mtx); + if (proc->status != 0) { + status = proc->status; goto fail; } @@ -288,6 +321,11 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) int status = 0; pthread_mutex_lock(&proc->mtx); + if (proc->frag_block != NULL) { + append_to_work_queue(proc, proc->frag_block); + proc->frag_block = NULL; + } + while (proc->backlog > 0 && proc->status == 0) pthread_cond_wait(&proc->done_cond, &proc->mtx); diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index 06c811e..c7ec366 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -28,6 +28,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->devblksz = devblksz; proc->file = file; proc->max_blocks = INIT_BLOCK_COUNT; + proc->frag_list_max = INIT_BLOCK_COUNT; proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); if (proc->blocks == NULL) { @@ -35,11 +36,21 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, return NULL; } + proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), + proc->frag_list_max); + if (proc->frag_list == NULL) { + free(proc->blocks); + free(proc); + return NULL; + } + return proc; } void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { + free(proc->frag_block); + free(proc->frag_list); free(proc->fragments); free(proc->blocks); free(proc); @@ -48,6 +59,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, sqfs_block_t *block) { + sqfs_block_t *fragblk = NULL; + if (proc->status != 0) { free(block); return proc->status; @@ -59,6 +72,23 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, return proc->status; } + if (block->flags & SQFS_BLK_IS_FRAGMENT) { + block->checksum = crc32(0, block->data, block->size); + + proc->status = handle_fragment(proc, block, &fragblk); + free(block); + + if (proc->status != 0) { + free(fragblk); + return proc->status; + } + + if (fragblk == NULL) + return 0; + + block = fragblk; + } + proc->status = sqfs_block_process(block, proc->cmp, proc->scratch, proc->max_block_size); @@ -71,5 +101,16 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, int sqfs_block_processor_finish(sqfs_block_processor_t *proc) { + if (proc->status != 0 || proc->frag_block == NULL) + return proc->status; + + proc->status = sqfs_block_process(proc->frag_block, proc->cmp, + proc->scratch, proc->max_block_size); + + if (proc->status == 0) + proc->status = process_completed_block(proc, proc->frag_block); + + free(proc->frag_block); + proc->frag_block = NULL; return proc->status; } diff --git a/lib/sqfshelper/data_writer.c b/lib/sqfshelper/data_writer.c index 5bce527..9f9631b 100644 --- a/lib/sqfshelper/data_writer.c +++ b/lib/sqfshelper/data_writer.c @@ -18,128 +18,14 @@ #include <errno.h> #include <zlib.h> -#define MK_BLK_SIG(chksum, size) \ - (((uint64_t)(size) << 32) | (uint64_t)(chksum)) - -#define BLK_SIZE(sig) ((sig) >> 32) - -#define INIT_BLOCK_COUNT (128) - -typedef struct { - uint32_t index; - uint32_t offset; - uint64_t signature; -} frag_info_t; - struct data_writer_t { - sqfs_block_t *frag_block; - size_t num_fragments; - sqfs_block_processor_t *proc; sqfs_compressor_t *cmp; sqfs_super_t *super; - size_t frag_list_num; - size_t frag_list_max; - frag_info_t *frag_list; - data_writer_stats_t stats; }; -static int flush_fragment_block(data_writer_t *data) -{ - int ret; - - ret = sqfs_block_processor_enqueue(data->proc, data->frag_block); - data->frag_block = NULL; - return ret; -} - -static int store_fragment(data_writer_t *data, sqfs_block_t *frag, - uint64_t signature) -{ - size_t new_sz; - void *new; - - if (data->frag_list_num == data->frag_list_max) { - new_sz = data->frag_list_max * 2; - new = realloc(data->frag_list, - sizeof(data->frag_list[0]) * new_sz); - - if (new == NULL) { - perror("growing fragment checksum table"); - return -1; - } - - data->frag_list = new; - data->frag_list_max = new_sz; - } - - data->frag_list[data->frag_list_num].index = data->frag_block->index; - data->frag_list[data->frag_list_num].offset = data->frag_block->size; - data->frag_list[data->frag_list_num].signature = signature; - data->frag_list_num += 1; - - sqfs_inode_set_frag_location(frag->inode, data->frag_block->index, - data->frag_block->size); - - memcpy(data->frag_block->data + data->frag_block->size, - frag->data, frag->size); - - data->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS); - data->frag_block->size += frag->size; - return 0; -} - -static int handle_fragment(data_writer_t *data, sqfs_block_t *frag) -{ - uint64_t signature; - size_t i, size; - - signature = MK_BLK_SIG(frag->checksum, frag->size); - - for (i = 0; i < data->frag_list_num; ++i) { - if (data->frag_list[i].signature == signature) { - sqfs_inode_set_frag_location(frag->inode, - data->frag_list[i].index, - data->frag_list[i].offset); - free(frag); - return 0; - } - } - - if (data->frag_block != NULL) { - size = data->frag_block->size + frag->size; - - if (size > data->super->block_size) { - if (flush_fragment_block(data)) - goto fail; - } - } - - if (data->frag_block == NULL) { - size = sizeof(sqfs_block_t) + data->super->block_size; - - data->frag_block = calloc(1, size); - if (data->frag_block == NULL) { - perror("creating fragment block"); - goto fail; - } - - data->frag_block->index = data->num_fragments++; - data->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK; - } - - if (store_fragment(data, frag, signature)) - goto fail; - - free(frag); - return 0; -fail: - free(frag); - return -1; -} - static bool is_zero_block(unsigned char *ptr, size_t size) { return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; @@ -225,9 +111,9 @@ int write_data_from_file_condensed(data_writer_t *data, sqfs_file_t *file, } } - blk->checksum = crc32(0, blk->data, blk->size); + blk->flags |= SQFS_BLK_IS_FRAGMENT; - if (handle_fragment(data, blk)) + if (sqfs_block_processor_enqueue(data->proc, blk)) return -1; } else { if (sqfs_block_processor_enqueue(data->proc, blk)) @@ -269,21 +155,11 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp, return NULL; } - data->frag_list_max = INIT_BLOCK_COUNT; - data->frag_list = alloc_array(sizeof(data->frag_list[0]), - data->frag_list_max); - if (data->frag_list == NULL) { - perror("creating data writer"); - free(data); - return NULL; - } - data->proc = sqfs_block_processor_create(super->block_size, cmp, num_jobs, max_backlog, devblksize, file); if (data->proc == NULL) { perror("creating data block processor"); - free(data->frag_list); free(data); return NULL; } @@ -296,7 +172,6 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp, void data_writer_destroy(data_writer_t *data) { sqfs_block_processor_destroy(data->proc); - free(data->frag_list); free(data); } @@ -312,11 +187,6 @@ int data_writer_write_fragment_table(data_writer_t *data) int data_writer_sync(data_writer_t *data) { - if (data->frag_block != NULL) { - if (flush_fragment_block(data)) - return -1; - } - return sqfs_block_processor_finish(data->proc); } |