diff options
Diffstat (limited to 'lib/sqfs/block_processor')
| -rw-r--r-- | lib/sqfs/block_processor/common.c | 130 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/internal.h | 19 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/serial.c | 43 | ||||
| -rw-r--r-- | lib/sqfs/block_processor/winpthread.c | 127 | 
4 files changed, 267 insertions, 52 deletions
| diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c index b2657e6..d6c0889 100644 --- a/lib/sqfs/block_processor/common.c +++ b/lib/sqfs/block_processor/common.c @@ -176,8 +176,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,  		search.hash = frag->checksum;  		search.size = frag->size; +		proc->frag_cmp_current = frag;  		entry = hash_table_search_pre_hashed(proc->frag_ht,  						     search.hash, &search); +		proc->frag_cmp_current = NULL; +  		if (entry != NULL) {  			if (frag->inode != NULL) {  				chunk = entry->data; @@ -238,8 +241,11 @@ static int process_completed_fragment(sqfs_block_processor_t *proc,  		chunk->size = frag->size;  		chunk->hash = frag->checksum; +		proc->frag_cmp_current = frag;  		entry = hash_table_insert_pre_hashed(proc->frag_ht, chunk->hash,  						     chunk, chunk); +		proc->frag_cmp_current = NULL; +  		if (entry == NULL) {  			free(chunk);  			goto fail_outblk; @@ -261,17 +267,76 @@ fail_outblk:  	return err;  } -static uint32_t chunk_info_hash(const void *key) +static uint32_t chunk_info_hash(void *user, const void *key)  {  	const chunk_info_t *chunk = key; +	(void)user;  	return chunk->hash;  } -static bool chunk_info_equals(const void *a, const void *b) +static bool chunk_info_equals(void *user, const void *k, const void *c)  { -	const chunk_info_t *a_ = a, *b_ = b; -	return a_->size == b_->size && -	       a_->hash == b_->hash; +	const chunk_info_t *key = k, *cmp = c; +	sqfs_block_processor_t *proc = user; +	sqfs_fragment_t frag; +	unsigned char *temp; +	size_t size; +	int ret; + +	if (key->size != cmp->size || key->hash != cmp->hash) +		return false; + +	if (proc->file == NULL || proc->uncmp == NULL) +		return true; + +	ret = proc->compare_frag_in_flight(proc, proc->frag_cmp_current, +					   cmp->index, cmp->offset); +	if (ret == 0) +		return true; + +	if (proc->buffered_index != cmp->index || +	    proc->buffered_blk_size == 0) { +		if (sqfs_frag_table_lookup(proc->frag_tbl, cmp->index, &frag)) +			return false; + +		proc->buffered_index = 0xFFFFFFFF; +		size = SQFS_ON_DISK_BLOCK_SIZE(frag.size); + +		if (SQFS_IS_BLOCK_COMPRESSED(frag.size)) { +			temp = proc->frag_buffer + proc->max_block_size; + +			ret = proc->file->read_at(proc->file, frag.start_offset, +						  temp, size); +			if (ret != 0) +				return false; + +			ret = proc->uncmp->do_block(proc->uncmp, temp, size, +						    proc->frag_buffer, +						    proc->max_block_size); +			if (ret <= 0) +				return false; + +			size = ret; +		} else { +			ret = proc->file->read_at(proc->file, frag.start_offset, +						  proc->frag_buffer, size); +			if (ret != 0) +				return false; +		} + +		proc->buffered_index = cmp->index; +		proc->buffered_blk_size = size; +	} + +	if (cmp->offset >= proc->buffered_blk_size) +		return false; + +	if (cmp->size > (proc->buffered_blk_size - cmp->offset)) +		return false; + +	return memcmp(proc->frag_buffer + cmp->offset, +		      proc->frag_cmp_current->data, +		      cmp->size) == 0;  }  static void ht_delete_function(struct hash_entry *entry) @@ -287,6 +352,7 @@ void block_processor_cleanup(sqfs_block_processor_t *base)  		release_old_block(base, base->frag_block);  	free(base->blk_current); +	free(base->frag_buffer);  	while (base->free_list != NULL) {  		it = base->free_list; @@ -297,22 +363,58 @@ void block_processor_cleanup(sqfs_block_processor_t *base)  	hash_table_destroy(base->frag_ht, ht_delete_function);  } -int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size, -			 sqfs_compressor_t *cmp, sqfs_block_writer_t *wr, -			 sqfs_frag_table_t *tbl) +int block_processor_init(sqfs_block_processor_t *base, +			 const sqfs_block_processor_desc_t *desc)  {  	base->process_completed_block = process_completed_block;  	base->process_completed_fragment = process_completed_fragment;  	base->process_block = process_block; -	base->max_block_size = max_block_size; -	base->cmp = cmp; -	base->frag_tbl = tbl; -	base->wr = wr; +	base->max_block_size = desc->max_block_size; +	base->cmp = desc->cmp; +	base->frag_tbl = desc->tbl; +	base->wr = desc->wr; +	base->file = desc->file; +	base->uncmp = desc->uncmp; +	base->buffered_index = 0xFFFFFFFF;  	base->stats.size = sizeof(base->stats); +	if (desc->file != NULL && desc->uncmp != NULL && desc->tbl != NULL) { +		base->frag_buffer = malloc(2 * desc->max_block_size); +		if (base->frag_buffer == NULL) +			return SQFS_ERROR_ALLOC; +	} +  	base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals); -	if (base->frag_ht == NULL) -		return -1; +	if (base->frag_ht == NULL) { +		free(base->frag_buffer); +		return SQFS_ERROR_ALLOC; +	} +	base->frag_ht->user = base;  	return 0;  } + +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, +						    sqfs_compressor_t *cmp, +						    unsigned int num_workers, +						    size_t max_backlog, +						    sqfs_block_writer_t *wr, +						    sqfs_frag_table_t *tbl) +{ +	sqfs_block_processor_desc_t desc; +	sqfs_block_processor_t *out; + +	memset(&desc, 0, sizeof(desc)); +	desc.size = sizeof(desc); +	desc.max_block_size = max_block_size; +	desc.num_workers = num_workers; +	desc.max_backlog = max_backlog; +	desc.cmp = cmp; +	desc.wr = wr; +	desc.tbl = tbl; + +	if (sqfs_block_processor_create_ex(&desc, &out) != 0) +		return NULL; + +	return out; +} diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h index ee76946..4699dc6 100644 --- a/lib/sqfs/block_processor/internal.h +++ b/lib/sqfs/block_processor/internal.h @@ -26,8 +26,7 @@  #include <stdlib.h>  #include <assert.h> -typedef struct chunk_info_t { -	struct chunk_info_t *next; +typedef struct {  	sqfs_u32 index;  	sqfs_u32 offset;  	sqfs_u32 size; @@ -82,6 +81,13 @@ struct sqfs_block_processor_t {  	bool begin_called; +	sqfs_file_t *file; +	sqfs_compressor_t *uncmp; +	sqfs_block_t *frag_cmp_current; +	sqfs_u8 *frag_buffer; +	sqfs_u32 buffered_index; +	sqfs_u32 buffered_blk_size; +  	int (*process_completed_block)(sqfs_block_processor_t *proc,  				       sqfs_block_t *block); @@ -92,6 +98,10 @@ struct sqfs_block_processor_t {  	int (*process_block)(sqfs_block_t *block, sqfs_compressor_t *cmp,  			     sqfs_u8 *scratch, size_t scratch_size); +	int (*compare_frag_in_flight)(sqfs_block_processor_t *proc, +				      sqfs_block_t *frag, sqfs_u32 index, +				      sqfs_u32 offset); +  	int (*append_to_work_queue)(sqfs_block_processor_t *proc,  				    sqfs_block_t *block); @@ -101,9 +111,6 @@ struct sqfs_block_processor_t {  SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base);  SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base, -				       size_t max_block_size, -				       sqfs_compressor_t *cmp, -				       sqfs_block_writer_t *wr, -				       sqfs_frag_table_t *tbl); +				       const sqfs_block_processor_desc_t *desc);  #endif /* INTERNAL_H */ diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c index 4d6b3ec..b20d5b0 100644 --- a/lib/sqfs/block_processor/serial.c +++ b/lib/sqfs/block_processor/serial.c @@ -61,27 +61,46 @@ static int block_processor_sync(sqfs_block_processor_t *proc)  	return ((serial_block_processor_t *)proc)->status;  } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -						    sqfs_compressor_t *cmp, -						    unsigned int num_workers, -						    size_t max_backlog, -						    sqfs_block_writer_t *wr, -						    sqfs_frag_table_t *tbl) +static int compare_frag_in_flight(sqfs_block_processor_t *proc, +				  sqfs_block_t *frag, sqfs_u32 index, +				  sqfs_u32 offset) +{ +	if (proc->frag_block == NULL || index != proc->frag_block->index) +		return -1; + +	if (offset >= proc->frag_block->size) +		return -1; + +	if (frag->size > (proc->frag_block->size - offset)) +		return -1; + +	return memcmp(proc->frag_block->data + offset, frag->data, frag->size); +} + +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, +				   sqfs_block_processor_t **out)  {  	serial_block_processor_t *proc; -	(void)num_workers; (void)max_backlog; +	int ret; -	proc = alloc_flex(sizeof(*proc), 1, max_block_size); +	if (desc->size != sizeof(sqfs_block_processor_desc_t)) +		return SQFS_ERROR_ARG_INVALID; + +	proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size);  	if (proc == NULL) -		return NULL; +		return SQFS_ERROR_ALLOC; -	if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { +	ret = block_processor_init(&proc->base, desc); +	if (ret != 0) {  		free(proc); -		return NULL; +		return ret;  	}  	proc->base.sync = block_processor_sync;  	proc->base.append_to_work_queue = append_to_work_queue; +	proc->base.compare_frag_in_flight = compare_frag_in_flight;  	((sqfs_object_t *)proc)->destroy = block_processor_destroy; -	return (sqfs_block_processor_t *)proc; + +	*out = (sqfs_block_processor_t *)proc; +	return 0;  } diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 3531a3b..806c749 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -78,6 +78,8 @@ struct compress_worker_t {  	thread_pool_processor_t *shared;  	sqfs_compressor_t *cmp;  	THREAD_HANDLE thread; +	sqfs_u32 frag_blk_index; +	sqfs_u32 frag_blk_size;  	sqfs_u8 scratch[];  }; @@ -178,6 +180,14 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)  			store_completed_block(shared, blk, status);  		blk = get_next_work_item(shared); + +		if (blk != NULL && (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) && +		    shared->base.uncmp != NULL && shared->base.file != NULL) { +			memcpy(worker->scratch + shared->base.max_block_size, +			       blk->data, blk->size); +			worker->frag_blk_index = blk->index; +			worker->frag_blk_size = blk->size; +		}  		UNLOCK(&shared->mtx);  		if (blk == NULL) @@ -388,40 +398,105 @@ static int append_to_work_queue(sqfs_block_processor_t *proc,  	return status;  } +static sqfs_block_t *find_frag_blk_in_queue(sqfs_block_t *q, sqfs_u32 index) +{ +	while (q != NULL) { +		if ((q->flags & SQFS_BLK_FRAGMENT_BLOCK) && q->index == index) +			break; +		q = q->next; +	} +	return q; +} + +static int compare_frag_in_flight(sqfs_block_processor_t *proc, +				  sqfs_block_t *frag, sqfs_u32 index, +				  sqfs_u32 offset) +{ +	thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc; +	sqfs_block_t *it = NULL; +	void *blockbuf = NULL; +	size_t i, size = 0; +	int ret; + +	if (proc->frag_block != NULL && proc->frag_block->index == index) +		it = proc->frag_block; + +	if (it == NULL) +		it = find_frag_blk_in_queue(thproc->proc_queue, index); + +	if (it == NULL) +		it = find_frag_blk_in_queue(thproc->io_queue, index); + +	if (it == NULL) +		it = find_frag_blk_in_queue(thproc->done, index); + +	if (it == NULL) { +		for (i = 0; i < thproc->num_workers; ++i) { +			if (thproc->workers[i]->frag_blk_index == index) { +				size = thproc->workers[i]->frag_blk_size; +				blockbuf = thproc->workers[i]->scratch + +					proc->max_block_size; +				break; +			} +		} +	} else if (it->flags & SQFS_BLK_IS_COMPRESSED) { +		proc->buffered_index = 0xFFFFFFFF; +		blockbuf = proc->frag_buffer; +		ret = proc->uncmp->do_block(proc->uncmp, it->data, it->size, +					    blockbuf, proc->max_block_size); +		if (ret <= 0) +			return -1; +		proc->buffered_index = it->index; +		size = ret; +	} else { +		blockbuf = it->data; +		size = it->size; +	} + +	if (blockbuf == NULL || size == 0) +		return -1; + +	if (offset >= size || frag->size > (size - offset)) +		return -1; + +	return memcmp((const char *)blockbuf + offset, frag->data, frag->size); +} +  static int block_processor_sync(sqfs_block_processor_t *proc)  {  	return append_to_work_queue(proc, NULL);  } -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -						    sqfs_compressor_t *cmp, -						    unsigned int num_workers, -						    size_t max_backlog, -						    sqfs_block_writer_t *wr, -						    sqfs_frag_table_t *tbl) +int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc, +				   sqfs_block_processor_t **out)  {  	thread_pool_processor_t *proc; +	unsigned int i, num_workers; +	size_t scratch_size;  	sigset_t oldset; -	unsigned int i;  	int ret; -	if (num_workers < 1) -		num_workers = 1; +	if (desc->size != sizeof(sqfs_block_processor_desc_t)) +		return SQFS_ERROR_ARG_INVALID; + +	num_workers = desc->num_workers < 1 ? 1 : desc->num_workers;  	proc = alloc_flex(sizeof(*proc),  			  sizeof(proc->workers[0]), num_workers);  	if (proc == NULL) -		return NULL; +		return SQFS_ERROR_ALLOC; -	if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { +	ret = block_processor_init(&proc->base, desc); +	if (ret != 0) {  		free(proc); -		return NULL; +		return ret;  	}  	proc->base.sync = block_processor_sync;  	proc->base.append_to_work_queue = append_to_work_queue; +	proc->base.compare_frag_in_flight = compare_frag_in_flight;  	proc->num_workers = num_workers; -	proc->max_backlog = max_backlog; +	proc->max_backlog = desc->max_backlog;  	((sqfs_object_t *)proc)->destroy = block_processor_destroy;  	MUTEX_INIT(&proc->mtx); @@ -431,28 +506,40 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,  	SIGNAL_DISABLE(&oldset);  	for (i = 0; i < num_workers; ++i) { +		scratch_size = desc->max_block_size; +		if (desc->uncmp != NULL && desc->file != NULL) +			scratch_size *= 2; +  		proc->workers[i] = alloc_flex(sizeof(compress_worker_t), -					      1, max_block_size); +					      1, scratch_size); -		if (proc->workers[i] == NULL) +		if (proc->workers[i] == NULL) { +			ret = SQFS_ERROR_ALLOC;  			goto fail; +		}  		proc->workers[i]->shared = proc; -		proc->workers[i]->cmp = sqfs_copy(cmp); +		proc->workers[i]->cmp = sqfs_copy(desc->cmp); +		proc->workers[i]->frag_blk_index = 0xFFFFFFFF; -		if (proc->workers[i]->cmp == NULL) +		if (proc->workers[i]->cmp == NULL) { +			ret = SQFS_ERROR_ALLOC;  			goto fail; +		}  		ret = THREAD_CREATE(&proc->workers[i]->thread,  				    worker_proc, proc->workers[i]); -		if (ret != 0) +		if (ret != 0) { +			ret = SQFS_ERROR_INTERNAL;  			goto fail; +		}  	}  	SIGNAL_ENABLE(&oldset); -	return (sqfs_block_processor_t *)proc; +	*out = (sqfs_block_processor_t *)proc; +	return 0;  fail:  	SIGNAL_ENABLE(&oldset);  	block_processor_destroy((sqfs_object_t *)proc); -	return NULL; +	return ret;  } | 
