diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-23 15:28:25 +0200 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-09-23 15:35:36 +0200 |
commit | b86cbb04c9cc72506783e175e871338b1f0e6750 (patch) | |
tree | dcb4f92b6962aa35cacb2db09219fd4dfbaf3d60 /lib/sqfs/blk_proc | |
parent | 8ee4a30a0f3545dac5b9a93b40c172f4fb4d44a1 (diff) |
Move the bulk of the work from the data writer to the block processor
Instead of calling a callback, the block processor now takes care of
writing the data blocks to the file in the correct order, keeping
track of fragment blocks and deduplicating blocks.
Some cleanup work remains to be done and the statistics have to be
repaired (yet again).
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/blk_proc')
-rw-r--r-- | lib/sqfs/blk_proc/fragtbl.c | 36 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/internal.h | 37 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/process_block.c | 151 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/pthread.c | 14 | ||||
-rw-r--r-- | lib/sqfs/blk_proc/serial.c | 16 |
5 files changed, 242 insertions, 12 deletions
diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c new file mode 100644 index 0000000..81baf09 --- /dev/null +++ b/lib/sqfs/blk_proc/fragtbl.c @@ -0,0 +1,36 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * fragtbl.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc, + sqfs_super_t *super) +{ + uint64_t start; + size_t size; + int ret; + + if (proc->num_fragments == 0) { + super->fragment_entry_count = 0; + super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL; + super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS; + super->flags |= SQFS_FLAG_NO_FRAGMENTS; + return 0; + } + + size = sizeof(proc->fragments[0]) * proc->num_fragments; + ret = sqfs_write_table(proc->file, proc->cmp, + proc->fragments, size, &start); + if (ret) + return ret; + + super->flags &= ~SQFS_FLAG_NO_FRAGMENTS; + super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS; + super->fragment_entry_count = proc->num_fragments; + super->fragment_table_start = start; + return 0; +} diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index 85c7783..b5af751 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -6,7 +6,11 @@ #include "sqfs/block_processor.h" #include "sqfs/compress.h" +#include "sqfs/inode.h" +#include "sqfs/table.h" #include "sqfs/error.h" +#include "sqfs/data.h" +#include "sqfs/io.h" #include "util.h" #include <string.h> @@ -16,6 +20,21 @@ #include <pthread.h> #endif + +#define MK_BLK_SIG(chksum, size) \ + (((uint64_t)(size) << 32) | (uint64_t)(chksum)) + +#define BLK_SIZE(sig) ((sig) >> 32) + +#define INIT_BLOCK_COUNT (128) + + +typedef struct { + uint64_t offset; + uint64_t signature; +} blk_info_t; + + #ifdef WITH_PTHREAD typedef struct { sqfs_block_processor_t *shared; @@ -46,18 +65,30 @@ struct sqfs_block_processor_t { uint32_t dequeue_id; unsigned int num_workers; - sqfs_block_cb cb; - void *user; int status; size_t max_backlog; + size_t devblksz; + sqfs_file_t *file; + + sqfs_fragment_t *fragments; + size_t num_fragments; + size_t max_fragments; + + uint64_t start; + + size_t file_start; + size_t num_blocks; + size_t max_blocks; + blk_info_t *blocks; + sqfs_compressor_t *cmp; + /* used only by workers */ size_t max_block_size; #ifdef WITH_PTHREAD compress_worker_t *workers[]; #else - sqfs_compressor_t *cmp; uint8_t scratch[]; #endif }; diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index 1b6ee29..0747bc5 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -37,6 +37,155 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, return 0; } +static int allign_file(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + if (!(blk->flags & SQFS_BLK_ALLIGN)) + return 0; + + return padd_sqfs(proc->file, proc->file->get_size(proc->file), + proc->devblksz); +} + +static int store_block_location(sqfs_block_processor_t *proc, uint64_t offset, + uint32_t size, uint32_t chksum) +{ + size_t new_sz; + void *new; + + if (proc->num_blocks == proc->max_blocks) { + new_sz = proc->max_blocks * 2; + new = realloc(proc->blocks, sizeof(proc->blocks[0]) * new_sz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->blocks = new; + proc->max_blocks = new_sz; + } + + proc->blocks[proc->num_blocks].offset = offset; + proc->blocks[proc->num_blocks].signature = MK_BLK_SIG(chksum, size); + proc->num_blocks += 1; + return 0; +} + +static size_t deduplicate_blocks(sqfs_block_processor_t *proc, size_t count) +{ + size_t i, j; + + for (i = 0; i < proc->file_start; ++i) { + for (j = 0; j < count; ++j) { + if (proc->blocks[i + j].signature != + proc->blocks[proc->file_start + j].signature) + break; + } + + if (j == count) + break; + } + + return i; +} + +static size_t grow_fragment_table(sqfs_block_processor_t *proc, size_t index) +{ + size_t newsz; + void *new; + + if (index < proc->max_fragments) + return 0; + + do { + newsz = proc->max_fragments ? proc->max_fragments * 2 : 16; + } while (index >= newsz); + + new = realloc(proc->fragments, sizeof(proc->fragments[0]) * newsz); + + if (new == NULL) + return SQFS_ERROR_ALLOC; + + proc->max_fragments = newsz; + proc->fragments = new; + return 0; +} + +static int handle_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + size_t start, count; + uint64_t offset; + uint32_t out; + int err; + + if (blk->flags & SQFS_BLK_FIRST_BLOCK) { + proc->start = proc->file->get_size(proc->file); + proc->file_start = proc->num_blocks; + + err = allign_file(proc, blk); + if (err) + return err; + } + + if (blk->size != 0) { + out = blk->size; + if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) + out |= 1 << 24; + + offset = proc->file->get_size(proc->file); + + if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { + if (grow_fragment_table(proc, blk->index)) + return 0; + + offset = htole64(offset); + proc->fragments[blk->index].start_offset = offset; + proc->fragments[blk->index].pad0 = 0; + proc->fragments[blk->index].size = htole32(out); + + if (blk->index >= proc->num_fragments) + proc->num_fragments = blk->index + 1; + } else { + blk->inode->block_sizes[blk->index] = out; + } + + err = store_block_location(proc, offset, out, blk->checksum); + if (err) + return err; + + err = proc->file->write_at(proc->file, offset, + blk->data, blk->size); + if (err) + return err; + } + + if (blk->flags & SQFS_BLK_LAST_BLOCK) { + err = allign_file(proc, blk); + if (err) + return err; + + count = proc->num_blocks - proc->file_start; + start = deduplicate_blocks(proc, count); + offset = proc->blocks[start].offset; + + sqfs_inode_set_file_block_start(blk->inode, offset); + + if (start < proc->file_start) { + offset = start + count; + + if (offset >= proc->file_start) { + proc->num_blocks = offset; + } else { + proc->num_blocks = proc->file_start; + } + + err = proc->file->truncate(proc->file, proc->start); + if (err) + return err; + } + } + + return 0; +} + int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue) { sqfs_block_t *it; @@ -48,7 +197,7 @@ int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue) if (it->flags & SQFS_BLK_COMPRESS_ERROR) { proc->status = SQFS_ERROR_COMRPESSOR; } else if (proc->status == 0) { - proc->status = proc->cb(proc->user, it); + proc->status = handle_block(proc, it); } free(it); diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 565bad2..a9cffd0 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -70,8 +70,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, size_t max_backlog, - void *user, - sqfs_block_cb callback) + size_t devblksz, + sqfs_file_t *file) { sqfs_block_processor_t *proc; unsigned int i; @@ -86,13 +86,19 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, return NULL; proc->max_block_size = max_block_size; - proc->cb = callback; - proc->user = user; proc->num_workers = num_workers; proc->max_backlog = max_backlog; proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + proc->devblksz = devblksz; + proc->cmp = cmp; + proc->file = file; + proc->max_blocks = INIT_BLOCK_COUNT; + + proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); + if (proc->blocks == NULL) + goto fail_init; for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index ee172de..b6c17fb 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -11,8 +11,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, size_t max_backlog, - void *user, - sqfs_block_cb callback) + size_t devblksz, + sqfs_file_t *file) { sqfs_block_processor_t *proc; (void)num_workers; @@ -25,8 +25,16 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->max_block_size = max_block_size; proc->cmp = cmp; - proc->cb = callback; - proc->user = user; + proc->devblksz = devblksz; + proc->file = file; + proc->max_blocks = INIT_BLOCK_COUNT; + + proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); + if (proc->blocks == NULL) { + free(proc); + return NULL; + } + return proc; } |