diff options
author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2023-01-31 11:21:30 +0100 |
---|---|---|
committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2023-01-31 13:51:49 +0100 |
commit | cdccc69c62579b0c13b35fad0728079652b8f3c9 (patch) | |
tree | 9fa54c710f73c5e08a9c8466e7a712eb63ee07ac /lib/sqfs/src/block_processor | |
parent | 2182129c8f359c4fa1390eaba7a65b595ccd4182 (diff) |
Move library source into src sub-directory
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs/src/block_processor')
-rw-r--r-- | lib/sqfs/src/block_processor/backend.c | 335 | ||||
-rw-r--r-- | lib/sqfs/src/block_processor/block_processor.c | 358 | ||||
-rw-r--r-- | lib/sqfs/src/block_processor/frontend.c | 243 | ||||
-rw-r--r-- | lib/sqfs/src/block_processor/internal.h | 115 |
4 files changed, 1051 insertions, 0 deletions
diff --git a/lib/sqfs/src/block_processor/backend.c b/lib/sqfs/src/block_processor/backend.c new file mode 100644 index 0000000..b443c9d --- /dev/null +++ b/lib/sqfs/src/block_processor/backend.c @@ -0,0 +1,335 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * backend.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int set_block_size(sqfs_inode_generic_t **inode, + sqfs_u32 index, sqfs_u32 size) +{ + size_t min_size = (index + 1) * sizeof(sqfs_u32); + size_t avail = (*inode)->payload_bytes_available; + sqfs_inode_generic_t *new; + size_t newsz; + + if (avail < min_size) { + newsz = avail ? avail : (sizeof(sqfs_u32) * 4); + while (newsz < min_size) + newsz *= 2; + + if (SZ_ADD_OV(newsz, sizeof(**inode), &newsz)) + return SQFS_ERROR_OVERFLOW; + + if (sizeof(size_t) > sizeof(sqfs_u32)) { + if ((newsz - sizeof(**inode)) > 0x0FFFFFFFFUL) + return SQFS_ERROR_OVERFLOW; + } + + new = realloc((*inode), newsz); + if (new == NULL) + return SQFS_ERROR_ALLOC; + + (*inode) = new; + (*inode)->payload_bytes_available = newsz - sizeof(**inode); + } + + (*inode)->extra[index] = size; + + if (min_size >= (*inode)->payload_bytes_used) + (*inode)->payload_bytes_used = min_size; + + return 0; +} + +static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + blk->next = proc->free_list; + proc->free_list = blk; + + proc->backlog -= 1; +} + +static int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + sqfs_u64 location; + sqfs_u32 size; + int err; + + if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { + sqfs_block_t *it = proc->fblk_in_flight, *prev = NULL; + + while (it != NULL && it->index != blk->index) { + prev = it; + it = it->next; + } + + if (it != NULL) { + if (prev == NULL) { + proc->fblk_in_flight = it->next; + } else { + prev->next = it->next; + } + free(it); + } + } + + err = proc->wr->write_data_block(proc->wr, blk->user, blk->size, + blk->checksum, + blk->flags & ~BLK_FLAG_INTERNAL, + blk->data, &location); + if (err) + goto out; + + proc->stats.output_bytes_generated += blk->size; + + if (blk->flags & SQFS_BLK_IS_SPARSE) { + if (blk->inode != NULL) { + sqfs_inode_make_extended(*(blk->inode)); + (*(blk->inode))->data.file_ext.sparse += blk->size; + + err = set_block_size(blk->inode, blk->index, 0); + if (err) + goto out; + } + proc->stats.sparse_block_count += 1; + } else if (blk->size != 0) { + size = blk->size; + if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) + size |= 1 << 24; + + if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { + if (proc->frag_tbl != NULL) { + err = sqfs_frag_table_set(proc->frag_tbl, + blk->index, location, + size); + if (err) + goto out; + } + proc->stats.frag_block_count += 1; + } else { + if (blk->inode != NULL) { + err = set_block_size(blk->inode, blk->index, + size); + if (err) + goto out; + } + proc->stats.data_block_count += 1; + } + } + + if (blk->flags & SQFS_BLK_LAST_BLOCK && blk->inode != NULL) + sqfs_inode_set_file_block_start(*(blk->inode), location); +out: + release_old_block(proc, blk); + return err; +} + +static int process_completed_fragment(sqfs_block_processor_t *proc, + sqfs_block_t *frag) +{ + chunk_info_t *chunk = NULL, search; + struct hash_entry *entry; + sqfs_u32 index, offset; + int err; + + if (frag->flags & SQFS_BLK_IS_SPARSE) { + if (frag->inode != NULL) { + sqfs_inode_make_extended(*(frag->inode)); + set_block_size(frag->inode, frag->index, 0); + (*(frag->inode))->data.file_ext.sparse += frag->size; + } + proc->stats.sparse_block_count += 1; + release_old_block(proc, frag); + return 0; + } + + proc->stats.total_frag_count += 1; + + if (!(frag->flags & SQFS_BLK_DONT_DEDUPLICATE)) { + search.hash = frag->checksum; + search.size = frag->size; + + proc->current_frag = frag; + proc->fblk_lookup_error = 0; + entry = hash_table_search_pre_hashed(proc->frag_ht, + search.hash, &search); + proc->current_frag = NULL; + + if (proc->fblk_lookup_error != 0) { + err = proc->fblk_lookup_error; + goto fail; + } + + if (entry != NULL) { + if (frag->inode != NULL) { + chunk = entry->data; + sqfs_inode_set_frag_location(*(frag->inode), + chunk->index, + chunk->offset); + } + release_old_block(proc, frag); + return 0; + } + } + + if (proc->frag_block != NULL) { + size_t size = proc->frag_block->size + frag->size; + + if (size > proc->max_block_size) { + proc->frag_block->io_seq_num = proc->io_seq_num++; + + err = enqueue_block(proc, proc->frag_block); + proc->frag_block = NULL; + + if (err) + goto fail; + } + } + + if (proc->frag_block == NULL) { + if (proc->frag_tbl == NULL) { + index = 0; + } else { + err = sqfs_frag_table_append(proc->frag_tbl, + 0, 0, &index); + if (err) + goto fail; + } + + offset = 0; + proc->frag_block = frag; + proc->frag_block->index = index; + proc->frag_block->flags &= + (SQFS_BLK_DONT_COMPRESS | SQFS_BLK_ALIGN); + proc->frag_block->flags |= SQFS_BLK_FRAGMENT_BLOCK; + } else { + index = proc->frag_block->index; + offset = proc->frag_block->size; + + memcpy(proc->frag_block->data + proc->frag_block->size, + frag->data, frag->size); + + proc->frag_block->size += frag->size; + proc->frag_block->flags |= + (frag->flags & + (SQFS_BLK_DONT_COMPRESS | SQFS_BLK_ALIGN)); + } + + if (proc->frag_tbl != NULL) { + err = SQFS_ERROR_ALLOC; + chunk = calloc(1, sizeof(*chunk)); + if (chunk == NULL) + goto fail; + + chunk->index = index; + chunk->offset = offset; + chunk->size = frag->size; + chunk->hash = frag->checksum; + + proc->current_frag = frag; + proc->fblk_lookup_error = 0; + entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, + chunk, chunk); + proc->current_frag = NULL; + + if (proc->fblk_lookup_error != 0) { + err = proc->fblk_lookup_error; + goto fail; + } + + if (entry == NULL) + goto fail; + } + + if (frag->inode != NULL) + sqfs_inode_set_frag_location(*(frag->inode), index, offset); + + if (frag != proc->frag_block) + release_old_block(proc, frag); + + proc->stats.actual_frag_count += 1; + return 0; +fail: + free(chunk); + if (frag != proc->frag_block) + release_old_block(proc, frag); + return err; +} + +static void store_io_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + sqfs_block_t *prev = NULL, *it = proc->io_queue; + + while (it != NULL && (it->io_seq_num < blk->io_seq_num)) { + prev = it; + it = it->next; + } + + if (prev == NULL) { + proc->io_queue = blk; + } else { + prev->next = blk; + } + + blk->next = it; +} + +int dequeue_block(sqfs_block_processor_t *proc) +{ + size_t backlog_old = proc->backlog; + sqfs_block_t *blk; + int status; + + do { + while (proc->io_queue != NULL) { + if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) + break; + + blk = proc->io_queue; + proc->io_queue = blk->next; + proc->io_deq_seq_num += 1; + + status = process_completed_block(proc, blk); + if (status != 0) + return status; + } + + if (proc->backlog < backlog_old) + break; + + if ((proc->backlog == 1) && + (proc->frag_block != NULL || proc->blk_current != NULL)) { + break; + } + + if ((proc->backlog == 2) && + proc->frag_block != NULL && proc->blk_current != NULL) { + break; + } + + blk = proc->pool->dequeue(proc->pool); + + if (blk == NULL) { + status = proc->pool->get_status(proc->pool); + return status ? status : SQFS_ERROR_INTERNAL; + } + + if (blk->flags & SQFS_BLK_IS_FRAGMENT) { + status = process_completed_fragment(proc, blk); + if (status != 0) + return status; + } else { + if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK) || + (blk->flags & BLK_FLAG_MANUAL_SUBMISSION)) { + blk->io_seq_num = proc->io_seq_num++; + } + + store_io_block(proc, blk); + } + } while (proc->backlog >= backlog_old); + + return 0; +} diff --git a/lib/sqfs/src/block_processor/block_processor.c b/lib/sqfs/src/block_processor/block_processor.c new file mode 100644 index 0000000..d607437 --- /dev/null +++ b/lib/sqfs/src/block_processor/block_processor.c @@ -0,0 +1,358 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * block_processor.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int process_block(void *userptr, void *workitem) +{ + worker_data_t *worker = userptr; + sqfs_block_t *block = workitem; + sqfs_s32 ret; + + if (block->size == 0) + return 0; + + if (!(block->flags & SQFS_BLK_IGNORE_SPARSE) && + is_memory_zero(block->data, block->size)) { + block->flags |= SQFS_BLK_IS_SPARSE; + return 0; + } + + if (block->flags & SQFS_BLK_DONT_HASH) { + block->checksum = 0; + } else { + block->checksum = xxh32(block->data, block->size); + } + + if (block->flags & (SQFS_BLK_IS_FRAGMENT | SQFS_BLK_DONT_COMPRESS)) + return 0; + + ret = worker->cmp->do_block(worker->cmp, block->data, block->size, + worker->scratch, worker->scratch_size); + if (ret < 0) + return ret; + + if (ret > 0) { + memcpy(block->data, worker->scratch, ret); + block->size = ret; + block->flags |= SQFS_BLK_IS_COMPRESSED; + } + return 0; +} + +static int load_frag_block(sqfs_block_processor_t *proc, sqfs_u32 index) +{ + sqfs_fragment_t info; + size_t size; + int ret; + + if (proc->cached_frag_blk == NULL) { + size = sizeof(*proc->cached_frag_blk); + + proc->cached_frag_blk = alloc_flex(size, 1, + proc->max_block_size); + + if (proc->cached_frag_blk == NULL) + return SQFS_ERROR_ALLOC; + } else { + if (proc->cached_frag_blk->index == index) + return 0; + } + + ret = sqfs_frag_table_lookup(proc->frag_tbl, index, &info); + if (ret != 0) + return ret; + + size = SQFS_ON_DISK_BLOCK_SIZE(info.size); + if (size > proc->max_block_size) + return SQFS_ERROR_CORRUPTED; + + if (SQFS_IS_BLOCK_COMPRESSED(info.size)) { + ret = proc->file->read_at(proc->file, info.start_offset, + proc->scratch, size); + if (ret != 0) + return ret; + + ret = proc->uncmp->do_block(proc->uncmp, proc->scratch, size, + proc->cached_frag_blk->data, + proc->max_block_size); + if (ret <= 0) + return ret ? ret : SQFS_ERROR_OVERFLOW; + + size = ret; + } else { + ret = proc->file->read_at(proc->file, info.start_offset, + proc->cached_frag_blk->data, size); + if (ret != 0) + return ret; + } + + proc->cached_frag_blk->size = size; + proc->cached_frag_blk->index = index; + return 0; +} + +static bool chunk_info_equals(void *user, const void *k, const void *c) +{ + const chunk_info_t *key = k, *cmp = c; + sqfs_block_processor_t *proc = user; + sqfs_block_t *it; + int ret; + + if (key->size != cmp->size || key->hash != cmp->hash) + return false; + + if (proc->uncmp == NULL || proc->file == NULL) + return true; + + if (proc->current_frag == NULL || proc->frag_tbl == NULL) + return true; + + if (proc->fblk_lookup_error != 0) + return false; + + for (it = proc->fblk_in_flight; it != NULL; it = it->next) { + if (it->index == cmp->index) + break; + } + + if (it == NULL && proc->frag_block != NULL) { + if (proc->frag_block->index == cmp->index) + it = proc->frag_block; + } + + if (it == NULL) { + ret = load_frag_block(proc, cmp->index); + if (ret != 0) { + proc->fblk_lookup_error = ret; + return false; + } + + it = proc->cached_frag_blk; + } + + if (cmp->offset >= it->size || (it->size - cmp->offset) < cmp->size) { + proc->fblk_lookup_error = SQFS_ERROR_CORRUPTED; + return false; + } + + if (cmp->size != proc->current_frag->size) { + proc->fblk_lookup_error = SQFS_ERROR_CORRUPTED; + return false; + } + + return memcmp(it->data + cmp->offset, + proc->current_frag->data, cmp->size) == 0; +} + +static void ht_delete_function(struct hash_entry *entry) +{ + free(entry->data); +} + +static void free_block_list(sqfs_block_t *list) +{ + while (list != NULL) { + sqfs_block_t *it = list; + list = it->next; + free(it); + } +} + +static void block_processor_destroy(sqfs_object_t *base) +{ + sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; + + free(proc->frag_block); + free(proc->blk_current); + free(proc->cached_frag_blk); + + free_block_list(proc->free_list); + free_block_list(proc->io_queue); + free_block_list(proc->fblk_in_flight); + + if (proc->frag_ht != NULL) + hash_table_destroy(proc->frag_ht, ht_delete_function); + + /* XXX: shut down the pool first before cleaning up the worker data */ + if (proc->pool != NULL) + proc->pool->destroy(proc->pool); + + while (proc->workers != NULL) { + worker_data_t *worker = proc->workers; + proc->workers = worker->next; + + sqfs_drop(worker->cmp); + free(worker); + } + + sqfs_drop(proc->frag_tbl); + sqfs_drop(proc->wr); + sqfs_drop(proc->file); + sqfs_drop(proc->uncmp); + free(proc); +} + +int sqfs_block_processor_sync(sqfs_block_processor_t *proc) +{ + int ret; + + for (;;) { + if (proc->backlog == 0) + break; + + if ((proc->backlog == 1) && + (proc->frag_block != NULL || proc->blk_current != NULL)) { + break; + } + + if ((proc->backlog == 2) && + proc->frag_block != NULL && proc->blk_current != NULL) { + break; + } + + ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + + return 0; +} + +int sqfs_block_processor_finish(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + int status; + + status = sqfs_block_processor_sync(proc); + if (status != 0) + return status; + + if (proc->frag_block != NULL) { + blk = proc->frag_block; + blk->next = NULL; + proc->frag_block = NULL; + + blk->io_seq_num = proc->io_seq_num++; + + status = enqueue_block(proc, blk); + if (status != 0) + return status; + + status = sqfs_block_processor_sync(proc); + } + + return status; +} + +const sqfs_block_processor_stats_t +*sqfs_block_processor_get_stats(const sqfs_block_processor_t *proc) +{ + return &proc->stats; +} + +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, + sqfs_block_processor_t **out) +{ + size_t i, count, scratch_size = 0; + sqfs_block_processor_t *proc; + int ret; + + if (desc->size != sizeof(sqfs_block_processor_desc_t)) + return SQFS_ERROR_ARG_INVALID; + + if (desc->file != NULL && desc->uncmp != NULL) + scratch_size = desc->max_block_size; + + proc = alloc_flex(sizeof(*proc), 1, scratch_size); + if (proc == NULL) + return SQFS_ERROR_ALLOC; + + sqfs_object_init(proc, block_processor_destroy, NULL); + + proc->max_backlog = desc->max_backlog; + proc->max_block_size = desc->max_block_size; + proc->frag_tbl = sqfs_grab(desc->tbl); + proc->wr = sqfs_grab(desc->wr); + proc->file = sqfs_grab(desc->file); + proc->uncmp = sqfs_grab(desc->uncmp); + proc->stats.size = sizeof(proc->stats); + + /* we need at least one current data block + one fragment block */ + if (proc->max_backlog < 3) + proc->max_backlog = 3; + + /* create the thread pool */ + proc->pool = thread_pool_create(desc->num_workers, process_block); + if (proc->pool == NULL) { + ret = SQFS_ERROR_INTERNAL; + goto fail_pool; + } + + /* create the worker compressors & scratch buffer */ + count = proc->pool->get_worker_count(proc->pool); + + for (i = 0; i < count; ++i) { + worker_data_t *worker = alloc_flex(sizeof(*worker), 1, + desc->max_block_size); + if (worker == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + worker->scratch_size = desc->max_block_size; + worker->next = proc->workers; + proc->workers = worker; + + worker->cmp = sqfs_copy(desc->cmp); + if (worker->cmp == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + proc->pool->set_worker_ptr(proc->pool, i, worker); + } + + /* create the fragment hash table */ + proc->frag_ht = hash_table_create(NULL, chunk_info_equals); + if (proc->frag_ht == NULL) { + ret = SQFS_ERROR_ALLOC; + goto fail_pool; + } + + proc->frag_ht->user = proc; + *out = proc; + return 0; +fail_pool: + block_processor_destroy((sqfs_object_t *)proc); + return ret; +} + +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, + sqfs_compressor_t *cmp, + unsigned int num_workers, + size_t max_backlog, + sqfs_block_writer_t *wr, + sqfs_frag_table_t *tbl) +{ + sqfs_block_processor_desc_t desc; + sqfs_block_processor_t *out; + + memset(&desc, 0, sizeof(desc)); + desc.size = sizeof(desc); + desc.max_block_size = max_block_size; + desc.num_workers = num_workers; + desc.max_backlog = max_backlog; + desc.cmp = cmp; + desc.wr = wr; + desc.tbl = tbl; + + if (sqfs_block_processor_create_ex(&desc, &out) != 0) + return NULL; + + return out; +} diff --git a/lib/sqfs/src/block_processor/frontend.c b/lib/sqfs/src/block_processor/frontend.c new file mode 100644 index 0000000..e8a4207 --- /dev/null +++ b/lib/sqfs/src/block_processor/frontend.c @@ -0,0 +1,243 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * frontend.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int get_new_block(sqfs_block_processor_t *proc, sqfs_block_t **out) +{ + sqfs_block_t *blk; + + while (proc->backlog >= proc->max_backlog) { + int ret = dequeue_block(proc); + if (ret != 0) + return ret; + } + + if (proc->free_list != NULL) { + blk = proc->free_list; + proc->free_list = blk->next; + } else { + blk = malloc(sizeof(*blk) + proc->max_block_size); + if (blk == NULL) + return SQFS_ERROR_ALLOC; + } + + memset(blk, 0, sizeof(*blk)); + *out = blk; + + proc->backlog += 1; + return 0; +} + +static int add_sentinel_block(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + int ret; + + ret = get_new_block(proc, &blk); + if (ret != 0) + return ret; + + blk->inode = proc->inode; + blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; + + return enqueue_block(proc, blk); +} + +int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ + int status; + + if ((blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && + proc->file != NULL && proc->uncmp != NULL) { + sqfs_block_t *copy = alloc_flex(sizeof(*copy), 1, blk->size); + + if (copy == NULL) + return SQFS_ERROR_ALLOC; + + copy->size = blk->size; + copy->index = blk->index; + memcpy(copy->data, blk->data, blk->size); + + copy->next = proc->fblk_in_flight; + proc->fblk_in_flight = copy; + } + + if (proc->pool->submit(proc->pool, blk) != 0) { + status = proc->pool->get_status(proc->pool); + + if (status == 0) + status = SQFS_ERROR_ALLOC; + + blk->next = proc->free_list; + proc->free_list = blk; + return status; + } + + return 0; +} + +int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc, + sqfs_inode_generic_t **inode, + void *user, sqfs_u32 flags) +{ + if (proc->begin_called) + return SQFS_ERROR_SEQUENCE; + + if (flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) + return SQFS_ERROR_UNSUPPORTED; + + if (inode != NULL) { + (*inode) = calloc(1, sizeof(sqfs_inode_generic_t)); + if ((*inode) == NULL) + return SQFS_ERROR_ALLOC; + + (*inode)->base.type = SQFS_INODE_FILE; + sqfs_inode_set_frag_location(*inode, 0xFFFFFFFF, 0xFFFFFFFF); + } + + proc->begin_called = true; + proc->inode = inode; + proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK; + proc->blk_index = 0; + proc->user = user; + return 0; +} + +int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data, + size_t size) +{ + sqfs_block_t *new; + sqfs_u64 filesize; + size_t diff; + int err; + + if (!proc->begin_called) + return SQFS_ERROR_SEQUENCE; + + if (proc->inode != NULL) { + sqfs_inode_get_file_size(*(proc->inode), &filesize); + sqfs_inode_set_file_size(*(proc->inode), filesize + size); + } + + while (size > 0) { + if (proc->blk_current == NULL) { + err = get_new_block(proc, &new); + if (err != 0) + return err; + + proc->blk_current = new; + proc->blk_current->flags = proc->blk_flags; + proc->blk_current->inode = proc->inode; + proc->blk_current->user = proc->user; + proc->blk_current->index = proc->blk_index++; + proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK; + } + + diff = proc->max_block_size - proc->blk_current->size; + + if (diff == 0) { + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + + if (err) + return err; + continue; + } + + if (diff > size) + diff = size; + + memcpy(proc->blk_current->data + proc->blk_current->size, + data, diff); + + size -= diff; + proc->blk_current->size += diff; + data = (const char *)data + diff; + + proc->stats.input_bytes_read += diff; + } + + if (proc->blk_current->size == proc->max_block_size) { + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + + if (err) + return err; + } + + return 0; +} + +int sqfs_block_processor_end_file(sqfs_block_processor_t *proc) +{ + int err; + + if (!proc->begin_called) + return SQFS_ERROR_SEQUENCE; + + if (proc->blk_current == NULL) { + if (!(proc->blk_flags & SQFS_BLK_FIRST_BLOCK)) { + err = add_sentinel_block(proc); + if (err) + return err; + } + } else { + if (proc->blk_flags & SQFS_BLK_DONT_FRAGMENT) { + proc->blk_current->flags |= SQFS_BLK_LAST_BLOCK; + } else { + if (!(proc->blk_current->flags & + SQFS_BLK_FIRST_BLOCK)) { + err = add_sentinel_block(proc); + if (err) + return err; + } + + proc->blk_current->flags |= SQFS_BLK_IS_FRAGMENT; + } + + err = enqueue_block(proc, proc->blk_current); + proc->blk_current = NULL; + + if (err) + return err; + } + + proc->begin_called = false; + proc->inode = NULL; + proc->user = NULL; + proc->blk_flags = 0; + return 0; +} + +int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user, + sqfs_u32 flags, const void *data, + size_t size) +{ + sqfs_block_t *blk; + int ret; + + if (proc->begin_called) + return SQFS_ERROR_SEQUENCE; + + if (size > proc->max_block_size) + return SQFS_ERROR_OVERFLOW; + + if (flags & ~SQFS_BLK_FLAGS_ALL) + return SQFS_ERROR_UNSUPPORTED; + + ret = get_new_block(proc, &blk); + if (ret != 0) + return ret; + + blk->flags = flags | BLK_FLAG_MANUAL_SUBMISSION; + blk->user = user; + blk->size = size; + memcpy(blk->data, data, size); + + return enqueue_block(proc, blk); +} diff --git a/lib/sqfs/src/block_processor/internal.h b/lib/sqfs/src/block_processor/internal.h new file mode 100644 index 0000000..0b2c88d --- /dev/null +++ b/lib/sqfs/src/block_processor/internal.h @@ -0,0 +1,115 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * internal.h + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#ifndef INTERNAL_H +#define INTERNAL_H + +#include "config.h" + +#include "sqfs/block_processor.h" +#include "sqfs/block_writer.h" +#include "sqfs/frag_table.h" +#include "sqfs/compressor.h" +#include "sqfs/inode.h" +#include "sqfs/table.h" +#include "sqfs/error.h" +#include "sqfs/block.h" +#include "sqfs/io.h" + +#include "util/hash_table.h" +#include "util/threadpool.h" +#include "util/util.h" + +#include <string.h> +#include <stdlib.h> + +typedef struct { + sqfs_u32 index; + sqfs_u32 offset; + sqfs_u32 size; + sqfs_u32 hash; +} chunk_info_t; + +enum { + BLK_FLAG_MANUAL_SUBMISSION = 0x10000000, + BLK_FLAG_INTERNAL = 0x10000000, +}; + +typedef struct sqfs_block_t { + struct sqfs_block_t *next; + sqfs_inode_generic_t **inode; + + sqfs_u32 io_seq_num; + sqfs_u32 flags; + sqfs_u32 size; + sqfs_u32 checksum; + + /* For data blocks: index within the inode. + For fragment fragment blocks: fragment table index. */ + sqfs_u32 index; + + /* User data pointer */ + void *user; + + sqfs_u8 data[]; +} sqfs_block_t; + +typedef struct worker_data_t { + struct worker_data_t *next; + sqfs_compressor_t *cmp; + + size_t scratch_size; + sqfs_u8 scratch[]; +} worker_data_t; + +struct sqfs_block_processor_t { + sqfs_object_t obj; + + sqfs_frag_table_t *frag_tbl; + sqfs_block_t *frag_block; + sqfs_block_writer_t *wr; + + sqfs_block_processor_stats_t stats; + + sqfs_inode_generic_t **inode; + sqfs_block_t *blk_current; + sqfs_u32 blk_flags; + sqfs_u32 blk_index; + void *user; + + struct hash_table *frag_ht; + sqfs_block_t *free_list; + + size_t max_block_size; + size_t max_backlog; + size_t backlog; + + bool begin_called; + + sqfs_file_t *file; + sqfs_compressor_t *uncmp; + + thread_pool_t *pool; + worker_data_t *workers; + + sqfs_block_t *io_queue; + sqfs_u32 io_seq_num; + sqfs_u32 io_deq_seq_num; + + sqfs_block_t *current_frag; + sqfs_block_t *cached_frag_blk; + sqfs_block_t *fblk_in_flight; + int fblk_lookup_error; + + sqfs_u8 scratch[]; +}; + +SQFS_INTERNAL int enqueue_block(sqfs_block_processor_t *proc, + sqfs_block_t *blk); + +SQFS_INTERNAL int dequeue_block(sqfs_block_processor_t *proc); + +#endif /* INTERNAL_H */ |