thread_pool.h 1.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <functional>
#include <future>
#include <memory>

namespace sccl {

class ThreadPool {
public:
15
    ThreadPool(size_t, int cpu_cord_offset = 1);
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    ~ThreadPool();

    // 将任务加入线程池队列并返回关联的future
    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        std::future<return_type> res = task->get_future();
        {
            pthread_mutex_lock(&queue_mutex);
            tasks.push([task]() { (*task)(); });
            active_tasks++; // 新任务增加活动任务计数
            pthread_mutex_unlock(&queue_mutex);
            pthread_cond_signal(&condition);
        }
        return res;
    }

    // 检查是否所有任务都已完成
    bool allTasksCompleted();

private:
    std::vector<pthread_t> workers;          // 工作线程列表
    std::queue<std::function<void()>> tasks; // 任务队列

    pthread_mutex_t queue_mutex; // 保护任务队列的互斥锁
    pthread_cond_t condition;    // 用于线程间通信的条件变量
    bool stop;                   // 标志位,指示线程池是否应该停止
    int active_tasks;            // 追踪活动任务的数量

    static void* run(void* arg);
49
50
51

    // 用于设置线程的CPU亲和性
    void setThreadAffinity(pthread_t thread, int core_id);
52
53
54
};

} // namespace sccl