thread_pool.cpp 2.56 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include "thread_pool.h"
#include <iostream>

/**
 * @brief 线程池构造函数
 *
 * 初始化线程池,创建指定数量的工作线程并启动。
 *
 * @param thread_cnt 要创建的线程数量
 *
 * @note 每个线程将执行run_thread函数,并共享任务队列和关闭标志
 */
Pthpool::Pthpool(int thread_cnt) : para(this->task_queue, this->shutdown) {
    this->thread_cnt  = thread_cnt;
    this->thread_pool = new pthread_t[thread_cnt];
    this->task_queue  = Task_Queue();
    this->shutdown    = false;
    for(int i = 0; i < thread_cnt; ++i) {
        pthread_create(&this->thread_pool[i], NULL, this->run_thread, &this->para);
    }
}
/**
 * @brief 线程池析构函数
 *
 * 负责清理线程池资源,包括:
 * 1. 销毁任务队列
 * 2. 设置关闭标志位
 * 3. 等待所有线程执行完成
 * 4. 释放线程池数组内存
 *
 * @note 会阻塞等待所有线程退出
 */
Pthpool::~Pthpool() {
    // 等待所有线程执行完
    this->task_queue.destroy();
    this->shutdown = true;
    for(int i = 0; i < this->thread_cnt; ++i) {
        pthread_join(this->thread_pool[i], NULL);
    }
    // 执行某种操作
    // std::cout<<"remain task queue size: "<<this->task_queue.size()<<"\n";
    // std::cout<<"end"<<std::endl;
    delete[] thread_pool;
}
/**
 * @brief 向线程池的任务队列中添加一个新任务
 *
 * @param f 要执行的任务函数指针,函数形式为 void* func(void*)
 * @param arg 传递给任务函数的参数指针
 */
void Pthpool::add_task(void* f(void*), void* arg) { this->task_queue.add_task(f, arg); }

/**
 * @brief 线程池工作线程的主循环函数
 *
 * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中提取并执行任务。
 * 当收到关闭信号时,会先完成队列中剩余任务再退出。
 *
 * @param arg 指向包含任务队列和关闭标志的节点指针
 * @return void* 线程退出状态(总是返回0)
 */
void* Pthpool::run_thread(void* arg) {
    node* node_arg         = (node*)arg;
    Task_Queue& task_queue = node_arg->task_queue;
    bool& shutdown         = node_arg->shutdown;

    while(true) {
        if(shutdown) {
            auto temp = task_queue.try_extract_task(); // 把剩下的任务跑玩
            while(temp.second) {
                temp.first.f(temp.first.arg);
                temp = task_queue.try_extract_task();
            }
            return (void*)0;
        }
        auto temp = task_queue.extract_task();
        if(!temp.second)
            continue;
        temp.first.f(temp.first.arg);
    }
    return (void*)0;
}