thread_pool.cpp 2.42 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include "thread_pool.h"

namespace sccl {

ThreadPool::ThreadPool(size_t threads_num) : stop(false) {
    pthread_mutex_init(&queue_mutex, nullptr);
    pthread_cond_init(&condition, nullptr);

    for(size_t i = 0; i < threads_num; ++i) {
        pthread_t worker;
        pthread_create(&worker, nullptr, ThreadPool::run, this);
        workers.push_back(worker);
    }
}

ThreadPool::~ThreadPool() {
    {
        pthread_mutex_lock(&queue_mutex);
        stop = true;
        pthread_mutex_unlock(&queue_mutex);
        pthread_cond_broadcast(&condition);
    }

    for(size_t i = 0; i < workers.size(); ++i) {
        pthread_join(workers[i], nullptr);
    }

    pthread_mutex_destroy(&queue_mutex);
    pthread_cond_destroy(&condition);
}
 
/**
 * @brief 线程池中工作线程的执行函数
 *
 * 该函数作为线程池中每个工作线程的入口点,不断从任务队列中获取并执行任务。
 * 使用互斥锁和条件变量实现线程安全的任务队列访问。
 * 当线程池停止且任务队列为空时,线程退出。
 *
 * @param arg 指向ThreadPool实例的指针
 * @return void* 总是返回nullptr
 */
void* ThreadPool::run(void* arg) {
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while(true) {
        std::function<void()> task;

        {
            pthread_mutex_lock(&pool->queue_mutex);
            while(pool->tasks.empty() && !pool->stop) {
                pthread_cond_wait(&pool->condition, &pool->queue_mutex);
            }
            if(pool->stop && pool->tasks.empty()) {
                pthread_mutex_unlock(&pool->queue_mutex);
                return nullptr;
            }
            task = pool->tasks.front();
            pool->tasks.pop();
            pthread_mutex_unlock(&pool->queue_mutex);
        }

        task(); // 执行任务
        {
            pthread_mutex_lock(&pool->queue_mutex);
            pool->active_tasks--; // 任务完成减少活动任务计数
            pthread_mutex_unlock(&pool->queue_mutex);
        }
    }
}

/**
 * 检查线程池中所有任务是否已完成
 *
 * @return 如果活动任务数为0且任务队列为空则返回true,否则返回false
 * @note 此操作是线程安全的,通过互斥锁保护共享数据
 */
bool ThreadPool::allTasksCompleted() {
    pthread_mutex_lock(&queue_mutex);
    bool completed = (active_tasks == 0) && tasks.empty();
    pthread_mutex_unlock(&queue_mutex);
    return completed;
}

} // namespace sccl