#include #include "thread_pool.h" namespace sccl { // 设置线程池最大线程 static constexpr int THREADS_POOL_MAX_SIZE = 128; /** * @brief 线程池构造函数 * * 初始化线程池,创建指定数量的工作线程并设置CPU亲和性。 * * @param threads_num 线程池中线程的数量 * @param cpu_cord_offset CPU亲和性设置的起始偏移量(跳过核心0) * * @note 线程数量会被限制在THREADS_POOL_MAX_SIZE以内 * @note 每个工作线程会被绑定到不同的CPU核心,从cpu_cord_offset开始 */ ThreadPool::ThreadPool(size_t threads_num, int cpu_cord_offset) : stop(false), active_tasks(0) { threads_num = min(THREADS_POOL_MAX_SIZE, threads_num); pthread_mutex_init(&queue_mutex, nullptr); pthread_cond_init(&condition, nullptr); workers.reserve(threads_num); for(size_t i = 0; i < threads_num; ++i) { pthread_t worker; pthread_create(&worker, nullptr, ThreadPool::run, this); workers.push_back(worker); // 设置工作线程的CPU亲和性,跳过核心0 setThreadAffinity(worker, i + cpu_cord_offset); } } /** * @brief 线程池析构函数 * * 负责安全地停止所有工作线程并清理资源: * 1. 设置停止标志并通知所有等待的线程 * 2. 等待所有工作线程结束 * 3. 销毁互斥锁和条件变量 */ ThreadPool::~ThreadPool() { { pthread_mutex_lock(&queue_mutex); stop = true; pthread_mutex_unlock(&queue_mutex); pthread_cond_broadcast(&condition); } for(size_t i = 0; i < workers.size(); ++i) { pthread_join(workers[i], nullptr); } pthread_mutex_destroy(&queue_mutex); pthread_cond_destroy(&condition); } /** * @brief 线程池中工作线程的执行函数 * * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中获取并执行任务。 * 使用互斥锁和条件变量实现线程安全的任务队列访问。 * 当线程池停止且任务队列为空时,线程退出。 * * @param arg 指向ThreadPool实例的指针 * @return void* 总是返回nullptr */ void* ThreadPool::run(void* arg) { ThreadPool* pool = static_cast(arg); while(true) { std::function task; { pthread_mutex_lock(&pool->queue_mutex); while(pool->tasks.empty() && !pool->stop) { pthread_cond_wait(&pool->condition, &pool->queue_mutex); } if(pool->stop && pool->tasks.empty()) { pthread_mutex_unlock(&pool->queue_mutex); return nullptr; } task = pool->tasks.front(); pool->tasks.pop(); pthread_mutex_unlock(&pool->queue_mutex); } task(); // 执行任务 { pthread_mutex_lock(&pool->queue_mutex); pool->active_tasks--; // 任务完成减少活动任务计数 pthread_mutex_unlock(&pool->queue_mutex); } } } /** * 检查线程池中所有任务是否已完成 * * @return 如果活动任务数为0且任务队列为空则返回true,否则返回false * @note 此操作是线程安全的,通过互斥锁保护共享数据 */ bool ThreadPool::allTasksCompleted() { pthread_mutex_lock(&queue_mutex); bool completed = (active_tasks == 0) && tasks.empty(); pthread_mutex_unlock(&queue_mutex); return completed; } /** * 设置指定线程的CPU亲和性,将其绑定到指定的核心上 * @param thread 需要设置亲和性的线程 * @param core_id 要绑定的CPU核心ID */ void ThreadPool::setThreadAffinity(pthread_t thread, int core_id) { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(core_id, &cpuset); pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); } } // namespace sccl