#pragma once #include #include #include #include #include #include #include #include #include #include #include "base.h" #include "net_utils.h" #include "socket.h" #include "thread_pool.h" namespace sccl { namespace hardware { namespace net { namespace ipc_socket { constexpr int SCCL_IPC_SOCKNAME_LEN = 64; // 定义IPC套接字结构体 struct scclIpcSocketHandle { int fd; // 文件描述符 char socketName[SCCL_IPC_SOCKNAME_LEN]; // 套接字名称 volatile uint32_t* abortFlag; // 用于中止操作的标志 }; // 封装发送数据,包括rank信息和实际数据的引用 struct DataPackage { int rank; char data[]; // 灵活数组成员,用于存储实际数据 }; ////////////////////////////////////////////////////////////////////////////////////////////////////// typedef class scclIpcSocket { public: // 构造函数和析构函数 scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr); virtual ~scclIpcSocket(); // 设置 abortFlag 的函数 scclResult_t setAbortFlag(volatile uint32_t* flag); // 获取 abortFlag 的函数 volatile uint32_t* getAbortFlag() const; // 设置IPC套接字的超时时间 scclResult_t setTimeout(int timeout_ms); // 获取线程池指针 ThreadPool* getPthreadPool(); ////////////////////////////////////////////////////////////////////////////////////////////////////// /* 并行计算时,不同的进程可能需要访问相同的文件或网络资源。通过发送文件描述符,可以避免多个进程重复打开相同的文件或建立相同的网络连接,从而节省资源和时间。 */ // 发送/接收文件描述符 scclResult_t scclIpcSocketSendFd(const int sendFd, int dst_rank); scclResult_t scclIpcSocketRecvFd(int* fd); // 通过Unix域套接字发送/接收数据到指定目标 scclResult_t scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank); scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank); // 通过Unix域套接字发送/接收数据到指定目标,有ACK信息 scclResult_t scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank); scclResult_t scclIpcSocketRecvDataWithAck(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank); ////////////////////////////////////////////////////////////////////////////////////////////////////// // local rank内的allgather操作,保证接收顺序 scclResult_t scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen); // local rank内的broadcast操作 scclResult_t scclIpcSocketBroadcast(void* data, size_t dataLen, int root); private: // 初始化IPC套接字 scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag); scclResult_t getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len); // TODO: 当前数据发送和接收采用的时unix域的UDP协议,非连接方式,将来根据需要改为TCP连接方式 // 通过Unix域套接字发送/接收数据到指定目标,不加锁执行 scclResult_t scclIpcSocketSendDataBasic(const void* data, size_t dataLen, int dst_rank); scclResult_t scclIpcSocketRecvDataBasic(void* buffer, size_t bufferLen, size_t* receivedLen); // 通过Unix域套接字发送/接收数据到指定目标,不加锁执行 scclResult_t scclIpcSocketSendDataAndRank(const void* data, size_t dataLen, int dst_rank); scclResult_t scclIpcSocketRecvDataAndRank(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank); private: // 定义并初始化一个 scclIpcSocket 结构体,用于处理 IPC 套接字连接 struct scclIpcSocketHandle* handle = nullptr; // 定义一个 sockaddr_un 结构体,用于存储客户端地址信息 struct sockaddr_un my_cliaddr; // 用于生成唯一套接字名称的hash值 const uint64_t ipc_hash; // 非阻塞套接字设置 const volatile uint32_t* my_abortFlag; // 进程id信息 int localRank = -1; int nlocalRanks = 0; // 线程池指针 ThreadPool* pthread_pool = nullptr; // 设置超时时间为无限长 int timeoutMs = -1; // 各种数据大小的固定值 static constexpr int ACK_SIZE = 8; // 假设 CHUNK_SIZE 是一个合适的块大小,例如 64KB static constexpr size_t CHUNK_SIZE = 64 * 1024; } scclIpcSocket_t; } // namespace ipc_socket } // namespace net } // namespace hardware } // namespace sccl