summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 03:45:30 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 04:01:10 +0200
commit8d9f24a65ef27a52615b3225776632de08462eba (patch)
tree76d64437a98e94b2749cbb2f48117f9d1b4af947
parent035cbdfe1bc5aea16cbcada5e3c00ecf5ac08c96 (diff)
Move entire fragment processing from data writer to block processor
So far, this is mostly a direct port from the block processor. The actual fragment checksumming is not done through the thread pool. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--include/sqfs/block_processor.h13
-rw-r--r--lib/sqfs/blk_proc/fragtbl.c110
-rw-r--r--lib/sqfs/blk_proc/internal.h17
-rw-r--r--lib/sqfs/blk_proc/process_block.c7
-rw-r--r--lib/sqfs/blk_proc/pthread.c52
-rw-r--r--lib/sqfs/blk_proc/serial.c41
-rw-r--r--lib/sqfshelper/data_writer.c134
7 files changed, 211 insertions, 163 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h
index 5b7b3e0..f0f7145 100644
--- a/include/sqfs/block_processor.h
+++ b/include/sqfs/block_processor.h
@@ -77,11 +77,16 @@ typedef enum {
SQFS_BLK_ALLIGN = 0x0008,
/**
- * @brief Indicates that a block is not part of a file but contains
- * file tail ends and an entry in the fragment table has to be
- * added.
+ * @brief Indicates that a block is a tail end of a file and the block
+ * processor should take care of fragment packing and accounting.
*/
- SQFS_BLK_FRAGMENT_BLOCK = 0x0010,
+ SQFS_BLK_IS_FRAGMENT = 0x0010,
+
+ /**
+ * @brief Set by the block processor on fragment blocks that
+ * it generates.
+ */
+ SQFS_BLK_FRAGMENT_BLOCK = 0x4000,
/**
* @brief Set by compressor worker if the block was actually compressed.
diff --git a/lib/sqfs/blk_proc/fragtbl.c b/lib/sqfs/blk_proc/fragtbl.c
index 3f6c644..84f5ed6 100644
--- a/lib/sqfs/blk_proc/fragtbl.c
+++ b/lib/sqfs/blk_proc/fragtbl.c
@@ -35,24 +35,112 @@ int sqfs_block_processor_write_fragment_table(sqfs_block_processor_t *proc,
return 0;
}
-int grow_fragment_table(sqfs_block_processor_t *proc, size_t index)
+static int grow_fragment_table(sqfs_block_processor_t *proc)
{
size_t newsz;
void *new;
- if (index < proc->max_fragments)
- return 0;
-
- do {
+ if (proc->num_fragments >= proc->max_fragments) {
newsz = proc->max_fragments ? proc->max_fragments * 2 : 16;
- } while (index >= newsz);
- new = realloc(proc->fragments, sizeof(proc->fragments[0]) * newsz);
+ new = realloc(proc->fragments,
+ sizeof(proc->fragments[0]) * newsz);
+
+ if (new == NULL)
+ return SQFS_ERROR_ALLOC;
+
+ proc->max_fragments = newsz;
+ proc->fragments = new;
+ }
+
+ return 0;
+}
+
+static int store_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
+ uint64_t signature)
+{
+ size_t new_sz;
+ void *new;
+
+ if (proc->frag_list_num == proc->frag_list_max) {
+ new_sz = proc->frag_list_max * 2;
+ new = realloc(proc->frag_list,
+ sizeof(proc->frag_list[0]) * new_sz);
+
+ if (new == NULL)
+ return SQFS_ERROR_ALLOC;
+
+ proc->frag_list = new;
+ proc->frag_list_max = new_sz;
+ }
+
+ proc->frag_list[proc->frag_list_num].index = proc->frag_block->index;
+ proc->frag_list[proc->frag_list_num].offset = proc->frag_block->size;
+ proc->frag_list[proc->frag_list_num].signature = signature;
+ proc->frag_list_num += 1;
+
+ sqfs_inode_set_frag_location(frag->inode, proc->frag_block->index,
+ proc->frag_block->size);
+
+ memcpy(proc->frag_block->data + proc->frag_block->size,
+ frag->data, frag->size);
+
+ proc->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS);
+ proc->frag_block->size += frag->size;
+ return 0;
+}
+
+int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
+ sqfs_block_t **blk_out)
+{
+ uint64_t signature;
+ size_t i, size;
+ int err;
+
+ signature = MK_BLK_SIG(frag->checksum, frag->size);
+
+ for (i = 0; i < proc->frag_list_num; ++i) {
+ if (proc->frag_list[i].signature == signature) {
+ sqfs_inode_set_frag_location(frag->inode,
+ proc->frag_list[i].index,
+ proc->frag_list[i].offset);
+ return 0;
+ }
+ }
+
+ if (proc->frag_block != NULL) {
+ size = proc->frag_block->size + frag->size;
+
+ if (size > proc->max_block_size) {
+ *blk_out = proc->frag_block;
+ proc->frag_block = NULL;
+ }
+ }
+
+ if (proc->frag_block == NULL) {
+ size = sizeof(sqfs_block_t) + proc->max_block_size;
+
+ err = grow_fragment_table(proc);
+ if (err)
+ goto fail;
+
+ proc->frag_block = calloc(1, size);
+ if (proc->frag_block == NULL) {
+ err = SQFS_ERROR_ALLOC;
+ goto fail;
+ }
+
+ proc->frag_block->index = proc->num_fragments++;
+ proc->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK;
+ }
- if (new == NULL)
- return SQFS_ERROR_ALLOC;
+ err = store_fragment(proc, frag, signature);
+ if (err)
+ goto fail;
- proc->max_fragments = newsz;
- proc->fragments = new;
return 0;
+fail:
+ free(*blk_out);
+ *blk_out = NULL;
+ return err;
}
diff --git a/lib/sqfs/blk_proc/internal.h b/lib/sqfs/blk_proc/internal.h
index 90a9d91..2e9980b 100644
--- a/lib/sqfs/blk_proc/internal.h
+++ b/lib/sqfs/blk_proc/internal.h
@@ -15,6 +15,7 @@
#include <string.h>
#include <stdlib.h>
+#include <zlib.h>
#ifdef WITH_PTHREAD
#include <pthread.h>
@@ -34,6 +35,12 @@ typedef struct {
uint64_t signature;
} blk_info_t;
+typedef struct {
+ uint32_t index;
+ uint32_t offset;
+ uint64_t signature;
+} frag_info_t;
+
#ifdef WITH_PTHREAD
typedef struct {
@@ -83,6 +90,11 @@ struct sqfs_block_processor_t {
blk_info_t *blocks;
sqfs_compressor_t *cmp;
+ sqfs_block_t *frag_block;
+ frag_info_t *frag_list;
+ size_t frag_list_num;
+ size_t frag_list_max;
+
/* used only by workers */
size_t max_block_size;
@@ -100,8 +112,9 @@ int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
SQFS_INTERNAL int process_completed_block(sqfs_block_processor_t *proc,
sqfs_block_t *block);
-SQFS_INTERNAL int grow_fragment_table(sqfs_block_processor_t *proc,
- size_t index);
+SQFS_INTERNAL
+int handle_fragment(sqfs_block_processor_t *proc, sqfs_block_t *frag,
+ sqfs_block_t **blk_out);
SQFS_INTERNAL size_t deduplicate_blocks(sqfs_block_processor_t *proc,
size_t count);
diff --git a/lib/sqfs/blk_proc/process_block.c b/lib/sqfs/blk_proc/process_block.c
index 78de31b..b4ed904 100644
--- a/lib/sqfs/blk_proc/process_block.c
+++ b/lib/sqfs/blk_proc/process_block.c
@@ -8,7 +8,6 @@
#include "internal.h"
#include <string.h>
-#include <zlib.h>
int sqfs_block_process(sqfs_block_t *block, sqfs_compressor_t *cmp,
uint8_t *scratch, size_t scratch_size)
@@ -71,16 +70,10 @@ int process_completed_block(sqfs_block_processor_t *proc, sqfs_block_t *blk)
offset = proc->file->get_size(proc->file);
if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) {
- if (grow_fragment_table(proc, blk->index))
- return 0;
-
offset = htole64(offset);
proc->fragments[blk->index].start_offset = offset;
proc->fragments[blk->index].pad0 = 0;
proc->fragments[blk->index].size = htole32(out);
-
- if (blk->index >= proc->num_fragments)
- proc->num_fragments = blk->index + 1;
} else {
blk->inode->block_sizes[blk->index] = out;
}
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index a3dc01f..4f01bb3 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -119,11 +119,17 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->cmp = cmp;
proc->file = file;
proc->max_blocks = INIT_BLOCK_COUNT;
+ proc->frag_list_max = INIT_BLOCK_COUNT;
proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
if (proc->blocks == NULL)
goto fail_init;
+ proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
+ proc->frag_list_max);
+ if (proc->frag_list == NULL)
+ goto fail_init;
+
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
1, max_block_size);
@@ -172,6 +178,7 @@ fail_init:
pthread_cond_destroy(&proc->done_cond);
pthread_cond_destroy(&proc->queue_cond);
pthread_mutex_destroy(&proc->mtx);
+ free(proc->frag_list);
free(proc->fragments);
free(proc->blocks);
free(proc);
@@ -200,6 +207,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
free_blk_list(proc->queue);
free_blk_list(proc->done);
+ free(proc->frag_block);
+ free(proc->frag_list);
free(proc->fragments);
free(proc->blocks);
free(proc);
@@ -241,16 +250,40 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *completed = NULL;
int status;
- pthread_mutex_lock(&proc->mtx);
- if (proc->status != 0) {
- status = proc->status;
+ if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
+ status = SQFS_ERROR_UNSUPPORTED;
+
+ pthread_mutex_lock(&proc->mtx);
+ if (proc->status == 0) {
+ proc->status = status;
+ pthread_cond_broadcast(&proc->queue_cond);
+ }
goto fail;
}
- if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
- status = SQFS_ERROR_UNSUPPORTED;
- proc->status = status;
- pthread_cond_broadcast(&proc->queue_cond);
+ if (block->flags & SQFS_BLK_IS_FRAGMENT) {
+ block->checksum = crc32(0, block->data, block->size);
+
+ completed = NULL;
+ status = handle_fragment(proc, block, &completed);
+
+ if (status != 0) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->status = status;
+ goto fail;
+ }
+
+ free(block);
+ if (completed == NULL)
+ return 0;
+
+ block = completed;
+ completed = NULL;
+ }
+
+ pthread_mutex_lock(&proc->mtx);
+ if (proc->status != 0) {
+ status = proc->status;
goto fail;
}
@@ -288,6 +321,11 @@ int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
int status = 0;
pthread_mutex_lock(&proc->mtx);
+ if (proc->frag_block != NULL) {
+ append_to_work_queue(proc, proc->frag_block);
+ proc->frag_block = NULL;
+ }
+
while (proc->backlog > 0 && proc->status == 0)
pthread_cond_wait(&proc->done_cond, &proc->mtx);
diff --git a/lib/sqfs/blk_proc/serial.c b/lib/sqfs/blk_proc/serial.c
index 06c811e..c7ec366 100644
--- a/lib/sqfs/blk_proc/serial.c
+++ b/lib/sqfs/blk_proc/serial.c
@@ -28,6 +28,7 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
proc->devblksz = devblksz;
proc->file = file;
proc->max_blocks = INIT_BLOCK_COUNT;
+ proc->frag_list_max = INIT_BLOCK_COUNT;
proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
if (proc->blocks == NULL) {
@@ -35,11 +36,21 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
return NULL;
}
+ proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
+ proc->frag_list_max);
+ if (proc->frag_list == NULL) {
+ free(proc->blocks);
+ free(proc);
+ return NULL;
+ }
+
return proc;
}
void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
{
+ free(proc->frag_block);
+ free(proc->frag_list);
free(proc->fragments);
free(proc->blocks);
free(proc);
@@ -48,6 +59,8 @@ void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
+ sqfs_block_t *fragblk = NULL;
+
if (proc->status != 0) {
free(block);
return proc->status;
@@ -59,6 +72,23 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
return proc->status;
}
+ if (block->flags & SQFS_BLK_IS_FRAGMENT) {
+ block->checksum = crc32(0, block->data, block->size);
+
+ proc->status = handle_fragment(proc, block, &fragblk);
+ free(block);
+
+ if (proc->status != 0) {
+ free(fragblk);
+ return proc->status;
+ }
+
+ if (fragblk == NULL)
+ return 0;
+
+ block = fragblk;
+ }
+
proc->status = sqfs_block_process(block, proc->cmp, proc->scratch,
proc->max_block_size);
@@ -71,5 +101,16 @@ int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
+ if (proc->status != 0 || proc->frag_block == NULL)
+ return proc->status;
+
+ proc->status = sqfs_block_process(proc->frag_block, proc->cmp,
+ proc->scratch, proc->max_block_size);
+
+ if (proc->status == 0)
+ proc->status = process_completed_block(proc, proc->frag_block);
+
+ free(proc->frag_block);
+ proc->frag_block = NULL;
return proc->status;
}
diff --git a/lib/sqfshelper/data_writer.c b/lib/sqfshelper/data_writer.c
index 5bce527..9f9631b 100644
--- a/lib/sqfshelper/data_writer.c
+++ b/lib/sqfshelper/data_writer.c
@@ -18,128 +18,14 @@
#include <errno.h>
#include <zlib.h>
-#define MK_BLK_SIG(chksum, size) \
- (((uint64_t)(size) << 32) | (uint64_t)(chksum))
-
-#define BLK_SIZE(sig) ((sig) >> 32)
-
-#define INIT_BLOCK_COUNT (128)
-
-typedef struct {
- uint32_t index;
- uint32_t offset;
- uint64_t signature;
-} frag_info_t;
-
struct data_writer_t {
- sqfs_block_t *frag_block;
- size_t num_fragments;
-
sqfs_block_processor_t *proc;
sqfs_compressor_t *cmp;
sqfs_super_t *super;
- size_t frag_list_num;
- size_t frag_list_max;
- frag_info_t *frag_list;
-
data_writer_stats_t stats;
};
-static int flush_fragment_block(data_writer_t *data)
-{
- int ret;
-
- ret = sqfs_block_processor_enqueue(data->proc, data->frag_block);
- data->frag_block = NULL;
- return ret;
-}
-
-static int store_fragment(data_writer_t *data, sqfs_block_t *frag,
- uint64_t signature)
-{
- size_t new_sz;
- void *new;
-
- if (data->frag_list_num == data->frag_list_max) {
- new_sz = data->frag_list_max * 2;
- new = realloc(data->frag_list,
- sizeof(data->frag_list[0]) * new_sz);
-
- if (new == NULL) {
- perror("growing fragment checksum table");
- return -1;
- }
-
- data->frag_list = new;
- data->frag_list_max = new_sz;
- }
-
- data->frag_list[data->frag_list_num].index = data->frag_block->index;
- data->frag_list[data->frag_list_num].offset = data->frag_block->size;
- data->frag_list[data->frag_list_num].signature = signature;
- data->frag_list_num += 1;
-
- sqfs_inode_set_frag_location(frag->inode, data->frag_block->index,
- data->frag_block->size);
-
- memcpy(data->frag_block->data + data->frag_block->size,
- frag->data, frag->size);
-
- data->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS);
- data->frag_block->size += frag->size;
- return 0;
-}
-
-static int handle_fragment(data_writer_t *data, sqfs_block_t *frag)
-{
- uint64_t signature;
- size_t i, size;
-
- signature = MK_BLK_SIG(frag->checksum, frag->size);
-
- for (i = 0; i < data->frag_list_num; ++i) {
- if (data->frag_list[i].signature == signature) {
- sqfs_inode_set_frag_location(frag->inode,
- data->frag_list[i].index,
- data->frag_list[i].offset);
- free(frag);
- return 0;
- }
- }
-
- if (data->frag_block != NULL) {
- size = data->frag_block->size + frag->size;
-
- if (size > data->super->block_size) {
- if (flush_fragment_block(data))
- goto fail;
- }
- }
-
- if (data->frag_block == NULL) {
- size = sizeof(sqfs_block_t) + data->super->block_size;
-
- data->frag_block = calloc(1, size);
- if (data->frag_block == NULL) {
- perror("creating fragment block");
- goto fail;
- }
-
- data->frag_block->index = data->num_fragments++;
- data->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK;
- }
-
- if (store_fragment(data, frag, signature))
- goto fail;
-
- free(frag);
- return 0;
-fail:
- free(frag);
- return -1;
-}
-
static bool is_zero_block(unsigned char *ptr, size_t size)
{
return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0;
@@ -225,9 +111,9 @@ int write_data_from_file_condensed(data_writer_t *data, sqfs_file_t *file,
}
}
- blk->checksum = crc32(0, blk->data, blk->size);
+ blk->flags |= SQFS_BLK_IS_FRAGMENT;
- if (handle_fragment(data, blk))
+ if (sqfs_block_processor_enqueue(data->proc, blk))
return -1;
} else {
if (sqfs_block_processor_enqueue(data->proc, blk))
@@ -269,21 +155,11 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
return NULL;
}
- data->frag_list_max = INIT_BLOCK_COUNT;
- data->frag_list = alloc_array(sizeof(data->frag_list[0]),
- data->frag_list_max);
- if (data->frag_list == NULL) {
- perror("creating data writer");
- free(data);
- return NULL;
- }
-
data->proc = sqfs_block_processor_create(super->block_size, cmp,
num_jobs, max_backlog,
devblksize, file);
if (data->proc == NULL) {
perror("creating data block processor");
- free(data->frag_list);
free(data);
return NULL;
}
@@ -296,7 +172,6 @@ data_writer_t *data_writer_create(sqfs_super_t *super, sqfs_compressor_t *cmp,
void data_writer_destroy(data_writer_t *data)
{
sqfs_block_processor_destroy(data->proc);
- free(data->frag_list);
free(data);
}
@@ -312,11 +187,6 @@ int data_writer_write_fragment_table(data_writer_t *data)
int data_writer_sync(data_writer_t *data)
{
- if (data->frag_block != NULL) {
- if (flush_fragment_block(data))
- return -1;
- }
-
return sqfs_block_processor_finish(data->proc);
}