aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-12 12:48:08 +0100
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-12-12 12:48:33 +0100
commit6d4faedcb53f54253160f1717fac609f922ae0c7 (patch)
treea082bc838a8d08df2769e1da11e600946f7354a9
parent40075fe0751a06c3373f53f36d44a27e79c2cca7 (diff)
Fix thread pool queue accounting
- ONLY manipulate the back log counter in the main thread. - Fix the order of operations when submitting blocks. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/data_writer/common.c2
-rw-r--r--lib/sqfs/data_writer/pthread.c96
2 files changed, 56 insertions, 42 deletions
diff --git a/lib/sqfs/data_writer/common.c b/lib/sqfs/data_writer/common.c
index 5fe70ac..d316a6f 100644
--- a/lib/sqfs/data_writer/common.c
+++ b/lib/sqfs/data_writer/common.c
@@ -77,8 +77,6 @@ void data_writer_store_done(sqfs_data_writer_t *proc, sqfs_block_t *blk,
if (status != 0 && proc->status == 0)
proc->status = status;
-
- proc->backlog -= 1;
}
sqfs_block_t *data_writer_next_work_item(sqfs_data_writer_t *proc)
diff --git a/lib/sqfs/data_writer/pthread.c b/lib/sqfs/data_writer/pthread.c
index 1830d07..97114c5 100644
--- a/lib/sqfs/data_writer/pthread.c
+++ b/lib/sqfs/data_writer/pthread.c
@@ -154,9 +154,19 @@ void sqfs_data_writer_destroy(sqfs_data_writer_t *proc)
data_writer_cleanup(proc);
}
-static void append_to_work_queue(sqfs_data_writer_t *proc,
- sqfs_block_t *block)
+static int append_to_work_queue(sqfs_data_writer_t *proc,
+ sqfs_block_t *block)
{
+ int status;
+
+ pthread_mutex_lock(&proc->mtx);
+ status = proc->status;
+ if (status != 0) {
+ free(block);
+ pthread_mutex_unlock(&proc->mtx);
+ return status;
+ }
+
if (proc->queue_last == NULL) {
proc->queue = proc->queue_last = block;
} else {
@@ -168,6 +178,8 @@ static void append_to_work_queue(sqfs_data_writer_t *proc,
block->next = NULL;
proc->backlog += 1;
pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
+ return 0;
}
static sqfs_block_t *try_dequeue(sqfs_data_writer_t *proc)
@@ -224,6 +236,7 @@ static int process_done_queue(sqfs_data_writer_t *proc, sqfs_block_t *queue)
while (queue != NULL && status == 0) {
it = queue;
queue = it->next;
+ proc->backlog -= 1;
if (it->flags & SQFS_BLK_IS_FRAGMENT) {
block = NULL;
@@ -275,66 +288,69 @@ int test_and_set_status(sqfs_data_writer_t *proc, int status)
return status;
}
-int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
+static int wait_completed(sqfs_data_writer_t *proc)
{
sqfs_block_t *queue;
int status;
pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > proc->max_backlog && proc->status == 0)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->status != 0) {
+ for (;;) {
+ queue = try_dequeue(proc);
status = proc->status;
- pthread_mutex_unlock(&proc->mtx);
- free(block);
- return status;
- }
- append_to_work_queue(proc, block);
- block = NULL;
+ if (queue != NULL || status != 0)
+ break;
- queue = try_dequeue(proc);
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+ }
pthread_mutex_unlock(&proc->mtx);
+ if (status != 0) {
+ free_blk_list(queue);
+ return status;
+ }
+
status = process_done_queue(proc, queue);
if (status != 0)
return test_and_set_status(proc, status);
- return 0;
+ return status;
}
-int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+int data_writer_enqueue(sqfs_data_writer_t *proc, sqfs_block_t *block)
{
- sqfs_block_t *queue;
- int status = 0;
-
- for (;;) {
- pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > 0 && proc->status == 0)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
+ int status;
- if (proc->status != 0) {
- status = proc->status;
- pthread_mutex_unlock(&proc->mtx);
+ while (proc->backlog > proc->max_backlog) {
+ status = wait_completed(proc);
+ if (status)
return status;
- }
+ }
- queue = proc->done;
- proc->done = NULL;
- pthread_mutex_unlock(&proc->mtx);
+ return append_to_work_queue(proc, block);
+}
- if (queue == NULL) {
- if (proc->frag_block != NULL) {
- append_to_work_queue(proc, proc->frag_block);
- proc->frag_block = NULL;
- continue;
- }
- break;
- }
+int sqfs_data_writer_finish(sqfs_data_writer_t *proc)
+{
+ int status;
+
+ while (proc->backlog > 0) {
+ status = wait_completed(proc);
+ if (status)
+ return status;
+ }
+
+ if (proc->frag_block != NULL) {
+ status = data_writer_do_block(proc->frag_block,
+ proc->workers[0]->cmp,
+ proc->workers[0]->scratch,
+ proc->max_block_size);
+ if (status)
+ return status;
- status = process_done_queue(proc, queue);
- if (status != 0)
+ status = process_done_queue(proc, proc->frag_block);
+ proc->frag_block = NULL;
+ if (status)
return status;
}