#include #include #include #include #include #include // 为了使用 std::this_thread::sleep_for #include "ipc_socket.h" namespace sccl { namespace hardware { namespace net { namespace ipc_socket { //////////////////////////////////////// scclIpcSocket调用的函数 //////////////////////////////////////// scclIpcSocket::scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag) : localRank(localRank), nlocalRanks(nlocalRanks), ipc_hash(hash) { scclResult_t res; // 初始化handle handle = new struct scclIpcSocketHandle(); handle->fd = -1; handle->socketName[0] = '\0'; // 设置线程池 if(nlocalRanks > 0) { pthread_pool = new ThreadPool(nlocalRanks * 2); // 其中一半用于发送一半,用于接收 } else { goto failure; } SCCLCHECKGOTO(scclIpcSocketInit(abortFlag), res, failure); return; failure: WARN("scclIpcSocket init failed"); return; } scclIpcSocket::~scclIpcSocket() { // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(1000); // 每1毫秒检查一次任务完成状态 } // 释放pthpool if(pthread_pool) { delete(pthread_pool); } // 释放handle if(handle->socketName[0] != '\0') { unlink(handle->socketName); } if(handle->fd >= 0) { close(handle->fd); } delete(handle); } //////////////////////////////////////////////////////////////////////////////////////////////////// scclResult_t scclIpcSocket::scclIpcSocketInit(volatile uint32_t* abortFlag) { // 中间变量 int fd = -1; char temp_addr[SCCL_IPC_SOCKNAME_LEN]; // 创建Unix域套接字 // af是本机IP地址类型,一般有PF_INET或者AF_INET(IPv4互联网协议族),还有PF_INET6(IPv6互联网协议族)等,但是一般用IPv4。 // type有两种SOCK_STREAM 和SOCK_DGRAM分别对应tcp和udp协议,区别是用不用建立连接。 if((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) < 0) { WARN("UDS: Socket creation error : %d", errno); return scclSystemError; } // 将cliaddr结构体清零,确保没有残留数据 bzero(&my_cliaddr, sizeof(my_cliaddr)); my_cliaddr.sun_family = AF_UNIX; // 为套接字创建唯一名称 int len; SCCLCHECK(getScclIpcSocknameStr(localRank, ipc_hash, temp_addr, &len)); INFO(SCCL_LOG_BOOTSTRAP, "UDS: Creating socket %s", temp_addr); // 设置套接字路径 strncpy(my_cliaddr.sun_path, temp_addr, len); my_cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧 // 绑定套接字 if(bind(fd, (struct sockaddr*)&my_cliaddr, sizeof(my_cliaddr)) < 0) { WARN("UDS: Binding to socket %s failed : %d", temp_addr, errno); close(fd); return scclSystemError; } // 设置handle的成员变量 handle->fd = fd; strcpy(handle->socketName, temp_addr); // 设置中止标志 handle->abortFlag = abortFlag; // 将套接字标记为非阻塞 if(handle->abortFlag) { int flags; EQCHECK(flags = fcntl(fd, F_GETFL), -1); SYSCHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK), "fcntl"); } return scclSuccess; } /** * 设置中止标志并更新socket的非阻塞模式 * * @param flag 指向中止标志的指针。如果非空,将socket设为非阻塞模式; * 如果为空,则恢复为阻塞模式。 * @note 该函数仅在handle有效时执行操作 */ scclResult_t scclIpcSocket::setAbortFlag(volatile uint32_t* flag) { if(handle) { handle->abortFlag = flag; if(flag) { int flags; EQCHECK(flags = fcntl(handle->fd, F_GETFL), -1); SYSCHECK(fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK), "fcntl"); } else { int flags; EQCHECK(flags = fcntl(handle->fd, F_GETFL), -1); SYSCHECK(fcntl(handle->fd, F_SETFL, flags & ~O_NONBLOCK), "fcntl"); } } return scclSuccess; } // 获取 abortFlag 的函数 volatile uint32_t* scclIpcSocket::getAbortFlag() const { return handle ? handle->abortFlag : nullptr; } /** * 设置IPC套接字的超时时间 * * @param timeout_ms 超时时间(毫秒) * @return 成功返回scclSuccess */ scclResult_t scclIpcSocket::setTimeout(int timeout_ms) { timeoutMs = timeout_ms; return scclSuccess; } ThreadPool* scclIpcSocket::getPthreadPool() { return pthread_pool; } ////////////////////////////////////////////////////////////////////////////////////////////////////// /** * @brief 通过Unix域套接字发送文件描述符 * * @param sendFd 要发送的文件描述符 * @param dst_rank 目标rank号 * @return scclResult_t 返回操作结果: * - scclSuccess: 发送成功 * - scclInternalError: 内部错误(如地址过长或中止标志被设置) * - scclSystemError: 系统调用错误 * * @note 使用Linux抽象套接字技巧(将sun_path[0]置为'\0') * 通过SCM_RIGHTS机制发送文件描述符 * 函数会循环尝试发送直到成功或遇到错误 */ scclResult_t scclIpcSocket::scclIpcSocketSendFd(const int sendFd, int dst_rank) { // 创建一个临时地址字符串 char temp_addr[SCCL_IPC_SOCKNAME_LEN]; // 格式化地址字符串 int len; SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len)); // 记录发送文件描述符的信息 INFO(SCCL_LOG_BOOTSTRAP, "UDS: Sending fd %d to UDS socket %s/fd:%d", sendFd, temp_addr, handle->fd); // 初始化消息头结构体和iovec结构体 struct msghdr msg; struct iovec iov[1]; // 联合体用于保证控制数组的对齐要求 union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr* cmptr; struct sockaddr_un cliaddr; // 构造客户端地址以发送共享句柄 bzero(&cliaddr, sizeof(cliaddr)); cliaddr.sun_family = AF_UNIX; strncpy(cliaddr.sun_path, temp_addr, len); cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧 // 设置消息头的控制信息部分 msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); cmptr = CMSG_FIRSTHDR(&msg); cmptr->cmsg_len = CMSG_LEN(sizeof(int)); cmptr->cmsg_level = SOL_SOCKET; cmptr->cmsg_type = SCM_RIGHTS; // 将要发送的文件描述符复制到控制信息中 memmove(CMSG_DATA(cmptr), &sendFd, sizeof(sendFd)); // 设置消息头的地址信息部分 msg.msg_name = (void*)&cliaddr; msg.msg_namelen = sizeof(struct sockaddr_un); // 设置iovec结构体,用于指定要发送的数据 iov[0].iov_base = (void*)""; iov[0].iov_len = 1; // 将iovec结构体关联到消息头 msg.msg_iov = iov; msg.msg_iovlen = 1; // 初始化消息标志 msg.msg_flags = 0; ssize_t sendResult; // 循环发送消息,直到成功发送数据 while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) { // 如果发送失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误 if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { WARN("UDS: Sending data over socket %s failed : %d", temp_addr, errno); return scclSystemError; } // 如果设置了中止标志,则返回内部错误 if(handle->abortFlag && *handle->abortFlag) return scclInternalError; } return scclSuccess; } /** * @brief 通过IPC socket接收文件描述符 * * 该函数使用recvmsg系统调用从socket接收文件描述符。函数会循环尝试接收, * 直到成功或发生错误。接收到的文件描述符会通过参数recvFd返回。 * * @param recvFd 用于存储接收到的文件描述符的指针 * @return scclResult_t 返回操作结果: * - scclSuccess: 成功接收文件描述符 * - scclSystemError: 系统调用失败 * - scclInternalError: 操作被中止 * * @note 函数会处理EAGAIN、EWOULDBLOCK和EINTR错误,其他错误会导致返回失败。 * 接收到的控制消息必须符合SOL_SOCKET级别和SCM_RIGHTS类型。 */ scclResult_t scclIpcSocket::scclIpcSocketRecvFd(int* recvFd) { // 初始化消息头结构体和iovec结构体 struct msghdr msg = {0, 0, 0, 0, 0, 0, 0}; struct iovec iov[1]; // 联合体用于保证控制数组的对齐要求 union { struct cmsghdr cm; char control[CMSG_SPACE(sizeof(int))]; } control_un; struct cmsghdr* cmptr; char dummy_buffer[1]; int ret; // 设置消息头的控制信息部分 msg.msg_control = control_un.control; msg.msg_controllen = sizeof(control_un.control); // 设置iovec结构体,用于指定要接收的数据 iov[0].iov_base = (void*)dummy_buffer; iov[0].iov_len = sizeof(dummy_buffer); // 将iovec结构体关联到消息头 msg.msg_iov = iov; msg.msg_iovlen = 1; // 循环接收消息,直到成功接收到数据 while((ret = recvmsg(handle->fd, &msg, 0)) <= 0) { // 如果接收失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误 if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { WARN("UDS: Receiving data over socket failed : %d", errno); return scclSystemError; } // 如果设置了中止标志,则返回内部错误 if(handle->abortFlag && *handle->abortFlag) return scclInternalError; } // 检查接收到的控制信息 if(((cmptr = CMSG_FIRSTHDR(&msg)) != NULL) && (cmptr->cmsg_len == CMSG_LEN(sizeof(int)))) { // 如果控制信息的级别或类型不正确,则记录警告并返回错误 if((cmptr->cmsg_level != SOL_SOCKET) || (cmptr->cmsg_type != SCM_RIGHTS)) { WARN("UDS: Receiving data over socket failed"); return scclSystemError; } // 将接收到的文件描述符复制到recvFd memmove(recvFd, CMSG_DATA(cmptr), sizeof(*recvFd)); } else { // 如果没有接收到控制信息,则记录警告并返回错误 WARN("UDS: Receiving data over socket %s failed", handle->socketName); return scclSystemError; } // 记录成功接收到文件描述符的信息 INFO(SCCL_LOG_BOOTSTRAP, "UDS: Got recvFd %d from socket %s", *recvFd, handle->socketName); // 返回成功 return scclSuccess; } ////////////////////////////////////////////////////////////////////////////////////////////////////// /** * @brief 通过IPC socket接收文件描述符 * * 该函数使用recvmsg系统调用从socket接收文件描述符。函数会循环尝试接收, * 直到成功或发生错误。接收到的文件描述符会通过参数recvFd返回。 * * @param recvFd 用于存储接收到的文件描述符的指针 * @return scclResult_t 返回操作结果: * - scclSuccess: 成功接收文件描述符 * - scclSystemError: 系统调用失败 * - scclInternalError: 操作被中止 * * @note 函数会处理EAGAIN、EWOULDBLOCK和EINTR错误,其他错误会导致返回失败。 * 接收到的控制消息必须符合SOL_SOCKET级别和SCM_RIGHTS类型。 */ scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank) { // 构造目标地址字符串 char temp_addr[SCCL_IPC_SOCKNAME_LEN]; int len; SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len)); // 设置消息结构体 struct msghdr msg; struct iovec iov[1]; struct sockaddr_un cliaddr; bzero(&cliaddr, sizeof(cliaddr)); cliaddr.sun_family = AF_UNIX; strncpy(cliaddr.sun_path, temp_addr, len); cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧 msg.msg_name = (void*)&cliaddr; msg.msg_namelen = sizeof(cliaddr); msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; iov[0].iov_base = (void*)data; iov[0].iov_len = dataLen; msg.msg_iov = iov; msg.msg_iovlen = 1; // 使用 poll 等待 socket 可写 struct pollfd pfd; pfd.fd = handle->fd; pfd.events = POLLOUT; int pollResult = poll(&pfd, 1, timeoutMs); if(pollResult <= 0) { if(pollResult == 0) { WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr); } else { WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno); } return scclSystemError; } ssize_t sendResult; while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) { if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno); return scclSystemError; } if(handle->abortFlag && *handle->abortFlag) return scclInternalError; pollResult = poll(&pfd, 1, timeoutMs); if(pollResult <= 0) { if(pollResult == 0) { WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr); } else { WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno); } return scclSystemError; } } INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zd bytes of data through UDS socket %s", sendResult, temp_addr); return scclSuccess; } /** * @brief 通过IPC套接字发送数据到指定目标rank * * @param data 要发送的数据指针 * @param dataLen 要发送的数据长度 * @param dst_rank 目标rank号 * @return scclResult_t 返回操作结果状态码: * - scclSuccess: 发送成功 * - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置) * - scclSystemError: 系统调用错误(如poll超时或sendmsg失败) * * @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据 * 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制 */ scclResult_t scclIpcSocket::scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen) { // 设置消息结构体 struct msghdr msg = {0}; struct iovec iov[1]; iov[0].iov_base = buffer; iov[0].iov_len = bufferLen; msg.msg_iov = iov; msg.msg_iovlen = 1; // 使用 poll 等待 socket 可读 struct pollfd pfd; pfd.fd = handle->fd; pfd.events = POLLIN; int pollResult = poll(&pfd, 1, timeoutMs); if(pollResult <= 0) { if(pollResult == 0) { WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName); } else { WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno); } return scclSystemError; } int ret; while(true) { ret = recvmsg(handle->fd, &msg, 0); if(ret > 0) { INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %d bytes of data from socket %s", ret, handle->socketName); *receivedLen = ret; return scclSuccess; } else if(ret == 0) { INFO(SCCL_LOG_BOOTSTRAP, "UDS: Connection closed by peer on socket %s", handle->socketName); *receivedLen = 0; return scclSuccess; } else { if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { pollResult = poll(&pfd, 1, timeoutMs); if(pollResult <= 0) { if(pollResult == 0) { WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName); } else { WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno); } return scclSystemError; } } else { WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno); return scclSystemError; } } } } /** * 通过IPC Socket发送数据并等待确认 * * @param data 要发送的数据指针 * @param dataLen 要发送的数据长度 * @param dst_rank 目标rank号 * @return scclSuccess 发送成功,其他错误码表示失败 * * 该函数会将数据分块发送(CHUNK_SIZE大小),每发送一个数据块后 * 会等待接收方返回ACK确认。如果收到非预期的ACK或发送/接收失败, * 会立即返回错误。所有数据成功发送并收到正确ACK后返回成功。 */ scclResult_t scclIpcSocket::scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank) { const char* dataPtr = static_cast(data); size_t bytesSent = 0; while(bytesSent < dataLen) { size_t bytesToSend = std::min(CHUNK_SIZE, dataLen - bytesSent); // 发送数据块 scclResult_t sendResult = scclIpcSocketSendData(dataPtr + bytesSent, bytesToSend, dst_rank); if(sendResult != scclSuccess) { return sendResult; } // 等待接收方的ACK char ack[ACK_SIZE]; size_t receivedLen; scclResult_t recvResult = scclIpcSocketRecvData(ack, sizeof(ack), &receivedLen); if(recvResult != scclSuccess) { return recvResult; } // 检查是否是预期的ack char target_ack[ACK_SIZE]; sprintf(target_ack, "ACK-%d", localRank); if(strcmp(ack, target_ack) != 0) { WARN("UDS: Received unexpected ACK: %s", ack); return scclSystemError; } bytesSent += bytesToSend; } INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zu bytes of data with ACK through UDS socket", dataLen); return scclSuccess; } /** * 通过IPC Socket接收数据并发送ACK确认 * * @param buffer 接收数据缓冲区指针 * @param bufferLen 缓冲区总长度 * @param receivedLen 实际接收到的数据长度(输出参数) * @param src_rank 发送方rank号 * @return scclSuccess表示成功,其他错误码表示失败 * * @note 采用分块接收机制,每接收一个数据块都会发送ACK确认 * 接收完成后会记录日志信息 */ scclResult_t scclIpcSocket::scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank) { char* bufferPtr = static_cast(buffer); size_t bytesReceived = 0; while(bytesReceived < bufferLen) { size_t bytesToReceive = std::min(CHUNK_SIZE, bufferLen - bytesReceived); // 接收数据块 scclResult_t recvResult = scclIpcSocketRecvData(bufferPtr + bytesReceived, bytesToReceive, receivedLen); if(recvResult != scclSuccess) { return recvResult; } // 发送ACK给发送方 char ack[ACK_SIZE]; sprintf(ack, "ACK-%d", src_rank); scclResult_t sendResult = scclIpcSocketSendData(ack, strlen(ack), src_rank); if(sendResult != scclSuccess) { return sendResult; } bytesReceived += *receivedLen; } INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %zu bytes of data and sent ACK through UDS socket", bufferLen); return scclSuccess; } ///////////////////////////////////////////////////////////////////////////////////////////////////// /** * @brief 使用IPC套接字实现Allgather操作 * * 该函数通过线程池并行发送和接收数据,实现多节点间的Allgather集合通信。 * * @param sendData 发送数据缓冲区指针 * @param recvData 接收数据缓冲区指针 * @param dataLen 每个节点的数据长度(字节) * @return scclResult_t 返回操作结果(scclSuccess表示成功) * * @note 1. 会跳过本地rank的数据传输 * 2. 数据包格式: [发送rank(int)][数据] * 3. 接收缓冲区需要预先分配足够空间(大小=nlocalRanks*dataLen) */ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen) { if(pthread_pool == nullptr || nlocalRanks <= 0) { WARN("scclIpcSocket init error!"); return scclInternalError; } // 采用线程池发送和接收数据 for(int i = 0; i < nlocalRanks; ++i) { if(i != localRank) { auto sendTask = [this, sendData, dataLen, i]() { // 计算 DataPackage 的总大小 size_t packageSize = sizeof(int) + dataLen; char* buffer = new char[packageSize]; // 将 rank 信息和数据一起拷贝到 buffer 中 int* rankPtr = reinterpret_cast(buffer); *rankPtr = localRank; char* dataPtr = buffer + sizeof(int); memcpy(dataPtr, sendData, dataLen); // 一次性发送 rank 信息和数据 scclIpcSocketSendData(buffer, packageSize, i); delete[] buffer; }; pthread_pool->enqueue(sendTask); auto recvTask = [this, recvData, dataLen, i]() { // 准备接收缓冲区 size_t packageSize = sizeof(int) + dataLen; char* buffer = new char[packageSize]; size_t receivedLen; // 一次性接收 rank 信息和数据 scclIpcSocketRecvData(buffer, packageSize, &receivedLen); // 从 buffer 中提取 rank 信息和数据 int* rankPtr = reinterpret_cast(buffer); int senderRank = *rankPtr; char* dataPtr = buffer + sizeof(int); memcpy(static_cast(recvData) + senderRank * dataLen, dataPtr, dataLen); delete[] buffer; }; pthread_pool->enqueue(recvTask); } else { // 自己的数据直接放置到正确位置 memcpy(static_cast(recvData) + localRank * dataLen, sendData, dataLen); } } // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(1000); // 每1毫秒检查一次任务完成状态 } return scclSuccess; } /** * @brief 使用IPC套接字进行Allgather同步操作 * * 该函数实现了基于IPC套接字的Allgather同步操作,将各进程的数据收集到所有进程的接收缓冲区中。 * * @param sendData 发送数据缓冲区指针 * @param recvData 接收数据缓冲区指针 * @param dataLen 每个进程发送/接收的数据长度 * @return scclResult_t 返回操作结果,成功返回scclSuccess,失败返回错误码 * * @note 1. 函数会先将本地数据复制到接收缓冲区对应位置 * 2. 使用线程池并行处理与其他进程的通信任务 * 3. 当wait为true时会阻塞等待所有通信完成 */ scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen) { if(pthread_pool == nullptr || nlocalRanks <= 0) { WARN("scclIpcSocket init error!"); return scclInternalError; } // 将当前进程的数据复制到接收缓冲区的对应位置 memcpy(static_cast(recvData) + localRank * dataLen, sendData, dataLen); // 采用线程池发送和接收数据 for(int i = 0; i < nlocalRanks; ++i) { if(i != localRank) { auto sendTask = [this, sendData, dataLen, i]() { scclIpcSocketSendData(sendData, dataLen, i); }; pthread_pool->enqueue(sendTask); auto recvTask = [this, recvData, dataLen, i]() { size_t receivedLen; scclIpcSocketRecvData(reinterpret_cast(recvData) + i * dataLen, dataLen, &receivedLen); }; pthread_pool->enqueue(recvTask); } } // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(1000); // 每1毫秒检查一次任务完成状态 } return scclSuccess; } /** * @brief 通过IPC Socket进行广播操作 * * 该函数实现了基于IPC Socket的广播通信机制。根进程(root)将数据发送给所有其他进程, * 非根进程从根进程接收数据。可以选择是否等待所有通信操作完成。 * * @param sendData 发送数据缓冲区指针(根进程使用) * @param recvData 接收数据缓冲区指针(非根进程使用) * @param dataLen 数据长度(字节) * @param root 根进程的rank值 * * @return scclResult_t 返回操作结果状态码 * - scclSuccess: 操作成功 * - scclInternalError: IPC Socket未初始化或本地rank数无效 * - scclInvalidArgument: 根进程rank值无效 */ scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, int root) { if(pthread_pool == nullptr || nlocalRanks <= 0) { WARN("scclIpcSocket init error!"); return scclInternalError; } if(root < 0 || root >= nlocalRanks) { WARN("scclIpcSocketBroadcast: Invalid root rank %d", root); return scclInvalidArgument; } if(localRank == root) { // 根进程:发送数据给所有其他进程 for(int i = 0; i < nlocalRanks; ++i) { if(i != root) { // 使用 std::bind 绑定 scclIpcSocketSendDataWithAck 方法和参数 auto sendTask = std::bind(&scclIpcSocket::scclIpcSocketSendDataWithAck, this, data, dataLen, i); // 将绑定后的函数对象添加到线程池的任务队列中 pthread_pool->enqueue(sendTask); } } } else { size_t receivedLen; scclResult_t result = scclIpcSocketRecvDataAndSendAck(data, dataLen, &receivedLen, root); if(result != scclSuccess) { return result; } } // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(1000); // 每1毫秒检查一次任务完成状态 } return scclSuccess; } ///////////////////////////////////////////////////////////////////////////////////// scclResult_t scclIpcSocket::getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len) { int len = snprintf(out_str, SCCL_IPC_SOCKNAME_LEN, "/tmp/sccl-socket-%d-%lx", rank, hash); if(len > (sizeof(my_cliaddr.sun_path) - 1)) { WARN("UDS: Cannot bind provided name to socket. Name too large"); return scclInternalError; } *out_len = len; return scclSuccess; } } // namespace ipc_socket } // namespace net } // namespace hardware } // namespace sccl