summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-13 02:00:36 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-13 02:05:42 +0100
commit514e9e500abdd8ea91ea3b2fca214587ee24a342 (patch)
tree6a6ae1efad3cbd073725b4dbc507259121083139
parent74f25ae0f3ebc1cd435f29c21ab164a9028af980 (diff)
Cleanup data writer
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/data_writer/fileapi.c45
-rw-r--r--lib/sqfs/data_writer/internal.h5
-rw-r--r--lib/sqfs/data_writer/pthread.c56
-rw-r--r--lib/sqfs/data_writer/serial.c20
-rw-r--r--lib/sqfs/data_writer/windows.c103
5 files changed, 96 insertions, 133 deletions
diff --git a/lib/sqfs/data_writer/fileapi.c b/lib/sqfs/data_writer/fileapi.c
index 98bdf5c..de77690 100644
--- a/lib/sqfs/data_writer/fileapi.c
+++ b/lib/sqfs/data_writer/fileapi.c
@@ -12,6 +12,22 @@ static bool is_zero_block(unsigned char *ptr, size_t size)
return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0;
}
+static int enqueue_block(sqfs_data_writer_t *proc, sqfs_block_t *block)
+{
+ int status;
+
+ while (proc->backlog > proc->max_backlog) {
+ status = wait_completed(proc);
+ if (status)
+ return status;
+ }
+
+ if (proc->backlog == proc->max_backlog)
+ proc->notify_threads = true;
+
+ return append_to_work_queue(proc, block, proc->notify_threads);
+}
+
static int add_sentinel_block(sqfs_data_writer_t *proc)
{
sqfs_block_t *blk = calloc(1, sizeof(*blk));
@@ -22,7 +38,7 @@ static int add_sentinel_block(sqfs_data_writer_t *proc)
blk->inode = proc->inode;
blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK;
- return data_writer_enqueue(proc, blk);
+ return enqueue_block(proc, blk);
}
int sqfs_data_writer_begin_file(sqfs_data_writer_t *proc,
@@ -64,7 +80,7 @@ static int flush_block(sqfs_data_writer_t *proc, sqfs_block_t *block)
proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK;
}
- return data_writer_enqueue(proc, block);
+ return enqueue_block(proc, block);
}
int sqfs_data_writer_append(sqfs_data_writer_t *proc, const void *data,
@@ -145,3 +161,28 @@ int sqfs_data_writer_end_file(sqfs_data_writer_t *proc)
proc->blk_index = 0;
return 0;
}
+
+int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+{
+ int status = 0;
+
+ append_to_work_queue(proc, NULL, true);
+
+ while (proc->backlog > 0) {
+ status = wait_completed(proc);
+ if (status)
+ return status;
+ }
+
+ if (proc->frag_block != NULL) {
+ status = append_to_work_queue(proc, proc->frag_block, true);
+ proc->frag_block = NULL;
+
+ if (status)
+ return status;
+
+ status = wait_completed(proc);
+ }
+
+ return status;
+}
diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h
index 24f520e..1a85644 100644
--- a/lib/sqfs/data_writer/internal.h
+++ b/lib/sqfs/data_writer/internal.h
@@ -148,6 +148,9 @@ SQFS_INTERNAL
int test_and_set_status(sqfs_data_writer_t *proc, int status);
SQFS_INTERNAL
-int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block);
+int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
+ bool notify_threads);
+
+SQFS_INTERNAL int wait_completed(sqfs_data_writer_t *proc);
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c
index cfd2d3e..8196641 100644
--- a/lib/sqfs/data_writer/pthread.c
+++ b/lib/sqfs/data_writer/pthread.c
@@ -154,8 +154,8 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
data_writer_cleanup(proc);
}
-static int append_to_work_queue(sqfs_data_writer_t *proc,
- sqfs_block_t *block, bool signal_threads)
+int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
+ bool signal_threads)
{
int status;
@@ -292,7 +292,7 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status)
return status;
}
-static int wait_completed(sqfs_data_writer_t *proc)
+int wait_completed(sqfs_data_writer_t *proc)
{
sqfs_block_t *queue;
int status;
@@ -315,53 +315,5 @@ static int wait_completed(sqfs_data_writer_t *proc)
}
status = process_done_queue(proc, queue);
- if (status != 0)
- return test_and_set_status(proc, status);
-
- return status;
-}
-
-int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
-{
- int status;
-
- while (proc->backlog > proc->max_backlog) {
- status = wait_completed(proc);
- if (status)
- return status;
- }
-
- if (proc->backlog == proc->max_backlog)
- proc->notify_threads = true;
-
- return append_to_work_queue(proc, block, proc->notify_threads);
-}
-
-int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
-{
- int status;
-
- append_to_work_queue(proc, NULL, true);
-
- while (proc->backlog > 0) {
- status = wait_completed(proc);
- if (status)
- return status;
- }
-
- if (proc->frag_block != NULL) {
- status = data_writer_do_block(proc->frag_block,
- proc->workers[0]->cmp,
- proc->workers[0]->scratch,
- proc->max_block_size);
- if (status)
- return status;
-
- status = process_done_queue(proc, proc->frag_block);
- proc->frag_block = NULL;
- if (status)
- return status;
- }
-
- return 0;
+ return status ? test_and_set_status(proc, status) : status;
}
diff --git a/lib/sqfs/data_writer/serial.c b/lib/sqfs/data_writer/serial.c
index 916f497..82f7836 100644
--- a/lib/sqfs/data_writer/serial.c
+++ b/lib/sqfs/data_writer/serial.c
@@ -43,11 +43,13 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status)
return proc->status;
}
-int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
+int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
+ bool signal_threads)
{
sqfs_block_t *fragblk = NULL;
+ (void)signal_threads;
- if (proc->status != 0) {
+ if (proc->status != 0 || block == NULL) {
free(block);
return proc->status;
}
@@ -80,19 +82,7 @@ int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
return proc->status;
}
-int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+int wait_completed(sqfs_data_writer_t *proc)
{
- if (proc->status != 0 || proc->frag_block == NULL)
- return proc->status;
-
- proc->status = data_writer_do_block(proc->frag_block, proc->cmp,
- proc->scratch,
- proc->max_block_size);
-
- if (proc->status == 0)
- proc->status = process_completed_block(proc, proc->frag_block);
-
- free(proc->frag_block);
- proc->frag_block = NULL;
return proc->status;
}
diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c
index 0f1bef1..d790f79 100644
--- a/lib/sqfs/data_writer/windows.c
+++ b/lib/sqfs/data_writer/windows.c
@@ -127,20 +127,38 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
data_writer_cleanup(proc);
}
-static void append_to_work_queue(sqfs_data_writer_t *proc,
- sqfs_block_t *block)
+int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block,
+ bool signal_threads)
{
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
+ int status;
+
+ EnterCriticalSection(&proc->mtx);
+ status = proc->status;
+
+ if (status != 0) {
+ free(block);
+ LeaveCriticalSection(&proc->mtx);
+ return status;
}
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
- proc->backlog += 1;
- WakeAllConditionVariable(&proc->queue_cond);
+ if (block != NULL) {
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
+ }
+
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+ }
+
+ if (signal_threads)
+ WakeAllConditionVariable(&proc->queue_cond);
+
+ LeaveCriticalSection(&proc->mtx);
+ return 0;
}
static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
@@ -248,70 +266,29 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status)
return status;
}
-int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
+int wait_completed(sqfs_data_writer_t *proc)
{
sqfs_block_t *queue;
int status;
EnterCriticalSection(&proc->mtx);
- while (proc->backlog > proc->max_backlog && proc->status == 0) {
+ for (;;) {
+ queue = try_dequeue(proc);
+ status = proc->status;
+
+ if (queue != NULL || status != 0)
+ break;
+
SleepConditionVariableCS(&proc->done_cond, &proc->mtx,
INFINITE);
}
+ LeaveCriticalSection(&proc->mtx);
- if (proc->status != 0) {
- status = proc->status;
- LeaveCriticalSection(&proc->mtx);
- free(block);
+ if (status != 0) {
+ free_blk_list(queue);
return status;
}
- append_to_work_queue(proc, block);
- block = NULL;
-
- queue = try_dequeue(proc);
- LeaveCriticalSection(&proc->mtx);
-
status = process_done_queue(proc, queue);
-
return status ? test_and_set_status(proc, status) : status;
}
-
-int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
-{
- sqfs_block_t *queue;
- int status = 0;
-
- for (;;) {
- EnterCriticalSection(&proc->mtx);
- while (proc->backlog > 0 && proc->status == 0) {
- SleepConditionVariableCS(&proc->done_cond, &proc->mtx,
- INFINITE);
- }
-
- if (proc->status != 0) {
- status = proc->status;
- LeaveCriticalSection(&proc->mtx);
- return status;
- }
-
- queue = proc->done;
- proc->done = NULL;
- LeaveCriticalSection(&proc->mtx);
-
- if (queue == NULL) {
- if (proc->frag_block != NULL) {
- append_to_work_queue(proc, proc->frag_block);
- proc->frag_block = NULL;
- continue;
- }
- break;
- }
-
- status = process_done_queue(proc, queue);
- if (status != 0)
- return status;
- }
-
- return 0;
-}