From 3da78a56df43360520f8007bdb4e11fa25f712cc Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Wed, 25 Sep 2019 06:03:36 +0200 Subject: Refactor out common code of the block processors Code that already is shared between the pthread and the serial processor as well as code that can be re-used by other threading API implementations. Signed-off-by: David Oberhollenzer --- lib/sqfs/Makemodule.am | 1 + lib/sqfs/blk_proc/common.c | 158 ++++++++++++++++++++++++++++++++++++++ lib/sqfs/blk_proc/fragtbl.c | 28 ------- lib/sqfs/blk_proc/internal.h | 26 +++++-- lib/sqfs/blk_proc/process_block.c | 31 -------- lib/sqfs/blk_proc/pthread.c | 110 +++++--------------------- lib/sqfs/blk_proc/serial.c | 39 +++------- 7 files changed, 208 insertions(+), 185 deletions(-) create mode 100644 lib/sqfs/blk_proc/common.c diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am index 28f5a93..7a02fa4 100644 --- a/lib/sqfs/Makemodule.am +++ b/lib/sqfs/Makemodule.am @@ -21,6 +21,7 @@ libsquashfs_la_SOURCES += lib/sqfs/inode.c lib/sqfs/blk_proc/fragtbl.c libsquashfs_la_SOURCES += lib/sqfs/blk_proc/process_block.c lib/sqfs/io.c libsquashfs_la_SOURCES += lib/sqfs/blk_proc/internal.h lib/sqfs/data_reader.c libsquashfs_la_SOURCES += lib/sqfs/blk_proc/deduplicate.c +libsquashfs_la_SOURCES += lib/sqfs/blk_proc/common.c libsquashfs_la_CPPFLAGS = $(AM_CPPFLAGS) libsquashfs_la_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) $(ZLIB_CFLAGS) libsquashfs_la_CFLAGS += $(XZ_CFLAGS) $(LZO_CFLAGS) $(LZ4_CFLAGS) diff --git a/lib/sqfs/blk_proc/common.c b/lib/sqfs/blk_proc/common.c new file mode 100644 index 0000000..a3c91eb --- /dev/null +++ b/lib/sqfs/blk_proc/common.c @@ -0,0 +1,158 @@ +/* SPDX-License-Identifier: LGPL-3.0-or-later */ +/* + * common.c + * + * Copyright (C) 2019 David Oberhollenzer + */ +#define SQFS_BUILDING_DLL +#include "internal.h" + +void free_blk_list(sqfs_block_t *list) +{ + sqfs_block_t *it; + + while (list != NULL) { + it = list; + list = list->next; + free(it); + } +} + +int block_processor_init(sqfs_block_processor_t *proc, size_t max_block_size, + sqfs_compressor_t *cmp, unsigned int num_workers, + size_t max_backlog, size_t devblksz, + sqfs_file_t *file) +{ + proc->max_block_size = max_block_size; + proc->num_workers = num_workers; + proc->max_backlog = max_backlog; + proc->devblksz = devblksz; + proc->cmp = cmp; + proc->file = file; + proc->max_blocks = INIT_BLOCK_COUNT; + proc->frag_list_max = INIT_BLOCK_COUNT; + + proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); + if (proc->blocks == NULL) + return -1; + + proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), + proc->frag_list_max); + if (proc->frag_list == NULL) + return -1; + + return 0; +} + +void block_processor_cleanup(sqfs_block_processor_t *proc) +{ + free_blk_list(proc->queue); + free_blk_list(proc->done); + free(proc->frag_block); + free(proc->frag_list); + free(proc->fragments); + free(proc->blocks); + free(proc); +} + +void block_processor_store_done(sqfs_block_processor_t *proc, + sqfs_block_t *blk, int status) +{ + sqfs_block_t *it = proc->done, *prev = NULL; + + while (it != NULL) { + if (it->sequence_number >= blk->sequence_number) + break; + prev = it; + it = it->next; + } + + if (prev == NULL) { + blk->next = proc->done; + proc->done = blk; + } else { + blk->next = prev->next; + prev->next = blk; + } + + if (status != 0 && proc->status == 0) + proc->status = status; + + proc->backlog -= 1; +} + +sqfs_block_t *block_processor_next_work_item(sqfs_block_processor_t *proc) +{ + sqfs_block_t *blk; + + if (proc->status != 0) + return NULL; + + blk = proc->queue; + proc->queue = blk->next; + blk->next = NULL; + + if (proc->queue == NULL) + proc->queue_last = NULL; + + return blk; +} + +int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, + uint8_t *scratch, size_t scratch_size) +{ + ssize_t ret; + + if (block->size == 0) { + block->checksum = 0; + return 0; + } + + block->checksum = crc32(0, block->data, block->size); + + if (block->flags & SQFS_BLK_IS_FRAGMENT) + return 0; + + if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) { + ret = cmp->do_block(cmp, block->data, block->size, + scratch, scratch_size); + if (ret < 0) + return ret; + + if (ret > 0) { + memcpy(block->data, scratch, ret); + block->size = ret; + block->flags |= SQFS_BLK_IS_COMPRESSED; + } + } + + return 0; +} + +int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc, + sqfs_super_t *super) +{ + uint64_t start; + size_t size; + int ret; + + if (proc->num_fragments == 0) { + super->fragment_entry_count = 0; + super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL; + super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS; + super->flags |= SQFS_FLAG_NO_FRAGMENTS; + return 0; + } + + size = sizeof(proc->fragments[0]) * proc->num_fragments; + ret = sqfs_write_table(proc->file, proc->cmp, + proc->fragments, size, &start); + if (ret) + return ret; + + super->flags &= ~SQFS_FLAG_NO_FRAGMENTS; + super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS; + super->fragment_entry_count = proc->num_fragments; + super->fragment_table_start = start; + return 0; +} diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c index 84f5ed6..c8a3397 100644 --- a/lib/sqfs/blk_proc/fragtbl.c +++ b/lib/sqfs/blk_proc/fragtbl.c @@ -7,34 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc, - sqfs_super_t *super) -{ - uint64_t start; - size_t size; - int ret; - - if (proc->num_fragments == 0) { - super->fragment_entry_count = 0; - super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL; - super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS; - super->flags |= SQFS_FLAG_NO_FRAGMENTS; - return 0; - } - - size = sizeof(proc->fragments[0]) * proc->num_fragments; - ret = sqfs_write_table(proc->file, proc->cmp, - proc->fragments, size, &start); - if (ret) - return ret; - - super->flags &= ~SQFS_FLAG_NO_FRAGMENTS; - super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS; - super->fragment_entry_count = proc->num_fragments; - super->fragment_table_start = start; - return 0; -} - static int grow_fragment_table(sqfs_block_processor_t *proc) { size_t newsz; diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h index 2e9980b..5fdbc3e 100644 --- a/lib/sqfs/blk_proc/internal.h +++ b/lib/sqfs/blk_proc/internal.h @@ -64,7 +64,6 @@ struct sqfs_block_processor_t { sqfs_block_t *queue_last; sqfs_block_t *done; - bool terminate; size_t backlog; int status; @@ -105,10 +104,6 @@ struct sqfs_block_processor_t { #endif }; -SQFS_INTERNAL -int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, - uint8_t *scratch, size_t scratch_size); - SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *block); @@ -123,4 +118,25 @@ SQFS_INTERNAL int store_block_location(sqfs_block_processor_t *proc, uint64_t offset, uint32_t size, uint32_t chksum); +SQFS_INTERNAL void free_blk_list(sqfs_block_t *list); + +SQFS_INTERNAL +int block_processor_init(sqfs_block_processor_t *proc, size_t max_block_size, + sqfs_compressor_t *cmp, unsigned int num_workers, + size_t max_backlog, size_t devblksz, + sqfs_file_t *file); + +SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *proc); + +SQFS_INTERNAL +void block_processor_store_done(sqfs_block_processor_t *proc, + sqfs_block_t *blk, int status); + +SQFS_INTERNAL +sqfs_block_t *block_processor_next_work_item(sqfs_block_processor_t *proc); + +SQFS_INTERNAL +int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp, + uint8_t *scratch, size_t scratch_size); + #endif /* INTERNAL_H */ diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c index f154a14..2951406 100644 --- a/lib/sqfs/blk_proc/process_block.c +++ b/lib/sqfs/blk_proc/process_block.c @@ -9,37 +9,6 @@ #include -int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp, - uint8_t *scratch, size_t scratch_size) -{ - ssize_t ret; - - if (block->size == 0) { - block->checksum = 0; - return 0; - } - - block->checksum = crc32(0, block->data, block->size); - - if (block->flags & SQFS_BLK_IS_FRAGMENT) - return 0; - - if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) { - ret = cmp->do_block(cmp, block->data, block->size, - scratch, scratch_size); - if (ret < 0) - return ret; - - if (ret > 0) { - memcpy(block->data, scratch, ret); - block->size = ret; - block->flags |= SQFS_BLK_IS_COMPRESSED; - } - } - - return 0; -} - static int allign_file(sqfs_block_processor_t *proc, sqfs_block_t *blk) { if (!(blk->flags & SQFS_BLK_ALLIGN)) diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c index 1c1088a..57b61c1 100644 --- a/lib/sqfs/blk_proc/pthread.c +++ b/lib/sqfs/blk_proc/pthread.c @@ -7,65 +7,6 @@ #define SQFS_BUILDING_DLL #include "internal.h" -static void free_blk_list(sqfs_block_t *list) -{ - sqfs_block_t *it; - - while (list != NULL) { - it = list; - list = list->next; - free(it); - } -} - -static void store_completed_block(sqfs_block_processor_t *proc, - sqfs_block_t *blk, int status) -{ - sqfs_block_t *it = proc->done, *prev = NULL; - - while (it != NULL) { - if (it->sequence_number >= blk->sequence_number) - break; - prev = it; - it = it->next; - } - - if (prev == NULL) { - blk->next = proc->done; - proc->done = blk; - } else { - blk->next = prev->next; - prev->next = blk; - } - - if (status != 0 && proc->status == 0) - proc->status = status; - - proc->backlog -= 1; - pthread_cond_broadcast(&proc->done_cond); -} - -static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc) -{ - sqfs_block_t *blk; - - while (proc->queue == NULL) { - if (proc->terminate || proc->status != 0) - return NULL; - - pthread_cond_wait(&proc->queue_cond, &proc->mtx); - } - - blk = proc->queue; - proc->queue = blk->next; - blk->next = NULL; - - if (proc->queue == NULL) - proc->queue_last = NULL; - - return blk; -} - static void *worker_proc(void *arg) { compress_worker_t *worker = arg; @@ -75,17 +16,23 @@ static void *worker_proc(void *arg) for (;;) { pthread_mutex_lock(&shared->mtx); - if (blk != NULL) - store_completed_block(shared, blk, status); + if (blk != NULL) { + block_processor_store_done(shared, blk, status); + pthread_cond_broadcast(&shared->done_cond); + } + + while (shared->queue == NULL && shared->status == 0) + pthread_cond_wait(&shared->queue_cond, &shared->mtx); - blk = get_next_work_item(shared); + blk = block_processor_next_work_item(shared); pthread_mutex_unlock(&shared->mtx); if (blk == NULL) break; - status = sqfs_block_process(blk, worker->cmp, worker->scratch, - shared->max_block_size); + status = block_processor_do_block(blk, worker->cmp, + worker->scratch, + shared->max_block_size); } return NULL; } @@ -109,26 +56,14 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; - proc->num_workers = num_workers; - proc->max_backlog = max_backlog; proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; - proc->devblksz = devblksz; - proc->cmp = cmp; - proc->file = file; - proc->max_blocks = INIT_BLOCK_COUNT; - proc->frag_list_max = INIT_BLOCK_COUNT; - - proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); - if (proc->blocks == NULL) - goto fail_init; - proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), - proc->frag_list_max); - if (proc->frag_list == NULL) + if (block_processor_init(proc, max_block_size, cmp, num_workers, + max_backlog, devblksz, file)) { goto fail_init; + } for (i = 0; i < num_workers; ++i) { proc->workers[i] = alloc_flex(sizeof(compress_worker_t), @@ -155,7 +90,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, return proc; fail_thread: pthread_mutex_lock(&proc->mtx); - proc->terminate = true; + proc->status = -1; pthread_cond_broadcast(&proc->queue_cond); pthread_mutex_unlock(&proc->mtx); @@ -178,10 +113,7 @@ fail_init: pthread_cond_destroy(&proc->done_cond); pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - free(proc->frag_list); - free(proc->fragments); - free(proc->blocks); - free(proc); + block_processor_cleanup(proc); return NULL; } @@ -190,7 +122,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) unsigned int i; pthread_mutex_lock(&proc->mtx); - proc->terminate = true; + proc->status = -1; pthread_cond_broadcast(&proc->queue_cond); pthread_mutex_unlock(&proc->mtx); @@ -205,13 +137,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) pthread_cond_destroy(&proc->queue_cond); pthread_mutex_destroy(&proc->mtx); - free_blk_list(proc->queue); - free_blk_list(proc->done); - free(proc->frag_block); - free(proc->frag_list); - free(proc->fragments); - free(proc->blocks); - free(proc); + block_processor_cleanup(proc); } static void append_to_work_queue(sqfs_block_processor_t *proc, diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c index c7ec366..67272a3 100644 --- a/lib/sqfs/blk_proc/serial.c +++ b/lib/sqfs/blk_proc/serial.c @@ -15,32 +15,15 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, sqfs_file_t *file) { sqfs_block_processor_t *proc; - (void)num_workers; - (void)max_backlog; proc = alloc_flex(sizeof(*proc), 1, max_block_size); if (proc == NULL) return NULL; - proc->max_block_size = max_block_size; - proc->cmp = cmp; - proc->devblksz = devblksz; - proc->file = file; - proc->max_blocks = INIT_BLOCK_COUNT; - proc->frag_list_max = INIT_BLOCK_COUNT; - - proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks); - if (proc->blocks == NULL) { - free(proc); - return NULL; - } - - proc->frag_list = alloc_array(sizeof(proc->frag_list[0]), - proc->frag_list_max); - if (proc->frag_list == NULL) { - free(proc->blocks); - free(proc); + if (block_processor_init(proc, max_block_size, cmp, num_workers, + max_backlog, devblksz, file)) { + block_processor_cleanup(proc); return NULL; } @@ -49,11 +32,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size, void sqfs_block_processor_destroy(sqfs_block_processor_t *proc) { - free(proc->frag_block); - free(proc->frag_list); - free(proc->fragments); - free(proc->blocks); - free(proc); + block_processor_cleanup(proc); } int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, @@ -89,8 +68,9 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc, block = fragblk; } - proc->status = sqfs_block_process(block, proc->cmp, proc->scratch, - proc->max_block_size); + proc->status = block_processor_do_block(block, proc->cmp, + proc->scratch, + proc->max_block_size); if (proc->status == 0) proc->status = process_completed_block(proc, block); @@ -104,8 +84,9 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc) if (proc->status != 0 || proc->frag_block == NULL) return proc->status; - proc->status = sqfs_block_process(proc->frag_block, proc->cmp, - proc->scratch, proc->max_block_size); + proc->status = block_processor_do_block(proc->frag_block, proc->cmp, + proc->scratch, + proc->max_block_size); if (proc->status == 0) proc->status = process_completed_block(proc, proc->frag_block); -- cgit v1.2.3