#include "thread_pool.h" namespace sccl { ThreadPool::ThreadPool(size_t threads_num) : stop(false) { pthread_mutex_init(&queue_mutex, nullptr); pthread_cond_init(&condition, nullptr); for(size_t i = 0; i < threads_num; ++i) { pthread_t worker; pthread_create(&worker, nullptr, ThreadPool::run, this); workers.push_back(worker); } } ThreadPool::~ThreadPool() { { pthread_mutex_lock(&queue_mutex); stop = true; pthread_mutex_unlock(&queue_mutex); pthread_cond_broadcast(&condition); } for(size_t i = 0; i < workers.size(); ++i) { pthread_join(workers[i], nullptr); } pthread_mutex_destroy(&queue_mutex); pthread_cond_destroy(&condition); } /** * @brief 线程池中工作线程的执行函数 * * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中获取并执行任务。 * 使用互斥锁和条件变量实现线程安全的任务队列访问。 * 当线程池停止且任务队列为空时,线程退出。 * * @param arg 指向ThreadPool实例的指针 * @return void* 总是返回nullptr */ void* ThreadPool::run(void* arg) { ThreadPool* pool = static_cast(arg); while(true) { std::function task; { pthread_mutex_lock(&pool->queue_mutex); while(pool->tasks.empty() && !pool->stop) { pthread_cond_wait(&pool->condition, &pool->queue_mutex); } if(pool->stop && pool->tasks.empty()) { pthread_mutex_unlock(&pool->queue_mutex); return nullptr; } task = pool->tasks.front(); pool->tasks.pop(); pthread_mutex_unlock(&pool->queue_mutex); } task(); // 执行任务 { pthread_mutex_lock(&pool->queue_mutex); pool->active_tasks--; // 任务完成减少活动任务计数 pthread_mutex_unlock(&pool->queue_mutex); } } } /** * 检查线程池中所有任务是否已完成 * * @return 如果活动任务数为0且任务队列为空则返回true,否则返回false * @note 此操作是线程安全的,通过互斥锁保护共享数据 */ bool ThreadPool::allTasksCompleted() { pthread_mutex_lock(&queue_mutex); bool completed = (active_tasks == 0) && tasks.empty(); pthread_mutex_unlock(&queue_mutex); return completed; } } // namespace sccl