From 1a95478b8d340c8b6b9dbff4f38f9293388fd1a3 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Wed, 14 Aug 2019 14:56:21 +0200 Subject: Add pthread based, parallel block processor implementation Signed-off-by: David Oberhollenzer --- .gitignore | 2 +- configure.ac | 4 + lib/Makemodule.am | 10 +- lib/comp/block_processor_parallel.c | 333 ++++++++++++++++++++++++++++++++++++ mkfs/Makemodule.am | 4 +- tar/Makemodule.am | 2 + 6 files changed, 350 insertions(+), 5 deletions(-) create mode 100644 lib/comp/block_processor_parallel.c diff --git a/.gitignore b/.gitignore index 83e68ac..d03be1e 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ depcomp install-sh missing stamp-h1 -config.h +config.* *.o *.a *~ diff --git a/configure.ac b/configure.ac index 19ee8d4..78ede8f 100644 --- a/configure.ac +++ b/configure.ac @@ -143,6 +143,10 @@ yes) AM_COND_IF([WITH_SELINUX], [], [AC_MSG_ERROR([cannot find selinux])]) ;; no) AM_CONDITIONAL([WITH_SELINUX], [false]) ;; esac +have_pthread="no" +AX_PTHREAD([have_pthread="yes"], []) +AM_CONDITIONAL([HAVE_PTHREAD], [test "x$have_pthread" != "xno"]) + ##### sanity check ##### have_compressor="no" diff --git a/lib/Makemodule.am b/lib/Makemodule.am index 7c2c99d..3c206d6 100644 --- a/lib/Makemodule.am +++ b/lib/Makemodule.am @@ -18,9 +18,8 @@ libtar_a_CFLAGS = $(AM_CFLAGS) libtar_a_CPPFLAGS = $(AM_CPPFLAGS) libcompress_a_SOURCES = lib/comp/compressor.c lib/comp/internal.h -libcompress_a_SOURCES += lib/comp/block_processor.c include/block_processor.h +libcompress_a_SOURCES += include/block_processor.h include/compress.h libcompress_a_SOURCES += lib/comp/create_block.c lib/comp/process_block.c -libcompress_a_SOURCES += include/compress.h libcompress_a_CFLAGS = $(AM_CFLAGS) libcompress_a_CPPFLAGS = $(AM_CPPFLAGS) @@ -51,6 +50,13 @@ libutil_a_SOURCES += lib/util/dirstack.c lib/util/padd_file.c libutil_a_SOURCES += lib/util/read_data_at.c lib/util/crc32.c libutil_a_SOURCES += lib/util/source_date_epoch.c +if HAVE_PTHREAD +libcompress_a_SOURCES += lib/comp/block_processor_parallel.c +libcompress_a_CFLAGS += $(PTHREAD_CFLAGS) +else +libcompress_a_SOURCES += lib/comp/block_processor.c +endif + if WITH_GZIP libcompress_a_SOURCES += lib/comp/gzip.c diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c new file mode 100644 index 0000000..617900f --- /dev/null +++ b/lib/comp/block_processor_parallel.c @@ -0,0 +1,333 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* + * block_processor.c + * + * Copyright (C) 2019 David Oberhollenzer + */ +#include "config.h" + +#include "block_processor.h" +#include "util.h" + +#include +#include +#include +#include + +#define MAX_BACKLOG_FACTOR (10) + +typedef struct { + block_processor_t *shared; + compressor_t *cmp; + pthread_t thread; + uint8_t scratch[]; +} compress_worker_t; + +struct block_processor_t { + pthread_mutex_t mtx; + pthread_cond_t queue_cond; + pthread_cond_t done_cond; + + /* needs rw access by worker and main thread */ + block_t *queue; + block_t *done; + bool terminate; + size_t backlog; + + /* used by main thread only */ + uint32_t enqueue_id; + uint32_t dequeue_id; + + unsigned int num_workers; + block_cb cb; + void *user; + int status; + + /* used only by workers */ + size_t max_block_size; + + compress_worker_t *workers[]; +}; + +static void store_completed_block(block_processor_t *shared, block_t *blk) +{ + block_t *it = shared->done, *prev = NULL; + + while (it != NULL) { + if (it->sequence_number >= blk->sequence_number) + break; + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = shared->done; + shared->done = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } +} + +static void *worker_proc(void *arg) +{ + compress_worker_t *worker = arg; + block_processor_t *shared = worker->shared; + block_t *blk; + + for (;;) { + pthread_mutex_lock(&shared->mtx); + while (shared->queue == NULL && !shared->terminate) { + pthread_cond_wait(&shared->queue_cond, + &shared->mtx); + } + + if (shared->terminate) { + pthread_mutex_unlock(&shared->mtx); + break; + } + + blk = shared->queue; + shared->queue = blk->next; + blk->next = NULL; + pthread_mutex_unlock(&shared->mtx); + + if (process_block(blk, worker->cmp, worker->scratch, + shared->max_block_size)) { + blk->flags |= BLK_COMPRESS_ERROR; + } + + pthread_mutex_lock(&shared->mtx); + store_completed_block(shared, blk); + shared->backlog -= 1; + pthread_cond_broadcast(&shared->done_cond); + pthread_mutex_unlock(&shared->mtx); + } + return NULL; +} + +block_processor_t *block_processor_create(size_t max_block_size, + compressor_t *cmp, + unsigned int num_workers, + void *user, + block_cb callback) +{ + block_processor_t *proc; + unsigned int i; + size_t size; + int ret; + + if (num_workers < 1) + num_workers = 1; + + size = sizeof(proc->workers[0]) * num_workers; + + proc = calloc(1, sizeof(*proc) + size); + if (proc == NULL) { + perror("Creating block processor"); + return NULL; + } + + proc->max_block_size = max_block_size; + proc->cb = callback; + proc->user = user; + proc->num_workers = num_workers; + + if (pthread_mutex_init(&proc->mtx, NULL)) { + perror("Creating block processor mutex"); + goto fail_free; + } + + if (pthread_cond_init(&proc->queue_cond, NULL)) { + perror("Creating block processor conditional"); + goto fail_mtx; + } + + if (pthread_cond_init(&proc->done_cond, NULL)) { + perror("Creating block processor completion conditional"); + goto fail_cond; + } + + size = sizeof(compress_worker_t) + max_block_size; + + for (i = 0; i < num_workers; ++i) { + proc->workers[i] = calloc(1, size); + + if (proc->workers[i] == NULL) { + perror("Creating block worker data"); + goto fail_init; + } + + proc->workers[i]->shared = proc; + proc->workers[i]->cmp = cmp->create_copy(cmp); + + if (proc->workers[i]->cmp == NULL) + goto fail_init; + } + + for (i = 0; i < num_workers; ++i) { + ret = pthread_create(&proc->workers[i]->thread, NULL, + worker_proc, proc->workers[i]); + + if (ret != 0) { + perror("Creating block processor thread"); + goto fail_thread; + } + } + + return proc; +fail_thread: + pthread_mutex_lock(&proc->mtx); + proc->terminate = true; + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); + + for (i = 0; i < num_workers; ++i) { + if (proc->workers[i]->thread > 0) { + pthread_join(proc->workers[i]->thread, NULL); + } + } +fail_init: + for (i = 0; i < num_workers; ++i) { + if (proc->workers[i] != NULL) { + if (proc->workers[i]->cmp != NULL) { + proc->workers[i]->cmp-> + destroy(proc->workers[i]->cmp); + } + + free(proc->workers[i]); + } + } + pthread_cond_destroy(&proc->done_cond); +fail_cond: + pthread_cond_destroy(&proc->queue_cond); +fail_mtx: + pthread_mutex_destroy(&proc->mtx); +fail_free: + free(proc); + return NULL; +} + +void block_processor_destroy(block_processor_t *proc) +{ + unsigned int i; + block_t *blk; + + pthread_mutex_lock(&proc->mtx); + proc->terminate = true; + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); + + for (i = 0; i < proc->num_workers; ++i) { + pthread_join(proc->workers[i]->thread, NULL); + + proc->workers[i]->cmp->destroy(proc->workers[i]->cmp); + free(proc->workers[i]); + } + + pthread_cond_destroy(&proc->done_cond); + pthread_cond_destroy(&proc->queue_cond); + pthread_mutex_destroy(&proc->mtx); + + while (proc->queue != NULL) { + blk = proc->queue; + proc->queue = blk->next; + free(blk); + } + + while (proc->done != NULL) { + blk = proc->done; + proc->done = blk->next; + free(blk); + } + + free(proc); +} + +static int process_completed_blocks(block_processor_t *proc, block_t *queue) +{ + block_t *it; + + while (queue != NULL) { + it = queue; + queue = queue->next; + + if (it->flags & BLK_COMPRESS_ERROR) { + proc->status = -1; + } else { + if (proc->cb(proc->user, it)) + proc->status = -1; + } + + free(it); + } + + return proc->status; +} + +int block_processor_enqueue(block_processor_t *proc, block_t *block) +{ + block_t *queue = NULL, *it, *prev; + + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + + pthread_mutex_lock(&proc->mtx); + while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + if (proc->queue == NULL) { + proc->queue = block; + } else { + for (it = proc->queue; it->next != NULL; it = it->next) + ; + it->next = block; + } + + proc->backlog += 1; + it = proc->done; + prev = NULL; + + while (it != NULL && it->sequence_number == proc->dequeue_id) { + prev = it; + it = it->next; + proc->dequeue_id += 1; + } + + if (prev != NULL) { + queue = proc->done; + prev->next = NULL; + proc->done = it; + } + + pthread_cond_broadcast(&proc->queue_cond); + pthread_mutex_unlock(&proc->mtx); + + return process_completed_blocks(proc, queue); +} + +int block_processor_finish(block_processor_t *proc) +{ + block_t *queue, *it; + + pthread_mutex_lock(&proc->mtx); + while (proc->backlog > 0) + pthread_cond_wait(&proc->done_cond, &proc->mtx); + + for (it = proc->done; it != NULL; it = it->next) { + if (it->sequence_number != proc->dequeue_id++) { + pthread_mutex_unlock(&proc->mtx); + goto bug_seqnum; + } + } + + queue = proc->done; + proc->done = NULL; + pthread_mutex_unlock(&proc->mtx); + + return process_completed_blocks(proc, queue); +bug_seqnum: + fputs("[BUG][parallel block processor] " + "gap in sequence numbers!\n", stderr); + return -1; +} diff --git a/mkfs/Makemodule.am b/mkfs/Makemodule.am index f8e3e2c..1316c3d 100644 --- a/mkfs/Makemodule.am +++ b/mkfs/Makemodule.am @@ -1,9 +1,9 @@ gensquashfs_SOURCES = mkfs/mkfs.c mkfs/mkfs.h mkfs/options.c gensquashfs_LDADD = libsquashfs.a libfstree.a libcompress.a libutil.a gensquashfs_CPPFLAGS = $(AM_CPPFLAGS) +gensquashfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) gensquashfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) -gensquashfs_LDADD += $(ZSTD_LIBS) - +gensquashfs_LDADD += $(ZSTD_LIBS) $(PTHREAD_LIBS) if WITH_SELINUX gensquashfs_CPPFLAGS += -DWITH_SELINUX gensquashfs_LDADD += $(LIBSELINUX_LIBS) diff --git a/tar/Makemodule.am b/tar/Makemodule.am index a10dc01..b8674c2 100644 --- a/tar/Makemodule.am +++ b/tar/Makemodule.am @@ -4,6 +4,8 @@ sqfs2tar_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS) tar2sqfs_SOURCES = tar/tar2sqfs.c tar2sqfs_LDADD = libsquashfs.a libtar.a libfstree.a libcompress.a libutil.a +tar2sqfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) tar2sqfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS) +tar2sqfs_LDADD += $(PTHREAD_LIBS) bin_PROGRAMS += sqfs2tar tar2sqfs -- cgit v1.2.3