diff options
Diffstat (limited to 'lib/sqfs/block_processor/winpthread.c')
| -rw-r--r-- | lib/sqfs/block_processor/winpthread.c | 267 | 
1 files changed, 135 insertions, 132 deletions
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c index 281eb6b..44c62f1 100644 --- a/lib/sqfs/block_processor/winpthread.c +++ b/lib/sqfs/block_processor/winpthread.c @@ -142,6 +142,7 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)  {  	compress_worker_t *worker = arg;  	thread_pool_processor_t *shared = worker->shared; +	sqfs_block_processor_t *proc = (sqfs_block_processor_t *)shared;  	sqfs_block_t *blk = NULL;  	int status = 0; @@ -156,9 +157,8 @@ static THREAD_TYPE worker_proc(THREAD_ARG arg)  		if (blk == NULL)  			break; -		status = block_processor_do_block(blk, worker->cmp, -						  worker->scratch, -						  shared->base.max_block_size); +		status = proc->process_block(blk, worker->cmp, worker->scratch, +					     proc->max_block_size);  	}  	return THREAD_EXIT_SUCCESS; @@ -198,127 +198,6 @@ static void block_processor_destroy(sqfs_object_t *obj)  	free(proc);  } -static thread_pool_processor_t *block_processor_create(size_t max_block_size, -						       sqfs_compressor_t *cmp, -						       unsigned int num_workers, -						       size_t max_backlog, -						       sqfs_block_writer_t *wr, -						       sqfs_frag_table_t *tbl) -{ -	thread_pool_processor_t *proc; -	unsigned int i; - -	if (num_workers < 1) -		num_workers = 1; - -	proc = alloc_flex(sizeof(*proc), -			  sizeof(proc->workers[0]), num_workers); -	if (proc == NULL) -		return NULL; - -	proc->num_workers = num_workers; -	proc->max_backlog = max_backlog; -	proc->base.max_block_size = max_block_size; -	proc->base.cmp = cmp; -	proc->base.frag_tbl = tbl; -	proc->base.wr = wr; -	proc->base.stats.size = sizeof(proc->base.stats); -	((sqfs_object_t *)proc)->destroy = block_processor_destroy; - -	for (i = 0; i < num_workers; ++i) { -		proc->workers[i] = alloc_flex(sizeof(compress_worker_t), -					      1, max_block_size); - -		if (proc->workers[i] == NULL) -			goto fail; - -		proc->workers[i]->shared = proc; -		proc->workers[i]->cmp = sqfs_copy(cmp); - -		if (proc->workers[i]->cmp == NULL) -			goto fail; -	} - -	return proc; -fail: -	block_processor_destroy((sqfs_object_t *)proc); -	return NULL; -} - -#if defined(_WIN32) || defined(__WINDOWS__) -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -						    sqfs_compressor_t *cmp, -						    unsigned int num_workers, -						    size_t max_backlog, -						    sqfs_block_writer_t *wr, -						    sqfs_frag_table_t *tbl) -{ -	thread_pool_processor_t *proc; -	unsigned int i; - -	proc = block_processor_create(max_block_size, cmp, num_workers, -				      max_backlog, wr, tbl); -	if (proc == NULL) -		return NULL; - -	InitializeCriticalSection(&proc->mtx); -	InitializeConditionVariable(&proc->queue_cond); -	InitializeConditionVariable(&proc->done_cond); - -	for (i = 0; i < num_workers; ++i) { -		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, -							proc->workers[i], 0, 0); -		if (proc->workers[i]->thread == NULL) -			goto fail; -	} - -	return (sqfs_block_processor_t *)proc; -fail: -	block_processor_destroy((sqfs_object_t *)proc); -	return NULL; -} -#else -sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, -						    sqfs_compressor_t *cmp, -						    unsigned int num_workers, -						    size_t max_backlog, -						    sqfs_block_writer_t *wr, -						    sqfs_frag_table_t *tbl) -{ -	thread_pool_processor_t *proc; -	sigset_t set, oldset; -	unsigned int i; -	int ret; - -	proc = block_processor_create(max_block_size, cmp, num_workers, -				      max_backlog, wr, tbl); -	if (proc == NULL) -		return NULL; - -	proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; -	proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; -	proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - -	sigfillset(&set); -	pthread_sigmask(SIG_SETMASK, &set, &oldset); - -	for (i = 0; i < num_workers; ++i) { -		ret = pthread_create(&proc->workers[i]->thread, NULL, -				     worker_proc, proc->workers[i]); - -		if (ret != 0) -			goto fail; -	} - -	pthread_sigmask(SIG_SETMASK, &oldset, NULL); -	return (sqfs_block_processor_t *)proc; -fail: -	pthread_sigmask(SIG_SETMASK, &oldset, NULL); -	block_processor_destroy((sqfs_object_t *)proc); -	return NULL; -} -#endif -  static void store_io_block(thread_pool_processor_t *proc, sqfs_block_t *blk)  {  	sqfs_block_t *it = proc->io_queue, *prev = NULL; @@ -397,7 +276,7 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)  	while (status == 0 && list != NULL) {  		it = list;  		list = list->next; -		status = process_completed_block(&proc->base, it); +		status = proc->base.process_completed_block(&proc->base, it);  		if (status != 0) {  			LOCK(&proc->mtx); @@ -411,7 +290,8 @@ static int handle_io_queue(thread_pool_processor_t *proc, sqfs_block_t *list)  	return status;  } -int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block) +static int append_to_work_queue(sqfs_block_processor_t *proc, +				sqfs_block_t *block)  {  	thread_pool_processor_t *thproc = (thread_pool_processor_t *)proc;  	sqfs_block_t *io_list = NULL, *io_list_last = NULL; @@ -454,8 +334,9 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)  		if (blk->flags & SQFS_BLK_IS_FRAGMENT) {  			fragblk = NULL; -			thproc->status = process_completed_fragment(proc, blk, -								    &fragblk); +			thproc->status = +				proc->process_completed_fragment(proc, blk, +								 &fragblk);  			if (fragblk != NULL) {  				fragblk->io_seq_num = thproc->io_enq_id++; @@ -481,6 +362,54 @@ int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)  	return status;  } +static thread_pool_processor_t *block_processor_create(size_t max_block_size, +						       sqfs_compressor_t *cmp, +						       unsigned int num_workers, +						       size_t max_backlog, +						       sqfs_block_writer_t *wr, +						       sqfs_frag_table_t *tbl) +{ +	thread_pool_processor_t *proc; +	unsigned int i; + +	if (num_workers < 1) +		num_workers = 1; + +	proc = alloc_flex(sizeof(*proc), +			  sizeof(proc->workers[0]), num_workers); +	if (proc == NULL) +		return NULL; + +	if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) { +		free(proc); +		return NULL; +	} + +	proc->base.append_to_work_queue = append_to_work_queue; +	proc->num_workers = num_workers; +	proc->max_backlog = max_backlog; +	((sqfs_object_t *)proc)->destroy = block_processor_destroy; + +	for (i = 0; i < num_workers; ++i) { +		proc->workers[i] = alloc_flex(sizeof(compress_worker_t), +					      1, max_block_size); + +		if (proc->workers[i] == NULL) +			goto fail; + +		proc->workers[i]->shared = proc; +		proc->workers[i]->cmp = sqfs_copy(cmp); + +		if (proc->workers[i]->cmp == NULL) +			goto fail; +	} + +	return proc; +fail: +	block_processor_destroy((sqfs_object_t *)proc); +	return NULL; +} +  int sqfs_block_processor_sync(sqfs_block_processor_t *proc)  {  	return append_to_work_queue(proc, NULL); @@ -492,16 +421,16 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)  	sqfs_block_t *blk;  	int status; -	status = append_to_work_queue(proc, NULL); +	status = sqfs_block_processor_sync(proc);  	if (status == 0 && proc->frag_block != NULL) {  		blk = proc->frag_block;  		blk->next = NULL;  		proc->frag_block = NULL; -		status = block_processor_do_block(blk, proc->cmp, -						  thproc->workers[0]->scratch, -						  proc->max_block_size); +		status = proc->process_block(blk, proc->cmp, +					     thproc->workers[0]->scratch, +					     proc->max_block_size);  		if (status == 0)  			status = handle_io_queue(thproc, blk); @@ -517,3 +446,77 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)  	return status;  } + +#if defined(_WIN32) || defined(__WINDOWS__) +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, +						    sqfs_compressor_t *cmp, +						    unsigned int num_workers, +						    size_t max_backlog, +						    sqfs_block_writer_t *wr, +						    sqfs_frag_table_t *tbl) +{ +	thread_pool_processor_t *proc; +	unsigned int i; + +	proc = block_processor_create(max_block_size, cmp, num_workers, +				      max_backlog, wr, tbl); +	if (proc == NULL) +		return NULL; + +	InitializeCriticalSection(&proc->mtx); +	InitializeConditionVariable(&proc->queue_cond); +	InitializeConditionVariable(&proc->done_cond); + +	for (i = 0; i < num_workers; ++i) { +		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, +							proc->workers[i], 0, 0); +		if (proc->workers[i]->thread == NULL) +			goto fail; +	} + +	return (sqfs_block_processor_t *)proc; +fail: +	block_processor_destroy((sqfs_object_t *)proc); +	return NULL; +} +#else +sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, +						    sqfs_compressor_t *cmp, +						    unsigned int num_workers, +						    size_t max_backlog, +						    sqfs_block_writer_t *wr, +						    sqfs_frag_table_t *tbl) +{ +	thread_pool_processor_t *proc; +	sigset_t set, oldset; +	unsigned int i; +	int ret; + +	proc = block_processor_create(max_block_size, cmp, num_workers, +				      max_backlog, wr, tbl); +	if (proc == NULL) +		return NULL; + +	proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; +	proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; +	proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + +	sigfillset(&set); +	pthread_sigmask(SIG_SETMASK, &set, &oldset); + +	for (i = 0; i < num_workers; ++i) { +		ret = pthread_create(&proc->workers[i]->thread, NULL, +				     worker_proc, proc->workers[i]); + +		if (ret != 0) +			goto fail; +	} + +	pthread_sigmask(SIG_SETMASK, &oldset, NULL); +	return (sqfs_block_processor_t *)proc; +fail: +	pthread_sigmask(SIG_SETMASK, &oldset, NULL); +	block_processor_destroy((sqfs_object_t *)proc); +	return NULL; +} +#endif  | 
