Commit 379c4128 authored by lishen's avatar lishen
Browse files

完成各个rank中的节点全部信息的allgather

parent ecf8df33
......@@ -230,6 +230,7 @@ public:
/**
* @brief 管理接受连接的套接字的类,继承自scclSocketManager。
*
* 服务器端接受客户端的连接请求,创建一个新的套接字用于通信
* 该类在构造时初始化套接字,并接受来自监听套接字的连接。
*/
class scclSocketAcceptManager : public scclSocketManager {
......
......@@ -98,13 +98,13 @@ bool matchIfList(const char* string, int port, struct netIf* ifList, int listSiz
}
scclResult_t printNetProps(const scclNetProperties_t* props, int rank, int localRank) {
printf("rank=%d, localRank=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%d, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n",
printf("rank=%d, localRank=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%u, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n",
rank,
localRank,
props->name,
props->pciPath,
props->guid,
props->ptrSupport,
static_cast<unsigned int>(props->ptrSupport),
props->speed,
props->port,
props->latency,
......
......@@ -7,7 +7,7 @@ namespace sccl {
namespace hardware {
namespace net {
typedef enum {
typedef enum : uint8_t {
SCCL_PTR_HOST = 0x1,
SCCL_PTR_CUDA = 0x2,
SCCL_PTR_DMABUF = 0x4
......@@ -19,15 +19,15 @@ constexpr int SCCL_NET_HANDLE_MAXSIZE = 128;
////////////////////////////////// 用于定义网络设备 //////////////////////////////////
typedef struct {
char* name; // 主要用于日志记录。
char* pciPath; // PCI设备在/sys中的路径。
uint64_t guid; // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
int ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
int speed; // 端口速度,单位为Mbps。
int port; // 端口号。
float latency; // 网络延迟
int maxComms; // 我们可以创建的最大通信数量
int maxRecvs; // 最大分组接收数量。
char* name; // 主要用于日志记录。
char* pciPath; // PCI设备在/sys中的路径。
uint64_t guid; // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
uint8_t ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
int speed; // 端口速度,单位为Mbps。
int port; // 端口号。
float latency; // 网络延迟
int maxComms; // 我们可以创建的最大通信数量
int maxRecvs; // 最大分组接收数量。
} scclNetProperties_t;
/**
......
......@@ -15,6 +15,20 @@
namespace sccl {
namespace hardware {
namespace topology {
/**
* @brief 执行根节点的数据收集和广播操作
*
* 该函数负责以下操作:
* 1. 设置本地监听服务
* 2. 向根节点发送本节点的基本数据
* 3. 从根节点接收本地rank数量信息
* 4. 当本地rank为0时,从根节点接收所有rank的IP数据
* 5. 将收集到的所有rank数据广播给节点内其他rank
*
* @param send_data_basic 发送给根节点的节点基础数据
* @param recv_data_basic 接收广播数据的缓冲区向量
* @return scclResult_t 返回操作结果,成功返回scclSuccess
*/
namespace bootstrap {
////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -36,7 +50,7 @@ static scclResult_t basicInit() {
SCCLCHECK(bootstrapNet::bootstrapNetInit());
// initGdrCopy(); // 初始化GDR复制
// SCCLCHECK(scclNetPluginInit());
#if 0
char strValue[1024];
// 检查NUMA自动平衡是否启用
SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
......@@ -77,6 +91,7 @@ static scclResult_t basicInit() {
if(err != hipSuccess)
hsaFineGrainFlag = false;
}
#endif
// 设置初始化标志
asm_ops::st_release_sys_global(&initialized, true);
......@@ -143,16 +158,16 @@ static void* bootstrapRoot(void* rargs) {
scclResult_t res = scclSuccess; // 函数结果
class ThreadPool* pthread_pool = nullptr; // 用于根节点分发消息的线程池
int nRanks = 0; // nRanks: 进程总数;
int nLocalRanks = 1;
int c = 0; // c: 已连接的进程计数
uint64_t rootHostHash = 0;
struct BootstrapNodeBasic node_basic; // 用于存储扩展信息的结构体
int nRanks = 0; // nRanks: 进程总数;
int nLocalRanks = 1;
int c = 0; // c: 已连接的进程计数
uint64_t rootHostHash = 0;
struct BootstrapNodeBasic node_basic = {}; // 用于存储扩展信息的结构体
struct BootstrapNodeBasic* all_rank_node_basic = nullptr; // 所有进程的地址
// 定义一个函数或者一个函数对象,用于执行实际的发送数据操作
// 定义一个函数或者一个函数对象,用于执行实际的发送数据操作。在后面执行
auto send_task = [](BootstrapNodeBasic& node_basic, uint64_t magic, int rank, void* data, size_t size) {
net::net_socket::scclSocketClientManager client_manager(&node_basic.addr, magic, net::net_socket::scclSocketTypeBootstrap);
net::net_socket::scclSocketClientManager client_manager(&node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
bootstrapNet::bootstrapNetSend(client_manager.getSocket(), data, size);
};
......@@ -175,9 +190,9 @@ static void* bootstrapRoot(void* rargs) {
WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nRanks, node_basic.nRanks); // 警告
goto out; // 跳转到out标签
}
if(memcmp(zero, &all_rank_node_basic[node_basic.rank].addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到
WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks); // 警告
goto out; // 跳转到out标签
if(memcmp(zero, &all_rank_node_basic[node_basic.rank].sock.addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到
WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks); // 警告
goto out; // 跳转到out标签
}
// 保存该rank的连接句柄
......@@ -188,16 +203,6 @@ static void* bootstrapRoot(void* rargs) {
INFO(SCCL_LOG_BOOTSTRAP, "COLLECTED ALL %d HANDLES", nRanks); // 日志:收集到所有句柄
// --------------------- 2.计算nLocalRanks,并广播给其他所有rank --------------------- //
#if 1
for(int r = 0; r < nRanks; ++r) {
auto temp_node_basic = all_rank_node_basic[r];
char line[100];
sprintf(line, "bootstrapRoot r=%d, rank=%d/%d, hostHash=%lu,\n", r, temp_node_basic.rank, temp_node_basic.nRanks, temp_node_basic.hostHash);
scclSocketAddress_t temp_addr = temp_node_basic.addr;
hardware::net::printSocketAddr(&temp_addr, line);
}
#endif
// 首先计算nLocalRanks大小,即具有相同hostHash的节点数量
rootHostHash = all_rank_node_basic[0].hostHash;
for(int i = 1; i < nRanks; ++i) {
......@@ -207,7 +212,7 @@ static void* bootstrapRoot(void* rargs) {
break; // 一旦发现不同的hostHash,停止计数
}
}
// 给每个节点localRank=0的进程发送信息,并由其进行广播,从而加快速度
// 给每个节点发送localRank的值
for(int r = 0; r < nRanks; ++r) {
auto dst_node_basic = all_rank_node_basic[r];
// 使用std::bind将参数绑定到send_task函数
......@@ -225,7 +230,7 @@ static void* bootstrapRoot(void* rargs) {
for(int r = 0; r < nRanks / nLocalRanks; ++r) {
int dst_rank = r * nLocalRanks; // 计算目标rank
auto dst_node_basic = all_rank_node_basic[dst_rank];
net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.addr, magic, net::net_socket::scclSocketTypeBootstrap);
net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
bootstrapNet::bootstrapNetSend(client_manager.getSocket(), all_rank_node_basic, sizeof(struct BootstrapNodeBasic) * nRanks);
printf("root send nLocalRanks value to rank=%d\n", r);
}
......@@ -251,7 +256,7 @@ out:
free(rargs); // 释放rargs内存
INFO(SCCL_LOG_BOOTSTRAP, "DONE"); // 日志:完成
return NULL; // 返回NULL
return NULL;
}
/**
......@@ -291,16 +296,11 @@ scclResult_t bootstrapCreateRoot(struct BootstrapHandle* handle) {
// 构造函数
Bootstrap::Bootstrap(const struct BootstrapHandle* handle, int rank, int nRanks)
: root_handle(handle), rank(rank), nRanks(nRanks), localRank(-1), nLocalRanks(0), socketInitDone(false) {
printf("construct init Bootstrap\n");
// 初始化线程池
pthread_pool = new ThreadPool(nRanks);
printf("Bootstrap 构造函数\n");
}
Bootstrap::~Bootstrap() {
if(pthread_pool) {
delete pthread_pool;
}
printf("Bootstrap 析构函数\n");
if(ipcsocket) {
delete ipcsocket;
}
......@@ -316,16 +316,14 @@ scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
// -------------------------- 1.获取自身基础信息 ----------------------------------- //
SCCLCHECK(basicInit());
uint64_t hostHash = getHostHash();
scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
// 设置基础信息
struct BootstrapNodeBasic node_basic{rank, nRanks, hostHash, localSocketAddr};
struct BootstrapNodeBasic node_basic = {};
// -------------------------- 2.设置0号rank搜集的CPU信息和localRank信息 ----------------------------------- //
// 创建根节点的数据收集
std::vector<struct BootstrapNodeBasic> all_node_basic;
all_node_basic.reserve(nRanks);
SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic, all_node_basic.data()));
SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic, all_node_basic));
// -------------------------- 3.设置本地localRank的BootstrapComm信息 ----------------------------------- //
// 初始化BootstrapComm类
......@@ -344,8 +342,9 @@ scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
LTCHECK(devices_num, 0); // 检查设备数量是否 devices_num>0
LTCHECK(devices_num, nLocalRanks); // 检查设备数量是否 devices_num>nLocalRanks
bootstrap_comm->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给的deviceCnt
#if 0
printf("devices_num=%d\n", bootstrap_comm->deviceCnt);
#endif
LECHECK(devices_num, bootstrap_comm->hipDev); // 检查hipDev是否小于deviceCnt
HIPCHECK(hipSetDevice(bootstrap_comm->hipDev)); // 设置当前设备为hipDev
......@@ -353,7 +352,9 @@ scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
// 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB"
const char* envNetName = getenv("SCCL_NET_NAME");
char* netName = (envNetName != NULL) ? strdup(envNetName) : strdup("IB");
#if 0
printf("netName=%s\n", netName);
#endif
// 初始化网络和引导网络
SCCLCHECK(net::scclNetInit(netName, bootstrap_comm->scclNet));
// 释放分配的网络名称字符串
......@@ -361,14 +362,33 @@ scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
//////// 初始化唯一信息结构体 ////////
struct scclNodeInfo local_node_info;
local_node_info.hostHash = hostHash;
// 补充定义
local_node_info.hostHash = node_basic.hostHash;
SCCLCHECK(bootstrapCommInitNodeInfo(bootstrap_comm->scclNet, &local_node_info));
printNodeInfo(&local_node_info);
// 设置CPU信息
memcpy(&(local_node_info.localNode.cpu.listen_sock), &(node_basic.sock), sizeof(scclSocket_t));
#if 0
{
char line[20];
sprintf(line, "11111 print rank=%d", rank);
std::string prefix(line); // 创建prefix字符串
printNodeInfo(prefix, &local_node_info); // 正确的调用方式
}
#endif
// -------------------------- 4.BootstrapComm信息的allgather ----------------------------------- //
// bootstrapAllGather(bootstrap_comm->node_info);
bootstrapCommAllGather(all_node_basic, &local_node_info, bootstrap_comm->node_info_set);
if(1) {
char line[20];
sprintf(line, "print rank=%d", rank);
std::string prefix(line); // 创建prefix字符串
for(int r = 0; r < nRanks; r++) {
struct scclNodeInfo node_basic = bootstrap_comm->node_info_set->node_info_vec[r];
printNodeInfo(prefix, &node_basic); // 正确的调用方式
}
}
// 设置初始化标志
asm_ops::st_release_sys_global(&socketInitDone, true);
......@@ -393,39 +413,52 @@ scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
* @param recv_data 接收广播数据的缓冲区指针
* @return scclResult_t 返回操作结果,成功返回scclSuccess
*/
scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(void* send_data, void* recv_data) {
// 数据类型转换
auto send_data_basic = reinterpret_cast<struct BootstrapNodeBasic*>(send_data);
auto recv_data_basic = reinterpret_cast<struct BootstrapNodeBasic*>(recv_data);
int recv_data_basic_size = nRanks * sizeof(struct BootstrapNodeBasic);
scclSocketAddress_t root_addr = root_handle->addr;
scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(struct BootstrapNodeBasic* send_data_basic, std::vector<struct BootstrapNodeBasic>& recv_data_basic) {
// 总的需要广播的数据
int recv_data_basic_size = nRanks * sizeof(struct BootstrapNodeBasic);
scclSocketAddress_t root_addr = root_handle->addr;
scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
// ------------- 1.各个rank在发送给根节点数据之前,首先设置监听listen ------------- //
net::net_socket::scclSocketServerManager local_server_manager(&send_data_basic->addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
// 保存监听的sock信息到本类,用于后续proxy使用
my_listen_sock = local_server_manager.releaseSocket();
net::net_socket::scclSocketServerManager local_server_manager(&localSocketAddr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
// ------------- 2.各个节点向根节点发送数据 ------------- //
net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(struct BootstrapNodeBasic)));
// 设置基础数据
send_data_basic->rank = rank;
send_data_basic->nRanks = nRanks;
send_data_basic->hostHash = getHostHash();
scclSocket_t* local_server_sock = local_server_manager.releaseSocket();
send_data_basic->sock = *local_server_sock;
// ------------- 2.各个节点向根节点发送数据 ------------- //
{
net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(struct BootstrapNodeBasic)));
}
// ------------- 3.从根节点接收nLocalRanks值 ------------- //
// 接收nLocalRanks信息
{
net::net_socket::scclSocketAcceptManager accept_manager(my_listen_sock);
net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &nLocalRanks, sizeof(int)));
}
// 要求必须 nRanks%nLocalRanks == 0
NEQCHECK(nRanks % nLocalRanks, 0);
// ------------- 4.nLocalRanks==0时,从根节点接收所有rank的ip数据 ------------- //
this->localRank = rank % nLocalRanks;
this->localRank = rank % nLocalRanks;
this->interRank = rank / nLocalRanks;
this->nInterRanks = nRanks / nLocalRanks;
// 从根节点接收数据,对应到函数 bootstrapRoot
if(localRank == 0) {
net::net_socket::scclSocketAcceptManager accept_manager(my_listen_sock);
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data_basic, recv_data_basic_size));
net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data_basic.data(), recv_data_basic_size));
}
// ------------- 5.nLocalRanks==0时,将所有rank的ip数据广播给节点内其他rank ------------- //
ipcsocket = new scclIpcSocket_t(localRank, nLocalRanks, /*hash*/ root_handle->magic);
ipcsocket->scclIpcSocketBroadcast(recv_data_basic, recv_data_basic_size, /*localRank root*/ 0);
ipcsocket->scclIpcSocketBroadcast(recv_data_basic.data(), recv_data_basic_size, /*localRank root*/ 0);
return scclSuccess;
}
......@@ -456,9 +489,6 @@ scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scc
////////////////// 设置硬件信息 //////////////////
struct topoLocalNode* p_localNode = &node_info->localNode;
// 设置CPU信息
p_localNode->cpu.listen_sock = *my_listen_sock;
// 设置PCI信息
SCCLCHECK(getBusId(hipDev, &p_localNode->pci.busId));
......@@ -471,37 +501,95 @@ scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scc
p_localNode->gpu.compCap = deviceProp.major * 10 + deviceProp.minor;
// 设置RDMA信息
SCCLCHECK(scclNet->getProperties(hipDev, &p_localNode->net.props));
SCCLCHECK(scclNet->devices(&p_localNode->net.count));
net::scclNetProperties_t props;
SCCLCHECK(scclNet->getProperties(hipDev, &props));
SCCLCHECK(scclNet->devices(&p_localNode->net.count)); // 节点内网卡数量
snprintf(p_localNode->net.name, sizeof(p_localNode->net.name), "%s", props.name); // 主要用于日志记录。
snprintf(p_localNode->net.pciPath, sizeof(p_localNode->net.pciPath), "%s", props.pciPath); // PCI设备在/sys中的路径。
#if 0
printf("p_localNode->net.pciPath len=%zu\n", strlen(p_localNode->net.pciPath));
#endif
p_localNode->net.guid = props.guid; // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
p_localNode->net.ptrSupport = props.ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
p_localNode->net.speed = props.speed; // 端口速度,单位为Mbps。
p_localNode->net.port = props.port; // 端口号。
p_localNode->net.latency = props.latency; // 网络延迟
p_localNode->net.maxComms = props.maxComms; // 可以创建的最大通信数量
p_localNode->net.maxRecvs = props.maxRecvs; // 最大分组接收数量。
return scclSuccess;
}
scclResult_t Bootstrap::bootstrapAllGather(struct scclNodeInfo*) {
scclResult_t Bootstrap::bootstrapCommAllGather(std::vector<struct BootstrapNodeBasic>& all_node_basic,
struct scclNodeInfo* node_info,
struct scclNodeInfoSet* node_info_set) {
// 数据准备
size_t inter_data_len = nLocalRanks * sizeof(struct scclNodeInfo); // 节点间传输时每个子块的大小
auto all_recv_data = reinterpret_cast<char*>(node_info_set->node_info_vec.data());
// 1.节点内通信 allgather
auto local_recv_data = all_recv_data + this->interRank * inter_data_len;
ipcsocket->scclIpcSocketAllgatherSync(node_info, (void*)local_recv_data, sizeof(struct scclNodeInfo));
// 2.节点间通信,ring allgather
// TODO: 后续nRanks特别大的时候,可以进一步优化算法,减少通信次数
if(localRank == 0) {
int prev_interRank = (this->interRank - 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks;
int next_interRank = (this->interRank + 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks;
// scclSocket_t prev_rank_sock = all_node_basic[prev_interRank].sock;
scclSocket_t next_rank_sock = all_node_basic[next_interRank].sock;
scclSocket_t self_rank_sock = all_node_basic[rank].sock;
printf("bootstrap allgather 11: rank %d, prev_interRank=%d, next_interRank=%d\n", rank, prev_interRank, next_interRank);
// 对于prev,当前rank是客户端;对于next,当前rank是服务器端
// 客户端:用于发送数据
net::net_socket::scclSocketClientManager client_manager(&next_rank_sock.addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
// 服务器端:用于接收数据
net::net_socket::scclSocketAcceptManager accept_manager(&self_rank_sock);
/////////////////// 实现数据传输 ///////////////////
for(int r = 0; r < nInterRanks - 1; r++) {
int prev_rank = (this->interRank - r - 1 + nInterRanks) % nInterRanks;
int next_rank = (this->interRank - r + nInterRanks) % nInterRanks;
// 准备发送/接收的数据
auto send_data = all_recv_data + next_rank * inter_data_len;
auto recv_data = all_recv_data + prev_rank * inter_data_len;
#if 0
printf("bootstrapCommAllGather rank=%d, interRank=%d, r=%d, prev_rank=%d, next_rank=%d, inter_data_len=%zu\n",
this->rank,
this->interRank,
r,
prev_rank,
next_rank,
inter_data_len);
#endif
// 3.节点内通信 allgather
// 发送/接收数据
SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data, inter_data_len));
SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data, inter_data_len));
}
}
#if 0
printf("222222\n");
if(rank == 0) {
char line[20];
sprintf(line, "print rank=%d", rank);
std::string prefix(line); // 创建prefix字符串
for(int r = 0; r < nRanks; r++) {
struct scclNodeInfo node_basic = node_info_set->node_info_vec[r];
printNodeInfo(prefix, &node_basic); // 正确的调用方式
}
}
#endif
// 3.节点内通信 broadcast
ipcsocket->scclIpcSocketBroadcast(all_recv_data, nRanks * sizeof(struct scclNodeInfo), 0);
return scclSuccess;
}
/////////////////////////////////////////////////////////////////////////////////////////////
// // 将本地socket地址写入到/tmp/文件夹的文件中,通过nfs共享存储,其他rank可见
// scclResult_t bootstrapGetAllNodes(const struct scclNodeInfo* , struct BootstrapComm* comm) {
// // // 分配并初始化IPC套接字
// // struct scclIpcSocket ipcSock = {0};
// // // Create a UDS socket to receive the converted fd
// // SCCLCHECK(scclIpcSocketInit(&ipcSock, ->rank, /*hash*/ handle->magic, /*abortFlag*/ NULL));
// // printf("fd=%d, socketName=%s\n", ipcSock.fd, ipcSock.socketName);
// return scclInProgress;
// }
// auto node_info = bootstrap_comm->node_info;
// // 设置节点内socket通信工具
// ipcsocket = new scclIpcSocket_t(node_info->localRank, node_info->nRanks, node_info->hostHash, bootstrap_comm->abortFlag);
} // namespace bootstrap
} // namespace topology
} // namespace hardware
......
......@@ -28,21 +28,23 @@ public:
// 初始化bootstrap通信环境
scclResult_t init(struct BootstrapComm* bootstrap_comm);
// 广播节点信息
scclResult_t bootstrapAllGather(struct scclNodeInfo*);
private:
// 创建根节点的数据广播
scclResult_t bootstrapRootGatherAndBroadcast(void* send_data, void* recv_data);
scclResult_t bootstrapRootGatherAndBroadcast(struct BootstrapNodeBasic* send_data_basic, std::vector<struct BootstrapNodeBasic>& recv_data_basic);
// 初始化唯一ID信息结构体
scclResult_t bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scclNodeInfo* node_info);
// scclResult_t bootstrapGetAllNodes(const struct scclNodeInfo* , struct BootstrapComm* comm);
// 广播节点信息
scclResult_t
bootstrapCommAllGather(std::vector<struct BootstrapNodeBasic>& all_node_basic, struct scclNodeInfo* node_info, struct scclNodeInfoSet* node_info_set);
private:
int rank, nRanks; // 初始化阶段获取MPI的值
int localRank, nLocalRanks; // 通过bootstrapRootGatherAndBroadcast函数确定值
int rank, nRanks; // 初始化阶段获取MPI的值
int localRank, nLocalRanks; // 通过bootstrapRootGatherAndBroadcast函数确定值
int interRank, nInterRanks; // 整个节点在全部节点中的位置
// TODO: 用于控制套接字终端的变量,目前不知道在哪里使用
volatile uint32_t* abortFlag; // 中止标志,非阻塞套接字设置
// 外部传入的0号节点的基础信息
......@@ -54,12 +56,8 @@ private:
pthread_mutex_t bootstrapMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t bootstrapCond = PTHREAD_COND_INITIALIZER;
// 标志是否已经初始化
// 线程池变量
int max_pthreads = 0; // 用于存储最大并行线程数的整型变量
class ThreadPool* pthread_pool = nullptr; // 指向ThreadPool类实例的指针,初始值为nullptr
scclIpcSocket_t* ipcsocket = nullptr; // 指向scclIpcSocket类实例的指针,初始值为nullptr
scclSocket_t* my_listen_sock = nullptr; // 指向scclSocket类实例的指针,初始值为nullptr
// 节点内通信的类
scclIpcSocket_t* ipcsocket = nullptr; // 指向scclIpcSocket类实例的指针,初始值为nullptr
};
} // namespace bootstrap
......
......@@ -9,7 +9,7 @@ namespace bootstrap {
////////////////////////////// 结构体定义 //////////////////////////////
// 构造函数定义
scclNodeInfoSet::scclNodeInfoSet(int nRanks) : nUniqueInfos(nRanks) {
scclNodeInfoSet::scclNodeInfoSet(int nRanks) {
printf("scclNodeInfoSet 构造函数\n");
node_info_vec.reserve(nRanks); // 预留空间
printf("scclNodeInfoSet 预留空间\n");
......@@ -21,6 +21,8 @@ void BootstrapComm::init(int rank, int nRanks, int localRank, int nLocalRanks) {
this->nRanks = nRanks;
this->localRank = localRank;
this->nLocalRanks = nLocalRanks;
this->interRank = rank / nLocalRanks;
this->nInterRanks = nRanks / nLocalRanks;
node_info_set = new scclNodeInfoSet(nRanks); // 假设需要动态分配
};
......@@ -192,16 +194,18 @@ scclResult_t getBusId(int hipDev, int64_t* busId) {
}
// 函数:打印 scclNodeInfo 结构体的信息
scclResult_t printNodeInfo(struct scclNodeInfo* info) {
scclResult_t printNodeInfo(const std::string& prefix, struct scclNodeInfo* info) {
char addrline[net::SOCKET_NAME_MAXLEN + 1];
if(info->localRank == 0) {
// if(info->localRank == 0) {
if(1) {
printf("==========================================\n"
"Total Rank: %d, Local Rank: %d, Host Hash: %lu, PID Hash: %lu\n"
"%s, Total Rank: %d, Local Rank: %d, Host Hash: %lu, PID Hash: %lu\n"
"gpu: dev=%d, gpu.name=%s, gcn=%s, compCap=%d\n"
"net: count=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%d, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n"
"cpu: socketAddr=%s\n pci: busId=%ld\n"
"net: count=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%u, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n"
"cpu: socketAddr=%s\npci: busId=%ld"
"\n==========================================\n",
prefix.c_str(),
info->rank,
info->localRank,
info->hostHash,
......@@ -211,15 +215,15 @@ scclResult_t printNodeInfo(struct scclNodeInfo* info) {
info->localNode.gpu.gcn,
info->localNode.gpu.compCap,
info->localNode.net.count,
info->localNode.net.props.name,
info->localNode.net.props.pciPath,
info->localNode.net.props.guid,
info->localNode.net.props.ptrSupport,
info->localNode.net.props.speed,
info->localNode.net.props.port,
info->localNode.net.props.latency,
info->localNode.net.props.maxComms,
info->localNode.net.props.maxRecvs,
info->localNode.net.name,
info->localNode.net.pciPath,
info->localNode.net.guid,
static_cast<unsigned int>(info->localNode.net.ptrSupport),
info->localNode.net.speed,
info->localNode.net.port,
info->localNode.net.latency,
info->localNode.net.maxComms,
info->localNode.net.maxRecvs,
net::net_socket::scclSocketToString(&info->localNode.cpu.listen_sock.addr, addrline),
info->localNode.pci.busId);
}
......
......@@ -37,9 +37,9 @@ struct bootstrapRootArgs {
// 用于初始建立连接阶段,0号rank之外的进程向其传递的信息
struct BootstrapNodeBasic {
int rank;
int nRanks; // 进程的总数量
uint64_t hostHash; // 用于区分host的CPU编号
scclSocketAddress_t addr; // 各个进程的监听套接字地址,用于网络通信
int nRanks; // 进程的总数量
uint64_t hostHash; // 用于区分host的CPU编号
scclSocket_t sock; // 各个进程的监听套接字地址,用于网络通信
};
// 定义每个rank所持有的所有拓扑节点
......@@ -53,13 +53,21 @@ struct topoLocalNode {
struct {
int dev; // NVML设备编号
char name[8]; // 设备名称
char gcn[7]; // GCN架构名称
char gcn[8]; // GCN架构名称
int compCap; // CUDA计算能力
} gpu; // GPU节点
struct {
int count; // 网卡数量
net::scclNetProperties_t props;
} net; // 网络节点
int count; // 网卡数量
char name[8]; // 主要用于日志记录。
char pciPath[128]; // PCI设备在/sys中的路径。
uint64_t guid; // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
uint8_t ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
int speed; // 端口速度,单位为Mbps。
int port; // 端口号。
float latency; // 网络延迟
int maxComms; // 可以创建的最大通信数量
int maxRecvs; // 最大分组接收数量。
} net; // 网络节点
};
// 定义结构体 scclNodeInfo,用于存储每个rank的通信节点的信息
......@@ -74,7 +82,6 @@ struct scclNodeInfo {
// 每个节点的信息
struct scclNodeInfoSet {
int nUniqueInfos; // 通信节点的数量
std::vector<struct scclNodeInfo> node_info_vec;
// 构造函数声明
......@@ -95,8 +102,11 @@ public:
int nRanks = 0; // 总的节点数量
int localRank = -1; // 当前节点在本地计算节点中的排名
int nLocalRanks = 0; // 本地计算节点中的节点总数
int hipDev = -1; // CUDA 设备 ID
int deviceCnt = 0; // 设备数量
int interRank = -1; // 整个节点在全部节点中的位置
int nInterRanks = 0; // 全局拥有节点的个数
int hipDev = -1; // CUDA 设备 ID
int deviceCnt = 0; // 设备数量
// proxy通信
uint64_t magic; // 魔术数,用于验证结构体
......@@ -125,7 +135,7 @@ scclResult_t getBusId(int hipDev, int64_t* busId);
int scclCudaCompCap(void);
// 打印唯一的拓扑信息
scclResult_t printNodeInfo(struct scclNodeInfo* info);
scclResult_t printNodeInfo(const std::string& prefix, struct scclNodeInfo* info);
// 实现类似于std::span的功能,将字节数组转换为类型数组
template <typename T>
......
......@@ -108,6 +108,12 @@ bool ThreadPool::allTasksCompleted() {
return completed;
}
/**
* @brief 获取线程池的容量大小
* @return 返回线程池当前的最大工作线程数
*/
int ThreadPool::getThreadPoolSize() { return workers.capacity(); }
/**
* 设置指定线程的CPU亲和性,将其绑定到指定的核心上
* @param thread 需要设置亲和性的线程
......
......@@ -35,6 +35,8 @@ public:
// 检查是否所有任务都已完成
bool allTasksCompleted();
// 获取线程池中工作线程的数量
int getThreadPoolSize();
private:
std::vector<pthread_t> workers; // 工作线程列表
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment