aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-10 13:08:26 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-10 13:08:26 +0200
commit1fe6b2aa4158516f9c6cf5751cc68aafef1af620 (patch)
tree2047e37b6101f35c5dd55c59b885be22fe570407
parent49c2c4a8c8a8eb32a7d5fdbf4b1eba24bb23efe7 (diff)
Make the thread pool queue backlog user configurable
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--doc/gensquashfs.15
-rw-r--r--doc/tar2sqfs.15
-rw-r--r--include/data_writer.h2
-rw-r--r--include/sqfs/block_processor.h4
-rw-r--r--lib/sqfs/block_processor.c2
-rw-r--r--lib/sqfs/block_processor_parallel.c7
-rw-r--r--lib/sqfshelper/data_writer.c4
-rw-r--r--mkfs/mkfs.c2
-rw-r--r--mkfs/mkfs.h1
-rw-r--r--mkfs/options.c17
-rw-r--r--tar/tar2sqfs.c20
11 files changed, 59 insertions, 10 deletions
diff --git a/doc/gensquashfs.1 b/doc/gensquashfs.1
index f2150cf..c364c98 100644
--- a/doc/gensquashfs.1
+++ b/doc/gensquashfs.1
@@ -34,6 +34,11 @@ If gensquashfs was compiled with a built in pthread based parallel data
compressor, this option can be used to set the number of compressor
threads. If not set, the default is 1.
.TP
+\fB\-\-queue\-backlog\fR, \fB\-Q\fR <count>
+Maximum number of data blocks in the thread worker queue before the packer
+starts waiting for the block processors to catch up. Higher values result
+in higher memory consumption. Defaults to 10 times the number of workers.
+.TP
\fB\-\-block\-size\fR, \fB\-b\fR <size>
Block size to use for Squashfs image.
Defaults to 131072.
diff --git a/doc/tar2sqfs.1 b/doc/tar2sqfs.1
index abbde6a..346d7e7 100644
--- a/doc/tar2sqfs.1
+++ b/doc/tar2sqfs.1
@@ -28,6 +28,11 @@ If tar2sqfs was compiled with a built in pthread based parallel data
compressor, this option can be used to set the number of compressor
threads. If not set, the default is 1.
.TP
+\fB\-\-queue\-backlog\fR, \fB\-Q\fR <count>
+Maximum number of data blocks in the thread worker queue before the packer
+starts waiting for the block processors to catch up. Higher values result
+in higher memory consumption. Defaults to 10 times the number of workers.
+.TP
\fB\-\-block\-size\fR, \fB\-b\fR <size>
Block size to use for Squashfs image.
Defaults to 131072.
diff --git a/include/data_writer.h b/include/data_writer.h
index 4d86bbf..9e82d11 100644
--- a/include/data_writer.h
+++ b/include/data_writer.h
@@ -37,7 +37,7 @@ enum {
*/
data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
sqfs_file_t *file, size_t devblksize,
- unsigned int num_jobs);
+ unsigned int num_jobs, size_t max_backlog);
void data_writer_destroy(data_writer_t *data);
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h
index ed38be6..1032a35 100644
--- a/include/sqfs/block_processor.h
+++ b/include/sqfs/block_processor.h
@@ -161,6 +161,9 @@ extern "C" {
* the deep copy function of the compressor is used to create
* several instances that don't interfere with each other.
* @param num_workers The number of worker threads to create.
+ * @param max_backlog The maximum number of blocks currently in flight. When
+ * trying to add more, enqueueing blocks until the in-flight
+ * block count drops below the threshold.
* @param user An arbitrary user pointer to pass to the block callback.
* @param callback A function to call for each finished data block.
*
@@ -172,6 +175,7 @@ SQFS_API
sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
+ size_t max_backlog,
void *user,
sqfs_block_cb callback);
diff --git a/lib/sqfs/block_processor.c b/lib/sqfs/block_processor.c
index 3fa37fd..ef71f9e 100644
--- a/lib/sqfs/block_processor.c
+++ b/lib/sqfs/block_processor.c
@@ -27,11 +27,13 @@ struct sqfs_block_processor_t {
sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
+ size_t max_backlog,
void *user,
sqfs_block_cb callback)
{
sqfs_block_processor_t *proc;
(void)num_workers;
+ (void)max_backlog;
proc = alloc_flex(sizeof(*proc), 1, max_block_size);
diff --git a/lib/sqfs/block_processor_parallel.c b/lib/sqfs/block_processor_parallel.c
index 05c07f1..a63135a 100644
--- a/lib/sqfs/block_processor_parallel.c
+++ b/lib/sqfs/block_processor_parallel.c
@@ -16,8 +16,6 @@
#include <string.h>
#include <stdlib.h>
-#define MAX_BACKLOG_FACTOR (10)
-
typedef struct {
sqfs_block_processor_t *shared;
sqfs_compressor_t *cmp;
@@ -46,6 +44,7 @@ struct sqfs_block_processor_t {
sqfs_block_cb cb;
void *user;
int status;
+ size_t max_backlog;
/* used only by workers */
size_t max_block_size;
@@ -117,6 +116,7 @@ static void *worker_proc(void *arg)
sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
+ size_t max_backlog,
void *user,
sqfs_block_cb callback)
{
@@ -136,6 +136,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->cb = callback;
proc->user = user;
proc->num_workers = num_workers;
+ proc->max_backlog = max_backlog;
if (pthread_mutex_init(&proc->mtx, NULL))
goto fail_free;
@@ -274,7 +275,7 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
(block->flags & SQFS_BLK_DONT_CHECKSUM)) {
store_completed_block(proc, block);
} else {
- while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR)
+ while (proc->backlog > proc->max_backlog)
pthread_cond_wait(&proc->done_cond, &proc->mtx);
if (proc->queue_last == NULL) {
diff --git a/lib/sqfshelper/data_writer.c b/lib/sqfshelper/data_writer.c
index 463eb60..d6a935b 100644
--- a/lib/sqfshelper/data_writer.c
+++ b/lib/sqfshelper/data_writer.c
@@ -452,7 +452,7 @@ int write_data_from_fd_condensed(data_writer_t *data, file_info_t *fi,
data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
sqfs_file_t *file, size_t devblksize,
- unsigned int num_jobs)
+ unsigned int num_jobs, size_t max_backlog)
{
data_writer_t *data = calloc(1, sizeof(*data));
@@ -462,7 +462,7 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
}
data->proc = sqfs_block_processor_create(super->block_size, cmp,
- num_jobs, data,
+ num_jobs, max_backlog, data,
block_callback);
data->cmp = cmp;
data->super = super;
diff --git a/mkfs/mkfs.c b/mkfs/mkfs.c
index 919ec20..fda9831 100644
--- a/mkfs/mkfs.c
+++ b/mkfs/mkfs.c
@@ -165,7 +165,7 @@ int main(int argc, char **argv)
super.flags |= SQFS_FLAG_COMPRESSOR_OPTIONS;
data = data_writer_create(&super, cmp, outfile,
- opt.devblksz, opt.num_jobs);
+ opt.devblksz, opt.num_jobs, opt.max_backlog);
if (data == NULL)
goto out_cmp;
diff --git a/mkfs/mkfs.h b/mkfs/mkfs.h
index 7ad9460..9d34d02 100644
--- a/mkfs/mkfs.h
+++ b/mkfs/mkfs.h
@@ -39,6 +39,7 @@ typedef struct {
int devblksz;
unsigned int dirscan_flags;
unsigned int num_jobs;
+ size_t max_backlog;
bool exportable;
bool quiet;
const char *infile;
diff --git a/mkfs/options.c b/mkfs/options.c
index 7b83ec4..f15569a 100644
--- a/mkfs/options.c
+++ b/mkfs/options.c
@@ -15,6 +15,7 @@ static struct option long_opts[] = {
{ "pack-file", required_argument, NULL, 'F' },
{ "pack-dir", required_argument, NULL, 'D' },
{ "num-jobs", required_argument, NULL, 'j' },
+ { "queue-backlog", required_argument, NULL, 'Q' },
{ "keep-time", no_argument, NULL, 'k' },
#ifdef HAVE_SYS_XATTR_H
{ "keep-xattr", no_argument, NULL, 'x' },
@@ -30,7 +31,7 @@ static struct option long_opts[] = {
{ "help", no_argument, NULL, 'h' },
};
-static const char *short_opts = "F:D:X:c:b:B:d:j:kxoefqhV"
+static const char *short_opts = "F:D:X:c:b:B:d:j:Q:kxoefqhV"
#ifdef WITH_SELINUX
"s:"
#endif
@@ -65,6 +66,10 @@ static const char *help_string =
" the selected compressor. Specify 'help' to\n"
" get a list of available options.\n"
" --num-jobs, -j <count> Number of compressor jobs to create.\n"
+" --queue-backlog, -Q <count> Maximum number of data blocks in the thread\n"
+" worker queue before the packer starts waiting\n"
+" for the block processors to catch up.\n"
+" Defaults to 10 times the number of jobs.\n"
" --block-size, -b <size> Block size to use for Squashfs image.\n"
" Defaults to %u.\n"
" --dev-block-size, -B <size> Device block size to padd the image to.\n"
@@ -151,6 +156,7 @@ void process_command_line(options_t *opt, int argc, char **argv)
opt->blksz = SQFS_DEFAULT_BLOCK_SIZE;
opt->devblksz = SQFS_DEVBLK_SIZE;
opt->num_jobs = 1;
+ opt->max_backlog = 0;
for (;;) {
i = getopt_long(argc, argv, short_opts, long_opts, NULL);
@@ -181,6 +187,9 @@ void process_command_line(options_t *opt, int argc, char **argv)
case 'j':
opt->num_jobs = strtol(optarg, NULL, 0);
break;
+ case 'Q':
+ opt->max_backlog = strtol(optarg, NULL, 0);
+ break;
case 'B':
opt->devblksz = strtol(optarg, NULL, 0);
if (opt->devblksz < 1024) {
@@ -240,6 +249,12 @@ void process_command_line(options_t *opt, int argc, char **argv)
}
}
+ if (opt->num_jobs < 1)
+ opt->num_jobs = 1;
+
+ if (opt->max_backlog < 1)
+ opt->max_backlog = 10 * opt->num_jobs;
+
if (opt->comp_extra != NULL && strcmp(opt->comp_extra, "help") == 0) {
compressor_print_help(opt->compressor);
exit(EXIT_SUCCESS);
diff --git a/tar/tar2sqfs.c b/tar/tar2sqfs.c
index a6f5224..5f53547 100644
--- a/tar/tar2sqfs.c
+++ b/tar/tar2sqfs.c
@@ -34,6 +34,7 @@ static struct option long_opts[] = {
{ "dev-block-size", required_argument, NULL, 'B' },
{ "defaults", required_argument, NULL, 'd' },
{ "num-jobs", required_argument, NULL, 'j' },
+ { "queue-backlog", required_argument, NULL, 'Q' },
{ "comp-extra", required_argument, NULL, 'X' },
{ "no-skip", no_argument, NULL, 's' },
{ "no-xattr", no_argument, NULL, 'x' },
@@ -45,7 +46,7 @@ static struct option long_opts[] = {
{ "version", no_argument, NULL, 'V' },
};
-static const char *short_opts = "c:b:B:d:X:j:sxekfqhV";
+static const char *short_opts = "c:b:B:d:X:j:Q:sxekfqhV";
static const char *usagestr =
"Usage: tar2sqfs [OPTIONS...] <sqfsfile>\n"
@@ -61,6 +62,10 @@ static const char *usagestr =
" the selected compressor. Specify 'help' to\n"
" get a list of available options.\n"
" --num-jobs, -j <count> Number of compressor jobs to create.\n"
+" --queue-backlog, -Q <count> Maximum number of data blocks in the thread\n"
+" worker queue before the packer starts waiting\n"
+" for the block processors to catch up.\n"
+" Defaults to 10 times the number of jobs.\n"
" --block-size, -b <size> Block size to use for Squashfs image.\n"
" Defaults to %u.\n"
" --dev-block-size, -B <size> Device block size to padd the image to.\n"
@@ -98,6 +103,7 @@ static size_t devblksize = SQFS_DEVBLK_SIZE;
static bool quiet = false;
static int outmode = 0;
static unsigned int num_jobs = 1;
+static size_t max_backlog = 0;
static E_SQFS_COMPRESSOR comp_id;
static char *comp_extra = NULL;
static char *fs_defaults = NULL;
@@ -148,6 +154,9 @@ static void process_args(int argc, char **argv)
case 'j':
num_jobs = strtol(optarg, NULL, 0);
break;
+ case 'Q':
+ max_backlog = strtol(optarg, NULL, 0);
+ break;
case 'X':
comp_extra = optarg;
break;
@@ -185,6 +194,12 @@ static void process_args(int argc, char **argv)
}
}
+ if (num_jobs < 1)
+ num_jobs = 1;
+
+ if (max_backlog < 1)
+ max_backlog = 10 * num_jobs;
+
if (comp_extra != NULL && strcmp(comp_extra, "help") == 0) {
compressor_print_help(comp_id);
exit(EXIT_SUCCESS);
@@ -401,7 +416,8 @@ int main(int argc, char **argv)
if (ret > 0)
super.flags |= SQFS_FLAG_COMPRESSOR_OPTIONS;
- data = data_writer_create(&super, cmp, outfile, devblksize, num_jobs);
+ data = data_writer_create(&super, cmp, outfile, devblksize,
+ num_jobs, max_backlog);
if (data == NULL)
goto out_cmp;