Commit 708aae12 authored by lishen's avatar lishen
Browse files

修改topo node的生成方式,优化path计算,生成transport_map

parent 571a75b5
......@@ -12,7 +12,11 @@ namespace sccl {
namespace hardware {
// 全局变量,全部节点的信息
sccl::hardware::topology::bootstrap::BootstrapComm_t bootstrap_comm;
typedef sccl::hardware::topology::bootstrap::BootstrapComm_t BootstrapComm_t;
typedef sccl::hardware::topology::graph::scclTopoGraph_t scclTopoGraph_t;
BootstrapComm_t* bootstrap_comm;
scclTopoGraph_t* topo_graph;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -30,19 +34,28 @@ scclResult_t sccl_init(const scclUniqueId* unique_id, int rank, int nRanks) {
// -------------------------- 2.初始化获取所有节点的node信息 ----------------------------------- //
auto sccl_bootstrap = std::make_unique<topology::bootstrap::Bootstrap>(root_handle, rank, nRanks);
SCCLCHECK(sccl_bootstrap->init(&bootstrap_comm));
bootstrap_comm = new BootstrapComm_t();
SCCLCHECK(sccl_bootstrap->init(bootstrap_comm));
printf("init pos 1\n");
// -------------------------- 3.MPI 建图 ----------------------------------- //
topo_graph = new scclTopoGraph_t(nRanks);
auto sccl_graph = std::make_unique<topology::graph::Graph>(rank, nRanks);
printf("init pos 2\n");
// 计算通信路径
sccl_graph->calculateCommunicationPaths(&bootstrap_comm);
SCCLCHECK(sccl_graph->calculateCommunicationPaths(bootstrap_comm, topo_graph, sccl_bootstrap.get()));
printf("init pos 3\n");
// // -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// -------------------------- 5.根据各个节点的基础信息计算topo结果 ----------------------------------- //
// // 后续放入到sccl_finalize中
// delete bootstrap_comm;
// delete topo_graph;
return scclSuccess;
}
......
......@@ -69,7 +69,7 @@ scclResult_t scclIpcSocket::scclIpcSocketInit(volatile uint32_t* abortFlag) {
// af是本机IP地址类型,一般有PF_INET或者AF_INET(IPv4互联网协议族),还有PF_INET6(IPv6互联网协议族)等,但是一般用IPv4。
// type有两种SOCK_STREAM 和SOCK_DGRAM分别对应tcp和udp协议,区别是用不用建立连接。
if((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) < 0) {
WARN("UDS: Socket creation error : %d", errno);
WARN("UDS: Socket creation error : %d (%s)", errno, strerror(errno));
return scclSystemError;
}
......@@ -88,7 +88,7 @@ scclResult_t scclIpcSocket::scclIpcSocketInit(volatile uint32_t* abortFlag) {
// 绑定套接字
if(bind(fd, (struct sockaddr*)&my_cliaddr, sizeof(my_cliaddr)) < 0) {
WARN("UDS: Binding to socket %s failed : %d", temp_addr, errno);
WARN("UDS: Binding to socket %s failed : %d (%s)", temp_addr, errno, strerror(errno));
close(fd);
return scclSystemError;
}
......@@ -224,7 +224,7 @@ scclResult_t scclIpcSocket::scclIpcSocketSendFd(const int sendFd, int dst_rank)
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
// 如果发送失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Sending data over socket %s failed : %d", temp_addr, errno);
WARN("UDS: Sending data over socket %s failed : %d (%s)", temp_addr, errno, strerror(errno));
return scclSystemError;
}
// 如果设置了中止标志,则返回内部错误
......@@ -281,7 +281,7 @@ scclResult_t scclIpcSocket::scclIpcSocketRecvFd(int* recvFd) {
while((ret = recvmsg(handle->fd, &msg, 0)) <= 0) {
// 如果接收失败且错误不是EAGAIN, EWOULDBLOCK或EINTR,则记录警告并返回错误
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Receiving data over socket failed : %d", errno);
WARN("UDS: Receiving data over socket failed : %d (%s)", errno, strerror(errno));
return scclSystemError;
}
// 如果设置了中止标志,则返回内部错误
......@@ -690,14 +690,14 @@ scclResult_t scclIpcSocket::scclIpcSocketSendDataBasic(const void* data, size_t
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
WARN("UDS: Error occurred while polling socket %s for writability : %d (%s)", temp_addr, errno, strerror(errno));
}
return scclSystemError;
}
ssize_t sendResult;
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno);
WARN("UDS: Error occurred while sending data through socket %s : %d (%s)", temp_addr, errno, strerror(errno));
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag) {
......@@ -708,7 +708,7 @@ scclResult_t scclIpcSocket::scclIpcSocketSendDataBasic(const void* data, size_t
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
WARN("UDS: Error occurred while polling socket %s for writability : %d (%s)", temp_addr, errno, strerror(errno));
}
return scclSystemError;
}
......@@ -840,14 +840,14 @@ scclResult_t scclIpcSocket::scclIpcSocketSendDataAndRank(const void* data, size_
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
WARN("UDS: Error occurred while polling socket %s for writability : %d (%s)", temp_addr, errno, strerror(errno));
}
return scclSystemError;
}
ssize_t sendResult;
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno);
WARN("UDS: Error occurred while sending data through socket %s : %d (%s)", temp_addr, errno, strerror(errno));
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag) {
......@@ -859,7 +859,7 @@ scclResult_t scclIpcSocket::scclIpcSocketSendDataAndRank(const void* data, size_
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
WARN("UDS: Error occurred while polling socket %s for writability : %d (%s)", temp_addr, errno, strerror(errno));
}
return scclSystemError;
}
......
......@@ -79,6 +79,9 @@ private:
// 初始化IPC套接字
scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
scclResult_t getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len);
// TODO: 当前数据发送和接收采用的时unix域的UDP协议,非连接方式,将来根据需要改为TCP连接方式
// 通过Unix域套接字发送/接收数据到指定目标,不加锁执行
scclResult_t scclIpcSocketSendDataBasic(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvDataBasic(void* buffer, size_t bufferLen, size_t* receivedLen);
......
......@@ -120,7 +120,7 @@ public:
class Bootstrap {
public:
Bootstrap(const BootstrapHandle_t*, int rank, int nRanks);
~Bootstrap();
virtual ~Bootstrap();
// 初始化bootstrap通信环境
scclResult_t init(BootstrapComm_t* bootstrap_comm);
......
......@@ -444,13 +444,22 @@ void scclTopoNode::printNodeInfo(const char* prefix, bool printNeighbor) {
//////////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: 当前根据千卡节点的rccl的xml进行特殊处理,不确定是否具有普适性
/**
* @brief 根据给定的PCI路径生成拓扑节点
* @brief 根据给定的PCI路径生成拓扑节点,核心代码
*
* 该函数通过解析给定的PCI路径,生成相应的拓扑节点,并将它们添加到节点向量中。
* 1.检查路径下存在NUMA节点,首先创建NUMA节点。
* 2.根据rccl的xml进行特殊处理:
* - 对于GPU设备,整条path修改其numaId为映射关系的值,不修改hipDev值
* 3.遍历PCI路径的各个部分,为每个部分创建相应的拓扑节点,并根据需要添加节点间的连接关系。
* 创建拓扑节点时,需要注意以下几点:
* 1.对于每个拓扑节点,需要其id包括:
* - interRank(节点间的编号)
* - hipDev(设备编号)
* - devValue(设备编码)
* - terminalType(连接终端的类型)
* - numaId(NUMA节点编号)
* 2.对于PCI路径中的每个部分,需要注意:
* - 对于GPU设备,整条path修改其numaId为映射关系的值,不修改hipDev值。GPU设备本身的numaId值不修改。
* - 对于NIC设备,整条path的pci type的node,其hipDev为-1(8bit表示为255),不修改numaId值。
* 3.建立初步连接过程中:
* - 检查路径下存在NUMA节点,首先创建NUMA节点。
* - 检查id相同且type相同,则合并节点,否则创建新节点。
*
* @param pciPath PCI设备路径
* @param interRank 跨节点的排名
......@@ -464,19 +473,26 @@ scclResult_t generate_topo_nodes(const char* pciPath, int interRank, int hipDev,
return scclInternalError;
}
// TODO: 根据rccl的xml文件修改的代码,后续查看有没有更好的方法
int terminalNumaId = -1;
{
char numaIdStr[numaIdStrLen];
scclTopoGetStrFromSys(pciPath, "numa_node", numaIdStr);
terminalNumaId = static_cast<int>(strtoul(numaIdStr, nullptr, 10));
}
// TODO: 根据rccl的xml文件修改的代码,后续查看有没有更好的方法
int adjustedNumaId = -1;
int terminalType = getDeviceTypeFromPciPath(pciPath);
{ // 如果node的type类型为GPU,则整条path修改numaId
char numaIdStr[numaIdStrLen];
getNumaIdStr(pciPath, hipDev, terminalType, numaIdStr);
terminalNumaId = static_cast<int>(strtoul(numaIdStr, nullptr, 10));
adjustedNumaId = static_cast<int>(strtoul(numaIdStr, nullptr, 10));
}
// 创建numa图点
scclTopoNode_t numaNode(terminalNumaId, interRank);
scclTopoNode_t numaNode(adjustedNumaId, interRank);
if(numaNode.type <= 0) {
WARN("Cannot find correct numa node:%d from pciPath:%s", pciPath, terminalNumaId);
WARN("Cannot find correct numa node:%d from pciPath:%s", pciPath, adjustedNumaId);
return scclInternalError;
}
......@@ -490,7 +506,7 @@ scclResult_t generate_topo_nodes(const char* pciPath, int interRank, int hipDev,
numaIt = nodes[nodes.size() - 1]; // 更新numaIt指针
} else {
// 处理容量不足的情况
WARN("Cannot add numa node:%d from pciPath:%s, nodes full!", pciPath, terminalNumaId);
WARN("Cannot add numa node:%d from pciPath:%s, nodes full!", pciPath, adjustedNumaId);
return scclInternalError;
}
}
......@@ -503,9 +519,23 @@ scclResult_t generate_topo_nodes(const char* pciPath, int interRank, int hipDev,
while(std::getline(gpuPathStream, segment, '/')) {
if(!segment.empty()) {
std::string currentPath = parentPath + segment + "/";
#if 0
printf("currentPath:%s, interRank:%d, terminalType:%d, hipDev:%d, adjustedNumaId:%d, terminalNumaId:%d\n",
currentPath.c_str(),
interRank,
terminalType,
hipDev,
adjustedNumaId,
terminalNumaId);
#endif
//// 1.创建拓扑图点
scclTopoNode_t node(currentPath.c_str(), interRank, terminalType, hipDev, terminalNumaId);
// 仅当到达终端node时,终端设备的numaId为其pci路径下读取的值
int nodeNumaId = adjustedNumaId; // 新创建一个变量,不影响后面的给parent和当前node添加neighbor部分
if(terminalType == GPU && currentPath.length() > strlen(pciPath) && strncmp(currentPath.c_str(), pciPath, strlen(pciPath)) == 0) {
nodeNumaId = terminalNumaId;
}
scclTopoNode_t node(currentPath.c_str(), interRank, terminalType, hipDev, nodeNumaId);
// 检查node是否是有效的,即不是空的
if(node.id == 0) {
parentPath = currentPath;
continue;
......@@ -530,7 +560,7 @@ scclResult_t generate_topo_nodes(const char* pciPath, int interRank, int hipDev,
//// 3.给parent和当前node添加neighbor
// 如果存在device文件,则尝试添加连接关系
if(isPciDevice(std::string(parentPath))) {
uint64_t parentId = getIdFromPciPath(parentPath.c_str(), interRank, terminalType, hipDev, terminalNumaId);
uint64_t parentId = getIdFromPciPath(parentPath.c_str(), interRank, terminalType, hipDev, adjustedNumaId);
// 如果parentId和node.id不相等,则添加双向边。否则不添加边
if(parentId != node.id) {
auto parentIt = findNodeById(parentId, nodes);
......@@ -561,6 +591,8 @@ scclResult_t generate_topo_nodes(const char* pciPath, int interRank, int hipDev,
* 它访问 /sys/devices/system/node/nodeX 目录下的cpumap文件,其中X是NUMA节点ID。
* affinity,例如 "00000000,00000000,ffff0000,00000000"
*
* 当函数返回时,affinityStr的生命周期结束,但它所包含的数据被复制到了一个新的std::string对象中。栈上分配的,会自动释放
*
* @param numaId NUMA节点的ID,用于构建系统文件路径
* @return std::string 返回表示CPU映射的字符串,格式为系统提供的cpumap内容
*
......@@ -576,7 +608,7 @@ std::string generate_topo_node_numa_info(int numaId) {
return std::string(affinityStr);
}
// 函数输出id分解后的所有数据。与函数getIdFromPciPathWithNuma相对应
// 函数输出id分解后的所有数据。与函数 getIdFromPciPathWithNuma 相对应
void getIdComponents(uint64_t idToDecompose, int* interRank, int* deviceValue, int* terminalType, int* hipDev, int* numaId) {
if(interRank) {
*interRank = static_cast<int>((idToDecompose >> 40) & 0xFFFFFF); // 提取interRank
......
#include <iostream>
#include "base.h"
#include "graph.h"
#include "paths.h"
......@@ -7,7 +8,7 @@ namespace hardware {
namespace topology {
namespace graph {
Graph::Graph(int rank, int nRanks) {
Graph::Graph(int rank, int nRanks) : rank(rank), nRanks(nRanks) {
// 构造函数的实现
}
......@@ -15,12 +16,26 @@ Graph::~Graph() {
// 析构函数的实现
}
scclResult_t Graph::calculateCommunicationPaths(const BootstrapComm_t* bootstrap_comm) {
scclResult_t Graph::calculateCommunicationPaths(const BootstrapComm_t* bootstrap_comm, scclTopoGraph_t* topo_graph, Bootstrap* sccl_bootstrap) {
// 通信路径计算的实现
std::cout << "Calculating communication paths..." << std::endl;
// 具体的实现细节
// 调用pathFinder类,实现硬件路径搜索
auto path_finder = PathFinder(bootstrap_comm);
path_finder.findGpuPaths();
printf("calculateCommunicationPaths pos 1\n");
// 将搜索结果写入topo_graph中,并记录有效node
SCCLCHECK(path_finder.computeTopoGpuP2pMap(topo_graph));
printf("calculateCommunicationPaths pos 2\n");
// 调用bootstrap类,将transport_map进行allgather统计
uint8_t* local_transport_map = topo_graph->getTransportMapRowStart(rank);
SCCLCHECK(sccl_bootstrap->bootstrapAllGather(local_transport_map, topo_graph->transport_map.data(), nRanks * sizeof(uint8_t)));
printf("calculateCommunicationPaths pos 3\n");
// 打印transport_map
if(bootstrap_comm->rank == 0) {
SCCLCHECK(topo_graph->printTransportMap());
}
return scclSuccess;
}
......
......@@ -15,7 +15,7 @@ public:
virtual ~Graph();
// 通信路径计算
scclResult_t calculateCommunicationPaths(const BootstrapComm_t* bootstrap_comm);
scclResult_t calculateCommunicationPaths(const BootstrapComm_t* bootstrap_comm, scclTopoGraph_t* topo_graph, Bootstrap* sccl_bootstrap);
// 逻辑拓扑构建
scclResult_t buildLogicalTopology();
......@@ -26,6 +26,8 @@ public:
private:
std::vector<std::vector<int>> adjacencyMatrix; // 使用邻接矩阵表示图
// 你可以根据需要添加更多的私有成员变量和函数
int rank, nRanks;
};
} // namespace graph
......
#include <string.h>
#include "graph_utils.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace graph {
///
} // namespace graph
} // namespace topology
} // namespace hardware
} // namespace sccl
#pragma once
#include <string.h>
#include "base.h"
#include "bootstrap.h"
#include "graph_utils.h"
namespace sccl {
namespace hardware {
......@@ -11,6 +11,78 @@ namespace graph {
typedef bootstrap::physical_links::scclTopoNode_t scclTopoNode_t;
typedef bootstrap::scclNodeInfo_t scclNodeInfo_t;
typedef bootstrap::BootstrapComm_t BootstrapComm_t;
typedef topology::bootstrap::Bootstrap Bootstrap;
// 定义 topoPathType_t 枚举类型,用于表示不同的路径类型。
typedef enum topoPathType {
PATH_LOC = 0, // 本地路径
PATH_NVL = 1, // 通过 NVLink 连接
PATH_NVB = 2, // 通过中间 GPU 使用 NVLink 连接
PATH_PIX = 3, // 通过最多一个 PCIe 桥连接
PATH_PXB = 4, // 通过多个 PCIe 桥连接(不经过 PCIe 主桥)
PATH_PXN = 5, // GPU 和 NIC 之间通过中间 GPU 连接
PATH_PHB = 6, // 通过 PCIe 以及 PCIe 主桥连接
PATH_SYS = 7, // 通过 PCIe 以及 NUMA 节点之间的 SMP 互连连接
PATH_NET = 8, // 通过网络连接
PATH_DIS = 9 // 断开连接
} topoPathType_t;
// GPU 连接其他GPU硬件的直连类型
typedef enum LinkType : uint8_t {
LINK_NONE = 0, // 本地路径
LINK_LOC = 1, // 本地路径
LINK_NVL = 2, // 通过 NVLink 连接
LINK_PIX = 3, // 通过 PCIe 桥连接
LINK_PXN = 4, // GPU 和 GPU 之间通过中间 NIC 连接,包括 PCIe 主桥
LINK_NET = 5 // 通过网络连接
} LinkType_t;
typedef struct scclTopoGraph {
scclTopoGraph() = delete; // 删除默认构造函数
scclTopoGraph(int nRanks) : nRanks(nRanks), transport_map(nullptr, 0) {
// 分配transport_map的内存
uint8_t* raw_transport_map = static_cast<uint8_t*>(calloc(nRanks * nRanks, sizeof(uint8_t)));
if(raw_transport_map == nullptr) {
// 处理内存分配失败的情况
throw std::bad_alloc();
}
// 使用ByteSpanArray初始化transport_map
transport_map = ByteSpanArray<uint8_t>(raw_transport_map, nRanks * nRanks);
}
virtual ~scclTopoGraph() {
// 释放transport_map的内存
free(transport_map.data());
}
uint8_t* getTransportMapRowStart(int row) { return transport_map[row * nRanks]; }
uint8_t* getTransportMapData(int row, int col) { return transport_map[row * nRanks + col]; }
// 打印transport_map
scclResult_t printTransportMap() {
for(int i = 0; i < this->nRanks; ++i) {
for(int j = 0; j < this->nRanks; ++j) {
uint8_t* value = this->getTransportMapData(i, j);
if(value != nullptr) {
printf("%d ", *value);
} else {
printf("nullptr ");
}
}
printf("\n");
}
return scclSuccess;
}
public:
// 使用无序映射存储图的有效节点
std::unordered_map<uint64_t, scclTopoNode_t> graph_nodes;
// 使用无序映射存储从每个GPU节点到其他GPU节点的所有路径,[start_node_id][end_node_id] = {path1, path2}
std::unordered_map<uint64_t, std::unordered_map<uint64_t, std::vector<std::vector<uint64_t>>>> gpu_paths;
// 传输位图
ByteSpanArray<uint8_t> transport_map; // 使用ByteSpanArray存储transport_map
int nRanks; // 记录GPU节点的数量
} scclTopoGraph_t;
} // namespace graph
} // namespace topology
......
......@@ -21,7 +21,7 @@ PathFinder::PathFinder(const BootstrapComm_t* bootstrap_comm)
// 检查node是否有效
if(node->type > CPU) {
// 将有效的node添加到graph_nodes中,并保存其neighbor的id
graph_nodes_[node->id] = std::vector<uint64_t>(node->neighbors.begin(), node->neighbors.begin() + node->neighborCount);
graph_node_neighbors_[node->id] = std::vector<uint64_t>(node->neighbors.begin(), node->neighbors.begin() + node->neighborCount);
// 构建id到index的映射
id_to_index_[node->id] = i;
}
......@@ -74,32 +74,134 @@ PathFinder::PathFinder(const BootstrapComm_t* bootstrap_comm)
}
}
#endif
// 查找当前rank对应的其他GPU节点的所有路径
printf("PathFinder pos 1\n");
findGpuPaths();
printf("PathFinder pos 2\n");
}
/**
* @brief 计算拓扑图中GPU节点之间的点对点映射
*
* 该函数用于计算拓扑图中GPU节点之间的点对点映射。它遍历`gpu_paths_`中的所有路径,
* 对于每一条路径,它将路径中的每个节点添加到`topo_graph`的`graph_nodes`中。然后,它根据路径中途径的节点点确定连接方式的类型,
* 并将连接方式的类型存储在`topo_graph`的`transport_map`中。最后,它将路径添加到`topo_graph`的`gpu_paths`中。
*
* @param topo_graph 指向拓扑图的指针
* @return scclResult_t 计算结果
*/
scclResult_t PathFinder::computeTopoGpuP2pMap(scclTopoGraph_t* topo_graph) {
// 遍历gpu_paths_中的所有路径
for(const auto& start_node_pair : gpu_paths_) {
uint64_t start_node_id = start_node_pair.first;
const auto& paths = start_node_pair.second;
// 遍历从start_node_id到其他GPU节点的所有路径
for(const auto& path : paths) {
#if 0
printf("paths len=%zu, path=%zu\n", paths.size(), path.size());
#endif
if(path.size() == 0)
continue;
// 遍历路径中的每个节点,将其添加到graph_nodes中
for(uint64_t node_id : path) {
// 查找node_container_中对应的节点
const scclTopoNode_t* node = findNodeById(node_id);
if(node != nullptr) {
// 检查节点是否已经存在于graph_nodes中
auto it = topo_graph->graph_nodes.find(node_id);
if(it == topo_graph->graph_nodes.end()) {
// 如果节点不存在于graph_nodes中,则将其拷贝到graph_nodes
topo_graph->graph_nodes[node_id] = *node;
}
}
}
// 将路径添加到topo_graph中的gpu_paths
uint64_t end_node_id = path.back(); // 获取路径的最后一个节点的ID
// 记录bitmap
LinkType_t link_type;
int start_gpu_rank, end_gpu_rank;
{
// 根据路径中途径的节点点确定连接方式的类型
SCCLCHECK(determineLinkType(path, &link_type));
int start_interRank, start_hipDev;
int end_interRank, end_hipDev;
bootstrap::physical_links::getIdComponents(start_node_id, &start_interRank, nullptr, nullptr, &start_hipDev, nullptr);
bootstrap::physical_links::getIdComponents(end_node_id, &end_interRank, nullptr, nullptr, &end_hipDev, nullptr);
start_gpu_rank = start_interRank * nLocalRanks + start_hipDev;
end_gpu_rank = end_interRank * nLocalRanks + end_hipDev;
#if 0
printf("rank=%d, interRank=%d, localRank=%d: start_interRank=%d, start_hipDev=%d, end_interRank=%d, end_hipDev=%d, link_type=%d\n",
rank,
interRank,
localRank,
start_interRank,
start_hipDev,
end_interRank,
end_hipDev,
static_cast<int>(link_type));
#endif
}
// 将连接方式的类型存储在transport_map中
if(*(topo_graph->getTransportMapData(start_gpu_rank, end_gpu_rank)) > 0 && link_type > 0) {
if(link_type < static_cast<LinkType_t>(*(topo_graph->getTransportMapData(start_gpu_rank, end_gpu_rank)))) {
*(topo_graph->getTransportMapData(start_gpu_rank, end_gpu_rank)) = link_type;
// 清空之前的路径
topo_graph->gpu_paths[start_node_id][end_node_id].clear();
// 添加新的路径
topo_graph->gpu_paths[start_node_id][end_node_id].push_back(path);
} else if(link_type == static_cast<LinkType_t>(*(topo_graph->getTransportMapData(start_gpu_rank, end_gpu_rank)))) {
// 添加新的路径
topo_graph->gpu_paths[start_node_id][end_node_id].push_back(path);
}
} else {
*(topo_graph->getTransportMapData(start_gpu_rank, end_gpu_rank)) = static_cast<uint8_t>(link_type);
// 添加新的路径
topo_graph->gpu_paths[start_node_id][end_node_id].push_back(path);
}
}
}
return scclSuccess;
}
scclResult_t PathFinder::findGpuPaths() {
// 查找所有type为GPU的节点,并执行BFS搜索
/////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 查找当前rank对应的其他GPU节点的所有路径
*
* 该函数用于查找当前rank对应的GPU节点的所有路径。它遍历`id_to_index_`中的所有节点ID和索引对,
* 对于每一个节点,如果该节点是GPU类型,并且属于当前rank的进程,则调用`bfsFindGpuPaths`函数执行广度优先搜索(BFS),
* 查找到其他所有GPU节点的路径。最后,如果当前rank为1,则调用`printGpuPaths`函数打印所有GPU路径。
*/
void PathFinder::findGpuPaths() {
// 查找当前rank对应的GPU的node,并执行BFS搜索,查找到其他所有GPU node的路径
for(const auto& pair : id_to_index_) {
uint64_t id = pair.first;
size_t index = pair.second;
// 定位到node
scclTopoNode_t* node = node_container_[index];
int hipDev;
bootstrap::physical_links::getIdComponents(node->id, nullptr, nullptr, nullptr, &hipDev, nullptr);
if(node->type == GPU && hipDev < nLocalRanks) {
printf("bfsFindGpuPaths start_node_id=%lu, running\n", node->id);
int nodeInterRank, nodeHipDev;
bootstrap::physical_links::getIdComponents(node->id, &nodeInterRank, nullptr, nullptr, &nodeHipDev, nullptr);
if(node->type == GPU && nodeInterRank == this->interRank && nodeHipDev == this->localRank) {
// printf("bfsFindGpuPaths start_node_id=%lu, running\n", node->id);
bfsFindGpuPaths(node->id);
}
}
#if 1
if(rank == 1) {
printGpuPaths();
}
return scclSuccess;
#endif
}
/////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 根据节点ID查找节点
*
......@@ -117,32 +219,7 @@ const scclTopoNode_t* PathFinder::findNodeById(uint64_t id) const {
return nullptr; // 如果未找到具有指定id的节点,则返回nullptr
}
// 为std::vector<uint64_t>类型定义一个相等比较函数
struct VectorEqual {
bool operator()(const std::vector<uint64_t>& lhs, const std::vector<uint64_t>& rhs) const {
if(lhs.size() != rhs.size()) {
return false;
}
for(size_t i = 0; i < lhs.size(); ++i) {
if(lhs[i] != rhs[i]) {
return false;
}
}
return true;
}
};
// 为std::vector<uint64_t>类型定义一个哈希函数
struct VectorHash {
size_t operator()(const std::vector<uint64_t>& vec) const {
size_t seed = vec.size();
for(const auto& i : vec) {
seed ^= i + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
return seed;
}
};
// TODO: 当nRanks特别大时,可以考虑采用kernel实现
/**
* @brief 使用广度优先搜索(BFS)查找从起始GPU节点到其他GPU节点的所有路径
*
......@@ -155,6 +232,78 @@ struct VectorHash {
* @param start_node_id 起始GPU节点的ID
*/
#if 1
void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
// 使用一个队列来存储当前路径
std::queue<std::vector<uint64_t>> queue;
// 使用一个unordered_map来存储每个node的所有最短路径
std::unordered_map<uint64_t, std::vector<std::vector<uint64_t>>> shortest_paths;
// 将起始节点加入队列
queue.push({start_node_id});
shortest_paths[start_node_id] = {{start_node_id}};
// 当队列不为空时,继续搜索
while(!queue.empty()) {
// 从队列中取出一个路径
auto path = queue.front();
queue.pop();
// 获取当前路径的最后一个节点的ID
uint64_t nodeId = path.back();
// 根据节点ID查找对应的节点
const scclTopoNode_t* current_node = findNodeById(nodeId);
if(current_node == nullptr) {
continue;
}
// 如果当前节点是GPU节点且不是起始节点,则将当前路径加入结果
if(current_node->type == GPU && nodeId != start_node_id) {
int hipDev;
bootstrap::physical_links::getIdComponents(current_node->id, nullptr, nullptr, nullptr, &hipDev, nullptr);
// 仅当节点内的device id小于等于nLocalRanks时,才是有效GPU,才将路径加入结果
if(hipDev < nLocalRanks) {
gpu_paths_[start_node_id].push_back(path);
}
} else {
int nodeInterRank;
bootstrap::physical_links::getIdComponents(nodeId, &nodeInterRank);
// 遍历当前节点的所有邻居节点
for(uint64_t neighbor_id : graph_node_neighbors_.at(nodeId)) {
if(findNodeById(neighbor_id) == nullptr) {
continue;
}
// 获取邻居节点的interRank
int neighbor_inter_rank;
bootstrap::physical_links::getIdComponents(neighbor_id, &neighbor_inter_rank);
// 检查邻居节点是否已在当前路径中访问过
bool visited = std::find(path.begin(), path.end(), neighbor_id) != path.end();
// 检查interRank是否已经存在(仅当interRank改变时)
bool inter_rank_exists = neighbor_inter_rank != nodeInterRank && std::find(path.begin(), path.end(), neighbor_id) != path.end();
// 如果邻居节点未访问过且interRank未存在,则扩展路径
if(!visited && !inter_rank_exists) {
std::vector<uint64_t> new_path = path;
new_path.push_back(neighbor_id);
// 如果新路径比已有的最短路径更短,或者长度相同但尚未记录,则更新最短路径
auto& paths = shortest_paths[neighbor_id];
if(paths.empty() || paths.front().size() > new_path.size() ||
(paths.front().size() == new_path.size() && std::find(paths.begin(), paths.end(), new_path) == paths.end())) {
if(paths.empty() || paths.front().size() > new_path.size()) {
paths = {new_path};
} else {
paths.push_back(new_path);
}
queue.push(new_path);
}
}
}
}
}
}
#else
void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
// 使用一个队列来存储当前路径
std::queue<std::vector<uint64_t>> queue;
......@@ -187,7 +336,7 @@ void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
int nodeInterRank;
bootstrap::physical_links::getIdComponents(nodeId, &nodeInterRank);
// 遍历当前节点的所有邻居节点
for(uint64_t neighbor_id : graph_nodes_.at(nodeId)) {
for(uint64_t neighbor_id : graph_node_neighbors_.at(nodeId)) {
if(findNodeById(neighbor_id) == nullptr) {
continue;
}
......@@ -220,7 +369,7 @@ void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
}
}
}
#else
void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
// 使用一个队列来存储当前路径
std::queue<std::vector<uint64_t>> queue;
......@@ -253,7 +402,7 @@ void PathFinder::bfsFindGpuPaths(uint64_t start_node_id) {
bootstrap::physical_links::getIdComponents(nodeId, &nodeInterRank);
// 遍历当前节点的所有邻居节点
for(uint64_t neighbor_id : graph_nodes_.at(nodeId)) {
for(uint64_t neighbor_id : graph_node_neighbors_.at(nodeId)) {
if(findNodeById(nodeId) == nullptr) {
continue;
}
......@@ -362,6 +511,49 @@ void PathFinder::printGpuPaths() {
}
}
scclResult_t PathFinder::determineLinkType(const std::vector<uint64_t>& path, LinkType_t* link_type) {
if(path.size() == 1) {
*link_type = LINK_LOC;
}
bool has_gpu = false, has_pix = false, has_nic = false, has_cpu = false;
// 遍历路径中的每个节点,从第2个点开始
for(int i = 1; i < path.size(); i++) {
uint64_t node_id = path[i];
// 查找node_container_中对应的节点
const scclTopoNode_t* node = findNodeById(node_id);
if(node == nullptr) {
WARN("cannot find node from id: %lu", node_id);
return scclInternalError;
}
// 根据节点的类型确定连接方式的类型
switch(node->type) {
case GPU: has_gpu = true; break;
case PCI: has_pix = true; break;
case NIC: has_nic = true; break;
case CPU: has_cpu = true; break;
default: break;
}
}
// 根据路径中节点的类型确定连接方式的类型
if(has_cpu) {
*link_type = LINK_NET;
} else if(has_nic) {
*link_type = LINK_PXN;
} else if(has_pix) {
*link_type = LINK_PIX;
} else if(has_gpu) {
*link_type = LINK_NVL;
} else {
*link_type = LINK_NONE; // 默认返回0
}
return scclSuccess;
}
} // namespace graph
} // namespace topology
} // namespace hardware
......
......@@ -18,26 +18,33 @@ public:
// 构造函数
PathFinder(const BootstrapComm_t* bootstrap_comm);
// 打印函数
scclResult_t findGpuPaths();
// 计算拓扑图中GPU节点之间的点对点映射
scclResult_t computeTopoGpuP2pMap(scclTopoGraph_t* graph);
// 打印函数
void printGpuPaths();
private:
ByteSpanArray<scclTopoNode_t> node_container_; // 使用NodeContainer来存储nodes数据
// 获取所有GPU到GPU的路径函数
void findGpuPaths();
std::unordered_map<uint64_t, std::vector<uint64_t>> graph_nodes_; // 使用无序映射存储图的节点和它们的邻居
std::unordered_map<uint64_t, std::vector<std::vector<uint64_t>>> gpu_paths_; // 使用无序映射存储从每个GPU节点到其他GPU节点的所有路径
// 存储node.id到nodes_span索引的映射
std::unordered_map<uint64_t, size_t> id_to_index_;
// 使用广度优先搜索(BFS)查找从起始GPU节点到其他GPU节点的所有路径
// 使用广度优先搜索(BFS)查找从起始GPU节点到其他GPU节点的最短路径
void bfsFindGpuPaths(uint64_t start_node_id);
// 根据node.id查找节点的函数
const scclTopoNode_t* findNodeById(uint64_t id) const;
// 根据path中node确定link的类型
scclResult_t determineLinkType(const std::vector<uint64_t>& path, LinkType_t* link_type);
private:
ByteSpanArray<scclTopoNode_t> node_container_; // 使用NodeContainer来存储nodes数据
std::unordered_map<uint64_t, std::vector<uint64_t>> graph_node_neighbors_; // 使用无序映射存储图的节点和它们的邻居
std::unordered_map<uint64_t, std::vector<std::vector<uint64_t>>> gpu_paths_; // 使用无序映射存储从每个GPU节点到其他GPU节点的所有路径
// 存储node.id到nodes_span索引的映射
std::unordered_map<uint64_t, size_t> id_to_index_;
int rank = -1; // 当前节点的全局排名
int nRanks = 0; // 总的节点数量
int localRank = -1; // 当前节点在本地计算节点中的排名
......
......@@ -24,20 +24,6 @@ typedef enum : int {
NET = 6 // 主要是RDMA网卡
} nodeType_t;
// 定义 topoPathType_t 枚举类型,用于表示不同的路径类型。
enum topoPathType {
PATH_LOC = 0, // 本地路径
PATH_NVL = 1, // 通过 NVLink 连接
PATH_NVB = 2, // 通过中间 GPU 使用 NVLink 连接
PATH_PIX = 3, // 通过最多一个 PCIe 桥连接
PATH_PXB = 4, // 通过多个 PCIe 桥连接(不经过 PCIe 主桥)
PATH_PXN = 5, // GPU 和 NIC 之间通过中间 GPU 连接
PATH_PHB = 6, // 通过 PCIe 以及 PCIe 主桥连接
PATH_SYS = 7, // 通过 PCIe 以及 NUMA 节点之间的 SMP 互连连接
PATH_NET = 8, // 通过网络连接
PATH_DIS = 9 // 断开连接
};
////////////////////////////////////////////////////////////////////////////////////////////////
// // 定义拓扑节点的结构体
......
......@@ -70,6 +70,9 @@ public:
// 提供一个size()函数,返回当前已经写入的数据的数量
size_t size() const { return size_; }
// 提供一个函数来返回数据块的起始地址
T* data() const { return data_; }
// 提供一个访问指定索引处元素的函数,返回T*类型的数据,或者在索引超出范围时返回空指针nullptr
T* operator[](size_t index) {
if(index < size_) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment