From 514e9e500abdd8ea91ea3b2fca214587ee24a342 Mon Sep 17 00:00:00 2001 From: David Oberhollenzer Date: Fri, 13 Dec 2019 02:00:36 +0100 Subject: Cleanup data writer Signed-off-by: David Oberhollenzer --- lib/sqfs/data_writer/fileapi.c | 45 +++++++++++++++++- lib/sqfs/data_writer/internal.h | 5 +- lib/sqfs/data_writer/pthread.c | 56 ++-------------------- lib/sqfs/data_writer/serial.c | 20 ++------ lib/sqfs/data_writer/windows.c | 103 ++++++++++++++++------------------------ 5 files changed, 96 insertions(+), 133 deletions(-) diff --git a/lib/sqfs/data_writer/fileapi.c b/lib/sqfs/data_writer/fileapi.c index 98bdf5c..de77690 100644 --- a/lib/sqfs/data_writer/fileapi.c +++ b/lib/sqfs/data_writer/fileapi.c @@ -12,6 +12,22 @@ static bool is_zero_block(unsigned char *ptr, size_t size) return ptr[0] == 0 && memcmp(ptr, ptr + 1, size - 1) == 0; } +static int enqueue_block(sqfs_data_writer_t *proc, sqfs_block_t *block) +{ + int status; + + while (proc->backlog > proc->max_backlog) { + status = wait_completed(proc); + if (status) + return status; + } + + if (proc->backlog == proc->max_backlog) + proc->notify_threads = true; + + return append_to_work_queue(proc, block, proc->notify_threads); +} + static int add_sentinel_block(sqfs_data_writer_t *proc) { sqfs_block_t *blk = calloc(1, sizeof(*blk)); @@ -22,7 +38,7 @@ static int add_sentinel_block(sqfs_data_writer_t *proc) blk->inode = proc->inode; blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK; - return data_writer_enqueue(proc, blk); + return enqueue_block(proc, blk); } int sqfs_data_writer_begin_file(sqfs_data_writer_t *proc, @@ -64,7 +80,7 @@ static int flush_block(sqfs_data_writer_t *proc, sqfs_block_t *block) proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK; } - return data_writer_enqueue(proc, block); + return enqueue_block(proc, block); } int sqfs_data_writer_append(sqfs_data_writer_t *proc, const void *data, @@ -145,3 +161,28 @@ int sqfs_data_writer_end_file(sqfs_data_writer_t *proc) proc->blk_index = 0; return 0; } + +int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +{ + int status = 0; + + append_to_work_queue(proc, NULL, true); + + while (proc->backlog > 0) { + status = wait_completed(proc); + if (status) + return status; + } + + if (proc->frag_block != NULL) { + status = append_to_work_queue(proc, proc->frag_block, true); + proc->frag_block = NULL; + + if (status) + return status; + + status = wait_completed(proc); + } + + return status; +} diff --git a/lib/sqfs/data_writer/internal.h b/lib/sqfs/data_writer/internal.h index 24f520e..1a85644 100644 --- a/lib/sqfs/data_writer/internal.h +++ b/lib/sqfs/data_writer/internal.h @@ -148,6 +148,9 @@ SQFS_INTERNAL int test_and_set_status(sqfs_data_writer_t *proc, int status); SQFS_INTERNAL -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block); +int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block, + bool notify_threads); + +SQFS_INTERNAL int wait_completed(sqfs_data_writer_t *proc); #endif /* INTERNAL_H */ diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c index cfd2d3e..8196641 100644 --- a/lib/sqfs/data_writer/pthread.c +++ b/lib/sqfs/data_writer/pthread.c @@ -154,8 +154,8 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) data_writer_cleanup(proc); } -static int append_to_work_queue(sqfs_data_writer_t *proc, - sqfs_block_t *block, bool signal_threads) +int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block, + bool signal_threads) { int status; @@ -292,7 +292,7 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status) return status; } -static int wait_completed(sqfs_data_writer_t *proc) +int wait_completed(sqfs_data_writer_t *proc) { sqfs_block_t *queue; int status; @@ -315,53 +315,5 @@ static int wait_completed(sqfs_data_writer_t *proc) } status = process_done_queue(proc, queue); - if (status != 0) - return test_and_set_status(proc, status); - - return status; -} - -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) -{ - int status; - - while (proc->backlog > proc->max_backlog) { - status = wait_completed(proc); - if (status) - return status; - } - - if (proc->backlog == proc->max_backlog) - proc->notify_threads = true; - - return append_to_work_queue(proc, block, proc->notify_threads); -} - -int sqfs_data_writer_finish(sqfs_data_writer_t *proc) -{ - int status; - - append_to_work_queue(proc, NULL, true); - - while (proc->backlog > 0) { - status = wait_completed(proc); - if (status) - return status; - } - - if (proc->frag_block != NULL) { - status = data_writer_do_block(proc->frag_block, - proc->workers[0]->cmp, - proc->workers[0]->scratch, - proc->max_block_size); - if (status) - return status; - - status = process_done_queue(proc, proc->frag_block); - proc->frag_block = NULL; - if (status) - return status; - } - - return 0; + return status ? test_and_set_status(proc, status) : status; } diff --git a/lib/sqfs/data_writer/serial.c b/lib/sqfs/data_writer/serial.c index 916f497..82f7836 100644 --- a/lib/sqfs/data_writer/serial.c +++ b/lib/sqfs/data_writer/serial.c @@ -43,11 +43,13 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status) return proc->status; } -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) +int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block, + bool signal_threads) { sqfs_block_t *fragblk = NULL; + (void)signal_threads; - if (proc->status != 0) { + if (proc->status != 0 || block == NULL) { free(block); return proc->status; } @@ -80,19 +82,7 @@ int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) return proc->status; } -int sqfs_data_writer_finish(sqfs_data_writer_t *proc) +int wait_completed(sqfs_data_writer_t *proc) { - if (proc->status != 0 || proc->frag_block == NULL) - return proc->status; - - proc->status = data_writer_do_block(proc->frag_block, proc->cmp, - proc->scratch, - proc->max_block_size); - - if (proc->status == 0) - proc->status = process_completed_block(proc, proc->frag_block); - - free(proc->frag_block); - proc->frag_block = NULL; return proc->status; } diff --git a/lib/sqfs/data_writer/windows.c b/lib/sqfs/data_writer/windows.c index 0f1bef1..d790f79 100644 --- a/lib/sqfs/data_writer/windows.c +++ b/lib/sqfs/data_writer/windows.c @@ -127,20 +127,38 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc) data_writer_cleanup(proc); } -static void append_to_work_queue(sqfs_data_writer_t *proc, - sqfs_block_t *block) +int append_to_work_queue(sqfs_data_writer_t *proc, sqfs_block_t *block, + bool signal_threads) { - if (proc->queue_last == NULL) { - proc->queue = proc->queue_last = block; - } else { - proc->queue_last->next = block; - proc->queue_last = block; + int status; + + EnterCriticalSection(&proc->mtx); + status = proc->status; + + if (status != 0) { + free(block); + LeaveCriticalSection(&proc->mtx); + return status; } - block->sequence_number = proc->enqueue_id++; - block->next = NULL; - proc->backlog += 1; - WakeAllConditionVariable(&proc->queue_cond); + if (block != NULL) { + if (proc->queue_last == NULL) { + proc->queue = proc->queue_last = block; + } else { + proc->queue_last->next = block; + proc->queue_last = block; + } + + block->sequence_number = proc->enqueue_id++; + block->next = NULL; + proc->backlog += 1; + } + + if (signal_threads) + WakeAllConditionVariable(&proc->queue_cond); + + LeaveCriticalSection(&proc->mtx); + return 0; } static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc) @@ -248,70 +266,29 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status) return status; } -int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block) +int wait_completed(sqfs_data_writer_t *proc) { sqfs_block_t *queue; int status; EnterCriticalSection(&proc->mtx); - while (proc->backlog > proc->max_backlog && proc->status == 0) { + for (;;) { + queue = try_dequeue(proc); + status = proc->status; + + if (queue != NULL || status != 0) + break; + SleepConditionVariableCS(&proc->done_cond, &proc->mtx, INFINITE); } + LeaveCriticalSection(&proc->mtx); - if (proc->status != 0) { - status = proc->status; - LeaveCriticalSection(&proc->mtx); - free(block); + if (status != 0) { + free_blk_list(queue); return status; } - append_to_work_queue(proc, block); - block = NULL; - - queue = try_dequeue(proc); - LeaveCriticalSection(&proc->mtx); - status = process_done_queue(proc, queue); - return status ? test_and_set_status(proc, status) : status; } - -int sqfs_data_writer_finish(sqfs_data_writer_t *proc) -{ - sqfs_block_t *queue; - int status = 0; - - for (;;) { - EnterCriticalSection(&proc->mtx); - while (proc->backlog > 0 && proc->status == 0) { - SleepConditionVariableCS(&proc->done_cond, &proc->mtx, - INFINITE); - } - - if (proc->status != 0) { - status = proc->status; - LeaveCriticalSection(&proc->mtx); - return status; - } - - queue = proc->done; - proc->done = NULL; - LeaveCriticalSection(&proc->mtx); - - if (queue == NULL) { - if (proc->frag_block != NULL) { - append_to_work_queue(proc, proc->frag_block); - proc->frag_block = NULL; - continue; - } - break; - } - - status = process_done_queue(proc, queue); - if (status != 0) - return status; - } - - return 0; -} -- cgit v1.2.3