summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-23 15:28:25 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-23 15:35:36 +0200
commitb86cbb04c9cc72506783e175e871338b1f0e6750 (patch)
treedcb4f92b6962aa35cacb2db09219fd4dfbaf3d60
parent8ee4a30a0f3545dac5b9a93b40c172f4fb4d44a1 (diff)
Move the bulk of the work from the data writer to the block processor
Instead of calling a callback, the block processor now takes care of writing the data blocks to the file in the correct order, keeping track of fragment blocks and deduplicating blocks. Some cleanup work remains to be done and the statistics have to be repaired (yet again). Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--include/sqfs/block_processor.h46
-rw-r--r--lib/sqfs/Makemodule.am2
-rw-r--r--lib/sqfs/blk_proc/fragtbl.c36
-rw-r--r--lib/sqfs/blk_proc/internal.h37
-rw-r--r--lib/sqfs/blk_proc/process_block.c151
-rw-r--r--lib/sqfs/blk_proc/pthread.c14
-rw-r--r--lib/sqfs/blk_proc/serial.c16
-rw-r--r--lib/sqfshelper/data_writer.c217
8 files changed, 275 insertions, 244 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h
index 5e55908..562c4d2 100644
--- a/include/sqfs/block_processor.h
+++ b/include/sqfs/block_processor.h
@@ -151,23 +151,6 @@ struct sqfs_block_t {
uint8_t data[];
};
-/**
- * @brief Signature of a callback function that can is called for each block.
- *
- * Gets called for each processed block. May be called from a different thread
- * than the one that calls enqueue or from the same thread, but only from one
- * thread at a time.
- *
- * Guaranteed to be called on blocks in the order that they are submitted
- * to enqueue.
- *
- * @param user The user pointer passed to @ref sqfs_block_processor_create.
- * @param blk The finished block.
- *
- * @return A non-zero return value is interpreted as fatal error.
- */
-typedef int (*sqfs_block_cb)(void *user, sqfs_block_t *blk);
-
#ifdef __cplusplus
extern "C" {
#endif
@@ -186,8 +169,9 @@ extern "C" {
* @param max_backlog The maximum number of blocks currently in flight. When
* trying to add more, enqueueing blocks until the in-flight
* block count drops below the threshold.
- * @param user An arbitrary user pointer to pass to the block callback.
- * @param callback A function to call for each finished data block.
+ * @param devblksz The device block size to allign files to if they have the
+ * apropriate flag set.
+ * @param file The output file to write the finished blocks to.
*
* @return A pointer to a block processor object on success, NULL on allocation
* failure or on failure to create the worker threads and
@@ -198,8 +182,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
size_t max_backlog,
- void *user,
- sqfs_block_cb callback);
+ size_t devblksz,
+ sqfs_file_t *file);
/**
* @brief Destroy a block processor and free all memory used by it.
@@ -244,6 +228,26 @@ SQFS_API int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
*/
SQFS_API int sqfs_block_processor_finish(sqfs_block_processor_t *proc);
+/**
+ * @brief Write the completed fragment table to disk.
+ *
+ * @memberof sqfs_block_processor_t
+ *
+ * Call this after producing the inode and directory table to generate
+ * the fragment table for the squashfs image.
+ *
+ * @param proc A pointer to a block processor object.
+ * @param super A pointer to a super block to write information about the
+ * fragment table to.
+ *
+ * @return Zero on success, an @ref E_SQFS_ERROR value on failure. The failure
+ * return value can either be an error encountered during enqueueing,
+ * processing or a failure return status from the block callback.
+ */
+SQFS_API
+int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc,
+ sqfs_super_t *super);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/sqfs/Makemodule.am b/lib/sqfs/Makemodule.am
index 44b5531..90c83cc 100644
--- a/lib/sqfs/Makemodule.am
+++ b/lib/sqfs/Makemodule.am
@@ -17,7 +17,7 @@ libsquashfs_la_SOURCES += lib/sqfs/dir_writer.c lib/sqfs/xattr_reader.c
libsquashfs_la_SOURCES += lib/sqfs/read_table.c lib/sqfs/comp/compressor.c
libsquashfs_la_SOURCES += lib/sqfs/io_stdin.c lib/sqfs/comp/internal.h
libsquashfs_la_SOURCES += lib/sqfs/dir_reader.c lib/sqfs/read_tree.c
-libsquashfs_la_SOURCES += lib/sqfs/inode.c
+libsquashfs_la_SOURCES += lib/sqfs/inode.c lib/sqfs/blk_proc/fragtbl.c
libsquashfs_la_SOURCES += lib/sqfs/blk_proc/process_block.c lib/sqfs/io.c
libsquashfs_la_SOURCES += lib/sqfs/blk_proc/internal.h lib/sqfs/data_reader.c
libsquashfs_la_CPPFLAGS = $(AM_CPPFLAGS)
diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c
new file mode 100644
index 0000000..81baf09
--- /dev/null
+++ b/lib/sqfs/blk_proc/fragtbl.c
@@ -0,0 +1,36 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * fragtbl.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc,
+ sqfs_super_t *super)
+{
+ uint64_t start;
+ size_t size;
+ int ret;
+
+ if (proc->num_fragments == 0) {
+ super->fragment_entry_count = 0;
+ super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL;
+ super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS;
+ super->flags |= SQFS_FLAG_NO_FRAGMENTS;
+ return 0;
+ }
+
+ size = sizeof(proc->fragments[0]) * proc->num_fragments;
+ ret = sqfs_write_table(proc->file, proc->cmp,
+ proc->fragments, size, &start);
+ if (ret)
+ return ret;
+
+ super->flags &= ~SQFS_FLAG_NO_FRAGMENTS;
+ super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS;
+ super->fragment_entry_count = proc->num_fragments;
+ super->fragment_table_start = start;
+ return 0;
+}
diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h
index 85c7783..b5af751 100644
--- a/lib/sqfs/blk_proc/internal.h
+++ b/lib/sqfs/blk_proc/internal.h
@@ -6,7 +6,11 @@
#include "sqfs/block_processor.h"
#include "sqfs/compress.h"
+#include "sqfs/inode.h"
+#include "sqfs/table.h"
#include "sqfs/error.h"
+#include "sqfs/data.h"
+#include "sqfs/io.h"
#include "util.h"
#include <string.h>
@@ -16,6 +20,21 @@
#include <pthread.h>
#endif
+
+#define MK_BLK_SIG(chksum, size) \
+ (((uint64_t)(size) << 32) | (uint64_t)(chksum))
+
+#define BLK_SIZE(sig) ((sig) >> 32)
+
+#define INIT_BLOCK_COUNT (128)
+
+
+typedef struct {
+ uint64_t offset;
+ uint64_t signature;
+} blk_info_t;
+
+
#ifdef WITH_PTHREAD
typedef struct {
sqfs_block_processor_t *shared;
@@ -46,18 +65,30 @@ struct sqfs_block_processor_t {
uint32_t dequeue_id;
unsigned int num_workers;
- sqfs_block_cb cb;
- void *user;
int status;
size_t max_backlog;
+ size_t devblksz;
+ sqfs_file_t *file;
+
+ sqfs_fragment_t *fragments;
+ size_t num_fragments;
+ size_t max_fragments;
+
+ uint64_t start;
+
+ size_t file_start;
+ size_t num_blocks;
+ size_t max_blocks;
+ blk_info_t *blocks;
+ sqfs_compressor_t *cmp;
+
/* used only by workers */
size_t max_block_size;
#ifdef WITH_PTHREAD
compress_worker_t *workers[];
#else
- sqfs_compressor_t *cmp;
uint8_t scratch[];
#endif
};
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index 1b6ee29..0747bc5 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -37,6 +37,155 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
return 0;
}
+static int allign_file(sqfs_block_processor_t *proc, sqfs_block_t *blk)
+{
+ if (!(blk->flags & SQFS_BLK_ALLIGN))
+ return 0;
+
+ return padd_sqfs(proc->file, proc->file->get_size(proc->file),
+ proc->devblksz);
+}
+
+static int store_block_location(sqfs_block_processor_t *proc, uint64_t offset,
+ uint32_t size, uint32_t chksum)
+{
+ size_t new_sz;
+ void *new;
+
+ if (proc->num_blocks == proc->max_blocks) {
+ new_sz = proc->max_blocks * 2;
+ new = realloc(proc->blocks, sizeof(proc->blocks[0]) * new_sz);
+
+ if (new == NULL)
+ return SQFS_ERROR_ALLOC;
+
+ proc->blocks = new;
+ proc->max_blocks = new_sz;
+ }
+
+ proc->blocks[proc->num_blocks].offset = offset;
+ proc->blocks[proc->num_blocks].signature = MK_BLK_SIG(chksum, size);
+ proc->num_blocks += 1;
+ return 0;
+}
+
+static size_t deduplicate_blocks(sqfs_block_processor_t *proc, size_t count)
+{
+ size_t i, j;
+
+ for (i = 0; i < proc->file_start; ++i) {
+ for (j = 0; j < count; ++j) {
+ if (proc->blocks[i + j].signature !=
+ proc->blocks[proc->file_start + j].signature)
+ break;
+ }
+
+ if (j == count)
+ break;
+ }
+
+ return i;
+}
+
+static size_t grow_fragment_table(sqfs_block_processor_t *proc, size_t index)
+{
+ size_t newsz;
+ void *new;
+
+ if (index < proc->max_fragments)
+ return 0;
+
+ do {
+ newsz = proc->max_fragments ? proc->max_fragments * 2 : 16;
+ } while (index >= newsz);
+
+ new = realloc(proc->fragments, sizeof(proc->fragments[0]) * newsz);
+
+ if (new == NULL)
+ return SQFS_ERROR_ALLOC;
+
+ proc->max_fragments = newsz;
+ proc->fragments = new;
+ return 0;
+}
+
+static int handle_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
+{
+ size_t start, count;
+ uint64_t offset;
+ uint32_t out;
+ int err;
+
+ if (blk->flags & SQFS_BLK_FIRST_BLOCK) {
+ proc->start = proc->file->get_size(proc->file);
+ proc->file_start = proc->num_blocks;
+
+ err = allign_file(proc, blk);
+ if (err)
+ return err;
+ }
+
+ if (blk->size != 0) {
+ out = blk->size;
+ if (!(blk->flags & SQFS_BLK_IS_COMPRESSED))
+ out |= 1 << 24;
+
+ offset = proc->file->get_size(proc->file);
+
+ if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) {
+ if (grow_fragment_table(proc, blk->index))
+ return 0;
+
+ offset = htole64(offset);
+ proc->fragments[blk->index].start_offset = offset;
+ proc->fragments[blk->index].pad0 = 0;
+ proc->fragments[blk->index].size = htole32(out);
+
+ if (blk->index >= proc->num_fragments)
+ proc->num_fragments = blk->index + 1;
+ } else {
+ blk->inode->block_sizes[blk->index] = out;
+ }
+
+ err = store_block_location(proc, offset, out, blk->checksum);
+ if (err)
+ return err;
+
+ err = proc->file->write_at(proc->file, offset,
+ blk->data, blk->size);
+ if (err)
+ return err;
+ }
+
+ if (blk->flags & SQFS_BLK_LAST_BLOCK) {
+ err = allign_file(proc, blk);
+ if (err)
+ return err;
+
+ count = proc->num_blocks - proc->file_start;
+ start = deduplicate_blocks(proc, count);
+ offset = proc->blocks[start].offset;
+
+ sqfs_inode_set_file_block_start(blk->inode, offset);
+
+ if (start < proc->file_start) {
+ offset = start + count;
+
+ if (offset >= proc->file_start) {
+ proc->num_blocks = offset;
+ } else {
+ proc->num_blocks = proc->file_start;
+ }
+
+ err = proc->file->truncate(proc->file, proc->start);
+ if (err)
+ return err;
+ }
+ }
+
+ return 0;
+}
+
int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue)
{
sqfs_block_t *it;
@@ -48,7 +197,7 @@ int process_completed_blocks(sqfs_block_processor_t *proc, sqfs_block_t *queue)
if (it->flags & SQFS_BLK_COMPRESS_ERROR) {
proc->status = SQFS_ERROR_COMRPESSOR;
} else if (proc->status == 0) {
- proc->status = proc->cb(proc->user, it);
+ proc->status = handle_block(proc, it);
}
free(it);
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index 565bad2..a9cffd0 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -70,8 +70,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
size_t max_backlog,
- void *user,
- sqfs_block_cb callback)
+ size_t devblksz,
+ sqfs_file_t *file)
{
sqfs_block_processor_t *proc;
unsigned int i;
@@ -86,13 +86,19 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
return NULL;
proc->max_block_size = max_block_size;
- proc->cb = callback;
- proc->user = user;
proc->num_workers = num_workers;
proc->max_backlog = max_backlog;
proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ proc->devblksz = devblksz;
+ proc->cmp = cmp;
+ proc->file = file;
+ proc->max_blocks = INIT_BLOCK_COUNT;
+
+ proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
+ if (proc->blocks == NULL)
+ goto fail_init;
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index ee172de..b6c17fb 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -11,8 +11,8 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_compressor_t *cmp,
unsigned int num_workers,
size_t max_backlog,
- void *user,
- sqfs_block_cb callback)
+ size_t devblksz,
+ sqfs_file_t *file)
{
sqfs_block_processor_t *proc;
(void)num_workers;
@@ -25,8 +25,16 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->max_block_size = max_block_size;
proc->cmp = cmp;
- proc->cb = callback;
- proc->user = user;
+ proc->devblksz = devblksz;
+ proc->file = file;
+ proc->max_blocks = INIT_BLOCK_COUNT;
+
+ proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
+ if (proc->blocks == NULL) {
+ free(proc);
+ return NULL;
+ }
+
return proc;
}
diff --git a/lib/sqfshelper/data_writer.c b/lib/sqfshelper/data_writer.c
index 461cb55..f5e22f3 100644
--- a/lib/sqfshelper/data_writer.c
+++ b/lib/sqfshelper/data_writer.c
@@ -26,11 +26,6 @@
#define INIT_BLOCK_COUNT (128)
typedef struct {
- uint64_t offset;
- uint64_t signature;
-} blk_info_t;
-
-typedef struct {
uint32_t index;
uint32_t offset;
uint64_t signature;
@@ -38,22 +33,11 @@ typedef struct {
struct data_writer_t {
sqfs_block_t *frag_block;
- sqfs_fragment_t *fragments;
size_t num_fragments;
- size_t max_fragments;
-
- size_t devblksz;
- uint64_t start;
sqfs_block_processor_t *proc;
sqfs_compressor_t *cmp;
sqfs_super_t *super;
- sqfs_file_t *file;
-
- size_t file_start;
- size_t num_blocks;
- size_t max_blocks;
- blk_info_t *blocks;
size_t frag_list_num;
size_t frag_list_max;
@@ -62,163 +46,6 @@ struct data_writer_t {
data_writer_stats_t stats;
};
-static int allign_file(data_writer_t *data)
-{
- return padd_sqfs(data->file, data->file->get_size(data->file),
- data->devblksz);
-}
-
-static int store_block_location(data_writer_t *data, uint64_t offset,
- uint32_t size, uint32_t chksum)
-{
- size_t new_sz;
- void *new;
-
- if (data->num_blocks == data->max_blocks) {
- new_sz = data->max_blocks * 2;
- new = realloc(data->blocks, sizeof(data->blocks[0]) * new_sz);
-
- if (new == NULL) {
- perror("growing data block checksum table");
- return -1;
- }
-
- data->blocks = new;
- data->max_blocks = new_sz;
- }
-
- data->blocks[data->num_blocks].offset = offset;
- data->blocks[data->num_blocks].signature = MK_BLK_SIG(chksum, size);
- data->num_blocks += 1;
- return 0;
-}
-
-static size_t deduplicate_blocks(data_writer_t *data, size_t count)
-{
- size_t i, j;
-
- for (i = 0; i < data->file_start; ++i) {
- for (j = 0; j < count; ++j) {
- if (data->blocks[i + j].signature !=
- data->blocks[data->file_start + j].signature)
- break;
- }
-
- if (j == count)
- break;
- }
-
- return i;
-}
-
-static size_t grow_fragment_table(data_writer_t *data, size_t index)
-{
- size_t newsz;
- void *new;
-
- if (index < data->max_fragments)
- return 0;
-
- do {
- newsz = data->max_fragments ? data->max_fragments * 2 : 16;
- } while (index >= newsz);
-
- new = realloc(data->fragments, sizeof(data->fragments[0]) * newsz);
-
- if (new == NULL) {
- perror("appending to fragment table");
- return -1;
- }
-
- data->max_fragments = newsz;
- data->fragments = new;
- return 0;
-}
-
-static int block_callback(void *user, sqfs_block_t *blk)
-{
- data_writer_t *data = user;
- size_t start, count;
- uint64_t offset;
- uint32_t out;
-
- if (blk->flags & SQFS_BLK_FIRST_BLOCK) {
- data->start = data->file->get_size(data->file);
- data->file_start = data->num_blocks;
-
- if ((blk->flags & SQFS_BLK_ALLIGN) && allign_file(data) != 0)
- return -1;
- }
-
- if (blk->size != 0) {
- out = blk->size;
- if (!(blk->flags & SQFS_BLK_IS_COMPRESSED))
- out |= 1 << 24;
-
- offset = data->file->get_size(data->file);
-
- if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) {
- if (grow_fragment_table(data, blk->index))
- return 0;
-
- data->fragments[blk->index].start_offset = htole64(offset);
- data->fragments[blk->index].pad0 = 0;
- data->fragments[blk->index].size = htole32(out);
-
- data->super->flags &= ~SQFS_FLAG_NO_FRAGMENTS;
- data->super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS;
-
- data->stats.frag_blocks_written += 1;
- } else {
- blk->inode->block_sizes[blk->index] = htole32(out);
-
- data->stats.blocks_written += 1;
- }
-
- if (store_block_location(data, offset, out, blk->checksum))
- return -1;
-
- if (data->file->write_at(data->file, offset,
- blk->data, blk->size)) {
- return -1;
- }
- }
-
- if (blk->flags & SQFS_BLK_LAST_BLOCK) {
- if ((blk->flags & SQFS_BLK_ALLIGN) && allign_file(data) != 0)
- return -1;
-
- count = data->num_blocks - data->file_start;
- start = deduplicate_blocks(data, count);
- offset = data->blocks[start].offset;
-
- sqfs_inode_set_file_block_start(blk->inode, offset);
-
- if (start < data->file_start) {
- offset = start + count;
-
- if (offset >= data->file_start) {
- data->num_blocks = offset;
- data->stats.duplicate_blocks +=
- offset - data->num_blocks;
- } else {
- data->num_blocks = data->file_start;
- data->stats.duplicate_blocks += count;
- }
-
- if (data->file->truncate(data->file, data->start)) {
- perror("truncating squashfs image after "
- "file deduplication");
- return -1;
- }
- }
- }
-
- return 0;
-}
-
-/*****************************************************************************/
-
static int flush_fragment_block(data_writer_t *data)
{
int ret;
@@ -435,74 +262,44 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
return NULL;
}
- data->max_blocks = INIT_BLOCK_COUNT;
data->frag_list_max = INIT_BLOCK_COUNT;
-
- data->blocks = alloc_array(sizeof(data->blocks[0]),
- data->max_blocks);
-
- if (data->blocks == NULL) {
- perror("creating data writer");
- free(data);
- return NULL;
- }
-
data->frag_list = alloc_array(sizeof(data->frag_list[0]),
data->frag_list_max);
-
if (data->frag_list == NULL) {
perror("creating data writer");
- free(data->blocks);
free(data);
return NULL;
}
data->proc = sqfs_block_processor_create(super->block_size, cmp,
- num_jobs, max_backlog, data,
- block_callback);
+ num_jobs, max_backlog,
+ devblksize, file);
if (data->proc == NULL) {
perror("creating data block processor");
free(data->frag_list);
- free(data->blocks);
free(data);
return NULL;
}
data->cmp = cmp;
data->super = super;
- data->file = file;
- data->devblksz = devblksize;
return data;
}
void data_writer_destroy(data_writer_t *data)
{
sqfs_block_processor_destroy(data->proc);
- free(data->fragments);
- free(data->blocks);
+ free(data->frag_list);
free(data);
}
int data_writer_write_fragment_table(data_writer_t *data)
{
- uint64_t start;
- size_t size;
- int ret;
-
- if (data->num_fragments == 0) {
- data->super->fragment_entry_count = 0;
- data->super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL;
- return 0;
- }
-
- size = sizeof(data->fragments[0]) * data->num_fragments;
- ret = sqfs_write_table(data->file, data->cmp,
- data->fragments, size, &start);
- if (ret)
+ if (sqfs_block_processor_write_fragment_table(data->proc,
+ data->super)) {
+ fputs("error storing fragment table\n", stderr);
return -1;
-
- data->super->fragment_entry_count = data->num_fragments;
- data->super->fragment_table_start = start;
+ }
return 0;
}