aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-25 06:03:36 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-25 06:35:01 +0200
commit3da78a56df43360520f8007bdb4e11fa25f712cc (patch)
treef1dfd2ca166909ac3a9c37d8ec5196b11a548bb0
parentb6400d20cc64afff22d2805c58dc04f2234d38a5 (diff)
Refactor out common code of the block processors
Code that already is shared between the pthread and the serial processor as well as code that can be re-used by other threading API implementations. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/Makemodule.am1
-rw-r--r--lib/sqfs/blk_proc/common.c158
-rw-r--r--lib/sqfs/blk_proc/fragtbl.c28
-rw-r--r--lib/sqfs/blk_proc/internal.h26
-rw-r--r--lib/sqfs/blk_proc/process_block.c31
-rw-r--r--lib/sqfs/blk_proc/pthread.c110
-rw-r--r--lib/sqfs/blk_proc/serial.c39
7 files changed, 208 insertions, 185 deletions
diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am
index 28f5a93..7a02fa4 100644
--- a/lib/sqfs/Makemodule.am
+++ b/lib/sqfs/Makemodule.am
@@ -21,6 +21,7 @@ libsquashfs_la_SOURCES += lib/sqfs/inode.c lib/sqfs/blk_proc/fragtbl.c
libsquashfs_la_SOURCES += lib/sqfs/blk_proc/process_block.c lib/sqfs/io.c
libsquashfs_la_SOURCES += lib/sqfs/blk_proc/internal.h lib/sqfs/data_reader.c
libsquashfs_la_SOURCES += lib/sqfs/blk_proc/deduplicate.c
+libsquashfs_la_SOURCES += lib/sqfs/blk_proc/common.c
libsquashfs_la_CPPFLAGS = $(AM_CPPFLAGS)
libsquashfs_la_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS) $(ZLIB_CFLAGS)
libsquashfs_la_CFLAGS += $(XZ_CFLAGS) $(LZO_CFLAGS) $(LZ4_CFLAGS)
diff --git a/lib/sqfs/blk_proc/common.c b/lib/sqfs/blk_proc/common.c
new file mode 100644
index 0000000..a3c91eb
--- /dev/null
+++ b/lib/sqfs/blk_proc/common.c
@@ -0,0 +1,158 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * common.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+void free_blk_list(sqfs_block_t *list)
+{
+ sqfs_block_t *it;
+
+ while (list != NULL) {
+ it = list;
+ list = list->next;
+ free(it);
+ }
+}
+
+int block_processor_init(sqfs_block_processor_t *proc, size_t max_block_size,
+ sqfs_compressor_t *cmp, unsigned int num_workers,
+ size_t max_backlog, size_t devblksz,
+ sqfs_file_t *file)
+{
+ proc->max_block_size = max_block_size;
+ proc->num_workers = num_workers;
+ proc->max_backlog = max_backlog;
+ proc->devblksz = devblksz;
+ proc->cmp = cmp;
+ proc->file = file;
+ proc->max_blocks = INIT_BLOCK_COUNT;
+ proc->frag_list_max = INIT_BLOCK_COUNT;
+
+ proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
+ if (proc->blocks == NULL)
+ return -1;
+
+ proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
+ proc->frag_list_max);
+ if (proc->frag_list == NULL)
+ return -1;
+
+ return 0;
+}
+
+void block_processor_cleanup(sqfs_block_processor_t *proc)
+{
+ free_blk_list(proc->queue);
+ free_blk_list(proc->done);
+ free(proc->frag_block);
+ free(proc->frag_list);
+ free(proc->fragments);
+ free(proc->blocks);
+ free(proc);
+}
+
+void block_processor_store_done(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk, int status)
+{
+ sqfs_block_t *it = proc->done, *prev = NULL;
+
+ while (it != NULL) {
+ if (it->sequence_number >= blk->sequence_number)
+ break;
+ prev = it;
+ it = it->next;
+ }
+
+ if (prev == NULL) {
+ blk->next = proc->done;
+ proc->done = blk;
+ } else {
+ blk->next = prev->next;
+ prev->next = blk;
+ }
+
+ if (status != 0 && proc->status == 0)
+ proc->status = status;
+
+ proc->backlog -= 1;
+}
+
+sqfs_block_t *block_processor_next_work_item(sqfs_block_processor_t *proc)
+{
+ sqfs_block_t *blk;
+
+ if (proc->status != 0)
+ return NULL;
+
+ blk = proc->queue;
+ proc->queue = blk->next;
+ blk->next = NULL;
+
+ if (proc->queue == NULL)
+ proc->queue_last = NULL;
+
+ return blk;
+}
+
+int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+ uint8_t *scratch, size_t scratch_size)
+{
+ ssize_t ret;
+
+ if (block->size == 0) {
+ block->checksum = 0;
+ return 0;
+ }
+
+ block->checksum = crc32(0, block->data, block->size);
+
+ if (block->flags & SQFS_BLK_IS_FRAGMENT)
+ return 0;
+
+ if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
+ ret = cmp->do_block(cmp, block->data, block->size,
+ scratch, scratch_size);
+ if (ret < 0)
+ return ret;
+
+ if (ret > 0) {
+ memcpy(block->data, scratch, ret);
+ block->size = ret;
+ block->flags |= SQFS_BLK_IS_COMPRESSED;
+ }
+ }
+
+ return 0;
+}
+
+int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc,
+ sqfs_super_t *super)
+{
+ uint64_t start;
+ size_t size;
+ int ret;
+
+ if (proc->num_fragments == 0) {
+ super->fragment_entry_count = 0;
+ super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL;
+ super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS;
+ super->flags |= SQFS_FLAG_NO_FRAGMENTS;
+ return 0;
+ }
+
+ size = sizeof(proc->fragments[0]) * proc->num_fragments;
+ ret = sqfs_write_table(proc->file, proc->cmp,
+ proc->fragments, size, &start);
+ if (ret)
+ return ret;
+
+ super->flags &= ~SQFS_FLAG_NO_FRAGMENTS;
+ super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS;
+ super->fragment_entry_count = proc->num_fragments;
+ super->fragment_table_start = start;
+ return 0;
+}
diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c
index 84f5ed6..c8a3397 100644
--- a/lib/sqfs/blk_proc/fragtbl.c
+++ b/lib/sqfs/blk_proc/fragtbl.c
@@ -7,34 +7,6 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc,
- sqfs_super_t *super)
-{
- uint64_t start;
- size_t size;
- int ret;
-
- if (proc->num_fragments == 0) {
- super->fragment_entry_count = 0;
- super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL;
- super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS;
- super->flags |= SQFS_FLAG_NO_FRAGMENTS;
- return 0;
- }
-
- size = sizeof(proc->fragments[0]) * proc->num_fragments;
- ret = sqfs_write_table(proc->file, proc->cmp,
- proc->fragments, size, &start);
- if (ret)
- return ret;
-
- super->flags &= ~SQFS_FLAG_NO_FRAGMENTS;
- super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS;
- super->fragment_entry_count = proc->num_fragments;
- super->fragment_table_start = start;
- return 0;
-}
-
static int grow_fragment_table(sqfs_block_processor_t *proc)
{
size_t newsz;
diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h
index 2e9980b..5fdbc3e 100644
--- a/lib/sqfs/blk_proc/internal.h
+++ b/lib/sqfs/blk_proc/internal.h
@@ -64,7 +64,6 @@ struct sqfs_block_processor_t {
sqfs_block_t *queue_last;
sqfs_block_t *done;
- bool terminate;
size_t backlog;
int status;
@@ -105,10 +104,6 @@ struct sqfs_block_processor_t {
#endif
};
-SQFS_INTERNAL
-int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
- uint8_t *scratch, size_t scratch_size);
-
SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc,
sqfs_block_t *block);
@@ -123,4 +118,25 @@ SQFS_INTERNAL int store_block_location(sqfs_block_processor_t *proc,
uint64_t offset, uint32_t size,
uint32_t chksum);
+SQFS_INTERNAL void free_blk_list(sqfs_block_t *list);
+
+SQFS_INTERNAL
+int block_processor_init(sqfs_block_processor_t *proc, size_t max_block_size,
+ sqfs_compressor_t *cmp, unsigned int num_workers,
+ size_t max_backlog, size_t devblksz,
+ sqfs_file_t *file);
+
+SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *proc);
+
+SQFS_INTERNAL
+void block_processor_store_done(sqfs_block_processor_t *proc,
+ sqfs_block_t *blk, int status);
+
+SQFS_INTERNAL
+sqfs_block_t *block_processor_next_work_item(sqfs_block_processor_t *proc);
+
+SQFS_INTERNAL
+int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+ uint8_t *scratch, size_t scratch_size);
+
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index f154a14..2951406 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -9,37 +9,6 @@
#include <string.h>
-int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
- uint8_t *scratch, size_t scratch_size)
-{
- ssize_t ret;
-
- if (block->size == 0) {
- block->checksum = 0;
- return 0;
- }
-
- block->checksum = crc32(0, block->data, block->size);
-
- if (block->flags & SQFS_BLK_IS_FRAGMENT)
- return 0;
-
- if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
- ret = cmp->do_block(cmp, block->data, block->size,
- scratch, scratch_size);
- if (ret < 0)
- return ret;
-
- if (ret > 0) {
- memcpy(block->data, scratch, ret);
- block->size = ret;
- block->flags |= SQFS_BLK_IS_COMPRESSED;
- }
- }
-
- return 0;
-}
-
static int allign_file(sqfs_block_processor_t *proc, sqfs_block_t *blk)
{
if (!(blk->flags & SQFS_BLK_ALLIGN))
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index 1c1088a..57b61c1 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -7,65 +7,6 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static void free_blk_list(sqfs_block_t *list)
-{
- sqfs_block_t *it;
-
- while (list != NULL) {
- it = list;
- list = list->next;
- free(it);
- }
-}
-
-static void store_completed_block(sqfs_block_processor_t *proc,
- sqfs_block_t *blk, int status)
-{
- sqfs_block_t *it = proc->done, *prev = NULL;
-
- while (it != NULL) {
- if (it->sequence_number >= blk->sequence_number)
- break;
- prev = it;
- it = it->next;
- }
-
- if (prev == NULL) {
- blk->next = proc->done;
- proc->done = blk;
- } else {
- blk->next = prev->next;
- prev->next = blk;
- }
-
- if (status != 0 && proc->status == 0)
- proc->status = status;
-
- proc->backlog -= 1;
- pthread_cond_broadcast(&proc->done_cond);
-}
-
-static sqfs_block_t *get_next_work_item(sqfs_block_processor_t *proc)
-{
- sqfs_block_t *blk;
-
- while (proc->queue == NULL) {
- if (proc->terminate || proc->status != 0)
- return NULL;
-
- pthread_cond_wait(&proc->queue_cond, &proc->mtx);
- }
-
- blk = proc->queue;
- proc->queue = blk->next;
- blk->next = NULL;
-
- if (proc->queue == NULL)
- proc->queue_last = NULL;
-
- return blk;
-}
-
static void *worker_proc(void *arg)
{
compress_worker_t *worker = arg;
@@ -75,17 +16,23 @@ static void *worker_proc(void *arg)
for (;;) {
pthread_mutex_lock(&shared->mtx);
- if (blk != NULL)
- store_completed_block(shared, blk, status);
+ if (blk != NULL) {
+ block_processor_store_done(shared, blk, status);
+ pthread_cond_broadcast(&shared->done_cond);
+ }
+
+ while (shared->queue == NULL && shared->status == 0)
+ pthread_cond_wait(&shared->queue_cond, &shared->mtx);
- blk = get_next_work_item(shared);
+ blk = block_processor_next_work_item(shared);
pthread_mutex_unlock(&shared->mtx);
if (blk == NULL)
break;
- status = sqfs_block_process(blk, worker->cmp, worker->scratch,
- shared->max_block_size);
+ status = block_processor_do_block(blk, worker->cmp,
+ worker->scratch,
+ shared->max_block_size);
}
return NULL;
}
@@ -109,26 +56,14 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
- proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->devblksz = devblksz;
- proc->cmp = cmp;
- proc->file = file;
- proc->max_blocks = INIT_BLOCK_COUNT;
- proc->frag_list_max = INIT_BLOCK_COUNT;
-
- proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
- if (proc->blocks == NULL)
- goto fail_init;
- proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
- proc->frag_list_max);
- if (proc->frag_list == NULL)
+ if (block_processor_init(proc, max_block_size, cmp, num_workers,
+ max_backlog, devblksz, file)) {
goto fail_init;
+ }
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
@@ -155,7 +90,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
return proc;
fail_thread:
pthread_mutex_lock(&proc->mtx);
- proc->terminate = true;
+ proc->status = -1;
pthread_cond_broadcast(&proc->queue_cond);
pthread_mutex_unlock(&proc->mtx);
@@ -178,10 +113,7 @@ fail_init:
pthread_cond_destroy(&proc->done_cond);
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free(proc->frag_list);
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ block_processor_cleanup(proc);
return NULL;
}
@@ -190,7 +122,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
unsigned int i;
pthread_mutex_lock(&proc->mtx);
- proc->terminate = true;
+ proc->status = -1;
pthread_cond_broadcast(&proc->queue_cond);
pthread_mutex_unlock(&proc->mtx);
@@ -205,13 +137,7 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
- free_blk_list(proc->queue);
- free_blk_list(proc->done);
- free(proc->frag_block);
- free(proc->frag_list);
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ block_processor_cleanup(proc);
}
static void append_to_work_queue(sqfs_block_processor_t *proc,
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index c7ec366..67272a3 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -15,32 +15,15 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_file_t *file)
{
sqfs_block_processor_t *proc;
- (void)num_workers;
- (void)max_backlog;
proc = alloc_flex(sizeof(*proc), 1, max_block_size);
if (proc == NULL)
return NULL;
- proc->max_block_size = max_block_size;
- proc->cmp = cmp;
- proc->devblksz = devblksz;
- proc->file = file;
- proc->max_blocks = INIT_BLOCK_COUNT;
- proc->frag_list_max = INIT_BLOCK_COUNT;
-
- proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
- if (proc->blocks == NULL) {
- free(proc);
- return NULL;
- }
-
- proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
- proc->frag_list_max);
- if (proc->frag_list == NULL) {
- free(proc->blocks);
- free(proc);
+ if (block_processor_init(proc, max_block_size, cmp, num_workers,
+ max_backlog, devblksz, file)) {
+ block_processor_cleanup(proc);
return NULL;
}
@@ -49,11 +32,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
- free(proc->frag_block);
- free(proc->frag_list);
- free(proc->fragments);
- free(proc->blocks);
- free(proc);
+ block_processor_cleanup(proc);
}
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
@@ -89,8 +68,9 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
block = fragblk;
}
- proc->status = sqfs_block_process(block, proc->cmp, proc->scratch,
- proc->max_block_size);
+ proc->status = block_processor_do_block(block, proc->cmp,
+ proc->scratch,
+ proc->max_block_size);
if (proc->status == 0)
proc->status = process_completed_block(proc, block);
@@ -104,8 +84,9 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
if (proc->status != 0 || proc->frag_block == NULL)
return proc->status;
- proc->status = sqfs_block_process(proc->frag_block, proc->cmp,
- proc->scratch, proc->max_block_size);
+ proc->status = block_processor_do_block(proc->frag_block, proc->cmp,
+ proc->scratch,
+ proc->max_block_size);
if (proc->status == 0)
proc->status = process_completed_block(proc, proc->frag_block);