aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-16 01:28:43 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2020-02-16 01:28:43 +0100
commit81f9ddf58b4024d51b24123788c860bb08a85b04 (patch)
tree4e39ad789ae31de683592e4638eb37a6fbcd9a4f /lib
parentd6143ce54787f70eda9ad2492823e6142e8c5652 (diff)
Move all the queue-waiting logic to the thread pool implemenation
Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
Diffstat (limited to 'lib')
-rw-r--r--lib/sqfs/block_processor/fileapi.c45
-rw-r--r--lib/sqfs/block_processor/internal.h2
-rw-r--r--lib/sqfs/block_processor/serial.c5
-rw-r--r--lib/sqfs/block_processor/winpthread.c78
4 files changed, 58 insertions, 72 deletions
diff --git a/lib/sqfs/block_processor/fileapi.c b/lib/sqfs/block_processor/fileapi.c
index 098f724..073d92f 100644
--- a/lib/sqfs/block_processor/fileapi.c
+++ b/lib/sqfs/block_processor/fileapi.c
@@ -7,19 +7,6 @@
#define SQFS_BUILDING_DLL
#include "internal.h"
-static int enqueue_block(sqfs_block_processor_t *proc, sqfs_block_t *block)
-{
- int status;
-
- while (proc->backlog > proc->max_backlog) {
- status = wait_completed(proc);
- if (status)
- return status;
- }
-
- return append_to_work_queue(proc, block);
-}
-
static int add_sentinel_block(sqfs_block_processor_t *proc)
{
sqfs_block_t *blk = calloc(1, sizeof(*blk));
@@ -30,21 +17,7 @@ static int add_sentinel_block(sqfs_block_processor_t *proc)
blk->inode = proc->inode;
blk->flags = proc->blk_flags | SQFS_BLK_LAST_BLOCK;
- return enqueue_block(proc, blk);
-}
-
-int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
- sqfs_inode_generic_t *inode, sqfs_u32 flags)
-{
- if (proc->inode != NULL)
- return SQFS_ERROR_SEQUENCE;
-
- if (flags & ~SQFS_BLK_USER_SETTABLE_FLAGS)
- return SQFS_ERROR_UNSUPPORTED;
-
- proc->inode = inode;
- proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK;
- return 0;
+ return append_to_work_queue(proc, blk);
}
static int flush_block(sqfs_block_processor_t *proc)
@@ -60,7 +33,21 @@ static int flush_block(sqfs_block_processor_t *proc)
proc->blk_flags &= ~SQFS_BLK_FIRST_BLOCK;
}
- return enqueue_block(proc, block);
+ return append_to_work_queue(proc, block);
+}
+
+int sqfs_block_processor_begin_file(sqfs_block_processor_t *proc,
+ sqfs_inode_generic_t *inode, sqfs_u32 flags)
+{
+ if (proc->inode != NULL)
+ return SQFS_ERROR_SEQUENCE;
+
+ if (flags & ~SQFS_BLK_USER_SETTABLE_FLAGS)
+ return SQFS_ERROR_UNSUPPORTED;
+
+ proc->inode = inode;
+ proc->blk_flags = flags | SQFS_BLK_FIRST_BLOCK;
+ return 0;
}
int sqfs_block_processor_append(sqfs_block_processor_t *proc, const void *data,
diff --git a/lib/sqfs/block_processor/internal.h b/lib/sqfs/block_processor/internal.h
index cea820f..f26d58e 100644
--- a/lib/sqfs/block_processor/internal.h
+++ b/lib/sqfs/block_processor/internal.h
@@ -115,6 +115,4 @@ int block_processor_do_block(sqfs_block_t *block, sqfs_compressor_t *cmp,
SQFS_INTERNAL
int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block);
-SQFS_INTERNAL int wait_completed(sqfs_block_processor_t *proc);
-
#endif /* INTERNAL_H */
diff --git a/lib/sqfs/block_processor/serial.c b/lib/sqfs/block_processor/serial.c
index 1d2c2db..e44ce17 100644
--- a/lib/sqfs/block_processor/serial.c
+++ b/lib/sqfs/block_processor/serial.c
@@ -74,11 +74,6 @@ done:
return proc->status;
}
-int wait_completed(sqfs_block_processor_t *proc)
-{
- return proc->status;
-}
-
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
{
if (proc->frag_block == NULL || proc->status != 0)
diff --git a/lib/sqfs/block_processor/winpthread.c b/lib/sqfs/block_processor/winpthread.c
index fcbb8b4..0ad8475 100644
--- a/lib/sqfs/block_processor/winpthread.c
+++ b/lib/sqfs/block_processor/winpthread.c
@@ -311,35 +311,6 @@ fail_init:
}
#endif
-int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
-{
- int status;
-
- LOCK(&proc->mtx);
- status = proc->status;
- if (status != 0)
- goto out;
-
- if (block != NULL) {
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
- proc->backlog += 1;
- block = NULL;
- }
-out:
- SIGNAL_ALL(&proc->queue_cond);
- UNLOCK(&proc->mtx);
- free(block);
- return 0;
-}
-
static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)
{
sqfs_block_t *queue, *it, *prev;
@@ -433,7 +404,7 @@ static int process_done_queue(sqfs_block_processor_t *proc, sqfs_block_t *queue)
return status;
}
-int wait_completed(sqfs_block_processor_t *proc)
+static int wait_completed(sqfs_block_processor_t *proc)
{
sqfs_block_t *queue;
int status;
@@ -470,19 +441,54 @@ int wait_completed(sqfs_block_processor_t *proc)
return status;
}
-int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
+int append_to_work_queue(sqfs_block_processor_t *proc, sqfs_block_t *block)
{
- int status = 0;
-
- append_to_work_queue(proc, NULL);
+ int status;
- while (proc->backlog > 0) {
+ while (proc->backlog > proc->max_backlog) {
status = wait_completed(proc);
if (status)
return status;
}
- if (proc->frag_block != NULL) {
+ LOCK(&proc->mtx);
+ status = proc->status;
+ if (status != 0)
+ goto out;
+
+ if (block != NULL) {
+ if (proc->queue_last == NULL) {
+ proc->queue = proc->queue_last = block;
+ } else {
+ proc->queue_last->next = block;
+ proc->queue_last = block;
+ }
+
+ block->sequence_number = proc->enqueue_id++;
+ block->next = NULL;
+ proc->backlog += 1;
+ block = NULL;
+ }
+out:
+ SIGNAL_ALL(&proc->queue_cond);
+ UNLOCK(&proc->mtx);
+ free(block);
+ return 0;
+}
+
+int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
+{
+ int status;
+
+ LOCK(&proc->mtx);
+ status = proc->status;
+ SIGNAL_ALL(&proc->queue_cond);
+ UNLOCK(&proc->mtx);
+
+ while (status == 0 && proc->backlog > 0)
+ status = wait_completed(proc);
+
+ if (status == 0 && proc->frag_block != NULL) {
status = append_to_work_queue(proc, proc->frag_block);
proc->frag_block = NULL;