diff options
| author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-12-13 02:27:17 +0100 | 
|---|---|---|
| committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-12-13 02:27:17 +0100 | 
| commit | 2d2772341fa65e3d412e76c6e9d4a8815756c0ec (patch) | |
| tree | 8bee446efe0fb6e4939a2bf4cf865b6897e50e1a /lib/sqfs | |
| parent | 514e9e500abdd8ea91ea3b2fca214587ee24a342 (diff) | |
Merge windows and pthread thread pool implementations
Since they are both structured the same way using condition variables,
they are only a few defines away from removing code duplication.
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib/sqfs')
| -rw-r--r-- | lib/sqfs/Makemodule.am | 4 | ||||
| -rw-r--r-- | lib/sqfs/data_writer/windows.c | 294 | ||||
| -rw-r--r-- | lib/sqfs/data_writer/winpthread.c (renamed from lib/sqfs/data_writer/pthread.c) | 153 | 
3 files changed, 130 insertions, 321 deletions
| diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index 228d3e9..2bc0fa3 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -43,11 +43,11 @@ libsquashfs_la_SOURCES += lib/sqfs/unix/io_file.c  endif  if HAVE_PTHREAD -libsquashfs_la_SOURCES += lib/sqfs/data_writer/pthread.c +libsquashfs_la_SOURCES += lib/sqfs/data_writer/winpthread.c  libsquashfs_la_CPPFLAGS += -DWITH_PTHREAD  else  if WINDOWS -libsquashfs_la_SOURCES += lib/sqfs/data_writer/windows.c +libsquashfs_la_SOURCES += lib/sqfs/data_writer/winpthread.c  else  libsquashfs_la_SOURCES += lib/sqfs/data_writer/serial.c  endif diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c deleted file mode 100644 index d790f79..0000000 --- a/lib/sqfs/data_writer/windows.c +++ /dev/null @@ -1,294 +0,0 @@ -/* SPDX-License-Identifier: LGPL-3.0-or-later */ -/* - * windows.c - * - * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> - */ -#define SQFS_BUILDING_DLL -#include "internal.h" - -struct compress_worker_t { -	sqfs_data_writer_t *shared; -	sqfs_compressor_t *cmp; -	HANDLE thread; -	sqfs_u8 scratch[]; -}; - -static DWORD WINAPI worker_proc(LPVOID arg) -{ -	compress_worker_t *worker = arg; -	sqfs_data_writer_t *shared = worker->shared; -	sqfs_block_t *blk = NULL; -	int status = 0; - -	for (;;) { -		EnterCriticalSection(&shared->mtx); -		if (blk != NULL) { -			data_writer_store_done(shared, blk, status); -			WakeConditionVariable(&shared->done_cond); -		} - -		while (shared->queue == NULL && shared->status == 0) { -			SleepConditionVariableCS(&shared->queue_cond, -						 &shared->mtx, INFINITE); -		} - -		blk = data_writer_next_work_item(shared); -		LeaveCriticalSection(&shared->mtx); - -		if (blk == NULL) -			break; - -		status = data_writer_do_block(blk, worker->cmp, -					      worker->scratch, -					      shared->max_block_size); -	} - -	return 0; -} - -sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size, -					    sqfs_compressor_t *cmp, -					    unsigned int num_workers, -					    size_t max_backlog, -					    size_t devblksz, -					    sqfs_file_t *file) -{ -	sqfs_data_writer_t *proc; -	unsigned int i; - -	if (num_workers < 1) -		num_workers = 1; - -	proc = alloc_flex(sizeof(*proc), -			  sizeof(proc->workers[0]), num_workers); -	if (proc == NULL) -		return NULL; - -	InitializeCriticalSection(&proc->mtx); -	InitializeConditionVariable(&proc->queue_cond); -	InitializeConditionVariable(&proc->done_cond); - -	if (data_writer_init(proc, max_block_size, cmp, num_workers, -			     max_backlog, devblksz, file)) { -		goto fail; -	} - -	for (i = 0; i < num_workers; ++i) { -		proc->workers[i] = alloc_flex(sizeof(compress_worker_t), -					      1, max_block_size); - -		if (proc->workers[i] == NULL) -			goto fail; - -		proc->workers[i]->shared = proc; -		proc->workers[i]->cmp = cmp->create_copy(cmp); - -		if (proc->workers[i]->cmp == NULL) -			goto fail; - -		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, -							proc->workers[i], 0, 0); -		if (proc->workers[i]->thread == NULL) -			goto fail; -	} - -	return proc; -fail: -	sqfs_data_writer_destroy(proc); -	return NULL; -} - -void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) -{ -	unsigned int i; - -	EnterCriticalSection(&proc->mtx); -	proc->status = -1; -	WakeAllConditionVariable(&proc->queue_cond); -	LeaveCriticalSection(&proc->mtx); - -	for (i = 0; i < proc->num_workers; ++i) { -		if (proc->workers[i] == NULL) -			continue; - -		if (proc->workers[i]->thread != NULL) { -			WaitForSingleObject(proc->workers[i]->thread, INFINITE); -			CloseHandle(proc->workers[i]->thread); -		} - -		if (proc->workers[i]->cmp != NULL) -			proc->workers[i]->cmp->destroy(proc->workers[i]->cmp); - -		free(proc->workers[i]); -	} - -	DeleteCriticalSection(&proc->mtx); -	data_writer_cleanup(proc); -} - -int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block, -			 bool signal_threads) -{ -	int status; - -	EnterCriticalSection(&proc->mtx); -	status = proc->status; - -	if (status != 0) { -		free(block); -		LeaveCriticalSection(&proc->mtx); -		return status; -	} - -	if (block != NULL) { -		if (proc->queue_last == NULL) { -			proc->queue = proc->queue_last = block; -		} else { -			proc->queue_last->next = block; -			proc->queue_last = block; -		} - -		block->sequence_number = proc->enqueue_id++; -		block->next = NULL; -		proc->backlog += 1; -	} - -	if (signal_threads) -		WakeAllConditionVariable(&proc->queue_cond); - -	LeaveCriticalSection(&proc->mtx); -	return 0; -} - -static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc) -{ -	sqfs_block_t *queue, *it, *prev; - -	it = proc->done; -	prev = NULL; - -	while (it != NULL && it->sequence_number == proc->dequeue_id) { -		prev = it; -		it = it->next; -		proc->dequeue_id += 1; -	} - -	if (prev == NULL) { -		queue = NULL; -	} else { -		queue = proc->done; -		prev->next = NULL; -		proc->done = it; -	} - -	return queue; -} - -static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs) -{ -	sqfs_block_t *it, *head = NULL, **next_ptr = &head; - -	while (lhs != NULL && rhs != NULL) { -		if (lhs->sequence_number <= rhs->sequence_number) { -			it = lhs; -			lhs = lhs->next; -		} else { -			it = rhs; -			rhs = rhs->next; -		} - -		*next_ptr = it; -		next_ptr = &it->next; -	} - -	it = (lhs != NULL ? lhs : rhs); -	*next_ptr = it; -	return head; -} - -static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue) -{ -	sqfs_block_t *it, *block = NULL; -	int status = 0; - -	while (queue != NULL && status == 0) { -		it = queue; -		queue = it->next; - -		if (it->flags & SQFS_BLK_IS_FRAGMENT) { -			block = NULL; -			status = process_completed_fragment(proc, it, &block); - -			if (block != NULL && status == 0) { -				EnterCriticalSection(&proc->mtx); -				proc->dequeue_id = it->sequence_number; -				block->sequence_number = it->sequence_number; - -				if (proc->queue == NULL) { -					proc->queue = block; -					proc->queue_last = block; -				} else { -					block->next = proc->queue; -					proc->queue = block; -				} - -				proc->backlog += 1; -				proc->done = queue_merge(queue, proc->done); -				WakeAllConditionVariable(&proc->queue_cond); -				LeaveCriticalSection(&proc->mtx); - -				queue = NULL; -			} else { -				free(block); -			} -		} else { -			status = process_completed_block(proc, it); -		} - -		free(it); -	} - -	free_blk_list(queue); -	return status; -} - -int test_and_set_status(sqfs_data_writer_t *proc, int status) -{ -	EnterCriticalSection(&proc->mtx); -	if (proc->status == 0) { -		proc->status = status; -	} else { -		status = proc->status; -	} -	WakeAllConditionVariable(&proc->queue_cond); -	LeaveCriticalSection(&proc->mtx); -	return status; -} - -int wait_completed(sqfs_data_writer_t *proc) -{ -	sqfs_block_t *queue; -	int status; - -	EnterCriticalSection(&proc->mtx); -	for (;;) { -		queue = try_dequeue(proc); -		status = proc->status; - -		if (queue != NULL || status != 0) -			break; - -		SleepConditionVariableCS(&proc->done_cond, &proc->mtx, -					 INFINITE); -	} -	LeaveCriticalSection(&proc->mtx); - -	if (status != 0) { -		free_blk_list(queue); -		return status; -	} - -	status = process_done_queue(proc, queue); -	return status ? test_and_set_status(proc, status) : status; -} diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/winpthread.c index 8196641..bba5894 100644 --- a/lib/sqfs/data_writer/pthread.c +++ b/lib/sqfs/data_writer/winpthread.c @@ -1,20 +1,40 @@  /* SPDX-License-Identifier: LGPL-3.0-or-later */  /* - * pthread.c + * winpthread.c   *   * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>   */  #define SQFS_BUILDING_DLL  #include "internal.h" +#if defined(_WIN32) || defined(__WINDOWS__) +#	define LOCK(mtx) EnterCriticalSection(mtx) +#	define UNLOCK(mtx) LeaveCriticalSection(mtx) +#	define AWAIT(cond, mtx) SleepConditionVariableCS(cond, mtx, INFINITE) +#	define SIGNAL_ALL(cond) WakeAllConditionVariable(cond) +#	define THREAD_EXIT_SUCCESS 0 +#	define THREAD_TYPE DWORD WINAPI +#	define THREAD_ARG LPVOID +#	define THREAD_HANDLE HANDLE +#else +#	define LOCK(mtx) pthread_mutex_lock(mtx) +#	define UNLOCK(mtx) pthread_mutex_unlock(mtx) +#	define AWAIT(cond, mtx) pthread_cond_wait(cond, mtx) +#	define SIGNAL_ALL(cond) pthread_cond_broadcast(cond) +#	define THREAD_EXIT_SUCCESS NULL +#	define THREAD_TYPE void * +#	define THREAD_ARG void * +#	define THREAD_HANDLE pthread_t +#endif +  struct compress_worker_t {  	sqfs_data_writer_t *shared;  	sqfs_compressor_t *cmp; -	pthread_t thread; +	THREAD_HANDLE thread;  	sqfs_u8 scratch[];  }; -static void *worker_proc(void *arg) +static THREAD_TYPE worker_proc(THREAD_ARG arg)  {  	compress_worker_t *worker = arg;  	sqfs_data_writer_t *shared = worker->shared; @@ -22,17 +42,17 @@ static void *worker_proc(void *arg)  	int status = 0;  	for (;;) { -		pthread_mutex_lock(&shared->mtx); +		LOCK(&shared->mtx);  		if (blk != NULL) {  			data_writer_store_done(shared, blk, status); -			pthread_cond_broadcast(&shared->done_cond); +			SIGNAL_ALL(&shared->done_cond);  		}  		while (shared->queue == NULL && shared->status == 0) -			pthread_cond_wait(&shared->queue_cond, &shared->mtx); +			AWAIT(&shared->queue_cond, &shared->mtx);  		blk = data_writer_next_work_item(shared); -		pthread_mutex_unlock(&shared->mtx); +		UNLOCK(&shared->mtx);  		if (blk == NULL)  			break; @@ -41,9 +61,91 @@ static void *worker_proc(void *arg)  					      worker->scratch,  					      shared->max_block_size);  	} + +	return THREAD_EXIT_SUCCESS; +} + +#if defined(_WIN32) || defined(__WINDOWS__) +sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size, +					    sqfs_compressor_t *cmp, +					    unsigned int num_workers, +					    size_t max_backlog, +					    size_t devblksz, +					    sqfs_file_t *file) +{ +	sqfs_data_writer_t *proc; +	unsigned int i; + +	if (num_workers < 1) +		num_workers = 1; + +	proc = alloc_flex(sizeof(*proc), +			  sizeof(proc->workers[0]), num_workers); +	if (proc == NULL) +		return NULL; + +	InitializeCriticalSection(&proc->mtx); +	InitializeConditionVariable(&proc->queue_cond); +	InitializeConditionVariable(&proc->done_cond); + +	if (data_writer_init(proc, max_block_size, cmp, num_workers, +			     max_backlog, devblksz, file)) { +		goto fail; +	} + +	for (i = 0; i < num_workers; ++i) { +		proc->workers[i] = alloc_flex(sizeof(compress_worker_t), +					      1, max_block_size); + +		if (proc->workers[i] == NULL) +			goto fail; + +		proc->workers[i]->shared = proc; +		proc->workers[i]->cmp = cmp->create_copy(cmp); + +		if (proc->workers[i]->cmp == NULL) +			goto fail; + +		proc->workers[i]->thread = CreateThread(NULL, 0, worker_proc, +							proc->workers[i], 0, 0); +		if (proc->workers[i]->thread == NULL) +			goto fail; +	} + +	return proc; +fail: +	sqfs_data_writer_destroy(proc);  	return NULL;  } +void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) +{ +	unsigned int i; + +	EnterCriticalSection(&proc->mtx); +	proc->status = -1; +	WakeAllConditionVariable(&proc->queue_cond); +	LeaveCriticalSection(&proc->mtx); + +	for (i = 0; i < proc->num_workers; ++i) { +		if (proc->workers[i] == NULL) +			continue; + +		if (proc->workers[i]->thread != NULL) { +			WaitForSingleObject(proc->workers[i]->thread, INFINITE); +			CloseHandle(proc->workers[i]->thread); +		} + +		if (proc->workers[i]->cmp != NULL) +			proc->workers[i]->cmp->destroy(proc->workers[i]->cmp); + +		free(proc->workers[i]); +	} + +	DeleteCriticalSection(&proc->mtx); +	data_writer_cleanup(proc); +} +#else  sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,  					    sqfs_compressor_t *cmp,  					    unsigned int num_workers, @@ -153,19 +255,17 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)  	data_writer_cleanup(proc);  } +#endif  int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,  			 bool signal_threads)  {  	int status; -	pthread_mutex_lock(&proc->mtx); +	LOCK(&proc->mtx);  	status = proc->status; -	if (status != 0) { -		free(block); -		pthread_mutex_unlock(&proc->mtx); -		return status; -	} +	if (status != 0) +		goto out;  	if (block != NULL) {  		if (proc->queue_last == NULL) { @@ -178,11 +278,14 @@ int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,  		block->sequence_number = proc->enqueue_id++;  		block->next = NULL;  		proc->backlog += 1; +		block = NULL;  	} - +out:  	if (signal_threads) -		pthread_cond_broadcast(&proc->queue_cond); -	pthread_mutex_unlock(&proc->mtx); +		SIGNAL_ALL(&proc->queue_cond); + +	UNLOCK(&proc->mtx); +	free(block);  	return 0;  } @@ -247,7 +350,7 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)  			status = process_completed_fragment(proc, it, &block);  			if (block != NULL && status == 0) { -				pthread_mutex_lock(&proc->mtx); +				LOCK(&proc->mtx);  				proc->dequeue_id = it->sequence_number;  				block->sequence_number = it->sequence_number; @@ -261,8 +364,8 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)  				proc->backlog += 1;  				proc->done = queue_merge(queue, proc->done); -				pthread_cond_broadcast(&proc->queue_cond); -				pthread_mutex_unlock(&proc->mtx); +				SIGNAL_ALL(&proc->queue_cond); +				UNLOCK(&proc->mtx);  				queue = NULL;  			} else { @@ -281,14 +384,14 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)  int test_and_set_status(sqfs_data_writer_t *proc, int status)  { -	pthread_mutex_lock(&proc->mtx); +	LOCK(&proc->mtx);  	if (proc->status == 0) {  		proc->status = status;  	} else {  		status = proc->status;  	} -	pthread_cond_broadcast(&proc->queue_cond); -	pthread_mutex_unlock(&proc->mtx); +	SIGNAL_ALL(&proc->queue_cond); +	UNLOCK(&proc->mtx);  	return status;  } @@ -297,7 +400,7 @@ int wait_completed(sqfs_data_writer_t *proc)  	sqfs_block_t *queue;  	int status; -	pthread_mutex_lock(&proc->mtx); +	LOCK(&proc->mtx);  	for (;;) {  		queue = try_dequeue(proc);  		status = proc->status; @@ -305,9 +408,9 @@ int wait_completed(sqfs_data_writer_t *proc)  		if (queue != NULL || status != 0)  			break; -		pthread_cond_wait(&proc->done_cond, &proc->mtx); +		AWAIT(&proc->done_cond, &proc->mtx);  	} -	pthread_mutex_unlock(&proc->mtx); +	UNLOCK(&proc->mtx);  	if (status != 0) {  		free_blk_list(queue); | 
