Commit 85db7de4 authored by lishen's avatar lishen
Browse files

基本实现bootstrap功能,所有rank硬件信息共享

parent a4ac3320
#include <stdint.h>
#include <hip/hip_runtime.h>
#include <hip/hip_runtime_api.h>
#include "base.h"
#include "hardware_utils.h"
#include "bootstrap.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
// 全局变量,全部节点的信息
struct BootstrapComm bootstrap_comm;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclGetUniqueId(scclUniqueId* unique_id) {
auto handle = reinterpret_cast<struct BootstrapHandle*>(unique_id);
NEQCHECK(sizeof(struct BootstrapHandle), SCCL_UNIQUE_ID_BYTES);
SCCLCHECK(bootstrapGetUniqueId(handle));
return scclSuccess;
}
scclResult_t sccl_init(const scclUniqueId* unique_id, int rank, int nRanks) {
// -------------------------- 1.获取0号rank的地址信息 ----------------------------------- //
auto root_handle = reinterpret_cast<const struct BootstrapHandle*>(unique_id);
EQCHECK(root_handle->magic, 0); // 检查handle是否已经更新
// -------------------------- 2.初始化获取所有节点的node信息 ----------------------------------- //
auto sccl_bootstrap = std::make_unique<Bootstrap>(root_handle, rank, nRanks);
SCCLCHECK(sccl_bootstrap->init(&bootstrap_comm));
// // -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// auto unique_ids_chr = reinterpret_cast<const char*>(unique_ids);
// // -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// std::vector<scclUniqueId> unique_id_vec(nRanks);
// MPI_Allgather(&unique_id, sizeof(scclUniqueId), MPI_BYTE, &unique_id_vec[0], sizeof(scclUniqueId), MPI_BYTE, MPI_COMM_WORLD);
// for(int i = 0; i < nRanks; ++i) {
// auto root_handle = reinterpret_cast<const struct BootstrapHandle*>(unique_ids_chr + i * sizeof(struct BootstrapHandle));
// printf("rank=%d, i=%d, unique_ids hosthash=%lu\n", root_handle->rank, i, root_handle->hostHash);
// }
// ByteSpan<struct BootstrapHandle> unique_ids_span(unique_ids_chr, nRanks * sizeof(struct BootstrapHandle));
// // -------------------------- 2.设置基础信息 ----------------------------------- //
// INFO(SCCL_LOG_TOPO, "Bootstrap ...\n");
// struct scclRankInfo rank_info;
// rank_info.rank = rank;
// rank_info.nRanks = nRanks;
// // 在每个进程中设置 root_handle 的值
// root_handle.rank = rank_info->rank;
// root_handle.hostHash = getHostHash();
// scclSocketAddress_t localSocketAddr = sccl_bootstrap->getLocalSocketAddr();
// memcpy(&root_handle.addr, &localSocketAddr, sizeof(scclSocketAddress_t));
// #if 1
// char line[100];
// sprintf(line, "pos 55: rank=%d", rank);
// SCCLCHECK(hardware::net::printSocketAddr(&root_handle.addr, line));
// printf("root_handle.hostHash rank=%d, hash=%lu\n", rank, root_handle.hostHash);
// #endif
// // -------------------------- 3.收集所有进程的 root_handle 信息 ----------------------------------- //
// std::vector<char> recvBuffer(nRanks * sendBuffer.size());
// SCCLCHECK(mpi::wrap_mpi_allgather(sendBuffer.data(), sendBuffer.size(), MPI_BYTE, recvBuffer.data(), sendBuffer.size(), MPI_BYTE, MPI_COMM_WORLD));
// -------------------------- 4.设置各个节点的基础信息 ----------------------------------- //
// SCCLCHECK(sccl_bootstrap->bootstrapInit(rank_info, recvBuffer.data()));
// -------------------------- 5.根据各个节点的基础信息计算topo结果 ----------------------------------- //
return scclSuccess;
}
scclResult_t sccl_finalize() {
// 设置一些全局变量的重置和销毁
// 设置socket等硬件监听的关闭
// void BootstrapComm::destroy() {
if(bootstrap_comm.nRanks > 0) {
bootstrap_comm.destroy();
}
return scclSuccess;
}
} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl
#pragma once
#include <stdint.h>
#include "base.h"
#include "comm.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclGetUniqueId(scclUniqueId* unique_id);
scclResult_t sccl_init(const scclUniqueId* unique_id, int rank, int nRanks);
scclResult_t sccl_finalize();
} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl
......@@ -13,13 +13,19 @@ namespace net {
namespace ipc_socket {
//////////////////////////////////////// scclIpcSocket调用的函数 ////////////////////////////////////////
scclIpcSocket::scclIpcSocket(int localRank, int localRanks, uint64_t hash, volatile uint32_t* abortFlag)
: localRank(localRank), localRanks(localRanks), ipc_hash(hash) {
scclIpcSocket::scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag)
: localRank(localRank), nlocalRanks(nlocalRanks), ipc_hash(hash) {
scclResult_t res;
handle = new struct scclIpcSocketHandle();
// 初始化handle
handle = new struct scclIpcSocketHandle();
handle->fd = -1;
handle->socketName[0] = '\0';
if(localRanks > 0) {
pthread_pool = new ThreadPool(localRanks * 2); // 其中一半用于发送一半,用于接收
// 设置线程池
if(nlocalRanks > 0) {
pthread_pool = new ThreadPool(nlocalRanks * 2); // 其中一半用于发送一半,用于接收
} else {
goto failure;
}
SCCLCHECKGOTO(scclIpcSocketInit(abortFlag), res, failure);
......@@ -31,6 +37,10 @@ failure:
}
scclIpcSocket::~scclIpcSocket() {
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
// 释放pthpool
if(pthread_pool) {
delete(pthread_pool);
......@@ -52,9 +62,6 @@ scclResult_t scclIpcSocket::scclIpcSocketInit(volatile uint32_t* abortFlag) {
// 中间变量
int fd = -1;
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
// 初始化handle的成员变量
handle->fd = -1;
handle->socketName[0] = '\0';
// 创建Unix域套接字
// af是本机IP地址类型,一般有PF_INET或者AF_INET(IPv4互联网协议族),还有PF_INET6(IPv6互联网协议族)等,但是一般用IPv4。
......@@ -69,12 +76,8 @@ scclResult_t scclIpcSocket::scclIpcSocketInit(volatile uint32_t* abortFlag) {
my_cliaddr.sun_family = AF_UNIX;
// 为套接字创建唯一名称
int len = snprintf(temp_addr, SCCL_IPC_SOCKNAME_LEN, SCCL_IPC_SOCKNAME_STR, localRank, ipc_hash);
if(len > (sizeof(my_cliaddr.sun_path) - 1)) {
WARN("UDS: Cannot bind provided name to socket. Name too large");
return scclInternalError;
}
int len;
SCCLCHECK(getScclIpcSocknameStr(localRank, ipc_hash, temp_addr, &len));
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Creating socket %s", temp_addr);
// 设置套接字路径
......@@ -162,12 +165,9 @@ scclResult_t scclIpcSocket::scclIpcSocketSendFd(const int sendFd, int dst_rank)
// 创建一个临时地址字符串
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
// 格式化地址字符串
int len = snprintf(temp_addr, SCCL_IPC_SOCKNAME_LEN, SCCL_IPC_SOCKNAME_STR, dst_rank, ipc_hash);
// 检查地址字符串长度是否超过限制
if(len > (sizeof(my_cliaddr.sun_path) - 1)) {
WARN("UDS: Cannot connect to provided name for socket. Name too large");
return scclInternalError;
}
int len;
SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len));
// 记录发送文件描述符的信息
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Sending fd %d to UDS socket %s/fd:%d", sendFd, temp_addr, handle->fd);
......@@ -230,7 +230,6 @@ scclResult_t scclIpcSocket::scclIpcSocketSendFd(const int sendFd, int dst_rank)
return scclInternalError;
}
// 返回成功
return scclSuccess;
}
......@@ -249,90 +248,11 @@ scclResult_t scclIpcSocket::scclIpcSocketSendFd(const int sendFd, int dst_rank)
* @note 函数会处理EAGAIN、EWOULDBLOCK和EINTR错误,其他错误会导致返回失败。
* 接收到的控制消息必须符合SOL_SOCKET级别和SCM_RIGHTS类型。
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvFd(int* recvFd) {
// 初始化消息头结构体和iovec结构体
struct msghdr msg = {0, 0, 0, 0, 0, 0, 0};
struct iovec iov[1];
// 联合体用于保证控制数组的对齐要求
union {
struct cmsghdr cm;
char control[CMSG_SPACE(sizeof(int))];
} control_un;
struct cmsghdr* cmptr;
char dummy_buffer[1];
int ret;
// 设置消息头的控制信息部分
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
// 设置iovec结构体,用于指定要接收的数据
iov[0].iov_base = (void*)dummy_buffer;
iov[0].iov_len = sizeof(dummy_buffer);
// 将iovec结构体关联到消息头
msg.msg_iov = iov;
msg.msg_iovlen = 1;
// 循环接收消息,直到成功接收到数据
while((ret = recvmsg(handle->fd, &msg, 0)) <= 0) {
// 如果接收失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Receiving data over socket failed : %d", errno);
return scclSystemError;
}
// 如果设置了中止标志,则返回内部错误
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
}
// 检查接收到的控制信息
if(((cmptr = CMSG_FIRSTHDR(&msg)) != NULL) && (cmptr->cmsg_len == CMSG_LEN(sizeof(int)))) {
// 如果控制信息的级别或类型不正确,则记录警告并返回错误
if((cmptr->cmsg_level != SOL_SOCKET) || (cmptr->cmsg_type != SCM_RIGHTS)) {
WARN("UDS: Receiving data over socket failed");
return scclSystemError;
}
// 将接收到的文件描述符复制到recvFd
memmove(recvFd, CMSG_DATA(cmptr), sizeof(*recvFd));
} else {
// 如果没有接收到控制信息,则记录警告并返回错误
WARN("UDS: Receiving data over socket %s failed", handle->socketName);
return scclSystemError;
}
// 记录成功接收到文件描述符的信息
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Got recvFd %d from socket %s", *recvFd, handle->socketName);
// 返回成功
return scclSuccess;
}
/**
* @brief 通过IPC套接字发送数据到指定目标rank
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置)
* - scclSystemError: 系统调用错误(如poll超时或sendmsg失败)
*
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank) {
// 构造目标地址字符串
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
int len = snprintf(temp_addr, SCCL_IPC_SOCKNAME_LEN, SCCL_IPC_SOCKNAME_STR, dst_rank, ipc_hash);
if(len > (sizeof(my_cliaddr.sun_path) - 1)) {
WARN("UDS: Unable to connect to the provided socket name. Name too long");
return scclInternalError;
}
int len;
SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len));
// 设置消息结构体
struct msghdr msg;
......@@ -342,16 +262,16 @@ scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataL
cliaddr.sun_family = AF_UNIX;
strncpy(cliaddr.sun_path, temp_addr, len);
cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
iov[0].iov_base = (void*)data;
iov[0].iov_len = dataLen;
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
iov[0].iov_base = (void*)data;
iov[0].iov_len = dataLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
// 使用 poll 等待 socket 可写
struct pollfd pfd;
......@@ -377,7 +297,6 @@ scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataL
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
// 如果 sendmsg 因为 EAGAIN 或 EWOULDBLOCK 失败,重新 poll
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
......@@ -389,23 +308,23 @@ scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataL
}
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zu bytes of data through UDS socket %s", dataLen, temp_addr);
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zd bytes of data through UDS socket %s", sendResult, temp_addr);
return scclSuccess;
}
/**
* @brief 通过IPC socket接收数据
* @brief 通过IPC套接字发送数据到指定目标rank
*
* 该函数使用poll机制等待socket可读,然后通过recvmsg接收数据。
* 支持超时设置和中断处理,当发生错误或超时时返回相应错误码。
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置)
* - scclSystemError: 系统调用错误(如poll超时或sendmsg失败)
*
* @param buffer 接收数据的缓冲区指针
* @param bufferLen 缓冲区长度
* @param receivedLen 实际接收到的数据长度(输出参数)
* @return scclResult_t 操作结果状态码:
* - scclSuccess: 成功接收数据
* - scclSystemError: 系统调用错误
* - scclInternalError: 被中断标志终止
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen) {
// 设置消息结构体
......@@ -432,159 +351,85 @@ scclResult_t scclIpcSocket::scclIpcSocketRecvData(void* buffer, size_t bufferLen
}
int ret;
while((ret = recvmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno);
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
// 如果 recvmsg 因为 EAGAIN 或 EWOULDBLOCK 失败,重新 poll
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
while(true) {
ret = recvmsg(handle->fd, &msg, 0);
if(ret > 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %d bytes of data from socket %s", ret, handle->socketName);
*receivedLen = ret;
return scclSuccess;
} else if(ret == 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Connection closed by peer on socket %s", handle->socketName);
*receivedLen = 0;
return scclSuccess;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno);
return scclSystemError;
}
return scclSystemError;
}
}
if(ret > 0) {
*receivedLen = ret;
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %zu bytes of data from socket %s", ret, handle->socketName);
return scclSuccess;
} else {
WARN("UDS: Error occurred while receiving data through socket %s", handle->socketName);
return scclSystemError;
}
}
/**
* @brief 通过Unix域套接字非阻塞发送数据到指定rank节点
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度(字节)
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(地址过长或中止标志被设置)
* - scclSystemError: 系统调用错误
*
* @note 使用Linux抽象套接字命名空间技术
* 函数会持续重试直到发送成功或发生错误
* 使用poll系统调用等待套接字变为可写状态
*/
scclResult_t scclIpcSocket::scclIpcSocketSendDataNonBlocking(const void* data, size_t dataLen, int dst_rank) {
// 创建一个临时地址字符串,用于存储目标套接字的地址
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
// 格式化目标地址字符串
int len = snprintf(temp_addr, SCCL_IPC_SOCKNAME_LEN, SCCL_IPC_SOCKNAME_STR, dst_rank, ipc_hash);
// 如果地址字符串太长,则返回错误
if(len > (sizeof(my_cliaddr.sun_path) - 1)) {
WARN("UDS: Cannot connect to provided name for socket. Name too large");
return scclInternalError;
// 发送数据的方法
scclResult_t scclIpcSocket::scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank) {
scclResult_t result = scclIpcSocketSendData(data, dataLen, dst_rank);
if(result != scclSuccess) {
return result;
}
// 记录日志,表示正在发送数据
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Sending %zu bytes of data to UDS socket %s", dataLen, temp_addr);
// 设置消息头结构体
struct msghdr msg;
struct iovec iov[1];
struct sockaddr_un cliaddr;
bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strncpy(cliaddr.sun_path, temp_addr, len);
cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧
iov[0].iov_base = (void*)data;
iov[0].iov_len = dataLen;
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
ssize_t sendResult;
// 尝试发送数据,如果失败则等待套接字变得可写后重试
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
// 如果错误不是 EAGAIN, EWOULDBLOCK 或 EINTR,则记录警告并返回错误
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Sending data over socket %s failed : %d", temp_addr, errno);
return scclSystemError;
}
// 如果设置了中止标志,则返回内部错误
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
printf("scclIpcSocketSendDataWithAck localRank=%d, dst_rank=%d\n", localRank, dst_rank);
// 使用 poll 系统调用等待套接字变得可写
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLOUT;
int pollResult = poll(&pfd, 1, -1); // 无限等待
if(pollResult <= 0) {
WARN("UDS: Polling for socket %s to become writable failed : %d", temp_addr, errno);
return scclSystemError;
}
// 等待接收方的ACK
char ack[ACK_SIZE];
size_t receivedLen;
result = scclIpcSocketRecvData(ack, sizeof(ack), &receivedLen);
printf("scclIpcSocketSendDataWithAck recv ack=%s, localRank=%d, dst_rank=%d\n", ack, localRank, dst_rank);
// 检查是否是预期的ack
char target_ack[ACK_SIZE];
sprintf(target_ack, "ACK-%d", localRank);
printf("scclIpcSocketSendDataWithAck 11 check recv ack=%s, target_ack=%s, localRank=%d, dst_rank=%d\n", ack, target_ack, localRank, dst_rank);
if(result != scclSuccess || strcmp(ack, target_ack) != 0) {
printf("errrrrrr, result=%d, ack=%s, %s\n", result, ack, target_ack);
return scclSystemError;
}
printf("scclIpcSocketSendDataWithAck 22 check recv ack=%s, target_ack=%s, localRank=%d, dst_rank=%d\n", ack, target_ack, localRank, dst_rank);
return scclSuccess;
}
/**
* @brief 非阻塞接收IPC socket数据
*
* 通过UDS套接字非阻塞接收数据,当数据不可读时会等待直到可读或发生错误。
*
* @param buffer 接收数据的缓冲区指针
* @param bufferLen 缓冲区长度
* @param receivedLen 实际接收到的数据长度(输出参数)
* @return scclResult_t 操作结果:
* - scclSuccess: 成功接收数据
* - scclSystemError: 系统调用错误
* - scclInternalError: 被中止标志中断
*
* @note 内部使用recvmsg和poll系统调用实现
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvDataNonBlocking(void* buffer, size_t bufferLen, size_t* receivedLen) {
// 初始化消息头结构体和iovec结构体
struct msghdr msg = {0};
struct iovec iov[1];
iov[0].iov_base = buffer;
iov[0].iov_len = bufferLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
int ret;
// 尝试接收消息,如果失败则等待套接字变得可读后重试
while((ret = recvmsg(handle->fd, &msg, 0)) <= 0) {
// 如果接收失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Receiving data over socket failed : %d", errno);
return scclSystemError;
}
// 如果设置了中止标志,则返回内部错误
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
// 使用 poll 系统调用等待套接字变得可读
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLIN;
int pollResult = poll(&pfd, 1, -1); // 无限等待
if(pollResult <= 0) {
WARN("UDS: Polling for socket %s to become readable failed : %d", handle->socketName, errno);
return scclSystemError;
}
// 接收数据的方法
scclResult_t scclIpcSocket::scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank) {
scclResult_t result = scclIpcSocketRecvData(buffer, bufferLen, receivedLen);
if(result != scclSuccess) {
return result;
}
// 如果成功接收到数据,则记录接收到的数据长度并返回成功
if(ret > 0) {
*receivedLen = ret;
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Received %zu bytes of data from socket %s", *receivedLen, handle->socketName);
return scclSuccess;
} else {
WARN("UDS: Receiving data over socket %s failed", handle->socketName);
return scclSystemError;
printf("scclIpcSocketRecvDataAndSendAck localRank=%d, src_rank=%d\n", localRank, src_rank);
// 发送ACK给发送方
char ack[ACK_SIZE];
sprintf(ack, "ACK-%d", src_rank);
printf("scclIpcSocketRecvDataAndSendAck localRank=%d, src_rank=%d, ack=%s\n", localRank, src_rank, ack);
result = scclIpcSocketSendData(ack, strlen(ack), /* 发送方的rank号 */ src_rank);
if(result != scclSuccess) {
return result;
}
printf("scclIpcSocketRecvDataAndSendAck send localRank=%d, src_rank=%d, ack=%s\n", localRank, src_rank, ack);
return scclSuccess;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 使用IPC套接字实现Allgather操作
*
......@@ -598,18 +443,16 @@ scclResult_t scclIpcSocket::scclIpcSocketRecvDataNonBlocking(void* buffer, size_
*
* @note 1. 会跳过本地rank的数据传输
* 2. 数据包格式: [发送rank(int)][数据]
* 3. 接收缓冲区需要预先分配足够空间(大小=localRanks*dataLen)
* 3. 接收缓冲区需要预先分配足够空间(大小=nlocalRanks*dataLen)
*/
scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen, bool wait) {
if(pthread_pool == nullptr || localRanks <= 0) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
std::vector<std::future<void>> futures;
// 采用线程池发送和接收数据
for(int i = 0; i < localRanks; ++i) {
for(int i = 0; i < nlocalRanks; ++i) {
if(i != localRank) {
auto sendTask = [this, sendData, dataLen, i]() {
// 计算 DataPackage 的总大小
......@@ -628,7 +471,7 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* r
delete[] buffer;
};
futures.push_back(pthread_pool->enqueue(sendTask));
pthread_pool->enqueue(sendTask);
auto recvTask = [this, recvData, dataLen, i]() {
// 准备接收缓冲区
......@@ -648,7 +491,7 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* r
delete[] buffer;
};
futures.push_back(pthread_pool->enqueue(recvTask));
pthread_pool->enqueue(recvTask);
} else {
// 自己的数据直接放置到正确位置
memcpy(static_cast<char*>(recvData) + localRank * dataLen, sendData, dataLen);
......@@ -657,8 +500,8 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* r
if(wait) {
// 等待所有任务完成
for(auto& fut : futures) {
fut.get();
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
}
......@@ -681,7 +524,7 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* r
* 3. 当wait为true时会阻塞等待所有通信完成
*/
scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen, bool wait) {
if(pthread_pool == nullptr || localRanks <= 0) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
......@@ -689,25 +532,24 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, voi
// 将当前进程的数据复制到接收缓冲区的对应位置
memcpy(static_cast<char*>(recvData) + localRank * dataLen, sendData, dataLen);
std::vector<std::future<void>> futures;
// 采用线程池发送和接收数据
for(int i = 0; i < localRanks; ++i) {
for(int i = 0; i < nlocalRanks; ++i) {
if(i != localRank) {
auto sendTask = [this, sendData, dataLen, i]() { scclIpcSocketSendData(sendData, dataLen, i); };
futures.push_back(pthread_pool->enqueue(sendTask));
pthread_pool->enqueue(sendTask);
auto recvTask = [this, recvData, dataLen, i]() {
size_t receivedLen;
scclIpcSocketRecvData(reinterpret_cast<char*>(recvData) + i * dataLen, dataLen, &receivedLen);
};
futures.push_back(pthread_pool->enqueue(recvTask));
pthread_pool->enqueue(recvTask);
}
}
if(wait) {
// 等待所有任务完成
for(auto& fut : futures) {
fut.get();
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
}
......@@ -731,47 +573,59 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, voi
* - scclInternalError: IPC Socket未初始化或本地rank数无效
* - scclInvalidArgument: 根进程rank值无效
*/
scclResult_t scclIpcSocket::scclIpcSocketBroadcast(const void* sendData, void* recvData, size_t dataLen, int root, bool wait) {
if(pthread_pool == nullptr || localRanks <= 0) {
scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, int root, bool wait) {
pthread_pool->allTasksCompleted();
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
if(root < 0 || root >= localRanks) {
if(root < 0 || root >= nlocalRanks) {
WARN("scclIpcSocketBroadcast: Invalid root rank %d", root);
return scclInvalidArgument;
}
std::vector<std::future<scclResult_t>> futures; // 使用 future 来收集每个任务的返回结果
// if(localRank == root) {
// // 根进程:发送数据给所有其他进程
// for(int i = 0; i < nlocalRanks; ++i) {
// if(i != root) {
// // 使用 std::bind 绑定 scclIpcSocketSendDataWithAck 方法和参数
// auto sendTask = std::bind(&scclIpcSocket::scclIpcSocketSendDataWithAck, this, data, dataLen, i);
// // 将绑定后的函数对象添加到线程池的任务队列中
// pthread_pool->enqueue(sendTask);
// printf("send root: %d, i=%d\n", root, i);
// }
// }
// } else {
// size_t receivedLen;
// scclResult_t result = scclIpcSocketRecvDataAndSendAck(data, dataLen, &receivedLen, root);
// if(result != scclSuccess) {
// return result;
// }
// printf("recv from root: localRank=%d\n", localRank);
// }
if(localRank == root) {
// 根进程:发送数据给所有其他进程
for(int i = 0; i < localRanks; ++i) {
if(i != root) {
auto sendTask = [this, sendData, dataLen, i]() -> scclResult_t { return scclIpcSocketSendData(sendData, dataLen, i); };
futures.push_back(pthread_pool->enqueue(sendTask));
}
if(wait) {
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
} else {
// 非根进程:从根进程接收数据
auto recvTask = [this, recvData, dataLen, root]() -> scclResult_t {
size_t receivedLen;
return scclIpcSocketRecvData(recvData, dataLen, &receivedLen);
};
futures.push_back(pthread_pool->enqueue(recvTask));
}
if(wait) {
// 等待所有任务完成并检查结果
for(auto& fut : futures) {
scclResult_t result = fut.get();
if(result != scclSuccess) {
WARN("scclIpcSocketBroadcast: Task failed with error %d", result);
return scclInternalError;
}
}
return scclSuccess;
}
/////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclIpcSocket::getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len) {
int len = snprintf(out_str, SCCL_IPC_SOCKNAME_LEN, "/tmp/sccl-socket-%d-%lx", rank, hash);
if(len > (sizeof(my_cliaddr.sun_path) - 1)) {
WARN("UDS: Cannot bind provided name to socket. Name too large");
return scclInternalError;
}
*out_len = len;
return scclSuccess;
}
......
#pragma once
#include <type_traits>
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
......@@ -20,8 +21,7 @@ namespace hardware {
namespace net {
namespace ipc_socket {
#define SCCL_IPC_SOCKNAME_LEN 64
#define SCCL_IPC_SOCKNAME_STR "/tmp/sccl-socket-%d-%lx"
constexpr int SCCL_IPC_SOCKNAME_LEN = 64;
// 定义IPC套接字结构体
struct scclIpcSocketHandle {
......@@ -40,11 +40,9 @@ struct DataPackage {
class scclIpcSocket {
public:
// 构造函数和析构函数
scclIpcSocket(int localRank, int localRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr);
scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr);
virtual ~scclIpcSocket();
// 初始化IPC套接字
scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
// 设置 abortFlag 的函数
scclResult_t setAbortFlag(volatile uint32_t* flag);
// 获取 abortFlag 的函数
......@@ -58,27 +56,32 @@ public:
/*
并行计算时,不同的进程可能需要访问相同的文件或网络资源。通过发送文件描述符,可以避免多个进程重复打开相同的文件或建立相同的网络连接,从而节省资源和时间。
*/
// 发送文件描述符
// 发送/接收文件描述符
scclResult_t scclIpcSocketSendFd(const int sendFd, int dst_rank);
// 接收文件描述符
scclResult_t scclIpcSocketRecvFd(int* fd);
// 通过Unix域套接字发送数据到指定目标,阻塞方式
// 通过Unix域套接字发送/接收数据到指定目标
scclResult_t scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank);
// 通过Unix域套接字接收数据,阻塞方式
scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen);
// 通过Unix域套接字发送数据到指定目标,非阻塞方式
scclResult_t scclIpcSocketSendDataNonBlocking(const void* data, size_t dataLen, int dst_rank);
// 通过Unix域套接字接收数据,非阻塞方式
scclResult_t scclIpcSocketRecvDataNonBlocking(void* buffer, size_t bufferLen, size_t* receivedLen);
// 通过Unix域套接字发送/接收数据到指定目标,并发送ack确保发送成功
scclResult_t scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank);
//////////////////////////////////////////////////////////////////////////////////////////////////////
// local rank内的allgather操作。保证接收顺序
scclResult_t scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen, bool wait = true);
// local rank内的allgather操作。为了性能,不保证接收顺序,所以发送的信息中需要添加进程ID
scclResult_t scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen, bool wait = true);
// local rank内的broadcast操作
scclResult_t scclIpcSocketBroadcast(const void* sendData, void* recvData, size_t dataLen, int root, bool wait = true);
scclResult_t scclIpcSocketBroadcast(void* data, size_t dataLen, int root, bool wait = true);
private:
// 初始化IPC套接字
scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
scclResult_t getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len);
private:
// 定义并初始化一个 scclIpcSocket 结构体,用于处理 IPC 套接字连接
......@@ -92,13 +95,14 @@ private:
const volatile uint32_t* my_abortFlag;
// 进程id信息
int localRank = -1;
int localRanks = 0;
int localRank = -1;
int nlocalRanks = 0;
// 线程池指针
ThreadPool* pthread_pool = nullptr;
// 设置超时时间为 10000 毫秒
int timeoutMs = 10000;
// 设置超时时间为无限长
int timeoutMs = -1;
static constexpr int ACK_SIZE = 8;
};
} // namespace ipc_socket
......
......@@ -17,8 +17,7 @@ namespace net {
scclResult_t printSocketAddr(union net_socket::scclSocketAddress* sock_addr, const char* prefix) {
char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
net::net_socket::scclSocketToString(sock_addr, line);
printf("\n==========================================\n%s addr: %s"
printf("==========================================\n%s addr: %s"
"\n==========================================\n",
prefix,
line);
......
......@@ -54,8 +54,7 @@ SCCL_PARAM(IbGdrFlushDisable, "GDR_FLUSH_DISABLE", 0);
SCCL_PARAM(IbSplitDataOnQps, "IB_SPLIT_DATA_ON_QPS", 1);
///////////////////////////////////////// 参数及结构体设置 /////////////////////////////////////////
#define MAXNAMESIZE 64
#define MAX_IF_NAME_SIZE 16
constexpr int MAXNAMESIZE = 64;
static char scclIbIfName[MAX_IF_NAME_SIZE + 1]; // 用于存储网络接口名称的字符数组
static union net_socket::scclSocketAddress scclIbIfAddr; // 定义一个联合体类型的变量,用于存储网络接口地址
......@@ -660,10 +659,12 @@ scclResult_t scclNetIb::scclIbRtsQp(struct ibv_qp* qp) {
return scclSuccess;
}
#define SCCL_NET_IB_REQ_UNUSED 0
#define SCCL_NET_IB_REQ_SEND 1
#define SCCL_NET_IB_REQ_RECV 2
#define SCCL_NET_IB_REQ_FLUSH 3
typedef enum : int {
SCCL_NET_IB_REQ_UNUSED = 0,
SCCL_NET_IB_REQ_SEND = 1,
SCCL_NET_IB_REQ_RECV = 2,
SCCL_NET_IB_REQ_FLUSH = 3
} NetIbReq_t;
const char* reqTypeStr[] = {"Unused", "Send", "Recv", "Flush"};
// The SendFifo needs to be 32-byte aligned and each element needs
......
......@@ -11,7 +11,7 @@ namespace hardware {
namespace net {
namespace net_socket {
#define MAX_LINE_LEN (2047)
constexpr int MAX_LINE_LEN = (2047);
/* Init functions */
static int scclNetIfs = -1;
......
......@@ -27,7 +27,7 @@ namespace socket_base {
*/
static int scclGetSocketFamily(void) {
int family = -1; // Family selection is not forced, will use first one found
const char* env = scclGetEnv("SCCL_SOCKET_FAMILY");
const char* env = getenv("SCCL_SOCKET_FAMILY");
if(env == NULL)
return family;
......@@ -240,6 +240,17 @@ static scclResult_t socketWait(int op, struct scclSocket* sock, void* ptr, int s
return scclSuccess;
}
/**
* 尝试接受一个socket连接
*
* @param sock 指向scclSocket结构体的指针,包含待接受的socket信息
* @return scclResult_t 返回操作结果:
* - scclSuccess 表示成功接受连接
* - scclSystemError 表示接受失败且错误不是EAGAIN或EWOULDBLOCK
*
* @note 如果accept失败且errno为EAGAIN或EWOULDBLOCK,函数会返回scclSuccess,
* 表示这是非阻塞模式下正常情况,可以稍后重试
*/
static scclResult_t socketTryAccept(struct scclSocket* sock) {
socklen_t socklen = sizeof(union scclSocketAddress);
sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen);
......@@ -252,6 +263,18 @@ static scclResult_t socketTryAccept(struct scclSocket* sock) {
return scclSuccess;
}
/**
* 完成socket连接的最终接受过程
*
* 1. 设置TCP_NODELAY选项
* 2. 接收并验证magic值
* 3. 接收并验证socket类型
* 4. 根据验证结果设置socket状态
*
* @param sock 要完成接受的socket指针
* @return scclSuccess 成功完成接受或忽略无效连接
* @return scclInternalError 类型验证失败
*/
static scclResult_t socketFinalizeAccept(struct scclSocket* sock) {
uint64_t magic;
enum scclSocketType type;
......@@ -286,6 +309,19 @@ static scclResult_t socketFinalizeAccept(struct scclSocket* sock) {
return scclSuccess;
}
/**
* 启动socket连接过程
*
* 根据asyncFlag决定使用阻塞或非阻塞connect()操作。处理连接过程中的各种状态:
* - 连接成功:返回scclSuccess,状态设为scclSocketStateConnected
* - EINPROGRESS:返回scclSuccess,状态设为scclSocketStateConnectPolling
* - ECONNREFUSED:进行重试,超过RETRY_REFUSED_TIMES次后返回scclRemoteError
* - ETIMEDOUT:进行重试,超过RETRY_TIMEDOUT_TIMES次后返回scclRemoteError
* - 其他错误:返回scclSystemError
*
* @param sock 要连接的socket结构体指针
* @return scclResult_t 连接结果状态码
*/
static scclResult_t socketStartConnect(struct scclSocket* sock) {
/* blocking/non-blocking connect() is determined by asyncFlag. */
int ret = connect(sock->fd, &sock->addr.sa, sock->salen);
......@@ -1024,16 +1060,19 @@ scclResult_t scclSocketReady(struct scclSocket* sock, int* running) {
/**
* @brief 接受一个socket连接
*
* @param sock 用于接收连接的socket对象
* @param listenSock 监听状态的socket对象
* @return scclResult_t 返回操作结果状态码:
* 该函数用于从监听socket接受一个新的连接,并将新连接的socket信息填充到指定的socket结构中。
*
* @param sock 用于存储新连接socket信息的结构体指针
* @param listenSock 监听socket的结构体指针
*
* @return scclResult_t 返回操作结果:
* - scclSuccess: 操作成功
* - scclInvalidArgument: 参数无效
* - scclSystemError: 系统错误
* - scclInternalError: 内部错误
*
* @note 该函数会阻塞直到连接被接受或发生错误。如果设置了异步标志(asyncFlag),
* 则会在后台处理连接请求。可以通过abortFlag来中操作。
* @note 该函数会阻塞直到连接被接受或发生错误,除非设置了异步标志
* 如果设置了abortFlag,可以通过设置该标志来中操作。
*/
scclResult_t scclSocketAccept(struct scclSocket* sock, struct scclSocket* listenSock) {
scclResult_t ret = scclSuccess;
......
......@@ -11,15 +11,16 @@
namespace sccl {
namespace hardware {
namespace net {
namespace net_socket {
#define MAX_IFS 16 // 最大接口数量
#define MAX_IF_NAME_SIZE 16 // 每个接口名称的最大长度
#define SLEEP_INT 1000 // 连接重试的休眠间隔,单位为微秒
#define RETRY_REFUSED_TIMES 2e4 // 在报告超时之前,连接被拒绝的重试次数(总计20秒)
#define RETRY_TIMEDOUT_TIMES 3 // 连接超时的重试次数(每次重试可能需要20秒)
#define SOCKET_NAME_MAXLEN (NI_MAXHOST + NI_MAXSERV) // 套接字名称的最大长度,包括主机名和服务名
#define SCCL_SOCKET_MAGIC 0x564ab9f2fc4b9d6cULL // 用于标识套接字的魔数
#define SLEEP_INT 1000 // 连接重试的休眠间隔,单位为微秒
constexpr int MAX_IFS = 16; // 最大接口数量
constexpr int MAX_IF_NAME_SIZE = 16; // 每个接口名称的最大长度
constexpr int RETRY_REFUSED_TIMES = 2e4; // 在报告超时之前,连接被拒绝的重试次数(总计20秒)
constexpr int RETRY_TIMEDOUT_TIMES = 3; // 连接超时的重试次数(每次重试可能需要20秒)
constexpr int SOCKET_NAME_MAXLEN = (NI_MAXHOST + NI_MAXSERV); // 套接字名称的最大长度,包括主机名和服务名
constexpr uint64_t SCCL_SOCKET_MAGIC = 0x564ab9f2fc4b9d6cULL; // 用于标识套接字的魔数
namespace net_socket {
/* 用于存储IPv4/IPv6通用套接字地址的联合体 */
union scclSocketAddress { // 联合体用于存储不同类型的套接字地址
......@@ -64,9 +65,6 @@ struct scclSocket {
enum scclSocketType type; // 套接字类型
};
#define SCCL_SOCKET_SEND 0
#define SCCL_SOCKET_RECV 1
//////////////////////////////////// socket工具 ////////////////////////////////////
// 将地址转换为字符串
const char* scclSocketToString(const union scclSocketAddress* addr, char* buf, const int numericHostForm = 1);
......@@ -99,21 +97,148 @@ scclResult_t scclSocketInit(struct scclSocket* sock,
enum scclSocketType type = scclSocketTypeUnknown,
volatile uint32_t* abortFlag = NULL,
int asyncFlag = 0);
// 创建一个监听socket。sock->addr可以预先填充IP和端口信息。成功调用后设置sock->fd
// 创建一个监听socket,用于服务器端等待客户端的连接请求。
// 是否阻塞: 阻塞,直到有客户端连接。
// 使用端: 服务器端。
// 使用时机: 当服务器准备好接受客户端连接时使用。
scclResult_t scclSocketListen(struct scclSocket* sock);
// 获取socket地址
// 获取socket的地址信息。
// 是否阻塞: 不阻塞。
// 使用端: 通用(客户端和服务器端均可使用)。
// 使用时机: 需要获取当前socket绑定的地址和端口信息时使用。
scclResult_t scclSocketGetAddr(struct scclSocket* sock, union scclSocketAddress* addr);
// 连接到sock->addr。成功调用后设置sock->fd。
// 客户端主动连接到服务器端的套接字。
// 是否阻塞: 可以选择阻塞或非阻塞,取决于实现和调用方式。
// 使用端: 客户端。
// 使用时机: 客户端需要与服务器建立连接时使用。
scclResult_t scclSocketConnect(struct scclSocket* sock, int portReuse = 0);
// 返回socket连接状态。
// 检查socket连接状态,判断是否已准备好进行通信。
// 是否阻塞: 不阻塞。
// 使用端: 通用(客户端和服务器端均可使用)。
// 使用时机: 在进行数据传输之前,检查连接是否已建立。
scclResult_t scclSocketReady(struct scclSocket* sock, int* running);
// 接受来自listenSock->fd的传入连接,并在sock->fd中保持文件描述符,远程端IP/端口在sock->addr中。
scclResult_t scclSocketAccept(struct scclSocket* sock, struct scclSocket* ulistenSock);
// 获取socket文件描述符
// 服务器端接受客户端的连接请求,创建一个新的套接字用于通信。
// 是否阻塞: 可以选择阻塞或非阻塞,取决于实现和调用方式。
// 使用端: 服务器端。
// 使用时机: 服务器端在监听socket上收到客户端连接请求后使用。
scclResult_t scclSocketAccept(struct scclSocket* sock, struct scclSocket* listenSock);
// 获取socket的文件描述符。
// 是否阻塞: 不阻塞。
// 使用端: 通用(客户端和服务器端均可使用)。
// 使用时机: 需要直接操作socket的文件描述符时使用,例如在多路复用中。
scclResult_t scclSocketGetFd(struct scclSocket* sock, int* fd);
// 设置socket文件描述符
// 设置socket的文件描述符。
// 是否阻塞: 不阻塞。
// 使用端: 通用(客户端和服务器端均可使用)。
// 使用时机: 在已有文件描述符的基础上创建socket对象时使用。
scclResult_t scclSocketSetFd(int fd, struct scclSocket* sock);
//////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 管理套接字的基类,提供套接字的获取、释放和初始化功能。
*
* 该类包含一个指向scclSocket结构体的指针sock_和一个布尔值owned_,
* 用于跟踪套接字的所有权。提供了获取和释放套接字的方法,
* 以及一个初始化方法来设置套接字的各种参数。
*/
class scclSocketManager {
public:
struct scclSocket* getSocket() { return sock_; }
struct scclSocket* releaseSocket() {
owned_ = false;
return sock_;
}
protected:
struct scclSocket* sock_;
bool owned_;
scclSocketManager() : sock_(nullptr), owned_(true) {}
virtual ~scclSocketManager() {
if(owned_ && sock_) {
scclSocketClose(sock_);
delete sock_;
}
}
// 默认的初始化过程
scclResult_t init(union scclSocketAddress* addr = NULL,
uint64_t magic = SCCL_SOCKET_MAGIC,
enum scclSocketType type = scclSocketTypeUnknown,
volatile uint32_t* abortFlag = NULL,
int asyncFlag = 0) {
sock_ = new struct scclSocket;
return scclSocketInit(sock_, addr, magic, type, abortFlag, asyncFlag);
}
private:
// 禁止拷贝构造和赋值操作
scclSocketManager(const scclSocketManager&);
scclSocketManager& operator=(const scclSocketManager&);
};
/**
* @brief 管理服务器端套接字的类,继承自scclSocketManager。
*
* 该类在构造时初始化套接字,并将其设置为监听模式。
*/
class scclSocketServerManager : public scclSocketManager {
public:
scclSocketServerManager(union scclSocketAddress* addr,
uint64_t magic = SCCL_SOCKET_MAGIC,
enum scclSocketType type = scclSocketTypeUnknown,
volatile uint32_t* abortFlag = NULL,
int asyncFlag = 0) {
SCCLCHECK(init(addr, magic, type, abortFlag, asyncFlag));
SCCLCHECK(scclSocketListen(sock_));
SCCLCHECK(scclSocketGetAddr(sock_, addr));
#if 0
char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
net::net_socket::scclSocketToString(addr, line);
printf("server listening addr: %s\n", line);
#endif
}
};
/**
* @brief 管理客户端套接字的类,继承自scclSocketManager。
*
* 该类在构造时初始化套接字,并尝试连接到服务器。
*/
class scclSocketClientManager : public scclSocketManager {
public:
scclSocketClientManager(union scclSocketAddress* addr,
uint64_t magic = SCCL_SOCKET_MAGIC,
enum scclSocketType type = scclSocketTypeUnknown,
volatile uint32_t* abortFlag = NULL,
int asyncFlag = 0) {
SCCLCHECK(init(addr, magic, type, abortFlag, asyncFlag));
SCCLCHECK(scclSocketConnect(sock_));
#if 0
char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
net::net_socket::scclSocketToString(addr, line);
printf("client connect addr: %s\n", line);
#endif
}
};
/**
* @brief 管理接受连接的套接字的类,继承自scclSocketManager。
*
* 该类在构造时初始化套接字,并接受来自监听套接字的连接。
*/
class scclSocketAcceptManager : public scclSocketManager {
public:
scclSocketAcceptManager(struct scclSocket* listenSock, int asyncFlag = 0) : scclSocketManager() {
init(NULL, SCCL_SOCKET_MAGIC, scclSocketTypeUnknown, NULL, /*asyncFlag=*/asyncFlag);
scclSocketAccept(sock_, listenSock);
}
};
} // namespace net_socket
} // namespace net
} // namespace hardware
......
......@@ -13,6 +13,10 @@ typedef enum {
SCCL_PTR_DMABUF = 0x4
} sccl_ptr_t;
constexpr int SCCL_SOCKET_SEND = 0;
constexpr int SCCL_SOCKET_RECV = 1;
constexpr int SCCL_NET_HANDLE_MAXSIZE = 128;
////////////////////////////////// 用于定义网络设备 //////////////////////////////////
typedef struct {
char* name; // 主要用于日志记录。
......@@ -92,8 +96,6 @@ public:
////////////////////////////////// 功能函数 //////////////////////////////////
// 初始化 ROCm 库
scclResult_t rocmLibraryInit(void);
#define SCCL_NET_HANDLE_MAXSIZE 128
struct netIf { // 网络接口结构体
char prefix[64]; // 网络前缀
int port; // 端口号
......
......@@ -8,6 +8,7 @@
#include <chrono>
#include <ctime>
#include <cstdint>
#include <memory> // for std::unique_ptr
#include "bootstrap.h"
......@@ -16,59 +17,12 @@ namespace hardware {
namespace topology {
namespace bootstrap {
#define MAX_THREADS (128)
////////////////////////////////////////////////////////////////////////////////////////////////////////
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; // 线程锁
static bool initialized = false; // 标志是否已经初始化
bool hsaFineGrainFlag = true; // 标志变量,用于指示是否启用HSAP细粒度标志
struct bootstrapRootArgs {
scclSocket_t* listenSock;
uint64_t magic;
};
// 构造函数
scclBootstrap::scclBootstrap(struct scclRankInfo* rank_info, struct scclBootstrapComm* comm) {
// 初始化线程池
int thread_cnt = ::std::min(MAX_THREADS, rank_info->nRanks);
pthread_pool = new ThreadPool(thread_cnt);
scclResult_t res;
// 将 handle 结构体清零
SCCLCHECKGOTO(scclCalloc(&handle, 1), res, failure);
// 初始化bootstrap网络环境
SCCLCHECKGOTO(bootstrapInit(rank_info, comm), res, failure);
return;
failure:
WARN("bootstrap not implemented yet");
return;
}
scclBootstrap::~scclBootstrap() {
if(handle) {
free(handle);
}
if(bootstrap_net) {
delete bootstrap_net;
}
if(pthread_pool) {
delete pthread_pool;
}
}
/**
* 初始化bootstrap通信环境
*
* @param rank_info 包含rank和nRanks信息的唯一标识符
* @param comm 需要初始化的bootstrap通信结构体
* @return scclResult_t 返回操作结果,成功返回scclSuccess
*
* 该函数负责:
* 1. 设置WarpSize
* 2. 初始化unique_info信息
* 3. 创建并初始化bootstrap socket句柄
*/
scclResult_t scclBootstrap::bootstrapInit(const struct scclRankInfo* rank_info, struct scclBootstrapComm* comm) {
static scclResult_t basicInit() {
// 如果已经初始化,直接返回成功
if(asm_ops::ld_acquire_sys_global(&initialized))
return scclSuccess;
......@@ -77,97 +31,31 @@ scclResult_t scclBootstrap::bootstrapInit(const struct scclRankInfo* rank_info,
pthread_mutex_lock(&initLock);
// 如果尚未初始化,进行初始化操作
if(!initialized) {
// 初始化通信结构的WarpSize
comm->WarpSize = warpSize;
// 获取CPU亲和性
sched_getaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity);
// 分配并初始化scclUniqueInfo信息
SCCLCHECK(scclCalloc(&(comm->unique_info), 1));
// 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB"
const char* envNetName = getenv("SCCL_NET_NAME");
char* netName = (envNetName != NULL) ? strdup(envNetName) : strdup("IB");
// 打印网络名称
printf("netName=%s\n", netName);
// 初始化网络和引导网络
SCCLCHECK(net::scclNetInit(netName, comm->scclNet));
// 释放分配的网络名称字符串
free(netName);
// 调用bootstrapNetInit初始化CPU硬件,环境检查
SCCLCHECK(bootstrapBasicInit());
// 初始化唯一信息结构体
SCCLCHECK(bootstrapUniqueInfoInit(rank_info, comm->scclNet, comm->unique_info));
// 初始化网络结构体,用于bootstrap阶段的socket通信
bootstrap_net = new bootstrapNet(comm);
// 设置当前rank的socket信息给handle
SCCLCHECK(getRandomData(&handle->magic, sizeof(handle->magic)));
memcpy(&handle->addr, &bootstrap_net->bootstrapNetIfAddr, sizeof(scclSocketAddress_t));
// SCCLCHECK(getIpcSocketAddr(&handle->peerIpcAddr));
#if 0
// char line[100];
// sprintf(line, "pos 55: rank=%d", rank_info->rank);
// SCCLCHECK(net::printSocketAddr(&handle->addr, line));
#endif
bootstrapAllGather(comm->unique_info);
// 设置初始化完成标志
asm_ops::st_release_sys_global(&initialized, true);
}
// 解锁
pthread_mutex_unlock(&initLock);
return scclSuccess;
}
/**
* @brief 执行基本的引导程序初始化
*
* 该函数负责初始化引导程序的基本组件,包括网络引导和系统环境检查。
* 使用互斥锁确保线程安全,避免重复初始化。
*
* @note 如果NUMA自动平衡已启用,会发出警告提示可能影响性能。
* @note 会检查内核版本信息并记录。
*
* @return scclResult_t 返回初始化结果,成功返回scclSuccess
*/
scclResult_t scclBootstrap::bootstrapBasicInit() {
// 始终初始化引导网络
SCCLCHECK(bootstrap_net->bootstrapNetInit());
// SCCLCHECK(scclNetPluginInit()); // collnet使用
char strValue[1024];
// 检查NUMA自动平衡是否启用
SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
if(strcmp(strValue, "1") == 0)
WARN("NUMA自动平衡已启用,这可能导致RCCL性能的不稳定性!通过\"sudo sysctl kernel.numa_balancing=0\"禁用");
// 获取内核版本信息
SCCLCHECK(scclTopoGetStrFromSys("/proc", "version", strValue));
char *verStr, *state;
verStr = strtok_r(strValue, " ", &state);
INFO(SCCL_LOG_BOOTSTRAP, "内核版本: %s", verStr);
for(int i = 0; i < 2; i++) {
verStr = strtok_r(NULL, " ", &state);
if(verStr == NULL)
break;
}
// TODO: 最终确定是否需要检查版本信息
#if 0
initEnv(); // 初始化环境
// 始终初始化引导网络
SCCLCHECK(bootstrapNet::bootstrapNetInit());
// initGdrCopy(); // 初始化GDR复制
// SCCLCHECK(scclNetPluginInit());
char strValue[1024];
// 检查NUMA自动平衡是否启用
SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
if(strcmp(strValue, "1") == 0)
WARN("NUMA自动平衡已启用,这可能导致RCCL性能的不稳定性!通过\"sudo sysctl kernel.numa_balancing=0\"禁用");
// 获取内核版本信息
SCCLCHECK(scclTopoGetStrFromSys("/proc", "version", strValue));
char *verStr, *state;
verStr = strtok_r(strValue, " ", &state);
for(int i = 0; i < 2; i++) {
verStr = strtok_r(NULL, " ", &state);
if(verStr == NULL)
break;
}
INFO(SCCL_LOG_BOOTSTRAP, "内核版本: %s", verStr);
// 检查是否为Cray系统
if(strstr(verStr, "cray") == NULL) {
// 获取BIOS版本信息
SCCLCHECK(scclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue));
if(strncmp("Hyper-V UEFI Release", strValue, 20) != 0) {
FILE* file;
// 读取内核命令行参数
......@@ -182,109 +70,468 @@ scclResult_t scclBootstrap::bootstrapBasicInit() {
if(strstr(strValue, "iommu=pt") == NULL)
WARN("内核命令行中缺少\"iommu=pt\"参数,这可能导致系统不稳定或挂起!");
}
#ifndef HIP_UNCACHED_MEMORY
// 检查环境变量"HSA_FORCE_FINE_GRAIN_PCIE"
char* env = getenv("HSA_FORCE_FINE_GRAIN_PCIE");
printf("HSA env=%s\n", env);
if(env == NULL || strcmp(env, "1") != 0)
WARN("环境变量中缺少\"HSA_FORCE_FINE_GRAIN_PCIE=1\",这可能导致RCCL性能低下,系统不稳定或挂起!");
#endif
float* ptr;
// 尝试分配细粒度PCIe内存
hipError_t err = hipExtMallocWithFlags((void**)&ptr, 128, hipDeviceMallocFinegrained);
if(err != hipSuccess)
hsaFineGrainFlag = false;
}
#endif
// 设置初始化标志
asm_ops::st_release_sys_global(&initialized, true);
}
// 解锁
pthread_mutex_unlock(&initLock);
return scclSuccess;
}
scclResult_t bootstrapGetUniqueId(struct BootstrapHandle* handle) {
SCCLCHECK(basicInit());
// 在每个进程中设置 handle 的值
getRandomData(&handle->magic, sizeof(handle->magic));
const char* env = getenv("SCCL_COMM_ID");
if(env) {
memset(&handle->magic, 0, sizeof(handle->magic));
INFO(SCCL_LOG_BOOTSTRAP, "SCCL_COMM_ID set by environment to %s", env);
if(scclSocketGetAddrFromString(&handle->addr, env) != scclSuccess) {
WARN("Invalid SCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return scclInvalidArgument;
}
} else {
// 初始化socket
scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
memcpy(&handle->addr, &localSocketAddr, sizeof(scclSocketAddress_t));
// 启动根节点listen监听
SCCLCHECK(bootstrapCreateRoot(handle));
}
return scclSuccess;
}
static scclResult_t setFilesLimit() {
struct rlimit filesLimit;
SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit");
filesLimit.rlim_cur = filesLimit.rlim_max;
SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit");
return scclSuccess;
}
/**
* @brief 初始化唯一信息结构体
* @brief 根节点引导程序,负责收集所有rank的地址信息并广播给其他rank
* 由于同一个socket数据传输比较慢,所以在进行数据广播时,仅传送给localRank==0的rank,再由其进行节点内广播
* 该函数所有数据传输与 Bootstrap::bootstrapRootGatherAndBroadcast 函数相配合
*
* @param rargs 包含监听套接字和验证魔数的参数结构体
* @return void* 总是返回NULL
*
* 该函数用于初始化scclUniqueInfo结构体,包括设置rank信息、设备信息、主机和进程哈希值,
* 以及硬件相关信息(GPU、CPU、RDMA、PCI等)。
* 该函数执行以下主要操作:
* 1. 初始化资源并设置文件描述符限制
* 2. 循环接收所有rank的连接请求,收集地址信息
* 3. 验证接收到的rank信息一致性
* 4. 计算本地rank数量(nLocalRanks)
* 5. 使用线程池并行发送nLocalRanks值给所有rank
* 6. 将收集到的所有rank地址信息广播给每个节点的localRank=0的进程
* 7. 清理资源并返回
*
* @param rank_info 输入参数,包含rank相关信息
* @param unique_info 输出参数,待初始化的唯一信息结构体
* @return scclResult_t 返回操作结果,scclSuccess表示成功
* @note 函数使用线程池加速消息分发,并通过日志记录关键操作步骤
*/
scclResult_t scclBootstrap::bootstrapUniqueInfoInit(const struct scclRankInfo* rank_info, scclNet_t* scclNet, struct scclUniqueInfo* unique_info) {
////////////////// 设置id信息 //////////////////
unique_info->rank = rank_info->rank; // 将unique_id的rank赋值给unique_info的rank
unique_info->nRanks = rank_info->nRanks; // 将unique_id的nRanks赋值给unique_info的nRanks
unique_info->localRanks = rank_info->localRanks; // 将unique_id的localRanks赋值给unique_info的localRanks
unique_info->localRank = rank_info->localRank; // 计算unique_info的localRank
static void* bootstrapRoot(void* rargs) {
struct bootstrapRootArgs* args = (struct bootstrapRootArgs*)rargs;
scclSocket_t* listenSock = args->listenSock; // 用于监听的套接字
uint64_t magic = args->magic; // 用于验证的魔数
scclResult_t res = scclSuccess; // 函数结果
class ThreadPool* pthread_pool = nullptr; // 用于根节点分发消息的线程池
int nRanks = 0; // nRanks: 进程总数;
int nLocalRanks = 1;
int c = 0; // c: 已连接的进程计数
uint64_t rootHostHash = 0;
struct BootstrapNodeBasic node_basic; // 用于存储扩展信息的结构体
struct BootstrapNodeBasic* all_rank_node_basic = nullptr; // 所有进程的地址
// 定义一个函数或者一个函数对象,用于执行实际的发送数据操作
auto send_task = [](BootstrapNodeBasic& node_basic, uint64_t magic, int rank, void* data, size_t size) {
net::net_socket::scclSocketClientManager client_manager(&node_basic.addr, magic, net::net_socket::scclSocketTypeBootstrap);
bootstrapNet::bootstrapNetSend(client_manager.getSocket(), data, size);
};
// 用于验证的工具进行初始化
scclSocketAddress_t* zero = nullptr; // 用于初始化或比较的零地址
setFilesLimit(); // 设置文件描述符限制
SCCLCHECKGOTO(scclCalloc(&zero, 1), res, out); // 为zero分配内存
INFO(SCCL_LOG_BOOTSTRAP, "BEGIN"); // 日志:开始
// --------------------- 1.从所有rank接收其socket地址(BootstrapNodeBasic) --------------------- //
do {
net::net_socket::scclSocketAcceptManager accept_manager(listenSock);
SCCLCHECKGOTO(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &node_basic, sizeof(node_basic)), res, out); // 接收数据
if(c == 0) {
nRanks = node_basic.nRanks;
SCCLCHECKGOTO(scclCalloc(&all_rank_node_basic, nRanks), res, out); // 为rankAddresses分配内存
pthread_pool = new ThreadPool(nRanks);
} else if(nRanks != node_basic.nRanks) { // 如果接收到的进程总数不匹配
WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nRanks, node_basic.nRanks); // 警告
goto out; // 跳转到out标签
}
if(memcmp(zero, &all_rank_node_basic[node_basic.rank].addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到
WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks); // 警告
goto out; // 跳转到out标签
}
LECHECK(unique_info->localRanks, unique_info->localRank); // 检查localRank是否小于localRanks
// 保存该rank的连接句柄
memcpy(all_rank_node_basic + node_basic.rank, &node_basic, sizeof(struct BootstrapNodeBasic));
++c; // 增加已连接的进程计数
INFO(SCCL_LOG_BOOTSTRAP, "Received connect from rank %d total %d/%d", node_basic.rank, c, nRanks); // 日志
} while(c < nRanks); // 当已连接的进程数小于总数时循环
INFO(SCCL_LOG_BOOTSTRAP, "COLLECTED ALL %d HANDLES", nRanks); // 日志:收集到所有句柄
uint32_t devices_num;
SCCLCHECK(rocm_smi_init()); // 初始化ROCM SMI库
SCCLCHECK(rocm_smi_getNumDevice(&devices_num)); // 获取设备数量
LTCHECK(devices_num, 0); // 检查设备数量是否大于0
// --------------------- 2.计算nLocalRanks,并广播给其他所有rank --------------------- //
#if 1
for(int r = 0; r < nRanks; ++r) {
auto temp_node_basic = all_rank_node_basic[r];
char line[100];
sprintf(line, "bootstrapRoot r=%d, rank=%d/%d, hostHash=%lu,\n", r, temp_node_basic.rank, temp_node_basic.nRanks, temp_node_basic.hostHash);
scclSocketAddress_t temp_addr = temp_node_basic.addr;
hardware::net::printSocketAddr(&temp_addr, line);
}
#endif
// 首先计算nLocalRanks大小,即具有相同hostHash的节点数量
rootHostHash = all_rank_node_basic[0].hostHash;
for(int i = 1; i < nRanks; ++i) {
if(rootHostHash == all_rank_node_basic[i].hostHash) {
nLocalRanks++; // 如果hostHash相同,则增加本地节点计数
} else {
break; // 一旦发现不同的hostHash,停止计数
}
}
// 给每个节点的localRank=0的进程发送信息,并由其进行广播,从而加快速度
for(int r = 0; r < nRanks; ++r) {
auto dst_node_basic = all_rank_node_basic[r];
// 使用std::bind将参数绑定到send_task函数
auto bound_task = std::bind(send_task, dst_node_basic, magic, r, &nLocalRanks, sizeof(int));
// 将绑定后的任务添加到线程池
pthread_pool->enqueue(bound_task);
}
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
// --------------------- 3.给所有localRank==0的rank发送all_rank_node_basic数据 --------------------- //
// 给每个节点的localRank=0的进程发送信息,并由其进行广播,从而加快速度
for(int r = 0; r < nRanks / nLocalRanks; ++r) {
int dst_rank = r * nLocalRanks; // 计算目标rank
auto dst_node_basic = all_rank_node_basic[dst_rank];
net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.addr, magic, net::net_socket::scclSocketTypeBootstrap);
bootstrapNet::bootstrapNetSend(client_manager.getSocket(), all_rank_node_basic, sizeof(struct BootstrapNodeBasic) * nRanks);
printf("root send nLocalRanks value to rank=%d\n", r);
}
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
}
unique_info->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给unique_info的deviceCnt
LECHECK(unique_info->deviceCnt, unique_info->hipDev); // 检查hipDev是否小于deviceCnt
HIPCHECK(hipSetDevice(unique_info->hipDev)); // 设置当前设备为hipDev
unique_info->hipDev = rank_info->hipDev;
INFO(SCCL_LOG_BOOTSTRAP, "bootstrap send out all %d handles", nRanks); // 日志:发送出所有句柄
out:
// 关闭套接字,并释放内存
if(listenSock) {
scclSocketClose(listenSock);
delete listenSock;
}
// 释放内存
if(all_rank_node_basic)
free(all_rank_node_basic);
if(zero)
free(zero);
if(pthread_pool)
delete pthread_pool;
free(rargs); // 释放rargs内存
INFO(SCCL_LOG_BOOTSTRAP, "DONE"); // 日志:完成
// 获取其他基础信息
unique_info->hostHash = getHostHash(); // 获取主机哈希值并赋值给unique_info的hostHash
unique_info->pidHash = getPidHash(); // 获取进程ID哈希值并赋值给unique_info的pidHash
return NULL; // 返回NULL
}
////////////////// 设置硬件信息 //////////////////
struct topoLocalNode* p_localNode = &unique_info->localNode;
/**
* 创建并启动bootstrap根节点
*
* 该函数负责初始化监听socket,创建并启动一个独立的线程来处理bootstrap根节点逻辑。
* 线程会被设置为detach状态,无需等待其结束。
*
* @param handle 包含bootstrap配置信息的句柄
* @return 成功返回scclSuccess,失败返回相应的错误码
*/
scclResult_t bootstrapCreateRoot(struct BootstrapHandle* handle) {
struct bootstrapRootArgs* args;
pthread_t thread;
// 设置根节点socket监听
net::net_socket::scclSocketServerManager root_manager(&handle->addr, handle->magic, net::net_socket::scclSocketTypeBootstrap);
// 为args分配内存
SCCLCHECK(scclCalloc(&args, 1));
// 设置线程参数
args->listenSock = root_manager.releaseSocket();
args->magic = handle->magic;
// 创建线程以执行bootstrapRoot函数, 直到线程结束才释放listenSock
NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0);
// 设置线程名称
scclSetThreadName(thread, "SCCL BootstrapR");
// 分离线程,使其在完成后自动回收资源
NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d
// 设置GPU信息
p_localNode->gpu.dev = rank_info->hipDev;
hipDeviceProp_t deviceProp;
HIPCHECK(hipGetDeviceProperties(&deviceProp, rank_info->hipDev));
snprintf(p_localNode->gpu.name, sizeof(p_localNode->gpu.name), "%s", deviceProp.name);
snprintf(p_localNode->gpu.gcn, sizeof(p_localNode->gpu.gcn), "%s", deviceProp.gcnArchName);
p_localNode->gpu.compCap = deviceProp.major * 10 + deviceProp.minor;
return scclSuccess;
}
// 设置CPU信息
memcpy(&p_localNode->cpu.socketAddr, &bootstrap_net->bootstrapNetIfAddr, sizeof(scclSocketAddress_t));
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
// 构造函数
Bootstrap::Bootstrap(const struct BootstrapHandle* handle, int rank, int nRanks)
: root_handle(handle), rank(rank), nRanks(nRanks), localRank(-1), nLocalRanks(0), socketInitDone(false) {
printf("construct init Bootstrap\n");
// 设置RDMA信息
SCCLCHECK(scclNet->getProperties(rank_info->hipDev, &p_localNode->net.props));
SCCLCHECK(scclNet->devices(&p_localNode->net.count));
// 初始化线程池
pthread_pool = new ThreadPool(nRanks);
}
// 设置PCI信息
SCCLCHECK(getBusId(rank_info->hipDev, &p_localNode->pci.busId));
Bootstrap::~Bootstrap() {
if(pthread_pool) {
delete pthread_pool;
}
if(ipcsocket) {
delete ipcsocket;
}
}
#if 1
printf("topoLocalNode size=%ld\n", sizeof(struct topoLocalNode));
SCCLCHECK(net::printNetProps(&p_localNode->net.props, rank_info->rank, rank_info->localRank));
#endif
scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
// 如果已经初始化,直接返回成功
if(asm_ops::ld_acquire_sys_global(&socketInitDone))
return scclSuccess;
// 加锁以确保初始化过程的线程安全
pthread_mutex_lock(&bootstrapMutex);
// -------------------------- 1.获取自身基础信息 ----------------------------------- //
SCCLCHECK(basicInit());
uint64_t hostHash = getHostHash();
scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
// 设置基础信息
struct BootstrapNodeBasic node_basic{rank, nRanks, hostHash, localSocketAddr};
printf("Bootstrap::init 111 rank=%d/%d, localRank=%d/%d\n", rank, nRanks, localRank, nLocalRanks);
// -------------------------- 2.设置0号rank搜集的CPU信息和localRank信息 ----------------------------------- //
// 创建根节点的数据收集
struct BootstrapNodeBasic* all_node_basic;
SCCLCHECK(scclCalloc(&all_node_basic, nRanks)); // 节点间用于传输数据的基础信息
SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic, all_node_basic));
printf("Bootstrap::init 222 rank=%d/%d, localRank=%d/%d\n", rank, nRanks, localRank, nLocalRanks);
// -------------------------- 3.设置本地localRank的BootstrapComm信息 ----------------------------------- //
// 初始化BootstrapComm类
bootstrap_comm->init(rank, nRanks, localRank, nLocalRanks);
printf("Bootstrap::init 333 rank=%d/%d, localRank=%d/%d\n", rank, nRanks, localRank, nLocalRanks);
if(CPU_COUNT(&bootstrap_comm->cpuAffinity)) {
sched_setaffinity(0, sizeof(cpu_set_t), &bootstrap_comm->cpuAffinity);
}
bootstrap_comm->magic = root_handle->magic;
//////// 设置显卡状态 ////////
bootstrap_comm->hipDev = localRank; // CUDA 设备 ID
uint32_t devices_num;
SCCLCHECK(rocm_smi_init()); // 初始化ROCM SMI库
SCCLCHECK(rocm_smi_getNumDevice(&devices_num)); // 获取设备数量
LTCHECK(devices_num, 0); // 检查设备数量是否 devices_num>0
LTCHECK(devices_num, nLocalRanks); // 检查设备数量是否 devices_num>nLocalRanks
bootstrap_comm->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给的deviceCnt
printf("devices_num=%d\n", bootstrap_comm->deviceCnt);
LECHECK(devices_num, bootstrap_comm->hipDev); // 检查hipDev是否小于deviceCnt
HIPCHECK(hipSetDevice(bootstrap_comm->hipDev)); // 设置当前设备为hipDev
//////// 设置启动通信的scclNet ////////
// 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB"
const char* envNetName = getenv("SCCL_NET_NAME");
char* netName = (envNetName != NULL) ? strdup(envNetName) : strdup("IB");
printf("netName=%s\n", netName);
// 初始化网络和引导网络
SCCLCHECK(net::scclNetInit(netName, bootstrap_comm->scclNet));
// 释放分配的网络名称字符串
free(netName);
//////// 初始化唯一信息结构体 ////////
struct scclNodeInfo local_node_info;
local_node_info.hostHash = hostHash;
SCCLCHECK(bootstrapCommInitNodeInfo(bootstrap_comm->scclNet, &local_node_info));
printNodeInfo(&local_node_info);
// -------------------------- 4.BootstrapComm信息的allgather ----------------------------------- //
// bootstrap_comm = new BootstrapComm(rank_info->nRanks);
// auto node_info_vec = bootstrap_comm->node_info_set->node_info_vec;
// struct scclNodeInfo* local_node_info = node_info_vec[0];
// constexpr int handle_size = sizeof(struct BootstrapHandle);
// auto handleBufferChr = reinterpret_cast<const char*>(handleBuffer);
// for(int i = 0; i < rank_info->nRanks; ++i) {
// auto temp_bootstrap_handle = deserializeBootstrapData<BootstrapHandle>(&handleBufferChr[i * handle_size]);
// int rank = temp_bootstrap_handle.rank;
// uint64_t hostHash = temp_bootstrap_handle.hostHash;
// // scclSocketAddress_t addr; // 地址,用于网络通信
// printf("bootstrapInit rank=%d, i=%d, hostHash=%lu\n", rank, i, hostHash);
// }
// printUniqueInfo(bootstrap_comm->unique_info);
// // 如果已经初始化,直接返回成功
// if(asm_ops::ld_acquire_sys_global(&initialized))
// return scclSuccess;
// // 加锁以确保初始化过程的线程安全
// pthread_mutex_lock(&bootstrapNetLock);
// // 如果尚未初始化,进行初始化操作
// if(!initialized) {
// // -------------------------- 1.设置各种属性 ----------------------------------- //
// // 获取CPU亲和性
// sched_getaffinity(0, sizeof(cpu_set_t), &bootstrap_comm->cpuAffinity);
// bootstrap_comm->nRanks = rank_info->nRanks;
// uint32_t devices_num;
// SCCLCHECK(rocm_smi_init()); // 初始化ROCM SMI库
// SCCLCHECK(rocm_smi_getNumDevice(&devices_num)); // 获取设备数量
// LTCHECK(devices_num, 0); // 检查设备数量是否大于0
// bootstrap_comm->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给的deviceCnt
// printf("devices_num=%s\n", bootstrap_comm->deviceCnt);
// // SCCLCHECK(getIpcSocketAddr(&handle->peerIpcAddr));
// #if 0
// // char line[100];
// // sprintf(line, "pos 55: rank=%d", rank_info->rank);
// // SCCLCHECK(net::printSocketAddr(&handle->addr, line));
// #endif
// // bootstrapAllGather(bootstrap_comm->node_info);
// // 设置初始化完成标志
// asm_ops::st_release_sys_global(&initialized, true);
// }
// // 解锁
// pthread_mutex_unlock(&bootstrapNetLock);
// 设置初始化标志
asm_ops::st_release_sys_global(&socketInitDone, true);
// 解锁
pthread_mutex_unlock(&bootstrapMutex);
return scclSuccess;
}
///////////////////////////////////////////////////////////////////////////
/**
* 检查bootstrap是否已成功初始化
* @brief 执行根节点的数据收集和广播操作
*
* 该函数通过检查handle指针和initialized标志来验证初始化状态
* 使用互斥锁确保线程安全
* 该函数负责以下操作:
* 1. 设置本地监听服务
* 2. 向根节点发送本节点的基本数据
* 3. 从根节点接收本地rank数量信息
* 4. 当本地rank为0时,从根节点接收所有rank的IP数据
* 5. 将收集到的所有rank数据广播给节点内其他rank
*
* @return scclSuccess 如果已初始化成功
* @return scclSystemError 如果未初始化或初始化失败
* @param send_data 发送给根节点的数据指针
* @param recv_data 接收广播数据的缓冲区指针
* @return scclResult_t 返回操作结果,成功返回scclSuccess
*/
scclResult_t scclBootstrap::bootstrapInitCheck() {
scclResult_t res = scclSuccess;
scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(void* send_data, void* recv_data) {
// 数据类型转换
auto send_data_basic = reinterpret_cast<struct BootstrapNodeBasic*>(send_data);
auto recv_data_basic = reinterpret_cast<struct BootstrapNodeBasic*>(recv_data);
int recv_data_basic_size = nRanks * sizeof(struct BootstrapNodeBasic);
scclSocketAddress_t root_addr = root_handle->addr;
// ------------- 1.各个rank在发送给根节点数据之前,首先设置监听listen ------------- //
net::net_socket::scclSocketServerManager local_server_manager(&send_data_basic->addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
// 保存监听的sock信息到本类,用于后续proxy使用
my_listen_sock = local_server_manager.releaseSocket();
// ------------- 2.各个节点向根节点发送数据 ------------- //
net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(struct BootstrapNodeBasic)));
// ------------- 3.从根节点接收nLocalRanks值 ------------- //
// 接收nLocalRanks信息
{
net::net_socket::scclSocketAcceptManager accept_manager(my_listen_sock);
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &nLocalRanks, sizeof(int)));
}
// 加锁以确保初始化过程的线程安全
pthread_mutex_lock(&initLock);
if(handle == nullptr || initialized == false) {
res = scclSystemError;
// ------------- 4.nLocalRanks==0时,从根节点接收所有rank的ip数据 ------------- //
this->localRank = rank % nLocalRanks;
if(localRank == 0) {
net::net_socket::scclSocketAcceptManager accept_manager(my_listen_sock);
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data_basic, recv_data_basic_size));
}
pthread_mutex_unlock(&initLock); // 解锁
printf("Bootstrap::bootstrapRootGatherAndBroadcast 444 rank=%d/%d, localRank=%d/%d\n", rank, nRanks, localRank, nLocalRanks);
// ------------- 5.nLocalRanks==0时,将所有rank的ip数据广播给节点内其他rank ------------- //
ipcsocket = new scclIpcSocket_t(localRank, nLocalRanks, /*hash*/ root_handle->magic);
ipcsocket->scclIpcSocketBroadcast(recv_data_basic, recv_data_basic_size, 0, /*wait*/ true);
return res;
return scclSuccess;
}
scclResult_t scclBootstrap::bootstrapAllGather(struct scclUniqueInfo* unique_info) {
/**
* @brief 初始化节点通信信息
*
* 该函数用于初始化节点的通信信息,包括:
* - 设置节点的全局排名和本地排名
* - 获取并设置进程ID哈希值
* - 设置GPU设备属性(名称、GCN架构、计算能力)
* - 设置RDMA网络属性
* - 设置PCI总线ID
* - 设置CPU套接字地址
*
* @param scclNet 网络句柄
* @param socket_addr 套接字地址
* @param node_info 节点信息结构体指针
* @return scclResult_t 返回操作结果,成功返回scclSuccess
*/
scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scclNodeInfo* node_info) {
////////////////// 设置基础信息 //////////////////
node_info->rank = rank; // 当前节点的全局排名
node_info->localRank = localRank; // 当前节点在本地计算节点中的排名
node_info->pidHash = getPidHash(); // 获取进程ID哈希值并赋值给的pidHash
int hipDev = localRank;
////////////////// 设置硬件信息 //////////////////
struct topoLocalNode* p_localNode = &node_info->localNode;
// 设置CPU信息
p_localNode->cpu.listen_sock = *my_listen_sock;
// 设置PCI信息
SCCLCHECK(getBusId(hipDev, &p_localNode->pci.busId));
// 设置GPU信息
p_localNode->gpu.dev = hipDev;
hipDeviceProp_t deviceProp;
HIPCHECK(hipGetDeviceProperties(&deviceProp, hipDev));
snprintf(p_localNode->gpu.name, sizeof(p_localNode->gpu.name), "%s", deviceProp.name);
snprintf(p_localNode->gpu.gcn, sizeof(p_localNode->gpu.gcn), "%s", deviceProp.gcnArchName);
p_localNode->gpu.compCap = deviceProp.major * 10 + deviceProp.minor;
// 设置RDMA信息
SCCLCHECK(scclNet->getProperties(hipDev, &p_localNode->net.props));
SCCLCHECK(scclNet->devices(&p_localNode->net.count));
return scclSuccess;
}
scclResult_t Bootstrap::bootstrapAllGather(struct scclNodeInfo*) {
// 1.节点内通信 allgather
// 2.节点间通信,ring allgather
......@@ -296,15 +543,19 @@ scclResult_t scclBootstrap::bootstrapAllGather(struct scclUniqueInfo* unique_inf
/////////////////////////////////////////////////////////////////////////////////////////////
// // 将本地socket地址写入到/tmp/文件夹的文件中,通过nfs共享存储,其他rank可见
// scclResult_t bootstrapGetAllNodes(const struct scclUniqueInfo* unique_info, struct scclBootstrapComm* comm) {
// scclResult_t bootstrapGetAllNodes(const struct scclNodeInfo* , struct BootstrapComm* comm) {
// // // 分配并初始化IPC套接字
// // struct scclIpcSocket ipcSock = {0};
// // // Create a UDS socket to receive the converted fd
// // SCCLCHECK(scclIpcSocketInit(&ipcSock, unique_info->rank, /*hash*/ handle->magic, /*abortFlag*/ NULL));
// // SCCLCHECK(scclIpcSocketInit(&ipcSock, ->rank, /*hash*/ handle->magic, /*abortFlag*/ NULL));
// // printf("fd=%d, socketName=%s\n", ipcSock.fd, ipcSock.socketName);
// return scclInProgress;
// }
// auto node_info = bootstrap_comm->node_info;
// // 设置节点内socket通信工具
// ipcsocket = new scclIpcSocket_t(node_info->localRank, node_info->nRanks, node_info->hostHash, bootstrap_comm->abortFlag);
} // namespace bootstrap
} // namespace topology
} // namespace hardware
......
......@@ -6,45 +6,60 @@
#include "bootstrap_utils.h"
#include "bootstrap_net.h"
#include "thread_pool.h"
#include "ipc_socket.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
class scclBootstrap {
typedef class sccl::hardware::net::ipc_socket::scclIpcSocket scclIpcSocket_t;
///////////////////////////////////// 用于初始化时的功能函数 //////////////////////////////////////////
scclResult_t bootstrapGetUniqueId(struct BootstrapHandle* handle);
scclResult_t bootstrapCreateRoot(struct BootstrapHandle* handle);
///////////////////////////////////// 用于初始化时的类 //////////////////////////////////////////
class Bootstrap {
public:
scclBootstrap(struct scclRankInfo* rank_info, struct scclBootstrapComm* comm);
~scclBootstrap();
Bootstrap(const struct BootstrapHandle*, int rank, int nRanks);
~Bootstrap();
// 初始化bootstrap通信环境
scclResult_t bootstrapInit(const struct scclRankInfo* rank_info, struct scclBootstrapComm* comm);
// 检查bootstrap是否已成功初始化
scclResult_t bootstrapInitCheck();
scclResult_t init(struct BootstrapComm* bootstrap_comm);
// 广播节点信息
scclResult_t bootstrapAllGather(struct scclUniqueInfo* unique_info);
scclResult_t bootstrapAllGather(struct scclNodeInfo*);
private:
// 执行基本的引导程序初始化
scclResult_t bootstrapBasicInit();
// 创建根节点的数据广播
scclResult_t bootstrapRootGatherAndBroadcast(void* send_data, void* recv_data);
// 初始化唯一ID信息结构体
scclResult_t bootstrapUniqueInfoInit(const struct scclRankInfo* rank_info, scclNet_t* scclNet, struct scclUniqueInfo* unique_info);
scclResult_t bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scclNodeInfo* node_info);
// scclResult_t bootstrapGetAllNodes(const struct scclUniqueInfo* unique_info, struct scclBootstrapComm* comm);
// scclResult_t bootstrapGetAllNodes(const struct scclNodeInfo* , struct BootstrapComm* comm);
private:
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
bool initialized = false;
bool hsaFineGrainFlag = true;
int rank, nRanks; // 初始化阶段获取MPI的值
int localRank, nLocalRanks; // 通过bootstrapRootGatherAndBroadcast函数确定值
volatile uint32_t* abortFlag; // 中止标志,非阻塞套接字设置
// 外部传入的0号节点的基础信息
const struct BootstrapHandle* root_handle;
// 分配并初始化引导句柄
struct scclBootstrapHandle* handle = nullptr;
// 分配并初始化网络结构体
class bootstrapNet* bootstrap_net = nullptr;
// 初始化标志
bool socketInitDone;
// 互斥锁,用于保护初始化过程的线程安全
pthread_mutex_t bootstrapMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t bootstrapCond = PTHREAD_COND_INITIALIZER;
int max_pthreads = 0;
class ThreadPool* pthread_pool = nullptr;
// 标志是否已经初始化
// 线程池变量
int max_pthreads = 0; // 用于存储最大并行线程数的整型变量
class ThreadPool* pthread_pool = nullptr; // 指向ThreadPool类实例的指针,初始值为nullptr
scclIpcSocket_t* ipcsocket = nullptr; // 指向scclIpcSocket类实例的指针,初始值为nullptr
scclSocket_t* my_listen_sock = nullptr; // 指向scclSocket类实例的指针,初始值为nullptr
};
} // namespace bootstrap
......
......@@ -15,18 +15,19 @@ namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
namespace bootstrapNet {
bootstrapNet::bootstrapNet(struct scclBootstrapComm* bootstrap_comm) {
auto unique_info = bootstrap_comm->unique_info;
// 设置节点内socket通信工具
ipcsocket = new scclIpcSocket_t(unique_info->localRank, unique_info->nRanks, unique_info->hostHash, bootstrap_comm->abortFlag);
}
/* Init functions */
// 用于存储网络接口名称的静态字符数组
static char bootstrapNetIfName[net::MAX_IF_NAME_SIZE + 1];
// 用于存储网络接口地址的静态结构体
static scclSocketAddress_t bootstrapNetIfAddr;
bootstrapNet::~bootstrapNet() {
if(ipcsocket) {
delete ipcsocket;
}
}
// 静态整型变量,用于指示网络初始化是否已完成(0表示未完成,非0表示已完成)
static int bootstrapNetInitDone = 0;
// 互斥锁,用于保护对上述静态变量的访问,确保线程安全
static pthread_mutex_t bootstrapNetLock = PTHREAD_MUTEX_INITIALIZER;
/**
* @brief 初始化引导网络
......@@ -43,7 +44,7 @@ bootstrapNet::~bootstrapNet() {
* - scclSystemError: 找不到匹配的网络接口
* - scclInternalError: 找不到可用的网络接口
*/
scclResult_t bootstrapNet::bootstrapNetInit() {
scclResult_t bootstrapNetInit() {
if(bootstrapNetInitDone == 0) {
pthread_mutex_lock(&bootstrapNetLock);
if(bootstrapNetInitDone == 0) {
......@@ -54,18 +55,18 @@ scclResult_t bootstrapNet::bootstrapNetInit() {
WARN("Invalid SCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return scclInvalidArgument;
}
if(net::net_socket::scclFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, MAX_IF_NAME_SIZE, 1) <= 0) {
if(net::net_socket::scclFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, net::MAX_IF_NAME_SIZE, 1) <= 0) {
WARN("NET/Socket : No usable listening interface found");
return scclSystemError;
}
} else {
int nIfs = net::net_socket::scclFindSocketInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, MAX_IF_NAME_SIZE, 1);
int nIfs = net::net_socket::scclFindSocketInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, net::MAX_IF_NAME_SIZE, 1);
if(nIfs <= 0) {
WARN("Bootstrap : no socket interface found");
return scclInternalError;
}
}
char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
char line[net::SOCKET_NAME_MAXLEN + net::MAX_IF_NAME_SIZE + 2];
sprintf(line, "%s:", bootstrapNetIfName);
net::net_socket::scclSocketToString(&bootstrapNetIfAddr, line + strlen(line));
INFO(SCCL_LOG_BOOTSTRAP, "Bootstrap : Using %s", line);
......@@ -76,6 +77,8 @@ scclResult_t bootstrapNet::bootstrapNetInit() {
return scclSuccess;
}
scclSocketAddress_t getLocalSocketAddr() { return bootstrapNetIfAddr; }
// Additional sync functions
/**
* 通过网络发送数据
......@@ -87,7 +90,7 @@ scclResult_t bootstrapNet::bootstrapNetInit() {
*
* @note 先发送数据大小(sizeof(int)),再发送实际数据
*/
scclResult_t bootstrapNet::bootstrapNetSend(scclSocket_t* sock, void* data, int size) {
scclResult_t bootstrapNetSend(scclSocket_t* sock, void* data, int size) {
SCCLCHECK(net::net_socket::scclSocketSend(sock, &size, sizeof(int)));
SCCLCHECK(net::net_socket::scclSocketSend(sock, data, size));
return scclSuccess;
......@@ -103,7 +106,7 @@ scclResult_t bootstrapNet::bootstrapNetSend(scclSocket_t* sock, void* data, int
*
* @note 如果接收到的数据大小超过缓冲区大小,会截断数据并返回scclInternalError
*/
scclResult_t bootstrapNet::bootstrapNetRecv(scclSocket_t* sock, void* data, int size) {
scclResult_t bootstrapNetRecv(scclSocket_t* sock, void* data, int size) {
int recvSize;
SCCLCHECK(net::net_socket::scclSocketRecv(sock, &recvSize, sizeof(int)));
if(recvSize > size) {
......@@ -114,6 +117,7 @@ scclResult_t bootstrapNet::bootstrapNetRecv(scclSocket_t* sock, void* data, int
return scclSuccess;
}
} // namespace bootstrapNet
} // namespace bootstrap
} // namespace topology
} // namespace hardware
......
......@@ -4,41 +4,24 @@
#include "base.h"
#include "socket.h"
#include "bootstrap_utils.h"
#include "ipc_socket.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
namespace bootstrapNet {
typedef class sccl::hardware::net::ipc_socket::scclIpcSocket scclIpcSocket_t;
// 初始化
scclResult_t bootstrapNetInit();
class bootstrapNet {
public:
// 构造函数
bootstrapNet(struct scclBootstrapComm* bootstrap_comm);
virtual ~bootstrapNet();
scclSocketAddress_t getLocalSocketAddr();
// 初始化
scclResult_t bootstrapNetInit();
// 通过socket发送数据
scclResult_t bootstrapNetSend(scclSocket_t* sock, void* data, int size);
// 通过socket接收数据
scclResult_t bootstrapNetRecv(scclSocket_t* sock, void* data, int size);
public:
/* Init functions */
char bootstrapNetIfName[MAX_IF_NAME_SIZE + 1];
scclSocketAddress_t bootstrapNetIfAddr;
private:
int bootstrapNetInitDone = 0;
pthread_mutex_t bootstrapNetLock = PTHREAD_MUTEX_INITIALIZER;
// 用于节点内socket通信
scclIpcSocket_t* ipcsocket = nullptr;
};
// 通过socket发送数据
scclResult_t bootstrapNetSend(scclSocket_t* sock, void* data, int size);
// 通过socket接收数据
scclResult_t bootstrapNetRecv(scclSocket_t* sock, void* data, int size);
} // namespace bootstrapNet
} // namespace bootstrap
} // namespace topology
} // namespace hardware
......
......@@ -7,6 +7,32 @@ namespace hardware {
namespace topology {
namespace bootstrap {
////////////////////////////// 结构体定义 //////////////////////////////
// 构造函数定义
scclNodeInfoSet::scclNodeInfoSet(int nRanks) : nUniqueInfos(nRanks) {
printf("scclNodeInfoSet 构造函数\n");
node_info_vec.reserve(nRanks); // 预留空间
printf("scclNodeInfoSet 预留空间\n");
}
void BootstrapComm::init(int rank, int nRanks, int localRank, int nLocalRanks) {
printf("BootstrapComm 构造函数, rank=%d\n", rank);
this->rank = rank;
this->nRanks = nRanks;
this->localRank = localRank;
this->nLocalRanks = nLocalRanks;
node_info_set = new scclNodeInfoSet(nRanks); // 假设需要动态分配
};
void BootstrapComm::destroy() {
printf("BootstrapComm 析构函数, rank=%d\n", rank);
if(node_info_set) {
delete node_info_set; // 释放动态分配的内存
}
}
//////////////////////////////////////////////////////////////////////////////////////////
/**
* 计算字符串的哈希值(基于DJB2a算法)
*
......@@ -165,25 +191,40 @@ scclResult_t getBusId(int hipDev, int64_t* busId) {
return scclSuccess;
}
// 函数:打印 scclUniqueInfo 结构体的信息
void printUniqueInfo(struct scclUniqueInfo* info) {
// 函数:打印 scclNodeInfo 结构体的信息
scclResult_t printNodeInfo(struct scclNodeInfo* info) {
char addrline[net::SOCKET_NAME_MAXLEN + 1];
if(info->localRank == 0) {
printf("\n==========================================\nTotal Rank: %d/%d, Local Rank: %d/%d, CUDA Device ID/Cnt: %d/%d, \n"
"Host Hash: %lu, PID Hash: %lu, gpu.name=%s, gcn=%s\n"
printf("==========================================\n"
"Total Rank: %d, Local Rank: %d, Host Hash: %lu, PID Hash: %lu\n"
"gpu: dev=%d, gpu.name=%s, gcn=%s, compCap=%d\n"
"net: count=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%d, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n"
"cpu: socketAddr=%s\n pci: busId=%ld\n"
"\n==========================================\n",
info->rank,
info->nRanks,
info->localRank,
info->localRanks,
info->hipDev,
info->deviceCnt,
info->hostHash,
info->pidHash,
info->localNode.gpu.dev,
info->localNode.gpu.name,
info->localNode.gpu.gcn);
SCCLCHECK(net::printNetProps(&info->localNode.net.props, info->rank, info->localRank));
info->localNode.gpu.gcn,
info->localNode.gpu.compCap,
info->localNode.net.count,
info->localNode.net.props.name,
info->localNode.net.props.pciPath,
info->localNode.net.props.guid,
info->localNode.net.props.ptrSupport,
info->localNode.net.props.speed,
info->localNode.net.props.port,
info->localNode.net.props.latency,
info->localNode.net.props.maxComms,
info->localNode.net.props.maxRecvs,
net::net_socket::scclSocketToString(&info->localNode.cpu.listen_sock.addr, addrline),
info->localNode.pci.busId);
}
return;
return scclSuccess;
}
} // namespace bootstrap
......
#pragma once
#include <string.h>
#include <cstddef>
#include <vector>
#include "base.h"
#include "topo_utils.h"
#include "comm.h"
......@@ -11,28 +13,43 @@ namespace hardware {
namespace topology {
namespace bootstrap {
typedef net::net_socket::scclSocketAddress scclSocketAddress_t;
typedef net::net_socket::scclSocket scclSocket_t;
typedef union net::net_socket::scclSocketAddress scclSocketAddress_t;
typedef struct net::net_socket::scclSocket scclSocket_t;
typedef net::scclNet_t scclNet_t;
// scclBootstrapHandle 结构体定义,用于存储引导句柄
struct scclBootstrapHandle {
uint64_t magic; // 魔术数,用于标识结构体的有效性
// 用于初始化时广播0号rank的地址信息
struct BootstrapHandle {
uint64_t magic = 0; // 随机码,用于socket通信
scclSocketAddress_t addr; // 地址,用于网络通信
};
// 定义硬件拓扑类型枚举
typedef enum {
GPU = 0, // 图形处理单元
PCI = 1, // 外围组件互连
XGMI = 2, // 非易失性存储器,NV卡中为nvlink
CPU = 3, // 中央处理器,实际上是NUMA域
NIC = 4, // 网络接口控制器
NET = 5 // 网络
} topoNodeType_t;
#define SCCL_UNIQUE_ID_BYTES (40) // sizeof(BootstrapHandle)
typedef struct {
char internal[SCCL_UNIQUE_ID_BYTES];
} scclUniqueId;
// 仅用于初始化的函数bootstrapCreateRoot,用于传递detach线程的参数
struct bootstrapRootArgs {
uint64_t magic;
scclSocket_t* listenSock = nullptr; // 根节点的监听
};
// 用于初始建立连接阶段,0号rank之外的进程向其传递的信息
struct BootstrapNodeBasic {
int rank;
int nRanks; // 进程的总数量
uint64_t hostHash; // 用于区分host的CPU编号
scclSocketAddress_t addr; // 各个进程的监听套接字地址,用于网络通信
};
// 定义每个rank所持有的所有拓扑节点
struct topoLocalNode {
struct {
scclSocket_t listen_sock; // 监听套接字
} cpu; // CPU节点
struct {
int64_t busId; // PCI总线ID以int64_t格式表示
} pci; // pci节点
struct {
int dev; // NVML设备编号
char name[8]; // 设备名称
......@@ -40,47 +57,48 @@ struct topoLocalNode {
int compCap; // CUDA计算能力
} gpu; // GPU节点
struct {
scclSocketAddress_t socketAddr; // 网络地址
} cpu; // CPU节点
struct {
int count; // 网卡数量
net::scclNetProperties_t props;
int count;
} net; // 网络节点
struct {
int64_t busId; // PCI总线ID以int64_t格式表示
} pci; // pci节点
};
// 定义结构体 scclUniqueInfo,用于存储每个rank的通信节点的信息
struct scclUniqueInfo {
// 定义结构体 scclNodeInfo,用于存储每个rank的通信节点的信息
struct scclNodeInfo {
struct topoLocalNode localNode;
int rank = -1; // 当前节点的全局排名
int localRank = -1; // 当前节点在本地计算节点中的排名
int rank; // 当前节点的全局排名
int nRanks; // 总的节点数量
int localRank; // 当前节点在本地计算节点中的排名
int localRanks; // 本地计算节点中的节点总数
int deviceCnt; // 设备数量
int hipDev; // CUDA 设备 ID
uint64_t hostHash; // 主机哈希值
uint64_t pidHash; // 进程 ID 哈希值
uint64_t hostHash = 0; // 主机哈希值
uint64_t pidHash = 0; // 进程 ID 哈希值
};
struct scclUniqueInfoSet {
int nUniqueInfo; // 通信节点的数量
std::vector<struct scclUniqueInfo*> unique_info_vec;
// 每个节点的信息
struct scclNodeInfoSet {
int nUniqueInfos; // 通信节点的数量
std::vector<struct scclNodeInfo> node_info_vec;
// 构造函数声明
scclNodeInfoSet(int nRanks);
};
// scclBootstrapComm 结构体定义,用于存储引导通信信息
struct scclBootstrapComm {
scclNet_t* scclNet;
// BootstrapComm 结构体定义,用于存储引导通信信息
struct BootstrapComm {
void init(int rank, int nRanks, int localRank, int nLocalRanks);
void destroy();
struct scclUniqueInfo* unique_info; // 每个通信节点的基础信息
public:
scclNet_t* scclNet;
struct scclNodeInfoSet* node_info_set;
cpu_set_t cpuAffinity; // CPU亲和性
int WarpSize;
void* bootstrap; // 引导信息
int rank = -1; // 当前节点的全局排名
int nRanks = 0; // 总的节点数量
int localRank = -1; // 当前节点在本地计算节点中的排名
int nLocalRanks = 0; // 本地计算节点中的节点总数
int hipDev = -1; // CUDA 设备 ID
int deviceCnt = 0; // 设备数量
// proxy通信
uint64_t magic; // 魔术数,用于验证结构体
volatile uint32_t* abortFlag; // 中止标志,非阻塞套接字设置
......@@ -107,7 +125,23 @@ scclResult_t getBusId(int hipDev, int64_t* busId);
int scclCudaCompCap(void);
// 打印唯一的拓扑信息
void printUniqueInfo(struct scclUniqueInfo* info);
scclResult_t printNodeInfo(struct scclNodeInfo* info);
// 实现类似于std::span的功能,将字节数组转换为类型数组
template <typename T>
class ByteSpan {
public:
ByteSpan(const char* data, std::size_t size) : data_(reinterpret_cast<const T*>(data)), size_(size / sizeof(T)) {}
const T* data() const { return data_; }
std::size_t size() const { return size_; }
const T& operator[](std::size_t index) const { return data_[index]; }
private:
const T* data_;
std::size_t size_;
};
} // namespace bootstrap
} // namespace topology
......
......@@ -112,8 +112,7 @@ scclResult_t rocm_smi_getLinkInfo(int srcIndex, int dstIndex, RSMI_IO_LINK_TYPE*
*count = 1;
if(*rsmi_type == RSMI_IOLINK_TYPE_XGMI && rsmi_weight == 15) {
*hops = 1;
// #if defined USE_ROCM_SMI64CONFIG && rocm_smi_VERSION_MAJOR >= 2
#if 1
// #if defined USE_ROCM_SMI64CONFIG && rocm_smi_VERSION_MAJOR >= 2
uint64_t min_bw = 0, max_bw = 0;
rsmi_version_t version;
ROCMSMICHECK(rsmi_version_get(&version));
......@@ -123,7 +122,6 @@ scclResult_t rocm_smi_getLinkInfo(int srcIndex, int dstIndex, RSMI_IO_LINK_TYPE*
*count = max_bw / min_bw;
INFO(SCCL_LOG_GRAPH, "rocm smi srcIndex:%d dstIndex:%d min_bw:%ld max_bw:%ld count:%d", srcIndex, dstIndex, min_bw, max_bw, *count);
#endif
}
return scclSuccess;
}
......
......@@ -7,6 +7,7 @@
#include "param.h"
#include "alloc.h"
#include "utils.h"
#include "asm_ops.h"
/*
外部环境变量设置:
......@@ -19,6 +20,7 @@ namespace sccl {
#define SCCL_MAX_NTHREADS 256
#define SCCL_MAX_OPS 2048
#define SCCL_STEPS 8
#define SCCL_LOCAL_MAX_NODES 8 // 每个节点内最多的卡数
typedef enum : uint8_t {
scclInt8 = 0,
......@@ -45,13 +47,4 @@ typedef enum : uint8_t {
SCCL_PROTO_SIMPLE = 2
} scclProtocolType_t;
// 每个进程的唯一ID
struct scclRankInfo {
int rank; // 当前节点的全局排名
int nRanks; // 总的节点数量
int localRank; // 当前节点的本地rank
int localRanks; // 当前节点的本地rank数量
int hipDev; // CUDA 设备 ID
};
} // namespace sccl
......@@ -74,24 +74,24 @@ static const char* scclGetErrorString(scclResult_t code) {
} \
} while(0);
#define SCCLCHECKGOTO(call, RES, label) \
do { \
RES = call; \
if(RES != scclSuccess && RES != scclInProgress) { \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d", __FILE__, __LINE__, RES); \
goto label; \
} \
#define SCCLCHECKGOTO(call, RES, label) \
do { \
RES = call; \
if(RES != scclSuccess && RES != scclInProgress) { \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d", __func__, __FILE__, __LINE__, RES); \
goto label; \
} \
} while(0);
#define HIPCHECK(cmd) \
do { \
hipError_t err = cmd; \
if(err != hipSuccess) { \
char hostname[1024]; \
gethostname(hostname, 1024); \
printf("%s: Test HIP failure %s:%d '%s'\n", hostname, __FILE__, __LINE__, hipGetErrorString(err)); \
return scclUnhandledHipError; \
} \
#define HIPCHECK(cmd) \
do { \
hipError_t err = cmd; \
if(err != hipSuccess) { \
char hostname[1024]; \
gethostname(hostname, 1024); \
INFO(SCCL_LOG_CODEALL, "%s: Test HIP failure %s:%d '%s'\n", hostname, hipGetErrorString(err)); \
return scclUnhandledHipError; \
} \
} while(0)
#define HIPCHECKGOTO(cmd, RES, label) \
......@@ -106,60 +106,60 @@ static const char* scclGetErrorString(scclResult_t code) {
////////////////////////////// Value检查 //////////////////////////////
#define EQCHECK(statement, value) \
do { \
if((statement) == value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
#define EQCHECK(statement, value) \
do { \
if((statement) == value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
} while(0);
#define EQCHECKGOTO(statement, value, RES, label) \
do { \
if((statement) == value) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
#define EQCHECKGOTO(statement, value, RES, label) \
do { \
if((statement) == value) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
} while(0);
#define NEQCHECK(statement, value) \
do { \
if((statement) != value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
#define NEQCHECK(statement, value) \
do { \
if((statement) != value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
} while(0);
#define NEQCHECKGOTO(statement, value, RES, label) \
do { \
if((statement) != value) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
#define NEQCHECKGOTO(statement, value, RES, label) \
do { \
if((statement) != value) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
} while(0);
#define LECHECK(statement, value) \
do { \
if((statement) <= value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
#define LECHECK(statement, value) \
do { \
if((statement) <= value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
} while(0);
#define LTCHECK(statement, value) \
do { \
if((statement) < value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
#define LTCHECK(statement, value) \
do { \
if((statement) < value) { \
/* Print the back trace*/ \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, scclSystemError, strerror(errno)); \
return scclSystemError; \
} \
} while(0);
////////////////////////////// SYS //////////////////////////////
......@@ -190,14 +190,14 @@ static const char* scclGetErrorString(scclResult_t code) {
} \
} while(true)
#define SYSCHECKGOTO(statement, RES, label) \
do { \
if((statement) == -1) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
#define SYSCHECKGOTO(statement, RES, label) \
do { \
if((statement) == -1) { \
/* Print the back trace*/ \
RES = scclSystemError; \
INFO(SCCL_LOG_CODEALL, "%s:%d -> %d (%s)", __func__, __FILE__, __LINE__, RES, strerror(errno)); \
goto label; \
} \
} while(0);
} // namespace sccl
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <algorithm> // For std::min/std::max
#include "sccl.h"
#ifdef PROFAPI
#define SCCL_API(ret, func, args...) \
__attribute__((visibility("default"))) __attribute__((alias(#func))) ret p##func(args); \
extern "C" __attribute__((visibility("default"))) __attribute__((weak)) ret func(args)
#else
#define SCCL_API(ret, func, args...) extern "C" __attribute__((visibility("default"))) ret func(args)
#endif // end PROFAPI
static __inline__ int scclTypeSize(scclDataType_t type) {
switch(type) {
case scclInt8:
case scclUint8: return 1;
case scclFloat16:
#if defined(RCCL_BFLOAT16)
case scclBfloat16:
#endif
return 2;
case scclInt32:
case scclUint32:
case scclFloat32: return 4;
case scclInt64:
case scclUint64:
case scclFloat64: return 8;
default: return -1;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment