标签搜索

目 录CONTENT

文章目录

需要初始化的多线程任务

小小城
2021-12-22 / 0 评论 / 0 点赞 / 7 阅读 / 7,159 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

需要初始化的多线程任务

[toc]

一、背景

  1. 想使用多线程完成任务,但是发现每个线程执行任务需要初始化一个句柄才能进行,那么就需要设计一下,避免在每个线程内频繁的初始化、卸载

二、代码

  1. 线程安全的任务队列: engine_work_queue.h
#pragma once
#include <pthread.h>
#include <list>
#include <iostream>
#include <memory>
#include <unistd.h>
#include <signal.h>

#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
  TypeName(const TypeName&) = delete;               \
  void operator=(const TypeName&) = delete

namespace etts {

// 任务类,需要自己实现 
class BaseTask {
public:
    BaseTask(char* sz_text_bufs, int text_buf_len, FILE* fp, 
            char* sz_voice_path_files, char* sz_voice_path_wav_files) {
         // engine_param = engine_params;
        snprintf(sz_text_buf, 1024, "%s", sz_text_bufs);
        // memcpy(sz_text_buf, sz_text_bufs, text_buf_len);
        n_text_buf_len = text_buf_len;
        fp_pcm = fp;
        // memcpy(sz_voice_path_file, sz_voice_path_files, strlen(sz_voice_path_files));
        // memcpy(sz_voice_path_wav_file, sz_voice_path_wav_files, strlen(sz_voice_path_wav_files));
        snprintf(sz_voice_path_file, 1024, "%s", sz_voice_path_files);
        snprintf(sz_voice_path_wav_file, 1024, "%s", sz_voice_path_wav_files);
    }

    ~BaseTask() {}
// private:
public:
    char sz_text_buf[1024];
    int n_text_buf_len;
    FILE* fp_pcm;
    char sz_voice_path_file[1024];
    char sz_voice_path_wav_file[1024];
};

typedef std::shared_ptr<BaseTask> Task;

class EngineWorkQueue {
public:
    static EngineWorkQueue &instance() {
        return _s_instance;
    }

    /**
     * @fn signal_stop()
     * @brief send a signal to all worker threads observing this queue
     * to stop and exit. Someone should join all the threads to ensure
     * they have exitted before quiting.
     */
    int signal_stop();

    /**
     * @fn push_work(std::shared_ptr<TTSWork>&)
     * @brief push a new task to the work queue and signal the state change
         */
    int push_work(const Task &work);

    Task pop_work(bool &out_stop_flag);
    
    EngineWorkQueue();
    
    ~EngineWorkQueue();
    
    int queue_size(int& size);

private:
    DISALLOW_COPY_AND_ASSIGN(EngineWorkQueue);
    static EngineWorkQueue _s_instance;
    bool _stop_flag;
    std::list<Task> _queue;
    pthread_cond_t _queue_cond;
    pthread_mutex_t _queue_mutex;
};

EngineWorkQueue EngineWorkQueue::_s_instance;

    EngineWorkQueue::EngineWorkQueue() {
        _stop_flag = false;
        if (0 != pthread_cond_init(&_queue_cond, NULL)) {
            std::cout << "[EngineWorkQueue::EngineWorkQueue] failed init queue condition!" << std::endl;
            kill(getpid(), SIGABRT);
        }
        if (0 != pthread_mutex_init(&_queue_mutex, NULL)) {
            std::cout << "[EngineWorkQueue::EngineWorkQueue] failed init queue mutex!" << std::endl;
            kill(getpid(), SIGABRT);
        }
    }

    EngineWorkQueue::~EngineWorkQueue() {
        pthread_cond_destroy(&_queue_cond);
        pthread_mutex_destroy(&_queue_mutex);
    }

    int EngineWorkQueue::signal_stop() {
        pthread_mutex_lock(&_queue_mutex);
        _stop_flag = true;
        pthread_cond_broadcast(&_queue_cond);
        pthread_mutex_unlock(&_queue_mutex);
        return 0;
    }

    int EngineWorkQueue::queue_size(int& size) {

        pthread_mutex_lock(&_queue_mutex);
        size = _queue.size();
        pthread_mutex_unlock(&_queue_mutex);
        return 0;
    }

    int EngineWorkQueue::push_work(const Task &work) {
        pthread_mutex_lock(&_queue_mutex);
        printf("push_work success\n");
        _queue.push_back(work);
        pthread_cond_signal(&_queue_cond);
        pthread_mutex_unlock(&_queue_mutex);
        return 0;
    }

    Task EngineWorkQueue::pop_work(bool &out_stop_flag) {
        Task ret;
        pthread_mutex_lock(&_queue_mutex);
        while (0 == _queue.size() && false == _stop_flag) {
            pthread_cond_wait(&_queue_cond, &_queue_mutex);
        }
        
        out_stop_flag = _stop_flag;
        if (out_stop_flag) {
            pthread_mutex_unlock(&_queue_mutex);
            return ret;
        }
        ret = _queue.front();
        _queue.erase(_queue.begin());
        std::cout << "EngineWorkQueue::pop_work queue_size:" << _queue.size() << std::endl;
        pthread_mutex_unlock(&_queue_mutex);
        return ret;
    }
}

  1. 类线程池:engine_new_work.h
#pragma once
#include <memory>
#include <unistd.h>
#include "engine_work_queue.h"

namespace etts {
enum EngineWorkState {
    STATE_INIT = 0,
    STATE_RUNNING,
    STATE_UNINIT,
};

class EngineNewWorker {
public:
    EngineNewWorker(int index);
    ~EngineNewWorker();

    static int run(std::shared_ptr<EngineNewWorker> &worker);
    void join();

    void set_worker_state(EngineWorkState state);
    EngineWorkState get_worker_state();

private:
    DISALLOW_COPY_AND_ASSIGN(EngineNewWorker);
    //about work thread
    static void *tts_synth_handler(void *arg);
    void main_loop();

private:
    // 工作线程tid
    pthread_t _handle;
    // 用来控制线程读写state
    pthread_rwlock_t _rwlock;
    // 工作线程状态
    EngineWorkState _state;

    // 当前工作线程    
    std::shared_ptr<EngineNewWorker> _self;
    // 当前工作线程的任务
    Task _current_task;
    // 当前工作线程的线程id,用户创建的时id
    int _thread_index;
};

EngineNewWorker::EngineNewWorker(int index) : _thread_index(index) { }

EngineNewWorker::~EngineNewWorker() {}

int EngineNewWorker::run(std::shared_ptr<EngineNewWorker> &worker) {
    worker->_self = worker;
    int res = pthread_create(
            &worker->_handle,
            NULL,
            EngineNewWorker::tts_synth_handler,
            static_cast<void *>(worker.get())
    );
    return res;
}

void *EngineNewWorker::tts_synth_handler(void *arg) {
    printf("[EngineNewWorker::tts_synth_handler] Worker starting...\n");
    EngineNewWorker *instance = static_cast<EngineNewWorker *>(arg);
    instance->main_loop();
    printf("[EngineNewWorker::tts_synth_handler] Worker exit\n");
    return NULL;
}

void EngineNewWorker::main_loop() {
    set_worker_state(STATE_INIT);
    //1. 初始化线程数据
    //ret = init_engine(h_tts);

    //2. 设置线程状态
    set_worker_state(STATE_RUNNING);

    // 退出标志
    bool quit = false;
    while (!quit) {
        // 3. 获取任务
        Task task = etts::EngineWorkQueue::instance().pop_work(quit);
        
        // 4. 退出
        if (quit) {
            printf("thread[%d] exit;\n", _thread_index);
            break;
        }

        //  5. 检测任务
        if (NULL == task.get()) {
            printf("[TTSNewWorker::main_loop] NULL task for worker!\n");
            continue;
        }
        printf("[EngineNewWorker::main_loop] get one task success");
        
        // 6. 开始合成
        //ret = synthesis(h_tts, task);
    }//end while

    // 7. 改变当前线程状态
    set_worker_state(STATE_UNINIT);

    // 8. 卸载引擎
    // ret = uninit_engine(h_tts);
    return;
}

void EngineNewWorker::set_worker_state(EngineWorkState state) {
    pthread_rwlock_wrlock(&_rwlock);
    _state = state;
    pthread_rwlock_unlock(&_rwlock);
}

//这个涉及到多个线程
EngineWorkState EngineNewWorker::get_worker_state() {
    pthread_rwlock_rdlock(&_rwlock);
    EngineWorkState ret = _state;
    pthread_rwlock_unlock(&_rwlock);
    return ret;
}

void EngineNewWorker::join() {
    pthread_join(_handle, NULL);
}

三、测试

  1. 创建工作线程
std::vector<std::shared_ptr<EngineNewWorker>> workers;
int thread_num = 10;
for (int i = 0; i < engine_num; ++i) {
    std::shared_ptr<EngineNewWorker> worker(new (std::nothrow)
                                                EngineNewWorker(i));
    worker->set_engine_param(engine_param);
    if (NULL == worker.get()) {
        printf("[Memory][Failed to new a Worker]\n");
        return ;
    }

    // 2. 为当前工作线程初始化,并等待执行任务
    int res = EngineNewWorker::run(worker);
    if (0 != res) {
        printf("[Thread][Failed to launch a Worker]\n");
        return ;
    }
    workers.emplace_back(worker);
}
  1. 检查工作线程的状态
for (auto it = workers.begin(); it != workers.end(); it++) {
    while (true) {
        int state = (*it)->get_worker_state();
        if (state > 0) {
            std::cout << "worker is ready,state:" << state << std::endl;
            break;
        } else {
            std::cout << "worker not ready,need wait,state:" << state << std::endl;
            sleep(1);
            continue;
        }
    }
}
  1. add Task
std::shared_ptr<BaseTask> task(new BaseTask);
EngineWorkQueue::instance().push_work(task);
  1. 等待退出
while (true) {
    int task_size = -1;
    EngineWorkQueue::instance().queue_size(task_size);
    if (task_size == 0) {
        if (0 != EngineWorkQueue::instance().signal_stop()) {
            printf("[Thread][Failed to signal Workers to stop]\n");
            continue;
        }
        break;
    }
    sleep(5);
}
  1. 等待线程退出
for (auto it = workers.begin(); it != workers.end(); ++it) {
    (*it)->join();
}
0

评论区