Commit a4ac3320 authored by lishen's avatar lishen
Browse files

通过线程池实现ipcsocket,满足节点内通信

parent d9d23f34
hipcc main.cpp \
-o main \
-std=c++17 -g -O3 -fopenmp -D__HIP_PLATFORM_HCC__ -Wno-return-type \
-I ./ -I /usr/include -I /opt/dtk/include \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/include \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/topo \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/thread_pool \
-L /usr/lib/x86_64-linux-gnu \
-L /usr/lib/ \
-lamdhip64 -lrocm_smi64 -pthread
\ No newline at end of file
#include <hip/hip_runtime.h>
#include <iostream>
int main() {
int deviceCount = 0;
hipError_t error_id = hipGetDeviceCount(&deviceCount);
if(error_id != hipSuccess) {
std::cerr << "Error: " << hipGetErrorString(error_id) << std::endl;
return EXIT_FAILURE;
}
if(deviceCount == 0) {
std::cerr << "There are no available GPU devices." << std::endl;
return EXIT_FAILURE;
}
for(int dev = 0; dev < deviceCount; ++dev) {
hipDeviceProp_t deviceProp;
hipError_t err = hipGetDeviceProperties(&deviceProp, dev);
if(err != hipSuccess) {
std::cerr << "Error getting device properties for device " << dev << ": " << hipGetErrorString(err) << std::endl;
continue; // 或者根据需要处理错误,例如跳过这个设备或终止程序
}
std::cout << "Device " << dev << ": " << deviceProp.name << std::endl;
std::cout << " GCN Arch: " << deviceProp.gcnArchName << std::endl;
std::cout << " Compute Capability: " << deviceProp.major << "." << deviceProp.minor << std::endl;
std::cout << " Total Global Mem: " << deviceProp.totalGlobalMem << " bytes" << std::endl;
std::cout << " Total Const Mem: " << deviceProp.totalConstMem << " bytes" << std::endl;
std::cout << " Max Threads Per Block: " << deviceProp.maxThreadsPerBlock << std::endl;
// 你可以根据需要打印更多的设备属性
}
return EXIT_SUCCESS;
}
......@@ -2,3 +2,4 @@
hipcc ./thread.cpp \
-o thread \
-std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__
g++ ./thread2.cpp \
-o thread2 \
-std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__
g++ ./thread3.cpp \
-o thread3 \
-std=c++17 -g -O3 -fopenmp -pthread \
-DROC_SHMEM -D__HIP_PLATFORM_HCC__
......@@ -2,6 +2,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <thread>
void* thread_function(void* arg) {
// 线程开始执行的函数
......
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
// 线程将要执行的函数
void* print_hello(void* thread_id) {
long tid;
tid = (long)thread_id;
printf("Hello World! It's me, thread #%ld!\n", tid);
sleep(1); // 模拟长时间运行的任务
printf("Thread #%ld is finished.\n", tid);
pthread_exit(NULL);
}
int main() {
pthread_t threads[2];
int rc;
long t;
for(t = 0; t < 2; t++) {
printf("In main: creating thread %ld\n", t);
rc = pthread_create(&threads[t], NULL, print_hello, (void*)t);
if(rc) {
printf("Error:unable to create thread,%d\n", rc);
exit(-1);
}
}
// 等待两个线程完成
for(t = 0; t < 2; t++) {
pthread_join(threads[t], NULL);
}
printf("Main completed. Exiting.\n");
pthread_exit(NULL);
}
\ No newline at end of file
#include <iostream>
#include <thread>
#include <vector>
#include <unistd.h> // 用于sleep函数
// 线程将要执行的函数
void print_hello(int thread_id) {
std::cout << "Hello World! It's me, thread #" << thread_id << "!\n";
sleep(1); // 模拟长时间运行的任务
std::cout << "Thread #" << thread_id << " is finished.\n";
}
int main() {
std::vector<std::thread> threads;
for(int i = 0; i < 2; ++i) {
std::cout << "In main: creating thread #" << i << "\n";
threads.push_back(std::thread(print_hello, i));
}
// 等待所有线程完成
for(auto& th : threads) {
th.join();
}
std::cout << "Main completed. Exiting.\n";
return 0;
}
# pthread_pool
[线程池](https://github.com/Cascol-Chen/pthread_pool/tree/main)
A simple implementation of threads pool using pthread.h with C++
In my implementation:
1. All the task will be finished before the pool is destroyed
2. Each thread_pool has its own tasks_queue if you create multiple pools.
hipcc main.cpp task_queue.cpp thread_pool.cpp \
-o main \
-std=c++17 -g -O3 -Wno-return-type \
-fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -pthread
#include <thread_pool.h>
using namespace std;
void* Func(void* arg) {
// cout<<*(int *)arg<<"\n";
// for(int i = 0; i < 10000000; ++i);
for(int i = 0; i < 1000; ++i) {
printf("arg=%d\n", *(int*)arg);
}
return (void*)0;
}
int arg2[100];
int main() {
clock_t st, ed;
st = clock();
{
Pthpool thread_pool(30);
for(int i = 1; i <= 30; ++i) {
arg2[i] = i;
thread_pool.add_task(Func, &arg2[i]);
}
}
ed = clock();
printf("time: %ldms\n", ed - st);
}
\ No newline at end of file
#include "task_queue.h"
/**
* @brief 任务队列构造函数
*
* 初始化任务队列的互斥锁和条件变量,并将shutdown标志设为false。
*/
Task_Queue::Task_Queue() {
pthread_mutex_init(&this->task_lock, NULL);
pthread_cond_init(&this->task_cond, NULL);
this->shutdown = false;
}
/**
* @brief 销毁任务队列,释放互斥锁资源
*
* 析构函数负责销毁任务队列中使用的 pthread_mutex_t 互斥锁。
* 应在对象生命周期结束时自动调用,确保资源正确释放。
*/
Task_Queue::~Task_Queue() { pthread_mutex_destroy(&this->task_lock); }
/**
* @brief 向任务队列中添加一个新任务
*
* @param f 要执行的任务函数指针
* @param arg 传递给任务函数的参数
* @return true 任务添加成功
*
* 该函数是线程安全的,会使用互斥锁保护任务队列。
* 当队列从空变为非空时,会发送条件变量信号通知等待的线程。
*/
bool Task_Queue::add_task(void* f(void*), void* arg) {
Task current_task = {f, arg};
pthread_mutex_lock(&this->task_lock);
this->tasks.push(current_task);
if(tasks.size() == 1)
pthread_cond_signal(&this->task_cond);
pthread_mutex_unlock(&this->task_lock);
return true;
}
/**
* 通知所有等待线程队列关闭,并广播条件变量以唤醒它们
* 调用此方法后,任务队列将不再接受新任务
*/
void Task_Queue::destroy() {
this->shutdown = true;
pthread_cond_broadcast(&this->task_cond);
}
/**
* @brief 从任务队列中提取一个任务
*
* 这是一个线程安全的操作,会阻塞直到队列中有任务或收到关闭信号。
* 如果队列关闭且为空,返回一个无效任务和false。
* 否则返回队列头部任务和true。
*
* @return std::pair<Task, bool> 返回的任务和是否成功的标志
*/
std::pair<Task, bool> Task_Queue::extract_task() {
pthread_mutex_lock(&this->task_lock);
while(this->tasks.size() == 0 && !shutdown)
pthread_cond_wait(&this->task_cond, &this->task_lock);
if(shutdown) {
pthread_mutex_unlock(&this->task_lock);
return std::make_pair(Task(), false);
}
// task must be greater than one
auto temp = this->tasks.front();
this->tasks.pop();
pthread_mutex_unlock(&this->task_lock);
return {temp, true};
}
/**
* @brief 尝试从任务队列中提取一个任务
*
* 该函数线程安全地从任务队列中取出一个任务。如果队列为空,则返回空任务和false;
* 否则返回取出的任务和true。
*
* @return std::pair<Task, bool> 返回一个pair,第一个元素是任务对象,第二个元素表示是否成功取出任务
*/
std::pair<Task, bool> Task_Queue::try_extract_task() {
pthread_mutex_lock(&this->task_lock);
if(this->tasks.size() == 0) {
pthread_mutex_unlock(&this->task_lock);
return std::make_pair(Task(), false);
} else {
auto tmp = this->tasks.front();
this->tasks.pop();
pthread_mutex_unlock(&this->task_lock);
return std::make_pair(tmp, true);
}
}
/**
* 获取任务队列中当前的任务数量
* @return 返回任务队列的大小(无符号整型)
*/
unsigned int Task_Queue::size() { return this->tasks.size(); }
\ No newline at end of file
#pragma once
#include <pthread.h>
#include <queue>
#include <utility>
// 定义一个任务结构体,用于线程池中的任务队列
struct Task {
// 成员函数指针,指向一个接受void*参数并返回void*的函数
void* (*f)(void*);
// 成员变量,用于存储传递给函数的参数
void* arg;
// 构造函数,用于初始化任务,接受一个函数指针和一个参数
Task(void* (*f)(void*), void* arg) {
this->f = f; // 初始化成员函数指针
this->arg = arg; // 初始化成员变量
}
// 默认构造函数,不进行初始化
Task() {}
};
class Task_Queue {
public:
Task_Queue(); // 构造函数
~Task_Queue(); // 析构函数
// 添加任务到队列中,参数为任务函数和其参数,返回值表示添加是否成功
bool add_task(void* f(void*), void* arg);
// 从队列中提取任务,返回值为任务和一个布尔值,表示提取是否成功
std::pair<Task, bool> extract_task();
// 尝试从队列中提取任务,不阻塞,返回值为任务和一个布尔值,表示提取是否成功
std::pair<Task, bool> try_extract_task();
// 获取当前队列中的任务数量
unsigned int size();
// 销毁任务队列,清理资源
void destroy();
private:
std::queue<Task> tasks; // 任务队列,存储任务对象
pthread_mutex_t task_lock; // 用于线程同步的互斥锁
pthread_cond_t task_cond; // 用于线程同步的条件变量
bool shutdown; // 标志位,用于指示队列是否关闭
};
#include "thread_pool.h"
#include <iostream>
/**
* @brief 线程池构造函数
*
* 初始化线程池,创建指定数量的工作线程并启动。
*
* @param thread_cnt 要创建的线程数量
*
* @note 每个线程将执行run_thread函数,并共享任务队列和关闭标志
*/
Pthpool::Pthpool(int thread_cnt) : para(this->task_queue, this->shutdown) {
this->thread_cnt = thread_cnt;
this->thread_pool = new pthread_t[thread_cnt];
this->task_queue = Task_Queue();
this->shutdown = false;
for(int i = 0; i < thread_cnt; ++i) {
pthread_create(&this->thread_pool[i], NULL, this->run_thread, &this->para);
}
}
/**
* @brief 线程池析构函数
*
* 负责清理线程池资源,包括:
* 1. 销毁任务队列
* 2. 设置关闭标志位
* 3. 等待所有线程执行完成
* 4. 释放线程池数组内存
*
* @note 会阻塞等待所有线程退出
*/
Pthpool::~Pthpool() {
// 等待所有线程执行完
this->task_queue.destroy();
this->shutdown = true;
for(int i = 0; i < this->thread_cnt; ++i) {
pthread_join(this->thread_pool[i], NULL);
}
// 执行某种操作
// std::cout<<"remain task queue size: "<<this->task_queue.size()<<"\n";
// std::cout<<"end"<<std::endl;
delete[] thread_pool;
}
/**
* @brief 向线程池的任务队列中添加一个新任务
*
* @param f 要执行的任务函数指针,函数形式为 void* func(void*)
* @param arg 传递给任务函数的参数指针
*/
void Pthpool::add_task(void* f(void*), void* arg) { this->task_queue.add_task(f, arg); }
/**
* @brief 线程池工作线程的主循环函数
*
* 该函数作为线程池中每个工作线程的入口点,不断从任务队列中提取并执行任务。
* 当收到关闭信号时,会先完成队列中剩余任务再退出。
*
* @param arg 指向包含任务队列和关闭标志的节点指针
* @return void* 线程退出状态(总是返回0)
*/
void* Pthpool::run_thread(void* arg) {
node* node_arg = (node*)arg;
Task_Queue& task_queue = node_arg->task_queue;
bool& shutdown = node_arg->shutdown;
while(true) {
if(shutdown) {
auto temp = task_queue.try_extract_task(); // 把剩下的任务跑玩
while(temp.second) {
temp.first.f(temp.first.arg);
temp = task_queue.try_extract_task();
}
return (void*)0;
}
auto temp = task_queue.extract_task();
if(!temp.second)
continue;
temp.first.f(temp.first.arg);
}
return (void*)0;
}
#pragma once
#include <pthread.h>
#include "task_queue.h"
// 定义一个节点结构体,用于线程池中的每个线程
struct node {
Task_Queue& task_queue; // 引用任务队列
bool& shutdown; // 引用关闭标志
// 构造函数,初始化任务队列和关闭标志
node(Task_Queue& task_queue, bool& shutdown) : task_queue(task_queue), shutdown(shutdown) {}
};
// 定义线程池类
class Pthpool {
public:
// 构造函数,初始化线程池,参数为线程数量
Pthpool(int thread_cnt);
// 析构函数,清理线程池
~Pthpool();
// 添加任务到任务队列,参数为任务函数和任务参数
void add_task(void* f(void*), void* arg);
private:
int thread_cnt; // 线程数量
Task_Queue task_queue; // 任务队列
pthread_t* thread_pool; // 线程池,存储线程ID
bool shutdown; // 关闭标志,用于控制线程池的关闭
node para; // 节点,用于线程运行时传递参数
// 静态方法,线程运行的主函数
static void* run_thread(void* arg);
};
hipcc main.cpp \
-o main \
-std=c++17 -g -O3 -Wno-return-type \
-fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -pthread
#include <iostream>
#include <random>
#include "thread_pool.h"
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<int> dist(-1000, 1000);
auto rnd = std::bind(dist, mt);
void simulate_hard_computation() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// Simple function that adds multiplies two numbers and prints the result
void multiply(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
// Same as before but now we have an output parameter
void multiply_output(int& out, const int a, const int b) {
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// Same as before but now we have an output parameter
int multiply_return(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
int main(int argc, char* argv[]) {
// Create pool with 3 threads
ThreadPool pool(3);
// Initialize pool
pool.init();
// Submit (partial) multiplication table
for(int i = 1; i < 3; ++i) {
for(int j = 1; j < 10; ++j) {
pool.submit(multiply, i, j);
}
}
// // Submit function with output parameter passed by ref
// int output_ref;
// auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// // Wait for multiplication output to finish
// future1.get();
// std::cout << "Last operation result is equals to " << output_ref << std::endl;
// // Submit function with return parameter
// auto future2 = pool.submit(multiply_return, 5, 3);
// // Wait for multiplication output to finish
// int res = future2.get();
// std::cout << "Last operation result is equals to " << res << std::endl;
pool.shutdown();
return 0;
}
#pragma once
#include <mutex>
#include <queue>
// Thread safe implementation of a Queue using an std::queue
template <typename T>
class SafeQueue {
private:
std::queue<T> m_queue;
std::mutex m_mutex;
public:
SafeQueue() {}
SafeQueue(SafeQueue& other) {
// TODO:
}
~SafeQueue() {}
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if(m_queue.empty()) {
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
\ No newline at end of file
#pragma once
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "safe_queue.h"
class ThreadPool {
private:
class ThreadWorker {
private:
int m_id;
ThreadPool* m_pool;
public:
ThreadWorker(ThreadPool* pool, const int id) : m_pool(pool), m_id(id) {}
void operator()() {
std::function<void()> func;
bool dequeued;
while(!m_pool->m_shutdown) {
{
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
if(m_pool->m_queue.empty()) {
m_pool->m_conditional_lock.wait(lock);
}
dequeued = m_pool->m_queue.dequeue(func);
}
if(dequeued) {
func();
}
}
}
};
bool m_shutdown;
SafeQueue<std::function<void()>> m_queue;
std::vector<std::thread> m_threads;
std::mutex m_conditional_mutex;
std::condition_variable m_conditional_lock;
public:
ThreadPool(const int n_threads) : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// Inits thread pool
void init() {
for(int i = 0; i < m_threads.size(); ++i) {
m_threads[i] = std::thread(ThreadWorker(this, i));
}
}
// Waits until threads finish their current task and shutdowns the pool
void shutdown() {
m_shutdown = true;
m_conditional_lock.notify_all();
for(int i = 0; i < m_threads.size(); ++i) {
if(m_threads[i].joinable()) {
m_threads[i].join();
}
}
}
// Submit a function to be executed asynchronously by the pool
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
// Create a function with bounded parameters ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// Encapsulate it into a shared ptr in order to be able to copy construct / assign
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
// Wrap packaged task into void function
std::function<void()> wrapper_func = [task_ptr]() { (*task_ptr)(); };
// Enqueue generic wrapper function
m_queue.enqueue(wrapper_func);
// Wake up one thread if its waiting
m_conditional_lock.notify_one();
// Return future from promise
return task_ptr->get_future();
}
};
hipcc main.cpp thread_pool.cpp \
-o main \
-std=c++17 -g -O3 -Wno-return-type \
-fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -pthread
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment