diff options
| author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-22 11:30:15 +0100 | 
|---|---|---|
| committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2021-03-22 15:26:47 +0100 | 
| commit | 5aa0f30173ecf3b6538b9136cb4783fc19266288 (patch) | |
| tree | e163429bb7ecf5c3cd343532e0fd60b5f1819d39 /lib/sqfs | |
| parent | c7056c1853b5defd5b933e651bf58dc94b4d3f8b (diff) | |
Cleanup the block processor file structure
A cleaner separation between common code, frontend code and backend
code is made.
The "is this byte blob zero" function is moved out to libutil (with
test case and everything) with a more optimized implementation.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs')
| -rw-r--r-- | lib/sqfs/Makemodule.am | 4 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/backend.c | 288 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/block_processor.c | 234 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/common.c | 421 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/frontend.c | 254 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/internal.h | 12 | 
6 files changed, 569 insertions, 644 deletions
| diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index 5a62894..72eb6bc 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -25,8 +25,9 @@ libsquashfs_la_SOURCES += lib/sqfs/xattr/xattr_writer_record.c  libsquashfs_la_SOURCES += lib/sqfs/xattr/xattr_writer.h  libsquashfs_la_SOURCES += lib/sqfs/write_super.c lib/sqfs/data_reader.c  libsquashfs_la_SOURCES += lib/sqfs/block_processor/internal.h -libsquashfs_la_SOURCES += lib/sqfs/block_processor/common.c  libsquashfs_la_SOURCES += lib/sqfs/block_processor/frontend.c +libsquashfs_la_SOURCES += lib/sqfs/block_processor/block_processor.c +libsquashfs_la_SOURCES += lib/sqfs/block_processor/backend.c  libsquashfs_la_SOURCES += lib/sqfs/frag_table.c include/sqfs/frag_table.h  libsquashfs_la_SOURCES += lib/sqfs/block_writer.c include/sqfs/block_writer.h  libsquashfs_la_CPPFLAGS = $(AM_CPPFLAGS) @@ -43,6 +44,7 @@ libsquashfs_la_SOURCES += lib/util/xxhash.c  libsquashfs_la_SOURCES += lib/util/hash_table.c include/hash_table.h  libsquashfs_la_SOURCES += lib/util/rbtree.c include/rbtree.h  libsquashfs_la_SOURCES += lib/util/array.c include/array.h +libsquashfs_la_SOURCES += lib/util/is_memory_zero.c  libsquashfs_la_SOURCES += include/threadpool.h  if CUSTOM_ALLOC diff --git a/lib/sqfs/block_processor/backend.c b/lib/sqfs/block_processor/backend.c new file mode 100644 index 0000000..ff142c9 --- /dev/null +++ b/lib/sqfs/block_processor/backend.c @@ -0,0 +1,288 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * backend.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int set_block_size(sqfs_inode_generic_t **inode, +			  sqfs_u32 index, sqfs_u32 size) +{ +	size_t min_size = (index + 1) * sizeof(sqfs_u32); +	size_t avail = (*inode)->payload_bytes_available; +	sqfs_inode_generic_t *new; +	size_t newsz; + +	if (avail < min_size) { +		newsz = avail ? avail : (sizeof(sqfs_u32) * 4); +		while (newsz < min_size) +			newsz *= 2; + +		if (SZ_ADD_OV(newsz, sizeof(**inode), &newsz)) +			return SQFS_ERROR_OVERFLOW; + +		if (sizeof(size_t) > sizeof(sqfs_u32)) { +			if ((newsz - sizeof(**inode)) > 0x0FFFFFFFFUL) +				return SQFS_ERROR_OVERFLOW; +		} + +		new = realloc((*inode), newsz); +		if (new == NULL) +			return SQFS_ERROR_ALLOC; + +		(*inode) = new; +		(*inode)->payload_bytes_available = newsz - sizeof(**inode); +	} + +	(*inode)->extra[index] = size; + +	if (min_size >= (*inode)->payload_bytes_used) +		(*inode)->payload_bytes_used = min_size; + +	return 0; +} + +static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ +	blk->next = proc->free_list; +	proc->free_list = blk; +} + +static int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ +	sqfs_u64 location; +	sqfs_u32 size; +	int err; + +	err = proc->wr->write_data_block(proc->wr, blk->user, blk->size, +					 blk->checksum, +					 blk->flags & ~BLK_FLAG_INTERNAL, +					 blk->data, &location); +	if (err) +		goto out; + +	proc->stats.output_bytes_generated += blk->size; + +	if (blk->flags & SQFS_BLK_IS_SPARSE) { +		if (blk->inode != NULL) { +			sqfs_inode_make_extended(*(blk->inode)); +			(*(blk->inode))->data.file_ext.sparse += blk->size; + +			err = set_block_size(blk->inode, blk->index, 0); +			if (err) +				goto out; +		} +		proc->stats.sparse_block_count += 1; +	} else if (blk->size != 0) { +		size = blk->size; +		if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) +			size |= 1 << 24; + +		if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { +			if (proc->frag_tbl != NULL) { +				err = sqfs_frag_table_set(proc->frag_tbl, +							  blk->index, location, +							  size); +				if (err) +					goto out; +			} +			proc->stats.frag_block_count += 1; +		} else { +			if (blk->inode != NULL) { +				err = set_block_size(blk->inode, blk->index, +						     size); +				if (err) +					goto out; +			} +			proc->stats.data_block_count += 1; +		} +	} + +	if (blk->flags & SQFS_BLK_LAST_BLOCK && blk->inode != NULL) +		sqfs_inode_set_file_block_start(*(blk->inode), location); +out: +	release_old_block(proc, blk); +	return err; +} + +static int process_completed_fragment(sqfs_block_processor_t *proc, +				      sqfs_block_t *frag) +{ +	chunk_info_t *chunk = NULL, search; +	struct hash_entry *entry; +	sqfs_u32 index, offset; +	int err; + +	if (frag->flags & SQFS_BLK_IS_SPARSE) { +		if (frag->inode != NULL) { +			sqfs_inode_make_extended(*(frag->inode)); +			set_block_size(frag->inode, frag->index, 0); +			(*(frag->inode))->data.file_ext.sparse += frag->size; +		} +		proc->stats.sparse_block_count += 1; +		release_old_block(proc, frag); +		return 0; +	} + +	proc->stats.total_frag_count += 1; + +	if (!(frag->flags & SQFS_BLK_DONT_DEDUPLICATE)) { +		search.hash = frag->checksum; +		search.size = frag->size; + +		entry = hash_table_search_pre_hashed(proc->frag_ht, +						     search.hash, &search); + +		if (entry != NULL) { +			if (frag->inode != NULL) { +				chunk = entry->data; +				sqfs_inode_set_frag_location(*(frag->inode), +							     chunk->index, +							     chunk->offset); +			} +			release_old_block(proc, frag); +			return 0; +		} +	} + +	if (proc->frag_block != NULL) { +		size_t size = proc->frag_block->size + frag->size; + +		if (size > proc->max_block_size) { +			proc->frag_block->io_seq_num = proc->io_seq_num++; + +			err = enqueue_block(proc, proc->frag_block); +			proc->frag_block = NULL; + +			if (err) +				goto fail; +		} +	} + +	if (proc->frag_block == NULL) { +		if (proc->frag_tbl == NULL) { +			index = 0; +		} else { +			err = sqfs_frag_table_append(proc->frag_tbl, +						     0, 0, &index); +			if (err) +				goto fail; +		} + +		offset = 0; +		proc->frag_block = frag; +		proc->frag_block->index = index; +		proc->frag_block->flags &= SQFS_BLK_DONT_COMPRESS; +		proc->frag_block->flags |= SQFS_BLK_FRAGMENT_BLOCK; +	} else { +		index = proc->frag_block->index; +		offset = proc->frag_block->size; + +		memcpy(proc->frag_block->data + proc->frag_block->size, +		       frag->data, frag->size); + +		proc->frag_block->size += frag->size; +		proc->frag_block->flags |= +			(frag->flags & SQFS_BLK_DONT_COMPRESS); +	} + +	if (proc->frag_tbl != NULL) { +		err = SQFS_ERROR_ALLOC; +		chunk = calloc(1, sizeof(*chunk)); +		if (chunk == NULL) +			goto fail; + +		chunk->index = index; +		chunk->offset = offset; +		chunk->size = frag->size; +		chunk->hash = frag->checksum; + +		entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, +						     chunk, chunk); + +		if (entry == NULL) +			goto fail; +	} + +	if (frag->inode != NULL) +		sqfs_inode_set_frag_location(*(frag->inode), index, offset); + +	if (frag != proc->frag_block) +		release_old_block(proc, frag); + +	proc->stats.actual_frag_count += 1; +	return 0; +fail: +	free(chunk); +	free(frag); +	return err; +} + +static void store_io_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +{ +	sqfs_block_t *prev = NULL, *it = proc->io_queue; + +	while (it != NULL && (it->io_seq_num < blk->io_seq_num)) { +		prev = it; +		it = it->next; +	} + +	if (prev == NULL) { +		proc->io_queue = blk; +	} else { +		prev->next = blk; +	} + +	blk->next = it; +	proc->backlog += 1; +} + +int dequeue_block(sqfs_block_processor_t *proc) +{ +	size_t backlog_old = proc->backlog; +	sqfs_block_t *blk; +	int status; + +	do { +		while (proc->io_queue != NULL) { +			if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) +				break; + +			blk = proc->io_queue; +			proc->io_queue = blk->next; +			proc->io_deq_seq_num += 1; +			proc->backlog -= 1; + +			status = process_completed_block(proc, blk); +			if (status != 0) +				return status; +		} + +		if (proc->backlog < backlog_old) +			break; + +		blk = proc->pool->dequeue(proc->pool); + +		if (blk == NULL) { +			status = proc->pool->get_status(proc->pool); +			return status ? status : SQFS_ERROR_INTERNAL; +		} + +		proc->backlog -= 1; + +		if (blk->flags & SQFS_BLK_IS_FRAGMENT) { +			status = process_completed_fragment(proc, blk); +			if (status != 0) +				return status; +		} else { +			if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) +				blk->io_seq_num = proc->io_seq_num++; + +			store_io_block(proc, blk); +		} +	} while (proc->backlog >= backlog_old); + +	return 0; +} diff --git a/lib/sqfs/block_processor/block_processor.c b/lib/sqfs/block_processor/block_processor.c new file mode 100644 index 0000000..f9313db --- /dev/null +++ b/lib/sqfs/block_processor/block_processor.c @@ -0,0 +1,234 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * block_processor.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +static int process_block(void *userptr, void *workitem) +{ +	worker_data_t *worker = userptr; +	sqfs_block_t *block = workitem; +	sqfs_s32 ret; + +	if (block->size == 0) +		return 0; + +	if (!(block->flags & SQFS_BLK_IGNORE_SPARSE) && +	    is_memory_zero(block->data, block->size)) { +		block->flags |= SQFS_BLK_IS_SPARSE; +		return 0; +	} + +	if (block->flags & SQFS_BLK_DONT_HASH) { +		block->checksum = 0; +	} else { +		block->checksum = xxh32(block->data, block->size); +	} + +	if (block->flags & (SQFS_BLK_IS_FRAGMENT | SQFS_BLK_DONT_COMPRESS)) +		return 0; + +	ret = worker->cmp->do_block(worker->cmp, block->data, block->size, +				    worker->scratch, worker->scratch_size); +	if (ret < 0) +		return ret; + +	if (ret > 0) { +		memcpy(block->data, worker->scratch, ret); +		block->size = ret; +		block->flags |= SQFS_BLK_IS_COMPRESSED; +	} +	return 0; +} + +static bool chunk_info_equals(void *user, const void *k, const void *c) +{ +	const chunk_info_t *key = k, *cmp = c; +	(void)user; +	return key->size == cmp->size && key->hash == cmp->hash; +} + +static void ht_delete_function(struct hash_entry *entry) +{ +	free(entry->data); +} + +static void block_processor_destroy(sqfs_object_t *base) +{ +	sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; +	sqfs_block_t *it; + +	free(proc->frag_block); +	free(proc->blk_current); + +	while (proc->free_list != NULL) { +		it = proc->free_list; +		proc->free_list = it->next; +		free(it); +	} + +	while (proc->io_queue != NULL) { +		it = proc->io_queue; +		proc->io_queue = it->next; +		free(it); +	} + +	if (proc->frag_ht != NULL) +		hash_table_destroy(proc->frag_ht, ht_delete_function); + +	/* XXX: shut down the pool first before cleaning up the worker data */ +	if (proc->pool != NULL) +		proc->pool->destroy(proc->pool); + +	while (proc->workers != NULL) { +		worker_data_t *worker = proc->workers; +		proc->workers = worker->next; + +		sqfs_destroy(worker->cmp); +		free(worker); +	} + +	free(proc); +} + +int sqfs_block_processor_sync(sqfs_block_processor_t *proc) +{ +	int ret; + +	while (proc->backlog > 0) { +		ret = dequeue_block(proc); +		if (ret != 0) +			return ret; +	} + +	return 0; +} + +int sqfs_block_processor_finish(sqfs_block_processor_t *proc) +{ +	sqfs_block_t *blk; +	int status; + +	status = sqfs_block_processor_sync(proc); +	if (status != 0) +		return status; + +	if (proc->frag_block != NULL) { +		blk = proc->frag_block; +		blk->next = NULL; +		proc->frag_block = NULL; + +		blk->io_seq_num = proc->io_seq_num++; + +		status = enqueue_block(proc, blk); +		if (status != 0) +			return status; + +		status = sqfs_block_processor_sync(proc); +	} + +	return status; +} + +const sqfs_block_processor_stats_t +*sqfs_block_processor_get_stats(const sqfs_block_processor_t *proc) +{ +	return &proc->stats; +} + +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, +				   sqfs_block_processor_t **out) +{ +	sqfs_block_processor_t *proc; +	size_t i, count; +	int ret; + +	if (desc->size != sizeof(sqfs_block_processor_desc_t)) +		return SQFS_ERROR_ARG_INVALID; + +	proc = calloc(1, sizeof(*proc)); +	if (proc == NULL) +		return SQFS_ERROR_ALLOC; + +	proc->max_backlog = desc->max_backlog; +	proc->max_block_size = desc->max_block_size; +	proc->frag_tbl = desc->tbl; +	proc->wr = desc->wr; +	proc->file = desc->file; +	proc->uncmp = desc->uncmp; +	proc->stats.size = sizeof(proc->stats); +	((sqfs_object_t *)proc)->destroy = block_processor_destroy; + +	/* create the thread pool */ +	proc->pool = thread_pool_create(desc->num_workers, process_block); +	if (proc->pool == NULL) { +		free(proc); +		return SQFS_ERROR_INTERNAL; +	} + +	/* create the worker compressors & scratch buffer */ +	count = proc->pool->get_worker_count(proc->pool); + +	for (i = 0; i < count; ++i) { +		worker_data_t *worker = alloc_flex(sizeof(*worker), 1, +						   desc->max_block_size); +		if (worker == NULL) { +			ret = SQFS_ERROR_ALLOC; +			goto fail_pool; +		} + +		worker->scratch_size = desc->max_block_size; +		worker->next = proc->workers; +		proc->workers = worker; + +		worker->cmp = sqfs_copy(desc->cmp); +		if (worker->cmp == NULL) { +			ret = SQFS_ERROR_ALLOC; +			goto fail_pool; +		} + +		proc->pool->set_worker_ptr(proc->pool, i, worker); +	} + +	/* create the fragment hash table */ +	proc->frag_ht = hash_table_create(NULL, chunk_info_equals); +	if (proc->frag_ht == NULL) { +		ret = SQFS_ERROR_ALLOC; +		goto fail_pool; +	} + +	proc->frag_ht->user = proc; +	*out = proc; +	return 0; +fail_pool: +	block_processor_destroy((sqfs_object_t *)proc); +	return ret; +} + +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, +						    sqfs_compressor_t *cmp, +						    unsigned int num_workers, +						    size_t max_backlog, +						    sqfs_block_writer_t *wr, +						    sqfs_frag_table_t *tbl) +{ +	sqfs_block_processor_desc_t desc; +	sqfs_block_processor_t *out; + +	memset(&desc, 0, sizeof(desc)); +	desc.size = sizeof(desc); +	desc.max_block_size = max_block_size; +	desc.num_workers = num_workers; +	desc.max_backlog = max_backlog; +	desc.cmp = cmp; +	desc.wr = wr; +	desc.tbl = tbl; + +	if (sqfs_block_processor_create_ex(&desc, &out) != 0) +		return NULL; + +	return out; +} diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c deleted file mode 100644 index 6a0015b..0000000 --- a/lib/sqfs/block_processor/common.c +++ /dev/null @@ -1,421 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * common.c - * - * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -static int set_block_size(sqfs_inode_generic_t **inode, -			  sqfs_u32 index, sqfs_u32 size) -{ -	size_t min_size = (index + 1) * sizeof(sqfs_u32); -	size_t avail = (*inode)->payload_bytes_available; -	sqfs_inode_generic_t *new; -	size_t newsz; - -	if (avail < min_size) { -		newsz = avail ? avail : (sizeof(sqfs_u32) * 4); -		while (newsz < min_size) -			newsz *= 2; - -		if (SZ_ADD_OV(newsz, sizeof(**inode), &newsz)) -			return SQFS_ERROR_OVERFLOW; - -		if (sizeof(size_t) > sizeof(sqfs_u32)) { -			if ((newsz - sizeof(**inode)) > 0x0FFFFFFFFUL) -				return SQFS_ERROR_OVERFLOW; -		} - -		new = realloc((*inode), newsz); -		if (new == NULL) -			return SQFS_ERROR_ALLOC; - -		(*inode) = new; -		(*inode)->payload_bytes_available = newsz - sizeof(**inode); -	} - -	(*inode)->extra[index] = size; - -	if (min_size >= (*inode)->payload_bytes_used) -		(*inode)->payload_bytes_used = min_size; - -	return 0; -} - -static void release_old_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) -{ -	blk->next = proc->free_list; -	proc->free_list = blk; -} - -int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) -{ -	sqfs_u64 location; -	sqfs_u32 size; -	int err; - -	err = proc->wr->write_data_block(proc->wr, blk->user, blk->size, -					 blk->checksum, -					 blk->flags & ~BLK_FLAG_INTERNAL, -					 blk->data, &location); -	if (err) -		goto out; - -	proc->stats.output_bytes_generated += blk->size; - -	if (blk->flags & SQFS_BLK_IS_SPARSE) { -		if (blk->inode != NULL) { -			sqfs_inode_make_extended(*(blk->inode)); -			(*(blk->inode))->data.file_ext.sparse += blk->size; - -			err = set_block_size(blk->inode, blk->index, 0); -			if (err) -				goto out; -		} -		proc->stats.sparse_block_count += 1; -	} else if (blk->size != 0) { -		size = blk->size; -		if (!(blk->flags & SQFS_BLK_IS_COMPRESSED)) -			size |= 1 << 24; - -		if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) { -			if (proc->frag_tbl != NULL) { -				err = sqfs_frag_table_set(proc->frag_tbl, -							  blk->index, location, -							  size); -				if (err) -					goto out; -			} -			proc->stats.frag_block_count += 1; -		} else { -			if (blk->inode != NULL) { -				err = set_block_size(blk->inode, blk->index, -						     size); -				if (err) -					goto out; -			} -			proc->stats.data_block_count += 1; -		} -	} - -	if (blk->flags & SQFS_BLK_LAST_BLOCK && blk->inode != NULL) -		sqfs_inode_set_file_block_start(*(blk->inode), location); -out: -	release_old_block(proc, blk); -	return err; -} - -static bool is_zero_block(unsigned char *ptr, size_t size) -{ -	return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; -} - -static int process_block(void *userptr, void *workitem) -{ -	sqfs_compressor_t *cmp = ((worker_data_t *)userptr)->cmp; -	sqfs_u8 *scratch = ((worker_data_t *)userptr)->scratch; -	size_t scratch_size = ((worker_data_t *)userptr)->scratch_size; -	sqfs_block_t *block = workitem; -	sqfs_s32 ret; - -	if (block->size == 0) -		return 0; - -	if (!(block->flags & SQFS_BLK_IGNORE_SPARSE) && -	    is_zero_block(block->data, block->size)) { -		block->flags |= SQFS_BLK_IS_SPARSE; -		return 0; -	} - -	if (block->flags & SQFS_BLK_DONT_HASH) { -		block->checksum = 0; -	} else { -		block->checksum = xxh32(block->data, block->size); -	} - -	if (block->flags & (SQFS_BLK_IS_FRAGMENT | SQFS_BLK_DONT_COMPRESS)) -		return 0; - -	ret = cmp->do_block(cmp, block->data, block->size, -			    scratch, scratch_size); -	if (ret < 0) -		return ret; - -	if (ret > 0) { -		memcpy(block->data, scratch, ret); -		block->size = ret; -		block->flags |= SQFS_BLK_IS_COMPRESSED; -	} -	return 0; -} - -int process_completed_fragment(sqfs_block_processor_t *proc, -			       sqfs_block_t *frag, -			       sqfs_block_t **blk_out) -{ -	chunk_info_t *chunk, search; -	struct hash_entry *entry; -	sqfs_u32 index, offset; -	size_t size; -	int err; - -	if (frag->flags & SQFS_BLK_IS_SPARSE) { -		if (frag->inode != NULL) { -			sqfs_inode_make_extended(*(frag->inode)); -			set_block_size(frag->inode, frag->index, 0); -			(*(frag->inode))->data.file_ext.sparse += frag->size; -		} -		proc->stats.sparse_block_count += 1; -		release_old_block(proc, frag); -		return 0; -	} - -	proc->stats.total_frag_count += 1; - -	if (!(frag->flags & SQFS_BLK_DONT_DEDUPLICATE)) { -		search.hash = frag->checksum; -		search.size = frag->size; - -		entry = hash_table_search_pre_hashed(proc->frag_ht, -						     search.hash, &search); - -		if (entry != NULL) { -			if (frag->inode != NULL) { -				chunk = entry->data; -				sqfs_inode_set_frag_location(*(frag->inode), -							     chunk->index, -							     chunk->offset); -			} -			release_old_block(proc, frag); -			return 0; -		} -	} - -	if (proc->frag_block != NULL) { -		size = proc->frag_block->size + frag->size; - -		if (size > proc->max_block_size) { -			*blk_out = proc->frag_block; -			proc->frag_block = NULL; -		} -	} - -	if (proc->frag_block == NULL) { -		if (proc->frag_tbl == NULL) { -			index = 0; -		} else { -			err = sqfs_frag_table_append(proc->frag_tbl, -						     0, 0, &index); -			if (err) -				goto fail; -		} - -		offset = 0; -		proc->frag_block = frag; -		proc->frag_block->index = index; -		proc->frag_block->flags &= SQFS_BLK_DONT_COMPRESS; -		proc->frag_block->flags |= SQFS_BLK_FRAGMENT_BLOCK; -	} else { -		index = proc->frag_block->index; -		offset = proc->frag_block->size; - -		memcpy(proc->frag_block->data + proc->frag_block->size, -		       frag->data, frag->size); - -		proc->frag_block->size += frag->size; -		proc->frag_block->flags |= -			(frag->flags & SQFS_BLK_DONT_COMPRESS); -		release_old_block(proc, frag); -	} - -	if (proc->frag_tbl != NULL) { -		err = SQFS_ERROR_ALLOC; -		chunk = calloc(1, sizeof(*chunk)); -		if (chunk == NULL) -			goto fail_outblk; - -		chunk->index = index; -		chunk->offset = offset; -		chunk->size = frag->size; -		chunk->hash = frag->checksum; - -		entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash, -						     chunk, chunk); - -		if (entry == NULL) { -			free(chunk); -			goto fail_outblk; -		} -	} - -	if (frag->inode != NULL) -		sqfs_inode_set_frag_location(*(frag->inode), index, offset); - -	proc->stats.actual_frag_count += 1; -	return 0; -fail: -	release_old_block(proc, frag); -fail_outblk: -	if (*blk_out != NULL) { -		release_old_block(proc, *blk_out); -		*blk_out = NULL; -	} -	return err; -} - -static uint32_t chunk_info_hash(void *user, const void *key) -{ -	const chunk_info_t *chunk = key; -	(void)user; -	return chunk->hash; -} - -static bool chunk_info_equals(void *user, const void *k, const void *c) -{ -	const chunk_info_t *key = k, *cmp = c; -	(void)user; -	return key->size == cmp->size && key->hash == cmp->hash; -} - -static void ht_delete_function(struct hash_entry *entry) -{ -	free(entry->data); -} - -static void block_processor_destroy(sqfs_object_t *base) -{ -	sqfs_block_processor_t *proc = (sqfs_block_processor_t *)base; -	sqfs_block_t *it; - -	if (proc->frag_block != NULL) -		release_old_block(proc, proc->frag_block); - -	free(proc->blk_current); - -	while (proc->free_list != NULL) { -		it = proc->free_list; -		proc->free_list = it->next; -		free(it); -	} - -	hash_table_destroy(proc->frag_ht, ht_delete_function); - -	/* XXX: shut down the pool first before cleaning up the worker data */ -	proc->pool->destroy(proc->pool); - -	while (proc->workers != NULL) { -		worker_data_t *worker = proc->workers; -		proc->workers = worker->next; - -		sqfs_destroy(worker->cmp); -		free(worker); -	} - -	free(proc); -} - -int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, -				   sqfs_block_processor_t **out) -{ -	sqfs_block_processor_t *proc; -	size_t i, count; -	int ret; - -	if (desc->size != sizeof(sqfs_block_processor_desc_t)) -		return SQFS_ERROR_ARG_INVALID; - -	proc = calloc(1, sizeof(*proc)); -	if (proc == NULL) -		return SQFS_ERROR_ALLOC; - -	proc->max_backlog = desc->max_backlog; -	proc->max_block_size = desc->max_block_size; -	proc->frag_tbl = desc->tbl; -	proc->wr = desc->wr; -	proc->file = desc->file; -	proc->uncmp = desc->uncmp; -	proc->stats.size = sizeof(proc->stats); -	((sqfs_object_t *)proc)->destroy = block_processor_destroy; - -	/* create the thread pool */ -	proc->pool = thread_pool_create(desc->num_workers, process_block); -	if (proc->pool == NULL) { -		free(proc); -		return SQFS_ERROR_INTERNAL; -	} - -	/* create the worker compressors & scratch buffer */ -	count = proc->pool->get_worker_count(proc->pool); - -	for (i = 0; i < count; ++i) { -		worker_data_t *worker = alloc_flex(sizeof(*worker), 1, -						   desc->max_block_size); -		if (worker == NULL) { -			ret = SQFS_ERROR_ALLOC; -			goto fail_pool; -		} - -		worker->scratch_size = desc->max_block_size; -		worker->next = proc->workers; -		proc->workers = worker; - -		worker->cmp = sqfs_copy(desc->cmp); -		if (worker->cmp == NULL) { -			ret = SQFS_ERROR_ALLOC; -			goto fail_pool; -		} - -		proc->pool->set_worker_ptr(proc->pool, i, worker); -	} - -	/* create the fragment hash table */ -	proc->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); -	if (proc->frag_ht == NULL) { -		ret = SQFS_ERROR_ALLOC; -		goto fail_pool; -	} - -	proc->frag_ht->user = proc; -	*out = proc; -	return 0; -fail_pool: -	proc->pool->destroy(proc->pool); -	while (proc->workers != NULL) { -		worker_data_t *worker = proc->workers; -		proc->workers = worker->next; - -		if (worker->cmp != NULL) -			sqfs_destroy(worker->cmp); - -		free(worker); -	} -	free(proc); -	return ret; -} - -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -						    sqfs_compressor_t *cmp, -						    unsigned int num_workers, -						    size_t max_backlog, -						    sqfs_block_writer_t *wr, -						    sqfs_frag_table_t *tbl) -{ -	sqfs_block_processor_desc_t desc; -	sqfs_block_processor_t *out; - -	memset(&desc, 0, sizeof(desc)); -	desc.size = sizeof(desc); -	desc.max_block_size = max_block_size; -	desc.num_workers = num_workers; -	desc.max_backlog = max_backlog; -	desc.cmp = cmp; -	desc.wr = wr; -	desc.tbl = tbl; - -	if (sqfs_block_processor_create_ex(&desc, &out) != 0) -		return NULL; - -	return out; -} diff --git a/lib/sqfs/block_processor/frontend.c b/lib/sqfs/block_processor/frontend.c index 8bd6cf2..ccf54c0 100644 --- a/lib/sqfs/block_processor/frontend.c +++ b/lib/sqfs/block_processor/frontend.c @@ -7,123 +7,46 @@  #define SQFS_BUILDING_DLL  #include "internal.h" -static sqfs_block_t *get_new_block(sqfs_block_processor_t *proc) +static int get_new_block(sqfs_block_processor_t *proc, sqfs_block_t **out)  {  	sqfs_block_t *blk; +	while (proc->backlog >= proc->max_backlog) { +		int ret = dequeue_block(proc); +		if (ret != 0) +			return ret; +	} +  	if (proc->free_list != NULL) {  		blk = proc->free_list;  		proc->free_list = blk->next;  	} else {  		blk = malloc(sizeof(*blk) + proc->max_block_size); +		if (blk == NULL) +			return SQFS_ERROR_ALLOC;  	} -	if (blk != NULL) -		memset(blk, 0, sizeof(*blk)); - -	return blk; +	memset(blk, 0, sizeof(*blk)); +	*out = blk; +	return 0;  } -static int dequeue_block(sqfs_block_processor_t *proc) +static int add_sentinel_block(sqfs_block_processor_t *proc)  { -	sqfs_block_t *blk, *fragblk, *it, *prev; -	bool have_dequeued = false; -	int status; -retry: -	while (proc->io_queue != NULL) { -		if (proc->io_queue->io_seq_num != proc->io_deq_seq_num) -			break; - -		blk = proc->io_queue; -		proc->io_queue = blk->next; -		proc->io_deq_seq_num += 1; -		proc->backlog -= 1; -		have_dequeued = true; - -		status = process_completed_block(proc, blk); -		if (status != 0) -			return status; -	} - -	if (have_dequeued) -		return 0; - -	blk = proc->pool->dequeue(proc->pool); - -	if (blk == NULL) { -		status = proc->pool->get_status(proc->pool); -		if (status == 0) -			status = SQFS_ERROR_INTERNAL; - -		return status; -	} - -	proc->backlog -= 1; -	have_dequeued = true; - -	if (blk->flags & SQFS_BLK_IS_FRAGMENT) { -		fragblk = NULL; -		status = process_completed_fragment(proc, blk, &fragblk); - -		if (status != 0) { -			free(fragblk); -			return status; -		} - -		if (fragblk != NULL) { -			fragblk->io_seq_num = proc->io_seq_num++; - -			if (proc->pool->submit(proc->pool, fragblk) != 0) { -				free(fragblk); - -				if (status == 0) { -					status = proc->pool-> -						get_status(proc->pool); - -					if (status == 0) -						status = SQFS_ERROR_ALLOC; -				} - -				return status; -			} - -			proc->backlog += 1; -			have_dequeued = false; -		} -	} else { -		if (!(blk->flags & SQFS_BLK_FRAGMENT_BLOCK)) -			blk->io_seq_num = proc->io_seq_num++; - -		prev = NULL; -		it = proc->io_queue; - -		while (it != NULL) { -			if (it->io_seq_num >= blk->io_seq_num) -				break; - -			prev = it; -			it = it->next; -		} - -		if (prev == NULL) { -			blk->next = proc->io_queue; -			proc->io_queue = blk; -		} else { -			blk->next = prev->next; -			prev->next = blk; -		} +	sqfs_block_t *blk; +	int ret; -		proc->backlog += 1; -		have_dequeued = false; -	} +	ret = get_new_block(proc, &blk); +	if (ret != 0) +		return ret; -	if (!have_dequeued) -		goto retry; +	blk->inode = proc->inode; +	blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; -	return 0; +	return enqueue_block(proc, blk);  } -static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk) +int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)  {  	int status; @@ -141,44 +64,6 @@ static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)  	return 0;  } -static int add_sentinel_block(sqfs_block_processor_t *proc) -{ -	sqfs_block_t *blk; -	int ret; - -	if (proc->backlog == proc->max_backlog) { -		ret = dequeue_block(proc); -		if (ret != 0) -			return ret; -	} - -	blk = get_new_block(proc); -	if (blk == NULL) -		return SQFS_ERROR_ALLOC; - -	blk->inode = proc->inode; -	blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - -	return enqueue_block(proc, blk); -} - -static int flush_block(sqfs_block_processor_t *proc) -{ -	sqfs_block_t *block; -	int ret; - -	if (proc->backlog == proc->max_backlog) { -		ret = dequeue_block(proc); -		if (ret != 0) -			return ret; -	} - -	block = proc->blk_current; -	proc->blk_current = NULL; - -	return enqueue_block(proc, block); -} -  int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,  				    sqfs_inode_generic_t **inode,  				    void *user, sqfs_u32 flags) @@ -224,9 +109,9 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data,  	while (size > 0) {  		if (proc->blk_current == NULL) { -			new = get_new_block(proc); -			if (new == NULL) -				return SQFS_ERROR_ALLOC; +			err = get_new_block(proc, &new); +			if (err != 0) +				return err;  			proc->blk_current = new;  			proc->blk_current->flags = proc->blk_flags; @@ -239,7 +124,9 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data,  		diff = proc->max_block_size - proc->blk_current->size;  		if (diff == 0) { -			err = flush_block(proc); +			err = enqueue_block(proc, proc->blk_current); +			proc->blk_current = NULL; +  			if (err)  				return err;  			continue; @@ -258,9 +145,12 @@ int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data,  		proc->stats.input_bytes_read += diff;  	} -	if (proc->blk_current != NULL && -	    proc->blk_current->size == proc->max_block_size) { -		return flush_block(proc); +	if (proc->blk_current->size == proc->max_block_size) { +		err = enqueue_block(proc, proc->blk_current); +		proc->blk_current = NULL; + +		if (err) +			return err;  	}  	return 0; @@ -293,7 +183,9 @@ int sqfs_block_processor_end_file(sqfs_block_processor_t *proc)  			proc->blk_current->flags |= SQFS_BLK_IS_FRAGMENT;  		} -		err = flush_block(proc); +		err = enqueue_block(proc, proc->blk_current); +		proc->blk_current = NULL; +  		if (err)  			return err;  	} @@ -321,78 +213,14 @@ int sqfs_block_processor_submit_block(sqfs_block_processor_t *proc, void *user,  	if (flags & ~SQFS_BLK_FLAGS_ALL)  		return SQFS_ERROR_UNSUPPORTED; -	if (proc->backlog == proc->max_backlog) { -		ret = dequeue_block(proc); -		if (ret != 0) -			return ret; -	} - -	blk = get_new_block(proc); -	if (blk == NULL) -		return SQFS_ERROR_ALLOC; +	ret = get_new_block(proc, &blk); +	if (ret != 0) +		return ret;  	blk->flags = flags | BLK_FLAG_MANUAL_SUBMISSION;  	blk->user = user;  	blk->size = size;  	memcpy(blk->data, data, size); -	ret = proc->pool->submit(proc->pool, blk); -	if (ret != 0) -		free(blk); - -	proc->backlog += 1; -	return ret; -} - -int sqfs_block_processor_sync(sqfs_block_processor_t *proc) -{ -	int ret; - -	while (proc->backlog > 0) { -		ret = dequeue_block(proc); -		if (ret != 0) -			return ret; -	} - -	return 0; -} - -int sqfs_block_processor_finish(sqfs_block_processor_t *proc) -{ -	sqfs_block_t *blk; -	int status; - -	status = sqfs_block_processor_sync(proc); -	if (status != 0) -		return status; - -	if (proc->frag_block != NULL) { -		blk = proc->frag_block; -		blk->next = NULL; -		proc->frag_block = NULL; - -		blk->io_seq_num = proc->io_seq_num++; - -		status = proc->pool->submit(proc->pool, blk); -		if (status != 0) { -			status = proc->pool->get_status(proc->pool); - -			if (status == 0) -				status = SQFS_ERROR_ALLOC; - -			free(blk); -			return status; -		} - -		proc->backlog += 1; -		status = sqfs_block_processor_sync(proc); -	} - -	return status; -} - -const sqfs_block_processor_stats_t -*sqfs_block_processor_get_stats(const sqfs_block_processor_t *proc) -{ -	return &proc->stats; +	return enqueue_block(proc, blk);  } diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index 2393380..ef8ff0f 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -25,7 +25,6 @@  #include <string.h>  #include <stdlib.h> -#include <assert.h>  typedef struct {  	sqfs_u32 index; @@ -96,19 +95,14 @@ struct sqfs_block_processor_t {  	thread_pool_t *pool;  	worker_data_t *workers; - -  	sqfs_block_t *io_queue;  	sqfs_u32 io_seq_num;  	sqfs_u32 io_deq_seq_num;  }; -SQFS_INTERNAL -int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk); +SQFS_INTERNAL int enqueue_block(sqfs_block_processor_t *proc, +				sqfs_block_t *blk); -SQFS_INTERNAL -int process_completed_fragment(sqfs_block_processor_t *proc, -			       sqfs_block_t *frag, -			       sqfs_block_t **blk_out); +SQFS_INTERNAL int dequeue_block(sqfs_block_processor_t *proc);  #endif /* INTERNAL_H */ | 
