From edac31f28863ae9f1b310d971fb194959c42c916 Mon Sep 17 00:00:00 2001
From: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Date: Wed, 25 Sep 2019 07:28:04 +0200
Subject: Rename block processor to sqfs_data_writer_t

Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
---
 lib/sqfs/data_writer/block.c    | 131 ++++++++++++++++
 lib/sqfs/data_writer/common.c   | 157 +++++++++++++++++++
 lib/sqfs/data_writer/fragment.c | 128 ++++++++++++++++
 lib/sqfs/data_writer/internal.h | 137 +++++++++++++++++
 lib/sqfs/data_writer/pthread.c  | 332 ++++++++++++++++++++++++++++++++++++++++
 lib/sqfs/data_writer/serial.c   |  96 ++++++++++++
 6 files changed, 981 insertions(+)
 create mode 100644 lib/sqfs/data_writer/block.c
 create mode 100644 lib/sqfs/data_writer/common.c
 create mode 100644 lib/sqfs/data_writer/fragment.c
 create mode 100644 lib/sqfs/data_writer/internal.h
 create mode 100644 lib/sqfs/data_writer/pthread.c
 create mode 100644 lib/sqfs/data_writer/serial.c

(limited to 'lib/sqfs/data_writer')

diff --git a/lib/sqfs/data_writer/block.c b/lib/sqfs/data_writer/block.c
new file mode 100644
index 0000000..9461737
--- /dev/null
+++ b/lib/sqfs/data_writer/block.c
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * process_block.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+#include <string.h>
+
+static int store_block_location(sqfs_data_writer_t *proc, uint64_t offset,
+				uint32_t size, uint32_t chksum)
+{
+	size_t new_sz;
+	void *new;
+
+	if (proc->num_blocks == proc->max_blocks) {
+		new_sz = proc->max_blocks * 2;
+		new = realloc(proc->blocks, sizeof(proc->blocks[0]) * new_sz);
+
+		if (new == NULL)
+			return SQFS_ERROR_ALLOC;
+
+		proc->blocks = new;
+		proc->max_blocks = new_sz;
+	}
+
+	proc->blocks[proc->num_blocks].offset = offset;
+	proc->blocks[proc->num_blocks].hash = MK_BLK_HASH(chksum, size);
+	proc->num_blocks += 1;
+	return 0;
+}
+
+static size_t deduplicate_blocks(sqfs_data_writer_t *proc, size_t count)
+{
+	size_t i, j;
+
+	for (i = 0; i < proc->file_start; ++i) {
+		for (j = 0; j < count; ++j) {
+			if (proc->blocks[i + j].hash !=
+			    proc->blocks[proc->file_start + j].hash)
+				break;
+		}
+
+		if (j == count)
+			break;
+	}
+
+	return i;
+}
+
+static int allign_file(sqfs_data_writer_t *proc, sqfs_block_t *blk)
+{
+	if (!(blk->flags & SQFS_BLK_ALLIGN))
+		return 0;
+
+	return padd_sqfs(proc->file, proc->file->get_size(proc->file),
+			 proc->devblksz);
+}
+
+int process_completed_block(sqfs_data_writer_t *proc, sqfs_block_t *blk)
+{
+	size_t start, count;
+	uint64_t offset;
+	uint32_t out;
+	int err;
+
+	if (blk->flags & SQFS_BLK_FIRST_BLOCK) {
+		proc->start = proc->file->get_size(proc->file);
+		proc->file_start = proc->num_blocks;
+
+		err = allign_file(proc, blk);
+		if (err)
+			return err;
+	}
+
+	if (blk->size != 0) {
+		out = blk->size;
+		if (!(blk->flags & SQFS_BLK_IS_COMPRESSED))
+			out |= 1 << 24;
+
+		offset = proc->file->get_size(proc->file);
+
+		if (blk->flags & SQFS_BLK_FRAGMENT_BLOCK) {
+			offset = htole64(offset);
+			proc->fragments[blk->index].start_offset = offset;
+			proc->fragments[blk->index].pad0 = 0;
+			proc->fragments[blk->index].size = htole32(out);
+		} else {
+			blk->inode->block_sizes[blk->index] = out;
+		}
+
+		err = store_block_location(proc, offset, out, blk->checksum);
+		if (err)
+			return err;
+
+		err = proc->file->write_at(proc->file, offset,
+					   blk->data, blk->size);
+		if (err)
+			return err;
+	}
+
+	if (blk->flags & SQFS_BLK_LAST_BLOCK) {
+		err = allign_file(proc, blk);
+		if (err)
+			return err;
+
+		count = proc->num_blocks - proc->file_start;
+		start = deduplicate_blocks(proc, count);
+		offset = proc->blocks[start].offset;
+
+		sqfs_inode_set_file_block_start(blk->inode, offset);
+
+		if (start < proc->file_start) {
+			offset = start + count;
+
+			if (offset >= proc->file_start) {
+				proc->num_blocks = offset;
+			} else {
+				proc->num_blocks = proc->file_start;
+			}
+
+			err = proc->file->truncate(proc->file, proc->start);
+			if (err)
+				return err;
+		}
+	}
+
+	return 0;
+}
diff --git a/lib/sqfs/data_writer/common.c b/lib/sqfs/data_writer/common.c
new file mode 100644
index 0000000..51acc1e
--- /dev/null
+++ b/lib/sqfs/data_writer/common.c
@@ -0,0 +1,157 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * common.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+void free_blk_list(sqfs_block_t *list)
+{
+	sqfs_block_t *it;
+
+	while (list != NULL) {
+		it = list;
+		list = list->next;
+		free(it);
+	}
+}
+
+int data_writer_init(sqfs_data_writer_t *proc, size_t max_block_size,
+		     sqfs_compressor_t *cmp, unsigned int num_workers,
+		     size_t max_backlog, size_t devblksz, sqfs_file_t *file)
+{
+	proc->max_block_size = max_block_size;
+	proc->num_workers = num_workers;
+	proc->max_backlog = max_backlog;
+	proc->devblksz = devblksz;
+	proc->cmp = cmp;
+	proc->file = file;
+	proc->max_blocks = INIT_BLOCK_COUNT;
+	proc->frag_list_max = INIT_BLOCK_COUNT;
+
+	proc->blocks = alloc_array(sizeof(proc->blocks[0]), proc->max_blocks);
+	if (proc->blocks == NULL)
+		return -1;
+
+	proc->frag_list = alloc_array(sizeof(proc->frag_list[0]),
+				      proc->frag_list_max);
+	if (proc->frag_list == NULL)
+		return -1;
+
+	return 0;
+}
+
+void data_writer_cleanup(sqfs_data_writer_t *proc)
+{
+	free_blk_list(proc->queue);
+	free_blk_list(proc->done);
+	free(proc->frag_block);
+	free(proc->frag_list);
+	free(proc->fragments);
+	free(proc->blocks);
+	free(proc);
+}
+
+void data_writer_store_done(sqfs_data_writer_t *proc, sqfs_block_t *blk,
+			    int status)
+{
+	sqfs_block_t *it = proc->done, *prev = NULL;
+
+	while (it != NULL) {
+		if (it->sequence_number >= blk->sequence_number)
+			break;
+		prev = it;
+		it = it->next;
+	}
+
+	if (prev == NULL) {
+		blk->next = proc->done;
+		proc->done = blk;
+	} else {
+		blk->next = prev->next;
+		prev->next = blk;
+	}
+
+	if (status != 0 && proc->status == 0)
+		proc->status = status;
+
+	proc->backlog -= 1;
+}
+
+sqfs_block_t *data_writer_next_work_item(sqfs_data_writer_t *proc)
+{
+	sqfs_block_t *blk;
+
+	if (proc->status != 0)
+		return NULL;
+
+	blk = proc->queue;
+	proc->queue = blk->next;
+	blk->next = NULL;
+
+	if (proc->queue == NULL)
+		proc->queue_last = NULL;
+
+	return blk;
+}
+
+int data_writer_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+			 uint8_t *scratch, size_t scratch_size)
+{
+	ssize_t ret;
+
+	if (block->size == 0) {
+		block->checksum = 0;
+		return 0;
+	}
+
+	block->checksum = crc32(0, block->data, block->size);
+
+	if (block->flags & SQFS_BLK_IS_FRAGMENT)
+		return 0;
+
+	if (!(block->flags & SQFS_BLK_DONT_COMPRESS)) {
+		ret = cmp->do_block(cmp, block->data, block->size,
+				    scratch, scratch_size);
+		if (ret < 0)
+			return ret;
+
+		if (ret > 0) {
+			memcpy(block->data, scratch, ret);
+			block->size = ret;
+			block->flags |= SQFS_BLK_IS_COMPRESSED;
+		}
+	}
+
+	return 0;
+}
+
+int sqfs_data_writer_write_fragment_table(sqfs_data_writer_t *proc,
+					  sqfs_super_t *super)
+{
+	uint64_t start;
+	size_t size;
+	int ret;
+
+	if (proc->num_fragments == 0) {
+		super->fragment_entry_count = 0;
+		super->fragment_table_start = 0xFFFFFFFFFFFFFFFFUL;
+		super->flags &= ~SQFS_FLAG_ALWAYS_FRAGMENTS;
+		super->flags |= SQFS_FLAG_NO_FRAGMENTS;
+		return 0;
+	}
+
+	size = sizeof(proc->fragments[0]) * proc->num_fragments;
+	ret = sqfs_write_table(proc->file, proc->cmp,
+			       proc->fragments, size, &start);
+	if (ret)
+		return ret;
+
+	super->flags &= ~SQFS_FLAG_NO_FRAGMENTS;
+	super->flags |= SQFS_FLAG_ALWAYS_FRAGMENTS;
+	super->fragment_entry_count = proc->num_fragments;
+	super->fragment_table_start = start;
+	return 0;
+}
diff --git a/lib/sqfs/data_writer/fragment.c b/lib/sqfs/data_writer/fragment.c
new file mode 100644
index 0000000..e4fe9b4
--- /dev/null
+++ b/lib/sqfs/data_writer/fragment.c
@@ -0,0 +1,128 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * fragtbl.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+static int grow_fragment_table(sqfs_data_writer_t *proc)
+{
+	size_t newsz;
+	void *new;
+
+	if (proc->num_fragments >= proc->max_fragments) {
+		newsz = proc->max_fragments ? proc->max_fragments * 2 : 16;
+
+		new = realloc(proc->fragments,
+			      sizeof(proc->fragments[0]) * newsz);
+
+		if (new == NULL)
+			return SQFS_ERROR_ALLOC;
+
+		proc->max_fragments = newsz;
+		proc->fragments = new;
+	}
+
+	return 0;
+}
+
+static int grow_deduplication_list(sqfs_data_writer_t *proc)
+{
+	size_t new_sz;
+	void *new;
+
+	if (proc->frag_list_num == proc->frag_list_max) {
+		new_sz = proc->frag_list_max * 2;
+		new = realloc(proc->frag_list,
+			      sizeof(proc->frag_list[0]) * new_sz);
+
+		if (new == NULL)
+			return SQFS_ERROR_ALLOC;
+
+		proc->frag_list = new;
+		proc->frag_list_max = new_sz;
+	}
+
+	return 0;
+}
+
+static int store_fragment(sqfs_data_writer_t *proc, sqfs_block_t *frag,
+			  uint64_t hash)
+{
+	int err = grow_deduplication_list(proc);
+
+	if (err)
+		return err;
+
+	proc->frag_list[proc->frag_list_num].index = proc->frag_block->index;
+	proc->frag_list[proc->frag_list_num].offset = proc->frag_block->size;
+	proc->frag_list[proc->frag_list_num].hash = hash;
+	proc->frag_list_num += 1;
+
+	sqfs_inode_set_frag_location(frag->inode, proc->frag_block->index,
+				     proc->frag_block->size);
+
+	memcpy(proc->frag_block->data + proc->frag_block->size,
+	       frag->data, frag->size);
+
+	proc->frag_block->flags |= (frag->flags & SQFS_BLK_DONT_COMPRESS);
+	proc->frag_block->size += frag->size;
+	return 0;
+}
+
+int process_completed_fragment(sqfs_data_writer_t *proc, sqfs_block_t *frag,
+			       sqfs_block_t **blk_out)
+{
+	uint64_t hash;
+	size_t i, size;
+	int err;
+
+	hash = MK_BLK_HASH(frag->checksum, frag->size);
+
+	for (i = 0; i < proc->frag_list_num; ++i) {
+		if (proc->frag_list[i].hash == hash) {
+			sqfs_inode_set_frag_location(frag->inode,
+						     proc->frag_list[i].index,
+						     proc->frag_list[i].offset);
+			return 0;
+		}
+	}
+
+	if (proc->frag_block != NULL) {
+		size = proc->frag_block->size + frag->size;
+
+		if (size > proc->max_block_size) {
+			*blk_out = proc->frag_block;
+			proc->frag_block = NULL;
+		}
+	}
+
+	if (proc->frag_block == NULL) {
+		size = sizeof(sqfs_block_t) + proc->max_block_size;
+
+		err = grow_fragment_table(proc);
+		if (err)
+			goto fail;
+
+		proc->frag_block = calloc(1, size);
+		if (proc->frag_block == NULL) {
+			err = SQFS_ERROR_ALLOC;
+			goto fail;
+		}
+
+		proc->frag_block->index = proc->num_fragments++;
+		proc->frag_block->flags = SQFS_BLK_FRAGMENT_BLOCK;
+	}
+
+	err = store_fragment(proc, frag, hash);
+	if (err)
+		goto fail;
+
+	return 0;
+fail:
+	free(*blk_out);
+	*blk_out = NULL;
+	return err;
+}
diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h
new file mode 100644
index 0000000..10cc82c
--- /dev/null
+++ b/lib/sqfs/data_writer/internal.h
@@ -0,0 +1,137 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * internal.h
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#ifndef INTERNAL_H
+#define INTERNAL_H
+
+#include "config.h"
+
+#include "sqfs/data_writer.h"
+#include "sqfs/compress.h"
+#include "sqfs/inode.h"
+#include "sqfs/table.h"
+#include "sqfs/error.h"
+#include "sqfs/data.h"
+#include "sqfs/io.h"
+#include "util.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <zlib.h>
+
+#ifdef WITH_PTHREAD
+#include <pthread.h>
+#endif
+
+
+#define MK_BLK_HASH(chksum, size) \
+	(((uint64_t)(size) << 32) | (uint64_t)(chksum))
+
+#define INIT_BLOCK_COUNT (128)
+
+
+typedef struct {
+	uint64_t offset;
+	uint64_t hash;
+} blk_info_t;
+
+typedef struct {
+	uint32_t index;
+	uint32_t offset;
+	uint64_t hash;
+} frag_info_t;
+
+
+#ifdef WITH_PTHREAD
+typedef struct {
+	sqfs_data_writer_t *shared;
+	sqfs_compressor_t *cmp;
+	pthread_t thread;
+	uint8_t scratch[];
+} compress_worker_t;
+#endif
+
+struct sqfs_data_writer_t {
+	/* synchronization primitives */
+#ifdef WITH_PTHREAD
+	pthread_mutex_t mtx;
+	pthread_cond_t queue_cond;
+	pthread_cond_t done_cond;
+#endif
+
+	/* needs rw access by worker and main thread */
+	sqfs_block_t *queue;
+	sqfs_block_t *queue_last;
+
+	sqfs_block_t *done;
+	size_t backlog;
+	int status;
+
+	/* used by main thread only */
+	uint32_t enqueue_id;
+	uint32_t dequeue_id;
+
+	unsigned int num_workers;
+	size_t max_backlog;
+
+	size_t devblksz;
+	sqfs_file_t *file;
+
+	sqfs_fragment_t *fragments;
+	size_t num_fragments;
+	size_t max_fragments;
+
+	uint64_t start;
+
+	size_t file_start;
+	size_t num_blocks;
+	size_t max_blocks;
+	blk_info_t *blocks;
+	sqfs_compressor_t *cmp;
+
+	sqfs_block_t *frag_block;
+	frag_info_t *frag_list;
+	size_t frag_list_num;
+	size_t frag_list_max;
+
+	/* used only by workers */
+	size_t max_block_size;
+
+#ifdef WITH_PTHREAD
+	compress_worker_t *workers[];
+#else
+	uint8_t scratch[];
+#endif
+};
+
+SQFS_INTERNAL int process_completed_block(sqfs_data_writer_t *proc,
+					  sqfs_block_t *block);
+
+SQFS_INTERNAL
+int process_completed_fragment(sqfs_data_writer_t *proc, sqfs_block_t *frag,
+			       sqfs_block_t **blk_out);
+
+SQFS_INTERNAL void free_blk_list(sqfs_block_t *list);
+
+SQFS_INTERNAL
+int data_writer_init(sqfs_data_writer_t *proc, size_t max_block_size,
+		     sqfs_compressor_t *cmp, unsigned int num_workers,
+		     size_t max_backlog, size_t devblksz, sqfs_file_t *file);
+
+SQFS_INTERNAL void data_writer_cleanup(sqfs_data_writer_t *proc);
+
+SQFS_INTERNAL
+void data_writer_store_done(sqfs_data_writer_t *proc, sqfs_block_t *blk,
+			    int status);
+
+SQFS_INTERNAL
+sqfs_block_t *data_writer_next_work_item(sqfs_data_writer_t *proc);
+
+SQFS_INTERNAL
+int data_writer_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
+			 uint8_t *scratch, size_t scratch_size);
+
+#endif /* INTERNAL_H */
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c
new file mode 100644
index 0000000..3170c11
--- /dev/null
+++ b/lib/sqfs/data_writer/pthread.c
@@ -0,0 +1,332 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * pthread.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+static void *worker_proc(void *arg)
+{
+	compress_worker_t *worker = arg;
+	sqfs_data_writer_t *shared = worker->shared;
+	sqfs_block_t *blk = NULL;
+	int status = 0;
+
+	for (;;) {
+		pthread_mutex_lock(&shared->mtx);
+		if (blk != NULL) {
+			data_writer_store_done(shared, blk, status);
+			pthread_cond_broadcast(&shared->done_cond);
+		}
+
+		while (shared->queue == NULL && shared->status == 0)
+			pthread_cond_wait(&shared->queue_cond, &shared->mtx);
+
+		blk = data_writer_next_work_item(shared);
+		pthread_mutex_unlock(&shared->mtx);
+
+		if (blk == NULL)
+			break;
+
+		status = data_writer_do_block(blk, worker->cmp,
+					      worker->scratch,
+					      shared->max_block_size);
+	}
+	return NULL;
+}
+
+sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
+					    sqfs_compressor_t *cmp,
+					    unsigned int num_workers,
+					    size_t max_backlog,
+					    size_t devblksz,
+					    sqfs_file_t *file)
+{
+	sqfs_data_writer_t *proc;
+	unsigned int i;
+	int ret;
+
+	if (num_workers < 1)
+		num_workers = 1;
+
+	proc = alloc_flex(sizeof(*proc),
+			  sizeof(proc->workers[0]), num_workers);
+	if (proc == NULL)
+		return NULL;
+
+	proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
+	proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+	proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+
+	if (data_writer_init(proc, max_block_size, cmp, num_workers,
+			     max_backlog, devblksz, file)) {
+		goto fail_init;
+	}
+
+	for (i = 0; i < num_workers; ++i) {
+		proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
+					      1, max_block_size);
+
+		if (proc->workers[i] == NULL)
+			goto fail_init;
+
+		proc->workers[i]->shared = proc;
+		proc->workers[i]->cmp = cmp->create_copy(cmp);
+
+		if (proc->workers[i]->cmp == NULL)
+			goto fail_init;
+	}
+
+	for (i = 0; i < num_workers; ++i) {
+		ret = pthread_create(&proc->workers[i]->thread, NULL,
+				     worker_proc, proc->workers[i]);
+
+		if (ret != 0)
+			goto fail_thread;
+	}
+
+	return proc;
+fail_thread:
+	pthread_mutex_lock(&proc->mtx);
+	proc->status = -1;
+	pthread_cond_broadcast(&proc->queue_cond);
+	pthread_mutex_unlock(&proc->mtx);
+
+	for (i = 0; i < num_workers; ++i) {
+		if (proc->workers[i]->thread > 0) {
+			pthread_join(proc->workers[i]->thread, NULL);
+		}
+	}
+fail_init:
+	for (i = 0; i < num_workers; ++i) {
+		if (proc->workers[i] != NULL) {
+			if (proc->workers[i]->cmp != NULL) {
+				proc->workers[i]->cmp->
+					destroy(proc->workers[i]->cmp);
+			}
+
+			free(proc->workers[i]);
+		}
+	}
+	pthread_cond_destroy(&proc->done_cond);
+	pthread_cond_destroy(&proc->queue_cond);
+	pthread_mutex_destroy(&proc->mtx);
+	data_writer_cleanup(proc);
+	return NULL;
+}
+
+void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
+{
+	unsigned int i;
+
+	pthread_mutex_lock(&proc->mtx);
+	proc->status = -1;
+	pthread_cond_broadcast(&proc->queue_cond);
+	pthread_mutex_unlock(&proc->mtx);
+
+	for (i = 0; i < proc->num_workers; ++i) {
+		pthread_join(proc->workers[i]->thread, NULL);
+
+		proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
+		free(proc->workers[i]);
+	}
+
+	pthread_cond_destroy(&proc->done_cond);
+	pthread_cond_destroy(&proc->queue_cond);
+	pthread_mutex_destroy(&proc->mtx);
+
+	data_writer_cleanup(proc);
+}
+
+static void append_to_work_queue(sqfs_data_writer_t *proc,
+				 sqfs_block_t *block)
+{
+	if (proc->queue_last == NULL) {
+		proc->queue = proc->queue_last = block;
+	} else {
+		proc->queue_last->next = block;
+		proc->queue_last = block;
+	}
+
+	block->sequence_number = proc->enqueue_id++;
+	block->next = NULL;
+	proc->backlog += 1;
+	pthread_cond_broadcast(&proc->queue_cond);
+}
+
+static int test_and_set_status(sqfs_data_writer_t *proc, int status)
+{
+	pthread_mutex_lock(&proc->mtx);
+	if (proc->status == 0) {
+		proc->status = status;
+	} else {
+		status = proc->status;
+	}
+	pthread_cond_broadcast(&proc->queue_cond);
+	return status;
+}
+
+static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
+{
+	sqfs_block_t *queue, *it, *prev;
+
+	it = proc->done;
+	prev = NULL;
+
+	while (it != NULL && it->sequence_number == proc->dequeue_id) {
+		prev = it;
+		it = it->next;
+		proc->dequeue_id += 1;
+	}
+
+	if (prev == NULL) {
+		queue = NULL;
+	} else {
+		queue = proc->done;
+		prev->next = NULL;
+		proc->done = it;
+	}
+
+	return queue;
+}
+
+static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
+{
+	sqfs_block_t *it, *head = NULL, **next_ptr = &head;
+
+	while (lhs != NULL && rhs != NULL) {
+		if (lhs->sequence_number <= rhs->sequence_number) {
+			it = lhs;
+			lhs = lhs->next;
+		} else {
+			it = rhs;
+			rhs = rhs->next;
+		}
+
+		*next_ptr = it;
+		next_ptr = &it->next;
+	}
+
+	it = (lhs != NULL ? lhs : rhs);
+	*next_ptr = it;
+	return head;
+}
+
+static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
+{
+	sqfs_block_t *it, *block = NULL;
+	int status = 0;
+
+	while (queue != NULL && status == 0) {
+		it = queue;
+		queue = it->next;
+
+		if (it->flags & SQFS_BLK_IS_FRAGMENT) {
+			block = NULL;
+			status = process_completed_fragment(proc, it, &block);
+
+			if (block != NULL && status == 0) {
+				pthread_mutex_lock(&proc->mtx);
+				proc->dequeue_id = it->sequence_number;
+				block->sequence_number = it->sequence_number;
+
+				if (proc->queue == NULL) {
+					proc->queue = block;
+					proc->queue_last = block;
+				} else {
+					block->next = proc->queue;
+					proc->queue = block;
+				}
+
+				proc->backlog += 1;
+				proc->done = queue_merge(queue, proc->done);
+				pthread_cond_broadcast(&proc->queue_cond);
+				pthread_mutex_unlock(&proc->mtx);
+
+				queue = NULL;
+			} else {
+				free(block);
+			}
+		} else {
+			status = process_completed_block(proc, it);
+		}
+
+		free(it);
+	}
+
+	free_blk_list(queue);
+	return status;
+}
+
+int sqfs_data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
+{
+	sqfs_block_t *queue;
+	int status;
+
+	if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
+		free(block);
+		return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED);
+	}
+
+	pthread_mutex_lock(&proc->mtx);
+	while (proc->backlog > proc->max_backlog && proc->status == 0)
+		pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+	if (proc->status != 0) {
+		status = proc->status;
+		pthread_mutex_unlock(&proc->mtx);
+		free(block);
+		return status;
+	}
+
+	append_to_work_queue(proc, block);
+	block = NULL;
+
+	queue = try_dequeue(proc);
+	pthread_mutex_unlock(&proc->mtx);
+
+	status = process_done_queue(proc, queue);
+	if (status != 0)
+		return test_and_set_status(proc, status);
+
+	return 0;
+}
+
+int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+{
+	sqfs_block_t *queue;
+	int status = 0;
+
+	for (;;) {
+		pthread_mutex_lock(&proc->mtx);
+		while (proc->backlog > 0 && proc->status == 0)
+			pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+		if (proc->status != 0) {
+			status = proc->status;
+			pthread_mutex_unlock(&proc->mtx);
+			return status;
+		}
+
+		queue = proc->done;
+		proc->done = NULL;
+		pthread_mutex_unlock(&proc->mtx);
+
+		if (queue == NULL) {
+			if (proc->frag_block != NULL) {
+				append_to_work_queue(proc, proc->frag_block);
+				proc->frag_block = NULL;
+				continue;
+			}
+			break;
+		}
+
+		status = process_done_queue(proc, queue);
+		if (status != 0)
+			return status;
+	}
+
+	return 0;
+}
diff --git a/lib/sqfs/data_writer/serial.c b/lib/sqfs/data_writer/serial.c
new file mode 100644
index 0000000..38dcc58
--- /dev/null
+++ b/lib/sqfs/data_writer/serial.c
@@ -0,0 +1,96 @@
+/* SPDX-License-Identifier: LGPL-3.0-or-later */
+/*
+ * serial.c
+ *
+ * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
+ */
+#define SQFS_BUILDING_DLL
+#include "internal.h"
+
+sqfs_data_writer_t *sqfs_data_writer_create(size_t max_block_size,
+					    sqfs_compressor_t *cmp,
+					    unsigned int num_workers,
+					    size_t max_backlog,
+					    size_t devblksz,
+					    sqfs_file_t *file)
+{
+	sqfs_data_writer_t *proc;
+
+	proc = alloc_flex(sizeof(*proc), 1, max_block_size);
+
+	if (proc == NULL)
+		return NULL;
+
+	if (data_writer_init(proc, max_block_size, cmp, num_workers,
+			     max_backlog, devblksz, file)) {
+		data_writer_cleanup(proc);
+		return NULL;
+	}
+
+	return proc;
+}
+
+void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
+{
+	data_writer_cleanup(proc);
+}
+
+int sqfs_data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
+{
+	sqfs_block_t *fragblk = NULL;
+
+	if (proc->status != 0) {
+		free(block);
+		return proc->status;
+	}
+
+	if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
+		proc->status = SQFS_ERROR_UNSUPPORTED;
+		free(block);
+		return proc->status;
+	}
+
+	if (block->flags & SQFS_BLK_IS_FRAGMENT) {
+		block->checksum = crc32(0, block->data, block->size);
+
+		proc->status = process_completed_fragment(proc, block,
+							  &fragblk);
+		free(block);
+
+		if (proc->status != 0) {
+			free(fragblk);
+			return proc->status;
+		}
+
+		if (fragblk == NULL)
+			return 0;
+
+		block = fragblk;
+	}
+
+	proc->status = data_writer_do_block(block, proc->cmp, proc->scratch,
+					    proc->max_block_size);
+
+	if (proc->status == 0)
+		proc->status = process_completed_block(proc, block);
+
+	free(block);
+	return proc->status;
+}
+
+int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+{
+	if (proc->status != 0 || proc->frag_block == NULL)
+		return proc->status;
+
+	proc->status = data_writer_do_block(proc->frag_block, proc->cmp,
+					    proc->scratch,
+					    proc->max_block_size);
+
+	if (proc->status == 0)
+		proc->status = process_completed_block(proc, proc->frag_block);
+
+	free(proc->frag_block);
+	proc->frag_block = NULL;
+	return proc->status;
+}
-- 
cgit v1.2.3