From 1fe6b2aa4158516f9c6cf5751cc68aafef1af620 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Tue, 10 Sep 2019 13:08:26 +0200 Subject: Make the thread pool queue backlog user configurable Signed-off-by: David Oberhollenzer --- doc/gensquashfs.1 | 5 +++++ doc/tar2sqfs.1 | 5 +++++ include/data_writer.h | 2 +- include/sqfs/block_processor.h | 4 ++++ lib/sqfs/block_processor.c | 2 ++ lib/sqfs/block_processor_parallel.c | 7 ++++--- lib/sqfshelper/data_writer.c | 4 ++-- mkfs/mkfs.c | 2 +- mkfs/mkfs.h | 1 + mkfs/options.c | 17 ++++++++++++++++- tar/tar2sqfs.c | 20 ++++++++++++++++++-- 11 files changed, 59 insertions(+), 10 deletions(-) diff --git a/doc/gensquashfs.1 b/doc/gensquashfs.1 index f2150cf..c364c98 100644 --- a/doc/gensquashfs.1 +++ b/doc/gensquashfs.1 @@ -34,6 +34,11 @@ If gensquashfs was compiled with a built in pthread based parallel data compressor, this option can be used to set the number of compressor threads. If not set, the default is 1. .TP +\fB\-\-queue\-backlog\fR, \fB\-Q\fR +Maximum number of data blocks in the thread worker queue before the packer +starts waiting for the block processors to catch up. Higher values result +in higher memory consumption. Defaults to 10 times the number of workers. +.TP \fB\-\-block\-size\fR, \fB\-b\fR Block size to use for Squashfs image. Defaults to 131072. diff --git a/doc/tar2sqfs.1 b/doc/tar2sqfs.1 index abbde6a..346d7e7 100644 --- a/doc/tar2sqfs.1 +++ b/doc/tar2sqfs.1 @@ -28,6 +28,11 @@ If tar2sqfs was compiled with a built in pthread based parallel data compressor, this option can be used to set the number of compressor threads. If not set, the default is 1. .TP +\fB\-\-queue\-backlog\fR, \fB\-Q\fR +Maximum number of data blocks in the thread worker queue before the packer +starts waiting for the block processors to catch up. Higher values result +in higher memory consumption. Defaults to 10 times the number of workers. +.TP \fB\-\-block\-size\fR, \fB\-b\fR Block size to use for Squashfs image. Defaults to 131072. diff --git a/include/data_writer.h b/include/data_writer.h index 4d86bbf..9e82d11 100644 --- a/include/data_writer.h +++ b/include/data_writer.h @@ -37,7 +37,7 @@ enum { */ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp, sqfs_file_t *file, size_t devblksize, - unsigned int num_jobs); + unsigned int num_jobs, size_t max_backlog); void data_writer_destroy(data_writer_t *data); diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h index ed38be6..1032a35 100644 --- a/include/sqfs/block_processor.h +++ b/include/sqfs/block_processor.h @@ -161,6 +161,9 @@ extern "C" { * the deep copy function of the compressor is used to create * several instances that don't interfere with each other. * @param num_workers The number of worker threads to create. + * @param max_backlog The maximum number of blocks currently in flight. When + * trying to add more, enqueueing blocks until the in-flight + * block count drops below the threshold. * @param user An arbitrary user pointer to pass to the block callback. * @param callback A function to call for each finished data block. * @@ -172,6 +175,7 @@ SQFS_API sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, + size_t max_backlog, void *user, sqfs_block_cb callback); diff --git a/lib/sqfs/block_processor.c b/lib/sqfs/block_processor.c index 3fa37fd..ef71f9e 100644 --- a/lib/sqfs/block_processor.c +++ b/lib/sqfs/block_processor.c @@ -27,11 +27,13 @@ struct sqfs_block_processor_t { sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, + size_t max_backlog, void *user, sqfs_block_cb callback) { sqfs_block_processor_t *proc; (void)num_workers; + (void)max_backlog; proc = alloc_flex(sizeof(*proc), 1, max_block_size); diff --git a/lib/sqfs/block_processor_parallel.c b/lib/sqfs/block_processor_parallel.c index 05c07f1..a63135a 100644 --- a/lib/sqfs/block_processor_parallel.c +++ b/lib/sqfs/block_processor_parallel.c @@ -16,8 +16,6 @@ #include #include -#define MAX_BACKLOG_FACTOR (10) - typedef struct { sqfs_block_processor_t *shared; sqfs_compressor_t *cmp; @@ -46,6 +44,7 @@ struct sqfs_block_processor_t { sqfs_block_cb cb; void *user; int status; + size_t max_backlog; /* used only by workers */ size_t max_block_size; @@ -117,6 +116,7 @@ static void *worker_proc(void *arg) sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_compressor_t *cmp, unsigned int num_workers, + size_t max_backlog, void *user, sqfs_block_cb callback) { @@ -136,6 +136,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, proc->cb = callback; proc->user = user; proc->num_workers = num_workers; + proc->max_backlog = max_backlog; if (pthread_mutex_init(&proc->mtx, NULL)) goto fail_free; @@ -274,7 +275,7 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, (block->flags & SQFS_BLK_DONT_CHECKSUM)) { store_completed_block(proc, block); } else { - while (proc->backlog > proc->num_workers * MAX_BACKLOG_FACTOR) + while (proc->backlog > proc->max_backlog) pthread_cond_wait(&proc->done_cond, &proc->mtx); if (proc->queue_last == NULL) { diff --git a/lib/sqfshelper/data_writer.c b/lib/sqfshelper/data_writer.c index 463eb60..d6a935b 100644 --- a/lib/sqfshelper/data_writer.c +++ b/lib/sqfshelper/data_writer.c @@ -452,7 +452,7 @@ int write_data_from_fd_condensed(data_writer_t *data, file_info_t *fi, data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp, sqfs_file_t *file, size_t devblksize, - unsigned int num_jobs) + unsigned int num_jobs, size_t max_backlog) { data_writer_t *data = calloc(1, sizeof(*data)); @@ -462,7 +462,7 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp, } data->proc = sqfs_block_processor_create(super->block_size, cmp, - num_jobs, data, + num_jobs, max_backlog, data, block_callback); data->cmp = cmp; data->super = super; diff --git a/mkfs/mkfs.c b/mkfs/mkfs.c index 919ec20..fda9831 100644 --- a/mkfs/mkfs.c +++ b/mkfs/mkfs.c @@ -165,7 +165,7 @@ int main(int argc, char **argv) super.flags |= SQFS_FLAG_COMPRESSOR_OPTIONS; data = data_writer_create(&super, cmp, outfile, - opt.devblksz, opt.num_jobs); + opt.devblksz, opt.num_jobs, opt.max_backlog); if (data == NULL) goto out_cmp; diff --git a/mkfs/mkfs.h b/mkfs/mkfs.h index 7ad9460..9d34d02 100644 --- a/mkfs/mkfs.h +++ b/mkfs/mkfs.h @@ -39,6 +39,7 @@ typedef struct { int devblksz; unsigned int dirscan_flags; unsigned int num_jobs; + size_t max_backlog; bool exportable; bool quiet; const char *infile; diff --git a/mkfs/options.c b/mkfs/options.c index 7b83ec4..f15569a 100644 --- a/mkfs/options.c +++ b/mkfs/options.c @@ -15,6 +15,7 @@ static struct option long_opts[] = { { "pack-file", required_argument, NULL, 'F' }, { "pack-dir", required_argument, NULL, 'D' }, { "num-jobs", required_argument, NULL, 'j' }, + { "queue-backlog", required_argument, NULL, 'Q' }, { "keep-time", no_argument, NULL, 'k' }, #ifdef HAVE_SYS_XATTR_H { "keep-xattr", no_argument, NULL, 'x' }, @@ -30,7 +31,7 @@ static struct option long_opts[] = { { "help", no_argument, NULL, 'h' }, }; -static const char *short_opts = "F:D:X:c:b:B:d:j:kxoefqhV" +static const char *short_opts = "F:D:X:c:b:B:d:j:Q:kxoefqhV" #ifdef WITH_SELINUX "s:" #endif @@ -65,6 +66,10 @@ static const char *help_string = " the selected compressor. Specify 'help' to\n" " get a list of available options.\n" " --num-jobs, -j Number of compressor jobs to create.\n" +" --queue-backlog, -Q Maximum number of data blocks in the thread\n" +" worker queue before the packer starts waiting\n" +" for the block processors to catch up.\n" +" Defaults to 10 times the number of jobs.\n" " --block-size, -b Block size to use for Squashfs image.\n" " Defaults to %u.\n" " --dev-block-size, -B Device block size to padd the image to.\n" @@ -151,6 +156,7 @@ void process_command_line(options_t *opt, int argc, char **argv) opt->blksz = SQFS_DEFAULT_BLOCK_SIZE; opt->devblksz = SQFS_DEVBLK_SIZE; opt->num_jobs = 1; + opt->max_backlog = 0; for (;;) { i = getopt_long(argc, argv, short_opts, long_opts, NULL); @@ -181,6 +187,9 @@ void process_command_line(options_t *opt, int argc, char **argv) case 'j': opt->num_jobs = strtol(optarg, NULL, 0); break; + case 'Q': + opt->max_backlog = strtol(optarg, NULL, 0); + break; case 'B': opt->devblksz = strtol(optarg, NULL, 0); if (opt->devblksz < 1024) { @@ -240,6 +249,12 @@ void process_command_line(options_t *opt, int argc, char **argv) } } + if (opt->num_jobs < 1) + opt->num_jobs = 1; + + if (opt->max_backlog < 1) + opt->max_backlog = 10 * opt->num_jobs; + if (opt->comp_extra != NULL && strcmp(opt->comp_extra, "help") == 0) { compressor_print_help(opt->compressor); exit(EXIT_SUCCESS); diff --git a/tar/tar2sqfs.c b/tar/tar2sqfs.c index a6f5224..5f53547 100644 --- a/tar/tar2sqfs.c +++ b/tar/tar2sqfs.c @@ -34,6 +34,7 @@ static struct option long_opts[] = { { "dev-block-size", required_argument, NULL, 'B' }, { "defaults", required_argument, NULL, 'd' }, { "num-jobs", required_argument, NULL, 'j' }, + { "queue-backlog", required_argument, NULL, 'Q' }, { "comp-extra", required_argument, NULL, 'X' }, { "no-skip", no_argument, NULL, 's' }, { "no-xattr", no_argument, NULL, 'x' }, @@ -45,7 +46,7 @@ static struct option long_opts[] = { { "version", no_argument, NULL, 'V' }, }; -static const char *short_opts = "c:b:B:d:X:j:sxekfqhV"; +static const char *short_opts = "c:b:B:d:X:j:Q:sxekfqhV"; static const char *usagestr = "Usage: tar2sqfs [OPTIONS...] \n" @@ -61,6 +62,10 @@ static const char *usagestr = " the selected compressor. Specify 'help' to\n" " get a list of available options.\n" " --num-jobs, -j Number of compressor jobs to create.\n" +" --queue-backlog, -Q Maximum number of data blocks in the thread\n" +" worker queue before the packer starts waiting\n" +" for the block processors to catch up.\n" +" Defaults to 10 times the number of jobs.\n" " --block-size, -b Block size to use for Squashfs image.\n" " Defaults to %u.\n" " --dev-block-size, -B Device block size to padd the image to.\n" @@ -98,6 +103,7 @@ static size_t devblksize = SQFS_DEVBLK_SIZE; static bool quiet = false; static int outmode = 0; static unsigned int num_jobs = 1; +static size_t max_backlog = 0; static E_SQFS_COMPRESSOR comp_id; static char *comp_extra = NULL; static char *fs_defaults = NULL; @@ -148,6 +154,9 @@ static void process_args(int argc, char **argv) case 'j': num_jobs = strtol(optarg, NULL, 0); break; + case 'Q': + max_backlog = strtol(optarg, NULL, 0); + break; case 'X': comp_extra = optarg; break; @@ -185,6 +194,12 @@ static void process_args(int argc, char **argv) } } + if (num_jobs < 1) + num_jobs = 1; + + if (max_backlog < 1) + max_backlog = 10 * num_jobs; + if (comp_extra != NULL && strcmp(comp_extra, "help") == 0) { compressor_print_help(comp_id); exit(EXIT_SUCCESS); @@ -401,7 +416,8 @@ int main(int argc, char **argv) if (ret > 0) super.flags |= SQFS_FLAG_COMPRESSOR_OPTIONS; - data = data_writer_create(&super, cmp, outfile, devblksize, num_jobs); + data = data_writer_create(&super, cmp, outfile, devblksize, + num_jobs, max_backlog); if (data == NULL) goto out_cmp; -- cgit v1.2.3