aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-08-14 14:56:21 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-08-18 22:44:39 +0200
commit1a95478b8d340c8b6b9dbff4f38f9293388fd1a3 (patch)
treeb62599b67c54132e1a7bd2559e422e3c5163563b
parent7ca19d23cb4913b5dabfdf3469852ec9f2c0f8d7 (diff)
Add pthread based, parallel block processor implementation
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--.gitignore2
-rw-r--r--configure.ac4
-rw-r--r--lib/Makemodule.am10
-rw-r--r--lib/comp/block_processor_parallel.c333
-rw-r--r--mkfs/Makemodule.am4
-rw-r--r--tar/Makemodule.am2
6 files changed, 350 insertions, 5 deletions
diff --git a/.gitignore b/.gitignore
index 83e68ac..d03be1e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,7 +14,7 @@ depcomp
install-sh
missing
stamp-h1
-config.h
+config.*
*.o
*.a
*~
diff --git a/configure.ac b/configure.ac
index 19ee8d4..78ede8f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -143,6 +143,10 @@ yes) AM_COND_IF([WITH_SELINUX], [], [AC_MSG_ERROR([cannot find selinux])]) ;;
no) AM_CONDITIONAL([WITH_SELINUX], [false]) ;;
esac
+have_pthread="no"
+AX_PTHREAD([have_pthread="yes"], [])
+AM_CONDITIONAL([HAVE_PTHREAD], [test "x$have_pthread" != "xno"])
+
##### sanity check #####
have_compressor="no"
diff --git a/lib/Makemodule.am b/lib/Makemodule.am
index 7c2c99d..3c206d6 100644
--- a/lib/Makemodule.am
+++ b/lib/Makemodule.am
@@ -18,9 +18,8 @@ libtar_a_CFLAGS = $(AM_CFLAGS)
libtar_a_CPPFLAGS = $(AM_CPPFLAGS)
libcompress_a_SOURCES = lib/comp/compressor.c lib/comp/internal.h
-libcompress_a_SOURCES += lib/comp/block_processor.c include/block_processor.h
+libcompress_a_SOURCES += include/block_processor.h include/compress.h
libcompress_a_SOURCES += lib/comp/create_block.c lib/comp/process_block.c
-libcompress_a_SOURCES += include/compress.h
libcompress_a_CFLAGS = $(AM_CFLAGS)
libcompress_a_CPPFLAGS = $(AM_CPPFLAGS)
@@ -51,6 +50,13 @@ libutil_a_SOURCES += lib/util/dirstack.c lib/util/padd_file.c
libutil_a_SOURCES += lib/util/read_data_at.c lib/util/crc32.c
libutil_a_SOURCES += lib/util/source_date_epoch.c
+if HAVE_PTHREAD
+libcompress_a_SOURCES += lib/comp/block_processor_parallel.c
+libcompress_a_CFLAGS += $(PTHREAD_CFLAGS)
+else
+libcompress_a_SOURCES += lib/comp/block_processor.c
+endif
+
if WITH_GZIP
libcompress_a_SOURCES += lib/comp/gzip.c
diff --git a/lib/comp/block_processor_parallel.c b/lib/comp/block_processor_parallel.c
new file mode 100644
index 0000000..617900f
--- /dev/null
+++ b/lib/comp/block_processor_parallel.c
@@ -0,0 +1,333 @@
+/* SPDX-License-Identifier: GPL-3.0-or-later */
+/*
+ * block_processor.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#include "config.h"
+
+#include "block_processor.h"
+#include "util.h"
+
+#include <pthread.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define MAX_BACKLOG_FACTOR (10)
+
+typedef struct {
+ block_processor_t *shared;
+ compressor_t *cmp;
+ pthread_t thread;
+ uint8_t scratch[];
+} compress_worker_t;
+
+struct block_processor_t {
+ pthread_mutex_t mtx;
+ pthread_cond_t queue_cond;
+ pthread_cond_t done_cond;
+
+ /* needs rw access by worker and main thread */
+ block_t *queue;
+ block_t *done;
+ bool terminate;
+ size_t backlog;
+
+ /* used by main thread only */
+ uint32_t enqueue_id;
+ uint32_t dequeue_id;
+
+ unsigned int num_workers;
+ block_cb cb;
+ void *user;
+ int status;
+
+ /* used only by workers */
+ size_t max_block_size;
+
+ compress_worker_t *workers[];
+};
+
+static void store_completed_block(block_processor_t *shared, block_t *blk)
+{
+ block_t *it = shared->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->sequence_number >= blk->sequence_number)
+ break;
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = shared->done;
+ shared->done = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+}
+
+static void *worker_proc(void *arg)
+{
+ compress_worker_t *worker = arg;
+ block_processor_t *shared = worker->shared;
+ block_t *blk;
+
+ for (;;) {
+ pthread_mutex_lock(&shared->mtx);
+ while (shared->queue == NULL && !shared->terminate) {
+ pthread_cond_wait(&shared->queue_cond,
+ &shared->mtx);
+ }
+
+ if (shared->terminate) {
+ pthread_mutex_unlock(&shared->mtx);
+ break;
+ }
+
+ blk = shared->queue;
+ shared->queue = blk->next;
+ blk->next = NULL;
+ pthread_mutex_unlock(&shared->mtx);
+
+ if (process_block(blk, worker->cmp, worker->scratch,
+ shared->max_block_size)) {
+ blk->flags |= BLK_COMPRESS_ERROR;
+ }
+
+ pthread_mutex_lock(&shared->mtx);
+ store_completed_block(shared, blk);
+ shared->backlog -= 1;
+ pthread_cond_broadcast(&shared->done_cond);
+ pthread_mutex_unlock(&shared->mtx);
+ }
+ return NULL;
+}
+
+block_processor_t *block_processor_create(size_t max_block_size,
+ compressor_t *cmp,
+ unsigned int num_workers,
+ void *user,
+ block_cb callback)
+{
+ block_processor_t *proc;
+ unsigned int i;
+ size_t size;
+ int ret;
+
+ if (num_workers < 1)
+ num_workers = 1;
+
+ size = sizeof(proc->workers[0]) * num_workers;
+
+ proc = calloc(1, sizeof(*proc) + size);
+ if (proc == NULL) {
+ perror("Creating block processor");
+ return NULL;
+ }
+
+ proc->max_block_size = max_block_size;
+ proc->cb = callback;
+ proc->user = user;
+ proc->num_workers = num_workers;
+
+ if (pthread_mutex_init(&proc->mtx, NULL)) {
+ perror("Creating block processor mutex");
+ goto fail_free;
+ }
+
+ if (pthread_cond_init(&proc->queue_cond, NULL)) {
+ perror("Creating block processor conditional");
+ goto fail_mtx;
+ }
+
+ if (pthread_cond_init(&proc->done_cond, NULL)) {
+ perror("Creating block processor completion conditional");
+ goto fail_cond;
+ }
+
+ size = sizeof(compress_worker_t) + max_block_size;
+
+ for (i = 0; i < num_workers; ++i) {
+ proc->workers[i] = calloc(1, size);
+
+ if (proc->workers[i] == NULL) {
+ perror("Creating block worker data");
+ goto fail_init;
+ }
+
+ proc->workers[i]->shared = proc;
+ proc->workers[i]->cmp = cmp->create_copy(cmp);
+
+ if (proc->workers[i]->cmp == NULL)
+ goto fail_init;
+ }
+
+ for (i = 0; i < num_workers; ++i) {
+ ret = pthread_create(&proc->workers[i]->thread, NULL,
+ worker_proc, proc->workers[i]);
+
+ if (ret != 0) {
+ perror("Creating block processor thread");
+ goto fail_thread;
+ }
+ }
+
+ return proc;
+fail_thread:
+ pthread_mutex_lock(&proc->mtx);
+ proc->terminate = true;
+ pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
+
+ for (i = 0; i < num_workers; ++i) {
+ if (proc->workers[i]->thread > 0) {
+ pthread_join(proc->workers[i]->thread, NULL);
+ }
+ }
+fail_init:
+ for (i = 0; i < num_workers; ++i) {
+ if (proc->workers[i] != NULL) {
+ if (proc->workers[i]->cmp != NULL) {
+ proc->workers[i]->cmp->
+ destroy(proc->workers[i]->cmp);
+ }
+
+ free(proc->workers[i]);
+ }
+ }
+ pthread_cond_destroy(&proc->done_cond);
+fail_cond:
+ pthread_cond_destroy(&proc->queue_cond);
+fail_mtx:
+ pthread_mutex_destroy(&proc->mtx);
+fail_free:
+ free(proc);
+ return NULL;
+}
+
+void block_processor_destroy(block_processor_t *proc)
+{
+ unsigned int i;
+ block_t *blk;
+
+ pthread_mutex_lock(&proc->mtx);
+ proc->terminate = true;
+ pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
+
+ for (i = 0; i < proc->num_workers; ++i) {
+ pthread_join(proc->workers[i]->thread, NULL);
+
+ proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
+ free(proc->workers[i]);
+ }
+
+ pthread_cond_destroy(&proc->done_cond);
+ pthread_cond_destroy(&proc->queue_cond);
+ pthread_mutex_destroy(&proc->mtx);
+
+ while (proc->queue != NULL) {
+ blk = proc->queue;
+ proc->queue = blk->next;
+ free(blk);
+ }
+
+ while (proc->done != NULL) {
+ blk = proc->done;
+ proc->done = blk->next;
+ free(blk);
+ }
+
+ free(proc);
+}
+
+static int process_completed_blocks(block_processor_t *proc, block_t *queue)
+{
+ block_t *it;
+
+ while (queue != NULL) {
+ it = queue;
+ queue = queue->next;
+
+ if (it->flags & BLK_COMPRESS_ERROR) {
+ proc->status = -1;
+ } else {
+ if (proc->cb(proc->user, it))
+ proc->status = -1;
+ }
+
+ free(it);
+ }
+
+ return proc->status;
+}
+
+int block_processor_enqueue(block_processor_t *proc, block_t *block)
+{
+ block_t *queue = NULL, *it, *prev;
+
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+
+ pthread_mutex_lock(&proc->mtx);
+ while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+ if (proc->queue == NULL) {
+ proc->queue = block;
+ } else {
+ for (it = proc->queue; it->next != NULL; it = it->next)
+ ;
+ it->next = block;
+ }
+
+ proc->backlog += 1;
+ it = proc->done;
+ prev = NULL;
+
+ while (it != NULL && it->sequence_number == proc->dequeue_id) {
+ prev = it;
+ it = it->next;
+ proc->dequeue_id += 1;
+ }
+
+ if (prev != NULL) {
+ queue = proc->done;
+ prev->next = NULL;
+ proc->done = it;
+ }
+
+ pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
+
+ return process_completed_blocks(proc, queue);
+}
+
+int block_processor_finish(block_processor_t *proc)
+{
+ block_t *queue, *it;
+
+ pthread_mutex_lock(&proc->mtx);
+ while (proc->backlog > 0)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+ for (it = proc->done; it != NULL; it = it->next) {
+ if (it->sequence_number != proc->dequeue_id++) {
+ pthread_mutex_unlock(&proc->mtx);
+ goto bug_seqnum;
+ }
+ }
+
+ queue = proc->done;
+ proc->done = NULL;
+ pthread_mutex_unlock(&proc->mtx);
+
+ return process_completed_blocks(proc, queue);
+bug_seqnum:
+ fputs("[BUG][parallel block processor] "
+ "gap in sequence numbers!\n", stderr);
+ return -1;
+}
diff --git a/mkfs/Makemodule.am b/mkfs/Makemodule.am
index f8e3e2c..1316c3d 100644
--- a/mkfs/Makemodule.am
+++ b/mkfs/Makemodule.am
@@ -1,9 +1,9 @@
gensquashfs_SOURCES = mkfs/mkfs.c mkfs/mkfs.h mkfs/options.c
gensquashfs_LDADD = libsquashfs.a libfstree.a libcompress.a libutil.a
gensquashfs_CPPFLAGS = $(AM_CPPFLAGS)
+gensquashfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
gensquashfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS)
-gensquashfs_LDADD += $(ZSTD_LIBS)
-
+gensquashfs_LDADD += $(ZSTD_LIBS) $(PTHREAD_LIBS)
if WITH_SELINUX
gensquashfs_CPPFLAGS += -DWITH_SELINUX
gensquashfs_LDADD += $(LIBSELINUX_LIBS)
diff --git a/tar/Makemodule.am b/tar/Makemodule.am
index a10dc01..b8674c2 100644
--- a/tar/Makemodule.am
+++ b/tar/Makemodule.am
@@ -4,6 +4,8 @@ sqfs2tar_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS)
tar2sqfs_SOURCES = tar/tar2sqfs.c
tar2sqfs_LDADD = libsquashfs.a libtar.a libfstree.a libcompress.a libutil.a
+tar2sqfs_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
tar2sqfs_LDADD += $(XZ_LIBS) $(ZLIB_LIBS) $(LZO_LIBS) $(LZ4_LIBS) $(ZSTD_LIBS)
+tar2sqfs_LDADD += $(PTHREAD_LIBS)
bin_PROGRAMS += sqfs2tar tar2sqfs