aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/sqfs/block_processor.h98
-rw-r--r--include/sqfs/predef.h1
-rw-r--r--lib/sqfs/block_processor/common.c42
-rw-r--r--lib/sqfs/block_processor/internal.h8
-rw-r--r--lib/sqfs/block_processor/serial.c26
-rw-r--r--lib/sqfs/block_processor/winpthread.c46
6 files changed, 175 insertions, 46 deletions
diff --git a/include/sqfs/block_processor.h b/include/sqfs/block_processor.h
index 1846069..a5eed33 100644
--- a/include/sqfs/block_processor.h
+++ b/include/sqfs/block_processor.h
@@ -110,12 +110,90 @@ struct sqfs_block_processor_stats_t {
sqfs_u64 actual_frag_count;
};
+/**
+ * @struct sqfs_block_processor_desc_t
+ *
+ * @brief Encapsulates a description for an @ref sqfs_block_processor_t
+ *
+ * An instance of this struct is used by @ref sqfs_block_processor_create_ex to
+ * instantiate block processor objects.
+ */
+struct sqfs_block_processor_desc_t {
+ /**
+ * @brief Holds the size of the structure.
+ *
+ * If a later version of libsquashfs expands this structure, the value
+ * of this field can be used to check at runtime whether the newer
+ * fields are avaialable or not.
+ *
+ * If @ref sqfs_block_processor_create_ex is given a struct whose size
+ * it does not recognize, it returns @ref SQFS_ERROR_ARG_INVALID.
+ */
+ sqfs_u32 size;
+
+ /**
+ * @brief The maximum size of a data block.
+ */
+ sqfs_u32 max_block_size;
+
+ /**
+ * @brief The number of worker threads to create.
+ */
+ sqfs_u32 num_workers;
+
+ /**
+ * @brief The maximum number of blocks currently in flight.
+ *
+ * When trying to add more, enqueueing blocks until the
+ * in-flight block count drops below the threshold.
+ */
+ sqfs_u32 max_backlog;
+
+ /**
+ * @brief A pointer to a compressor.
+ *
+ * If multiple worker threads are used, the deep copy function of the
+ * compressor is used to create several instances that don't interfere
+ * with each other. This means, the compressor implementation must be
+ * able to create copies of itself that can be used independendly and
+ * concurrently.
+ */
+ sqfs_compressor_t *cmp;
+
+ /**
+ * @brief A block writer to send to finished blocks to.
+ */
+ sqfs_block_writer_t *wr;
+
+ /**
+ * @brief A fragment table to use for storing block locations.
+ */
+ sqfs_frag_table_t *tbl;
+
+ /**
+ * @brief Pointer to a file to read back fragment blocks from.
+ *
+ * If file and uncmp are not NULL, the file is used to read back
+ * fragment blocks during fragment deduplication and verify possible
+ * matches. If either of them are NULL, the deduplication relies on
+ * fragment size and hash alone.
+ */
+ sqfs_file_t *file;
+
+ /**
+ * @brief A pointer to a compressor the decompresses data.
+ *
+ * @copydoc file
+ */
+ sqfs_compressor_t *uncmp;
+};
+
#ifdef __cplusplus
extern "C" {
#endif
/**
- * @brief Create a data block writer.
+ * @brief Create a data block processor.
*
* @memberof sqfs_block_processor_t
*
@@ -132,7 +210,7 @@ extern "C" {
* @param tbl A fragment table to use for storing fragment and fragment block
* locations.
*
- * @return A pointer to a data writer object on success, NULL on allocation
+ * @return A pointer to a block processor object on success, NULL on allocation
* failure or on failure to create and initialize the worker threads.
*/
SQFS_API
@@ -144,6 +222,22 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
sqfs_frag_table_t *tbl);
/**
+ * @brief Create a data block processor.
+ *
+ * @memberof sqfs_block_processor_t
+ *
+ * @param desc A pointer to an extensible structure that holds the description
+ * of the block processor.
+ * @param out On success, returns the pointer to the newly created block
+ * processor object.
+ *
+ * @return Zero on success, an @ref SQFS_ERROR value on failure.
+ */
+SQFS_API
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out);
+
+/**
* @brief Start writing a file.
*
* @memberof sqfs_block_processor_t
diff --git a/include/sqfs/predef.h b/include/sqfs/predef.h
index e0afac4..55ccc86 100644
--- a/include/sqfs/predef.h
+++ b/include/sqfs/predef.h
@@ -91,6 +91,7 @@ typedef struct sqfs_frag_table_t sqfs_frag_table_t;
typedef struct sqfs_block_writer_t sqfs_block_writer_t;
typedef struct sqfs_block_writer_stats_t sqfs_block_writer_stats_t;
typedef struct sqfs_block_processor_stats_t sqfs_block_processor_stats_t;
+typedef struct sqfs_block_processor_desc_t sqfs_block_processor_desc_t;
typedef struct sqfs_fragment_t sqfs_fragment_t;
typedef struct sqfs_dir_header_t sqfs_dir_header_t;
diff --git a/lib/sqfs/block_processor/common.c b/lib/sqfs/block_processor/common.c
index b2657e6..1e42042 100644
--- a/lib/sqfs/block_processor/common.c
+++ b/lib/sqfs/block_processor/common.c
@@ -297,22 +297,48 @@ void block_processor_cleanup(sqfs_block_processor_t *base)
hash_table_destroy(base->frag_ht, ht_delete_function);
}
-int block_processor_init(sqfs_block_processor_t *base, size_t max_block_size,
- sqfs_compressor_t *cmp, sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+int block_processor_init(sqfs_block_processor_t *base,
+ const sqfs_block_processor_desc_t *desc)
{
base->process_completed_block = process_completed_block;
base->process_completed_fragment = process_completed_fragment;
base->process_block = process_block;
- base->max_block_size = max_block_size;
- base->cmp = cmp;
- base->frag_tbl = tbl;
- base->wr = wr;
+ base->max_block_size = desc->max_block_size;
+ base->cmp = desc->cmp;
+ base->frag_tbl = desc->tbl;
+ base->wr = desc->wr;
+ base->file = desc->file;
+ base->uncmp = desc->uncmp;
base->stats.size = sizeof(base->stats);
base->frag_ht = hash_table_create(chunk_info_hash, chunk_info_equals);
if (base->frag_ht == NULL)
- return -1;
+ return SQFS_ERROR_ALLOC;
return 0;
}
+
+sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
+ sqfs_compressor_t *cmp,
+ unsigned int num_workers,
+ size_t max_backlog,
+ sqfs_block_writer_t *wr,
+ sqfs_frag_table_t *tbl)
+{
+ sqfs_block_processor_desc_t desc;
+ sqfs_block_processor_t *out;
+
+ memset(&desc, 0, sizeof(desc));
+ desc.size = sizeof(desc);
+ desc.max_block_size = max_block_size;
+ desc.num_workers = num_workers;
+ desc.max_backlog = max_backlog;
+ desc.cmp = cmp;
+ desc.wr = wr;
+ desc.tbl = tbl;
+
+ if (sqfs_block_processor_create_ex(&desc, &out) != 0)
+ return NULL;
+
+ return out;
+}
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index 883385b..efea3d4 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -81,6 +81,9 @@ struct sqfs_block_processor_t {
bool begin_called;
+ sqfs_file_t *file;
+ sqfs_compressor_t *uncmp;
+
int (*process_completed_block)(sqfs_block_processor_t *proc,
sqfs_block_t *block);
@@ -100,9 +103,6 @@ struct sqfs_block_processor_t {
SQFS_INTERNAL void block_processor_cleanup(sqfs_block_processor_t *base);
SQFS_INTERNAL int block_processor_init(sqfs_block_processor_t *base,
- size_t max_block_size,
- sqfs_compressor_t *cmp,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl);
+ const sqfs_block_processor_desc_t *desc);
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
index 4d6b3ec..daa11fe 100644
--- a/lib/sqfs/block_processor/serial.c
+++ b/lib/sqfs/block_processor/serial.c
@@ -61,27 +61,29 @@ static int block_processor_sync(sqfs_block_processor_t *proc)
return ((serial_block_processor_t *)proc)->status;
}
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
serial_block_processor_t *proc;
- (void)num_workers; (void)max_backlog;
+ int ret;
- proc = alloc_flex(sizeof(*proc), 1, max_block_size);
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
+
+ proc = alloc_flex(sizeof(*proc), 1, desc->max_block_size);
if (proc == NULL)
- return NULL;
+ return SQFS_ERROR_ALLOC;
- if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ ret = block_processor_init(&proc->base, desc);
+ if (ret != 0) {
free(proc);
- return NULL;
+ return ret;
}
proc->base.sync = block_processor_sync;
proc->base.append_to_work_queue = append_to_work_queue;
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
- return (sqfs_block_processor_t *)proc;
+
+ *out = (sqfs_block_processor_t *)proc;
+ return 0;
}
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index 3531a3b..c3e3d13 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -393,35 +393,34 @@ static int block_processor_sync(sqfs_block_processor_t *proc)
return append_to_work_queue(proc, NULL);
}
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- sqfs_block_writer_t *wr,
- sqfs_frag_table_t *tbl)
+int sqfs_block_processor_create_ex(const sqfs_block_processor_desc_t *desc,
+ sqfs_block_processor_t **out)
{
thread_pool_processor_t *proc;
+ unsigned int i, num_workers;
sigset_t oldset;
- unsigned int i;
int ret;
- if (num_workers < 1)
- num_workers = 1;
+ if (desc->size != sizeof(sqfs_block_processor_desc_t))
+ return SQFS_ERROR_ARG_INVALID;
+
+ num_workers = desc->num_workers < 1 ? 1 : desc->num_workers;
proc = alloc_flex(sizeof(*proc),
sizeof(proc->workers[0]), num_workers);
if (proc == NULL)
- return NULL;
+ return SQFS_ERROR_ALLOC;
- if (block_processor_init(&proc->base, max_block_size, cmp, wr, tbl)) {
+ ret = block_processor_init(&proc->base, desc);
+ if (ret != 0) {
free(proc);
- return NULL;
+ return ret;
}
proc->base.sync = block_processor_sync;
proc->base.append_to_work_queue = append_to_work_queue;
proc->num_workers = num_workers;
- proc->max_backlog = max_backlog;
+ proc->max_backlog = desc->max_backlog;
((sqfs_object_t *)proc)->destroy = block_processor_destroy;
MUTEX_INIT(&proc->mtx);
@@ -432,27 +431,34 @@ sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
for (i = 0; i < num_workers; ++i) {
proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
+ 1, desc->max_block_size);
- if (proc->workers[i] == NULL)
+ if (proc->workers[i] == NULL) {
+ ret = SQFS_ERROR_ALLOC;
goto fail;
+ }
proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = sqfs_copy(cmp);
+ proc->workers[i]->cmp = sqfs_copy(desc->cmp);
- if (proc->workers[i]->cmp == NULL)
+ if (proc->workers[i]->cmp == NULL) {
+ ret = SQFS_ERROR_ALLOC;
goto fail;
+ }
ret = THREAD_CREATE(&proc->workers[i]->thread,
worker_proc, proc->workers[i]);
- if (ret != 0)
+ if (ret != 0) {
+ ret = SQFS_ERROR_INTERNAL;
goto fail;
+ }
}
SIGNAL_ENABLE(&oldset);
- return (sqfs_block_processor_t *)proc;
+ *out = (sqfs_block_processor_t *)proc;
+ return 0;
fail:
SIGNAL_ENABLE(&oldset);
block_processor_destroy((sqfs_object_t *)proc);
- return NULL;
+ return ret;
}