summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 17:29:40 +0200
committerDavid Oberhollenzer <david.oberhollenzer@sigma-star.at>2019-09-24 17:29:40 +0200
commit2354f7b5cfc289fdec95eeecb8d441c20513712b (patch)
treef48e705b09d51cde8fca0cf55b7aa8b733a384dc
parentb2a7c2e3fd3ad9e4e3d5f475d8338ed30ede358f (diff)
Bring back batched dequeuing
Instead of doing piece-wise block dequeuing, take as many consequitive blocks as we can and in the comparetively rare* case that we produce a fragment block, put everything back for the next run. *rare because a lot of fragments are eaten by deduplication and even after that, a number of them are required to fill a block. Signed-off-by: David Oberhollenzer <david.oberhollenzer@sigma-star.at>
-rw-r--r--lib/sqfs/blk_proc/pthread.c154
1 files changed, 110 insertions, 44 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
index 85146da..de499f7 100644
--- a/lib/sqfs/blk_proc/pthread.c
+++ b/lib/sqfs/blk_proc/pthread.c
@@ -256,74 +256,140 @@ static int test_and_set_status(sqfs_block_processor_t *proc, int status)
return status;
}
-static int queue_pump(sqfs_block_processor_t *proc, sqfs_block_t *block)
+static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)
{
- sqfs_block_t *completed = NULL;
- int status;
+ sqfs_block_t *queue, *it, *prev;
- pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > proc->max_backlog && proc->status == 0)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
+ it = proc->done;
+ prev = NULL;
- if (proc->status != 0) {
- status = proc->status;
- pthread_mutex_unlock(&proc->mtx);
- free(block);
- return status;
+ while (it != NULL && it->sequence_number == proc->dequeue_id) {
+ prev = it;
+ it = it->next;
+ proc->dequeue_id += 1;
+ }
+
+ if (prev == NULL) {
+ queue = NULL;
+ } else {
+ queue = proc->done;
+ prev->next = NULL;
+ proc->done = it;
}
- completed = get_completed_if_avail(proc);
+ return queue;
+}
- append_to_work_queue(proc, block);
- block = NULL;
- pthread_mutex_unlock(&proc->mtx);
+static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
+{
+ sqfs_block_t *it, *head = NULL, **next_ptr = &head;
+
+ while (lhs != NULL && rhs != NULL) {
+ if (lhs->sequence_number <= rhs->sequence_number) {
+ it = lhs;
+ lhs = lhs->next;
+ } else {
+ it = rhs;
+ rhs = rhs->next;
+ }
+
+ *next_ptr = it;
+ next_ptr = &it->next;
+ }
+
+ it = (lhs != NULL ? lhs : rhs);
+ *next_ptr = it;
+ return head;
+}
+
+static int process_done_queue(sqfs_block_processor_t *proc,
+ sqfs_block_t *queue)
+{
+ sqfs_block_t *it, *block = NULL;
+ int status = 0;
+
+ while (queue != NULL) {
+ it = queue;
+ queue = it->next;
+ it->next = NULL;
+
+ if (it->flags & SQFS_BLK_IS_FRAGMENT) {
+ status = handle_fragment(proc, it, &block);
+
+ if (status != 0) {
+ free(it);
+ free(block);
+ free_blk_list(queue);
+ status = test_and_set_status(proc, status);
+ break;
+ }
+
+ if (block != NULL) {
+ pthread_mutex_lock(&proc->mtx);
+ proc->dequeue_id = it->sequence_number;
+ block->sequence_number = proc->dequeue_id;
+
+ if (proc->queue == NULL) {
+ proc->queue = block;
+ proc->queue_last = block;
+ } else {
+ block->next = proc->queue;
+ proc->queue = block;
+ }
+
+ proc->backlog += 1;
+ proc->done = queue_merge(queue, proc->done);
+ pthread_cond_broadcast(&proc->queue_cond);
+ pthread_mutex_unlock(&proc->mtx);
- if (completed == NULL)
- return 0;
-
- if (completed->flags & SQFS_BLK_IS_FRAGMENT) {
- status = handle_fragment(proc, completed, &block);
-
- if (status != 0) {
- free(block);
- status = test_and_set_status(proc, status);
- } else if (block != NULL) {
- pthread_mutex_lock(&proc->mtx);
- proc->dequeue_id = completed->sequence_number;
- block->sequence_number = proc->dequeue_id;
-
- if (proc->queue == NULL) {
- proc->queue = block;
- proc->queue_last = block;
- } else {
- block->next = proc->queue;
- proc->queue = block;
+ queue = NULL;
}
+ } else {
+ status = process_completed_block(proc, it);
- proc->backlog += 1;
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
+ if (status != 0) {
+ status = test_and_set_status(proc, status);
+ free_blk_list(queue);
+ free(it);
+ break;
+ }
}
- } else {
- status = process_completed_block(proc, completed);
- if (status != 0)
- status = test_and_set_status(proc, status);
+ free(it);
}
- free(completed);
return status;
}
int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
sqfs_block_t *block)
{
+ sqfs_block_t *queue;
+ int status;
+
if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
free(block);
return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED);
}
- return queue_pump(proc, block);
+ pthread_mutex_lock(&proc->mtx);
+ while (proc->backlog > proc->max_backlog && proc->status == 0)
+ pthread_cond_wait(&proc->done_cond, &proc->mtx);
+
+ if (proc->status != 0) {
+ status = proc->status;
+ pthread_mutex_unlock(&proc->mtx);
+ free(block);
+ return status;
+ }
+
+ append_to_work_queue(proc, block);
+ block = NULL;
+
+ queue = try_dequeue(proc);
+ pthread_mutex_unlock(&proc->mtx);
+
+ return process_done_queue(proc, queue);
}
int sqfs_block_processor_finish(sqfs_block_processor_t *proc)