summaryrefslogtreecommitdiff
path: root/lib/sqfs/blk_proc/pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqfs/blk_proc/pthread.c')
-rw-r--r--lib/sqfs/blk_proc/pthread.c334
1 files changed, 0 insertions, 334 deletions
diff --git a/lib/sqfs/blk_proc/pthread.c b/lib/sqfs/blk_proc/pthread.c
deleted file mode 100644
index 7b95c7c..0000000
--- a/lib/sqfs/blk_proc/pthread.c
+++ /dev/null
@@ -1,334 +0,0 @@
-/* SPDX-License-Identifier: LGPL-3.0-or-later */
-/*
- * block_processor.c
- *
- * Copyright (C) 2019 David Oberhollenzer <goliath@infraroot.at>
- */
-#define SQFS_BUILDING_DLL
-#include "internal.h"
-
-static void *worker_proc(void *arg)
-{
- compress_worker_t *worker = arg;
- sqfs_block_processor_t *shared = worker->shared;
- sqfs_block_t *blk = NULL;
- int status = 0;
-
- for (;;) {
- pthread_mutex_lock(&shared->mtx);
- if (blk != NULL) {
- block_processor_store_done(shared, blk, status);
- pthread_cond_broadcast(&shared->done_cond);
- }
-
- while (shared->queue == NULL && shared->status == 0)
- pthread_cond_wait(&shared->queue_cond, &shared->mtx);
-
- blk = block_processor_next_work_item(shared);
- pthread_mutex_unlock(&shared->mtx);
-
- if (blk == NULL)
- break;
-
- status = block_processor_do_block(blk, worker->cmp,
- worker->scratch,
- shared->max_block_size);
- }
- return NULL;
-}
-
-sqfs_block_processor_t *sqfs_block_processor_create(size_t max_block_size,
- sqfs_compressor_t *cmp,
- unsigned int num_workers,
- size_t max_backlog,
- size_t devblksz,
- sqfs_file_t *file)
-{
- sqfs_block_processor_t *proc;
- unsigned int i;
- int ret;
-
- if (num_workers < 1)
- num_workers = 1;
-
- proc = alloc_flex(sizeof(*proc),
- sizeof(proc->workers[0]), num_workers);
- if (proc == NULL)
- return NULL;
-
- proc->mtx = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
- proc->queue_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
- proc->done_cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
-
- if (block_processor_init(proc, max_block_size, cmp, num_workers,
- max_backlog, devblksz, file)) {
- goto fail_init;
- }
-
- for (i = 0; i < num_workers; ++i) {
- proc->workers[i] = alloc_flex(sizeof(compress_worker_t),
- 1, max_block_size);
-
- if (proc->workers[i] == NULL)
- goto fail_init;
-
- proc->workers[i]->shared = proc;
- proc->workers[i]->cmp = cmp->create_copy(cmp);
-
- if (proc->workers[i]->cmp == NULL)
- goto fail_init;
- }
-
- for (i = 0; i < num_workers; ++i) {
- ret = pthread_create(&proc->workers[i]->thread, NULL,
- worker_proc, proc->workers[i]);
-
- if (ret != 0)
- goto fail_thread;
- }
-
- return proc;
-fail_thread:
- pthread_mutex_lock(&proc->mtx);
- proc->status = -1;
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
-
- for (i = 0; i < num_workers; ++i) {
- if (proc->workers[i]->thread > 0) {
- pthread_join(proc->workers[i]->thread, NULL);
- }
- }
-fail_init:
- for (i = 0; i < num_workers; ++i) {
- if (proc->workers[i] != NULL) {
- if (proc->workers[i]->cmp != NULL) {
- proc->workers[i]->cmp->
- destroy(proc->workers[i]->cmp);
- }
-
- free(proc->workers[i]);
- }
- }
- pthread_cond_destroy(&proc->done_cond);
- pthread_cond_destroy(&proc->queue_cond);
- pthread_mutex_destroy(&proc->mtx);
- block_processor_cleanup(proc);
- return NULL;
-}
-
-void sqfs_block_processor_destroy(sqfs_block_processor_t *proc)
-{
- unsigned int i;
-
- pthread_mutex_lock(&proc->mtx);
- proc->status = -1;
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
-
- for (i = 0; i < proc->num_workers; ++i) {
- pthread_join(proc->workers[i]->thread, NULL);
-
- proc->workers[i]->cmp->destroy(proc->workers[i]->cmp);
- free(proc->workers[i]);
- }
-
- pthread_cond_destroy(&proc->done_cond);
- pthread_cond_destroy(&proc->queue_cond);
- pthread_mutex_destroy(&proc->mtx);
-
- block_processor_cleanup(proc);
-}
-
-static void append_to_work_queue(sqfs_block_processor_t *proc,
- sqfs_block_t *block)
-{
- if (proc->queue_last == NULL) {
- proc->queue = proc->queue_last = block;
- } else {
- proc->queue_last->next = block;
- proc->queue_last = block;
- }
-
- block->sequence_number = proc->enqueue_id++;
- block->next = NULL;
- proc->backlog += 1;
- pthread_cond_broadcast(&proc->queue_cond);
-}
-
-static int test_and_set_status(sqfs_block_processor_t *proc, int status)
-{
- pthread_mutex_lock(&proc->mtx);
- if (proc->status == 0) {
- proc->status = status;
- } else {
- status = proc->status;
- }
- pthread_cond_broadcast(&proc->queue_cond);
- return status;
-}
-
-static sqfs_block_t *try_dequeue(sqfs_block_processor_t *proc)
-{
- sqfs_block_t *queue, *it, *prev;
-
- it = proc->done;
- prev = NULL;
-
- while (it != NULL && it->sequence_number == proc->dequeue_id) {
- prev = it;
- it = it->next;
- proc->dequeue_id += 1;
- }
-
- if (prev == NULL) {
- queue = NULL;
- } else {
- queue = proc->done;
- prev->next = NULL;
- proc->done = it;
- }
-
- return queue;
-}
-
-static sqfs_block_t *queue_merge(sqfs_block_t *lhs, sqfs_block_t *rhs)
-{
- sqfs_block_t *it, *head = NULL, **next_ptr = &head;
-
- while (lhs != NULL && rhs != NULL) {
- if (lhs->sequence_number <= rhs->sequence_number) {
- it = lhs;
- lhs = lhs->next;
- } else {
- it = rhs;
- rhs = rhs->next;
- }
-
- *next_ptr = it;
- next_ptr = &it->next;
- }
-
- it = (lhs != NULL ? lhs : rhs);
- *next_ptr = it;
- return head;
-}
-
-static int process_done_queue(sqfs_block_processor_t *proc,
- sqfs_block_t *queue)
-{
- sqfs_block_t *it, *block = NULL;
- int status = 0;
-
- while (queue != NULL && status == 0) {
- it = queue;
- queue = it->next;
-
- if (it->flags & SQFS_BLK_IS_FRAGMENT) {
- block = NULL;
- status = process_completed_fragment(proc, it, &block);
-
- if (block != NULL && status == 0) {
- pthread_mutex_lock(&proc->mtx);
- proc->dequeue_id = it->sequence_number;
- block->sequence_number = it->sequence_number;
-
- if (proc->queue == NULL) {
- proc->queue = block;
- proc->queue_last = block;
- } else {
- block->next = proc->queue;
- proc->queue = block;
- }
-
- proc->backlog += 1;
- proc->done = queue_merge(queue, proc->done);
- pthread_cond_broadcast(&proc->queue_cond);
- pthread_mutex_unlock(&proc->mtx);
-
- queue = NULL;
- } else {
- free(block);
- }
- } else {
- status = process_completed_block(proc, it);
- }
-
- free(it);
- }
-
- free_blk_list(queue);
- return status;
-}
-
-int sqfs_block_processor_enqueue(sqfs_block_processor_t *proc,
- sqfs_block_t *block)
-{
- sqfs_block_t *queue;
- int status;
-
- if (block->flags & ~SQFS_BLK_USER_SETTABLE_FLAGS) {
- free(block);
- return test_and_set_status(proc, SQFS_ERROR_UNSUPPORTED);
- }
-
- pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > proc->max_backlog && proc->status == 0)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->status != 0) {
- status = proc->status;
- pthread_mutex_unlock(&proc->mtx);
- free(block);
- return status;
- }
-
- append_to_work_queue(proc, block);
- block = NULL;
-
- queue = try_dequeue(proc);
- pthread_mutex_unlock(&proc->mtx);
-
- status = process_done_queue(proc, queue);
- if (status != 0)
- return test_and_set_status(proc, status);
-
- return 0;
-}
-
-int sqfs_block_processor_finish(sqfs_block_processor_t *proc)
-{
- sqfs_block_t *queue;
- int status = 0;
-
- for (;;) {
- pthread_mutex_lock(&proc->mtx);
- while (proc->backlog > 0 && proc->status == 0)
- pthread_cond_wait(&proc->done_cond, &proc->mtx);
-
- if (proc->status != 0) {
- status = proc->status;
- pthread_mutex_unlock(&proc->mtx);
- return status;
- }
-
- queue = proc->done;
- proc->done = NULL;
- pthread_mutex_unlock(&proc->mtx);
-
- if (queue == NULL) {
- if (proc->frag_block != NULL) {
- append_to_work_queue(proc, proc->frag_block);
- proc->frag_block = NULL;
- continue;
- }
- break;
- }
-
- status = process_done_queue(proc, queue);
- if (status != 0)
- return status;
- }
-
- return 0;
-}