#include "thread_pool.h" #include /** * @brief 线程池构造函数 * * 初始化线程池,创建指定数量的工作线程并启动。 * * @param thread_cnt 要创建的线程数量 * * @note 每个线程将执行run_thread函数,并共享任务队列和关闭标志 */ Pthpool::Pthpool(int thread_cnt) : para(this->task_queue, this->shutdown) { this->thread_cnt = thread_cnt; this->thread_pool = new pthread_t[thread_cnt]; this->task_queue = Task_Queue(); this->shutdown = false; for(int i = 0; i < thread_cnt; ++i) { pthread_create(&this->thread_pool[i], NULL, this->run_thread, &this->para); } } /** * @brief 线程池析构函数 * * 负责清理线程池资源,包括: * 1. 销毁任务队列 * 2. 设置关闭标志位 * 3. 等待所有线程执行完成 * 4. 释放线程池数组内存 * * @note 会阻塞等待所有线程退出 */ Pthpool::~Pthpool() { // 等待所有线程执行完 this->task_queue.destroy(); this->shutdown = true; for(int i = 0; i < this->thread_cnt; ++i) { pthread_join(this->thread_pool[i], NULL); } // 执行某种操作 // std::cout<<"remain task queue size: "<task_queue.size()<<"\n"; // std::cout<<"end"<task_queue.add_task(f, arg); } /** * @brief 线程池工作线程的主循环函数 * * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中提取并执行任务。 * 当收到关闭信号时,会先完成队列中剩余任务再退出。 * * @param arg 指向包含任务队列和关闭标志的节点指针 * @return void* 线程退出状态(总是返回0) */ void* Pthpool::run_thread(void* arg) { node* node_arg = (node*)arg; Task_Queue& task_queue = node_arg->task_queue; bool& shutdown = node_arg->shutdown; while(true) { if(shutdown) { auto temp = task_queue.try_extract_task(); // 把剩下的任务跑玩 while(temp.second) { temp.first.f(temp.first.arg); temp = task_queue.try_extract_task(); } return (void*)0; } auto temp = task_queue.extract_task(); if(!temp.second) continue; temp.first.f(temp.first.arg); } return (void*)0; }