thread_pool.cpp 3.77 KB
Newer Older
1
#include <algorithm>
2
3
4
5
#include "thread_pool.h"

namespace sccl {

6
7
8
9
10
11
// 设置线程池最大线程
static constexpr int THREADS_POOL_MAX_SIZE = 128;

/**
 * @brief 线程池构造函数
 *
12
 * 初始化线程池,创建指定数量的工作线程并设置CPU亲和性。
13
 *
14
15
 * @param threads_num 线程池中线程的数量
 * @param cpu_cord_offset CPU亲和性设置的起始偏移量(跳过核心0)
16
 *
17
18
 * @note 线程数量会被限制在THREADS_POOL_MAX_SIZE以内
 * @note 每个工作线程会被绑定到不同的CPU核心,从cpu_cord_offset开始
19
20
21
22
 */
ThreadPool::ThreadPool(size_t threads_num, int cpu_cord_offset) : stop(false), active_tasks(0) {
    threads_num = min(THREADS_POOL_MAX_SIZE, threads_num);

23
24
    pthread_mutex_init(&queue_mutex, nullptr);
    pthread_cond_init(&condition, nullptr);
25
    workers.reserve(threads_num);
26
27
28
29
30

    for(size_t i = 0; i < threads_num; ++i) {
        pthread_t worker;
        pthread_create(&worker, nullptr, ThreadPool::run, this);
        workers.push_back(worker);
31
32
        // 设置工作线程的CPU亲和性,跳过核心0
        setThreadAffinity(worker, i + cpu_cord_offset);
33
34
35
    }
}

36
37
38
39
40
41
42
43
/**
 * @brief 线程池析构函数
 *
 * 负责安全地停止所有工作线程并清理资源:
 * 1. 设置停止标志并通知所有等待的线程
 * 2. 等待所有工作线程结束
 * 3. 销毁互斥锁和条件变量
 */
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
ThreadPool::~ThreadPool() {
    {
        pthread_mutex_lock(&queue_mutex);
        stop = true;
        pthread_mutex_unlock(&queue_mutex);
        pthread_cond_broadcast(&condition);
    }

    for(size_t i = 0; i < workers.size(); ++i) {
        pthread_join(workers[i], nullptr);
    }

    pthread_mutex_destroy(&queue_mutex);
    pthread_cond_destroy(&condition);
}
59

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/**
 * @brief 线程池中工作线程的执行函数
 *
 * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中获取并执行任务。
 * 使用互斥锁和条件变量实现线程安全的任务队列访问。
 * 当线程池停止且任务队列为空时,线程退出。
 *
 * @param arg 指向ThreadPool实例的指针
 * @return void* 总是返回nullptr
 */
void* ThreadPool::run(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while(true) {
        std::function<void()> task;

        {
            pthread_mutex_lock(&pool->queue_mutex);
            while(pool->tasks.empty() && !pool->stop) {
                pthread_cond_wait(&pool->condition, &pool->queue_mutex);
            }
            if(pool->stop && pool->tasks.empty()) {
                pthread_mutex_unlock(&pool->queue_mutex);
                return nullptr;
            }
            task = pool->tasks.front();
            pool->tasks.pop();
            pthread_mutex_unlock(&pool->queue_mutex);
        }

        task(); // 执行任务
        {
            pthread_mutex_lock(&pool->queue_mutex);
            pool->active_tasks--; // 任务完成减少活动任务计数
            pthread_mutex_unlock(&pool->queue_mutex);
        }
    }
}

/**
 * 检查线程池中所有任务是否已完成
 *
 * @return 如果活动任务数为0且任务队列为空则返回true,否则返回false
 * @note 此操作是线程安全的,通过互斥锁保护共享数据
 */
bool ThreadPool::allTasksCompleted() {
    pthread_mutex_lock(&queue_mutex);
    bool completed = (active_tasks == 0) && tasks.empty();
    pthread_mutex_unlock(&queue_mutex);
    return completed;
}

111
112
113
114
115
116
117
118
119
120
121
122
/**
 * 设置指定线程的CPU亲和性,将其绑定到指定的核心上
 * @param thread 需要设置亲和性的线程
 * @param core_id 要绑定的CPU核心ID
 */
void ThreadPool::setThreadAffinity(pthread_t thread, int core_id) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(core_id, &cpuset);
    pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
}

123
} // namespace sccl