thread_pool.cpp 3.79 KB
Newer Older
1
#include <algorithm>
2
3
4
5
#include "thread_pool.h"

namespace sccl {

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 设置线程池最大线程
static constexpr int THREADS_POOL_MAX_SIZE = 128;

/**
 * @brief 线程池构造函数
 *
 * 初始化线程池,创建指定数量的工作线程。
 *
 * @param threads_num 线程池中初始线程数量,不超过THREADS_POOL_MAX_SIZE限制
 *
 * @note 会初始化互斥锁和条件变量,并启动工作线程执行ThreadPool::run函数
 */
ThreadPool::ThreadPool(size_t threads_num, int cpu_cord_offset) : stop(false), active_tasks(0) {
    threads_num = min(THREADS_POOL_MAX_SIZE, threads_num);

21
22
    pthread_mutex_init(&queue_mutex, nullptr);
    pthread_cond_init(&condition, nullptr);
23
    // printf("ThreadPool 构造函数");
24
25
26
27
28

    for(size_t i = 0; i < threads_num; ++i) {
        pthread_t worker;
        pthread_create(&worker, nullptr, ThreadPool::run, this);
        workers.push_back(worker);
29
30
        // 设置工作线程的CPU亲和性,跳过核心0
        setThreadAffinity(worker, i + cpu_cord_offset);
31
32
33
    }
}

34
35
36
37
38
39
40
41
/**
 * @brief 线程池析构函数
 *
 * 负责安全地停止所有工作线程并清理资源:
 * 1. 设置停止标志并通知所有等待的线程
 * 2. 等待所有工作线程结束
 * 3. 销毁互斥锁和条件变量
 */
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
ThreadPool::~ThreadPool() {
    {
        pthread_mutex_lock(&queue_mutex);
        stop = true;
        pthread_mutex_unlock(&queue_mutex);
        pthread_cond_broadcast(&condition);
    }

    for(size_t i = 0; i < workers.size(); ++i) {
        pthread_join(workers[i], nullptr);
    }

    pthread_mutex_destroy(&queue_mutex);
    pthread_cond_destroy(&condition);
}
57

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
 * @brief 线程池中工作线程的执行函数
 *
 * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中获取并执行任务。
 * 使用互斥锁和条件变量实现线程安全的任务队列访问。
 * 当线程池停止且任务队列为空时,线程退出。
 *
 * @param arg 指向ThreadPool实例的指针
 * @return void* 总是返回nullptr
 */
void* ThreadPool::run(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while(true) {
        std::function<void()> task;

        {
            pthread_mutex_lock(&pool->queue_mutex);
            while(pool->tasks.empty() && !pool->stop) {
                pthread_cond_wait(&pool->condition, &pool->queue_mutex);
            }
            if(pool->stop && pool->tasks.empty()) {
                pthread_mutex_unlock(&pool->queue_mutex);
                return nullptr;
            }
            task = pool->tasks.front();
            pool->tasks.pop();
            pthread_mutex_unlock(&pool->queue_mutex);
        }

        task(); // 执行任务
        {
            pthread_mutex_lock(&pool->queue_mutex);
90
91
            printf("ThreadPool active_tasks--");

92
93
94
95
96
97
98
99
100
101
102
103
104
105
            pool->active_tasks--; // 任务完成减少活动任务计数
            pthread_mutex_unlock(&pool->queue_mutex);
        }
    }
}

/**
 * 检查线程池中所有任务是否已完成
 *
 * @return 如果活动任务数为0且任务队列为空则返回true,否则返回false
 * @note 此操作是线程安全的,通过互斥锁保护共享数据
 */
bool ThreadPool::allTasksCompleted() {
    pthread_mutex_lock(&queue_mutex);
106
    printf("active_tasks: %d, tasks.size(): %lu\n", active_tasks, tasks.size());
107
108
109
110
111
    bool completed = (active_tasks == 0) && tasks.empty();
    pthread_mutex_unlock(&queue_mutex);
    return completed;
}

112
113
114
115
116
117
118
119
120
121
122
123
/**
 * 设置指定线程的CPU亲和性,将其绑定到指定的核心上
 * @param thread 需要设置亲和性的线程
 * @param core_id 要绑定的CPU核心ID
 */
void ThreadPool::setThreadAffinity(pthread_t thread, int core_id) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(core_id, &cpuset);
    pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
}

124
} // namespace sccl