diff options
| author | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-14 14:56:21 +0200 | 
|---|---|---|
| committer | David Oberhollenzer <david.oberhollenzer@sigma-star.at> | 2019-08-18 22:44:39 +0200 | 
| commit | 1a95478b8d340c8b6b9dbff4f38f9293388fd1a3 (patch) | |
| tree | b62599b67c54132e1a7bd2559e422e3c5163563b | |
| parent | 7ca19d23cb4913b5dabfdf3469852ec9f2c0f8d7 (diff) | |
Add pthread based, parallel block processor implementation
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | configure.ac | 4 | ||||
| -rw-r--r-- | lib/Makemodule.am | 10 | ||||
| -rw-r--r-- | lib/comp/block_processor_parallel.c | 333 | ||||
| -rw-r--r-- | mkfs/Makemodule.am | 4 | ||||
| -rw-r--r-- | tar/Makemodule.am | 2 | 
6 files changed, 350 insertions, 5 deletions
| @@ -14,7 +14,7 @@ depcomp  install-sh  missing  stamp-h1 -config.h +config.*  *.o  *.a  *~ diff --git a/configure.ac b/configure.ac index 19ee8d4..78ede8f 100644 --- a/configure.ac +++ b/configure.ac @@ -143,6 +143,10 @@ yes) AM_COND_IF([WITH_SELINUX], [], [AC_MSG_ERROR([cannot find selinux])]) ;;  no)  AM_CONDITIONAL([WITH_SELINUX], [false]) ;;  esac +have_pthread="no" +AX_PTHREAD([have_pthread="yes"], []) +AM_CONDITIONAL([HAVE_PTHREAD], [test "x$have_pthread" != "xno"]) +  ##### sanity check #####  have_compressor="no" diff --git a/lib/Makemodule.am b/lib/Makemodule.am index 7c2c99d..3c206d6 100644 --- a/lib/Makemodule.am +++ b/lib/Makemodule.am @@ -18,9 +18,8 @@ libtar_a_CFLAGS = $(AM_CFLAGS)  libtar_a_CPPFLAGS = $(AM_CPPFLAGS)  libcompress_a_SOURCES = lib/comp/compressor.c lib/comp/internal.h -libcompress_a_SOURCES += lib/comp/block_processor.c include/block_processor.h +libcompress_a_SOURCES += include/block_processor.h include/compress.h  libcompress_a_SOURCES += lib/comp/create_block.c lib/comp/process_block.c -libcompress_a_SOURCES += include/compress.h  libcompress_a_CFLAGS = $(AM_CFLAGS)  libcompress_a_CPPFLAGS = $(AM_CPPFLAGS) @@ -51,6 +50,13 @@ libutil_a_SOURCES += lib/util/dirstack.c lib/util/padd_file.c  libutil_a_SOURCES += lib/util/read_data_at.c lib/util/crc32.c  libutil_a_SOURCES += lib/util/source_date_epoch.c +if HAVE_PTHREAD +libcompress_a_SOURCES += lib/comp/block_processor_parallel.c +libcompress_a_CFLAGS += $(PTHREAD_CFLAGS) +else +libcompress_a_SOURCES += lib/comp/block_processor.c +endif +  if WITH_GZIP  libcompress_a_SOURCES += lib/comp/gzip.c diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c new file mode 100644 index 0000000..617900f --- /dev/null +++ b/lib/comp/block_processor_parallel.c @@ -0,0 +1,333 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* + * block_processor.c + * + * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at> + */ +#include "config.h" + +#include "block_processor.h" +#include "util.h" + +#include <pthread.h> +#include <string.h> +#include <stdlib.h> +#include <stdio.h> + +#define MAX_BACKLOG_FACTOR (10) + +typedef struct { +	block_processor_t *shared; +	compressor_t *cmp; +	pthread_t thread; +	uint8_t scratch[]; +} compress_worker_t; + +struct block_processor_t { +	pthread_mutex_t mtx; +	pthread_cond_t queue_cond; +	pthread_cond_t done_cond; + +	/* needs rw access by worker and main thread */ +	block_t *queue; +	block_t *done; +	bool terminate; +	size_t backlog; + +	/* used by main thread only */ +	uint32_t enqueue_id; +	uint32_t dequeue_id; + +	unsigned int num_workers; +	block_cb cb; +	void *user; +	int status; + +	/* used only by workers */ +	size_t max_block_size; + +	compress_worker_t *workers[]; +}; + +static void store_completed_block(block_processor_t *shared, block_t *blk) +{ +	block_t *it = shared->done, *prev = NULL; + +	while (it != NULL) { +		if (it->sequence_number >= blk->sequence_number) +			break; +		prev = it; +		it = it->next; +	} + +	if (prev == NULL) { +		blk->next = shared->done; +		shared->done = blk; +	} else { +		blk->next = prev->next; +		prev->next = blk; +	} +} + +static void *worker_proc(void *arg) +{ +	compress_worker_t *worker = arg; +	block_processor_t *shared = worker->shared; +	block_t *blk; + +	for (;;) { +		pthread_mutex_lock(&shared->mtx); +		while (shared->queue == NULL && !shared->terminate) { +			pthread_cond_wait(&shared->queue_cond, +					  &shared->mtx); +		} + +		if (shared->terminate) { +			pthread_mutex_unlock(&shared->mtx); +			break; +		} + +		blk = shared->queue; +		shared->queue = blk->next; +		blk->next = NULL; +		pthread_mutex_unlock(&shared->mtx); + +		if (process_block(blk, worker->cmp, worker->scratch, +				  shared->max_block_size)) { +			blk->flags |= BLK_COMPRESS_ERROR; +		} + +		pthread_mutex_lock(&shared->mtx); +		store_completed_block(shared, blk); +		shared->backlog -= 1; +		pthread_cond_broadcast(&shared->done_cond); +		pthread_mutex_unlock(&shared->mtx); +	} +	return NULL; +} + +block_processor_t *block_processor_create(size_t max_block_size, +					  compressor_t *cmp, +					  unsigned int num_workers, +					  void *user, +					  block_cb callback) +{ +	block_processor_t *proc; +	unsigned int i; +	size_t size; +	int ret; + +	if (num_workers < 1) +		num_workers = 1; + +	size = sizeof(proc->workers[0]) * num_workers; + +	proc = calloc(1, sizeof(*proc) + size); +	if (proc == NULL) { +		perror("Creating block processor"); +		return NULL; +	} + +	proc->max_block_size = max_block_size; +	proc->cb = callback; +	proc->user = user; +	proc->num_workers = num_workers; + +	if (pthread_mutex_init(&proc->mtx, NULL)) { +		perror("Creating block processor mutex"); +		goto fail_free; +	} + +	if (pthread_cond_init(&proc->queue_cond, NULL)) { +		perror("Creating block processor conditional"); +		goto fail_mtx; +	} + +	if (pthread_cond_init(&proc->done_cond, NULL)) { +		perror("Creating block processor completion conditional"); +		goto fail_cond; +	} + +	size = sizeof(compress_worker_t) + max_block_size; + +	for (i = 0; i < num_workers; ++i) { +		proc->workers[i] = calloc(1, size); + +		if (proc->workers[i] == NULL) { +			perror("Creating block worker data"); +			goto fail_init; +		} + +		proc->workers[i]->shared = proc; +		proc->workers[i]->cmp = cmp->create_copy(cmp); + +		if (proc->workers[i]->cmp == NULL) +			goto fail_init; +	} + +	for (i = 0; i < num_workers; ++i) { +		ret = pthread_create(&proc->workers[i]->thread, NULL, +				     worker_proc, proc->workers[i]); + +		if (ret != 0) { +			perror("Creating block processor thread"); +			goto fail_thread; +		} +	} + +	return proc; +fail_thread: +	pthread_mutex_lock(&proc->mtx); +	proc->terminate = true; +	pthread_cond_broadcast(&proc->queue_cond); +	pthread_mutex_unlock(&proc->mtx); + +	for (i = 0; i < num_workers; ++i) { +		if (proc->workers[i]->thread > 0) { +			pthread_join(proc->workers[i]->thread, NULL); +		} +	} +fail_init: +	for (i = 0; i < num_workers; ++i) { +		if (proc->workers[i] != NULL) { +			if (proc->workers[i]->cmp != NULL) { +				proc->workers[i]->cmp-> +					destroy(proc->workers[i]->cmp); +			} + +			free(proc->workers[i]); +		} +	} +	pthread_cond_destroy(&proc->done_cond); +fail_cond: +	pthread_cond_destroy(&proc->queue_cond); +fail_mtx: +	pthread_mutex_destroy(&proc->mtx); +fail_free: +	free(proc); +	return NULL; +} + +void block_processor_destroy(block_processor_t *proc) +{ +	unsigned int i; +	block_t *blk; + +	pthread_mutex_lock(&proc->mtx); +	proc->terminate = true; +	pthread_cond_broadcast(&proc->queue_cond); +	pthread_mutex_unlock(&proc->mtx); + +	for (i = 0; i < proc->num_workers; ++i) { +		pthread_join(proc->workers[i]->thread, NULL); + +		proc->workers[i]->cmp->destroy(proc->workers[i]->cmp); +		free(proc->workers[i]); +	} + +	pthread_cond_destroy(&proc->done_cond); +	pthread_cond_destroy(&proc->queue_cond); +	pthread_mutex_destroy(&proc->mtx); + +	while (proc->queue != NULL) { +		blk = proc->queue; +		proc->queue = blk->next; +		free(blk); +	} + +	while (proc->done != NULL) { +		blk = proc->done; +		proc->done = blk->next; +		free(blk); +	} + +	free(proc); +} + +static int process_completed_blocks(block_processor_t *proc, block_t *queue) +{ +	block_t *it; + +	while (queue != NULL) { +		it = queue; +		queue = queue->next; + +		if (it->flags & BLK_COMPRESS_ERROR) { +			proc->status = -1; +		} else { +			if (proc->cb(proc->user, it)) +				proc->status = -1; +		} + +		free(it); +	} + +	return proc->status; +} + +int block_processor_enqueue(block_processor_t *proc, block_t *block) +{ +	block_t *queue = NULL, *it, *prev; + +	block->sequence_number = proc->enqueue_id++; +	block->next = NULL; + +	pthread_mutex_lock(&proc->mtx); +	while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) +		pthread_cond_wait(&proc->done_cond, &proc->mtx); + +	if (proc->queue == NULL) { +		proc->queue = block; +	} else { +		for (it = proc->queue; it->next != NULL; it = it->next) +			; +		it->next = block; +	} + +	proc->backlog += 1; +	it = proc->done; +	prev = NULL; + +	while (it != NULL && it->sequence_number == proc->dequeue_id) { +		prev = it; +		it = it->next; +		proc->dequeue_id += 1; +	} + +	if (prev != NULL) { +		queue = proc->done; +		prev->next = NULL; +		proc->done = it; +	} + +	pthread_cond_broadcast(&proc->queue_cond); +	pthread_mutex_unlock(&proc->mtx); + +	return process_completed_blocks(proc, queue); +} + +int block_processor_finish(block_processor_t *proc) +{ +	block_t *queue, *it; + +	pthread_mutex_lock(&proc->mtx); +	while (proc->backlog > 0) +		pthread_cond_wait(&proc->done_cond, &proc->mtx); + +	for (it = proc->done; it != NULL; it = it->next) { +		if (it->sequence_number != proc->dequeue_id++) { +			pthread_mutex_unlock(&proc->mtx); +			goto bug_seqnum; +		} +	} + +	queue = proc->done; +	proc->done = NULL; +	pthread_mutex_unlock(&proc->mtx); + +	return process_completed_blocks(proc, queue); +bug_seqnum: +	fputs("[BUG][parallel block processor] " +	      "gap in sequence numbers!\n", stderr); +	return -1; +} diff --git a/mkfs/Makemodule.am b/mkfs/Makemodule.am index f8e3e2c..1316c3d 100644 --- a/mkfs/Makemodule.am +++ b/mkfs/Makemodule.am @@ -1,9 +1,9 @@  gensquashfs_SOURCES = mkfs/mkfs.c mkfs/mkfs.h mkfs/options.c  gensquashfs_LDADD = libsquashfs.a libfstree.a libcompress.a libutil.a  gensquashfs_CPPFLAGS = $(AM_CPPFLAGS) +gensquashfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)  gensquashfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) -gensquashfs_LDADD += $(ZSTD_LIBS) - +gensquashfs_LDADD += $(ZSTD_LIBS) $(PTHREAD_LIBS)  if WITH_SELINUX  gensquashfs_CPPFLAGS += -DWITH_SELINUX  gensquashfs_LDADD += $(LIBSELINUX_LIBS) diff --git a/tar/Makemodule.am b/tar/Makemodule.am index a10dc01..b8674c2 100644 --- a/tar/Makemodule.am +++ b/tar/Makemodule.am @@ -4,6 +4,8 @@ sqfs2tar_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS)  tar2sqfs_SOURCES = tar/tar2sqfs.c  tar2sqfs_LDADD = libsquashfs.a libtar.a libfstree.a libcompress.a libutil.a +tar2sqfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)  tar2sqfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS) +tar2sqfs_LDADD += $(PTHREAD_LIBS)  bin_PROGRAMS += sqfs2tar tar2sqfs | 
