#include #include #include #include #include #include #include #include #include #include #include // for std::unique_ptr #include "bootstrap.h" namespace sccl { namespace hardware { namespace topology { namespace bootstrap { //////////////////////////////////////////////////////////////////////////////////////////////////////// pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; // 线程锁 static bool initialized = false; // 标志是否已经初始化 bool hsaFineGrainFlag = true; // 标志变量,用于指示是否启用HSAP细粒度标志 static scclResult_t basicInit() { // 如果已经初始化,直接返回成功 if(asm_ops::ld_acquire_sys_global(&initialized)) return scclSuccess; // 加锁以确保初始化过程的线程安全 pthread_mutex_lock(&initLock); // 如果尚未初始化,进行初始化操作 if(!initialized) { initEnv(); // 初始化环境 // 始终初始化引导网络 SCCLCHECK(bootstrapNet::bootstrapNetInit()); // initGdrCopy(); // 初始化GDR复制 // SCCLCHECK(scclNetPluginInit()); #if 0 char strValue[1024]; // 检查NUMA自动平衡是否启用 SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue)); if(strcmp(strValue, "1") == 0) WARN("NUMA自动平衡已启用,这可能导致RCCL性能的不稳定性!通过\"sudo sysctl kernel.numa_balancing=0\"禁用"); // 获取内核版本信息 SCCLCHECK(scclTopoGetStrFromSys("/proc", "version", strValue)); char *verStr, *state; verStr = strtok_r(strValue, " ", &state); for(int i = 0; i < 2; i++) { verStr = strtok_r(NULL, " ", &state); if(verStr == NULL) break; } INFO(SCCL_LOG_BOOTSTRAP, "内核版本: %s", verStr); // 检查是否为Cray系统 if(strstr(verStr, "cray") == NULL) { // 获取BIOS版本信息 SCCLCHECK(scclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue)); if(strncmp("Hyper-V UEFI Release", strValue, 20) != 0) { FILE* file; // 读取内核命令行参数 if((file = fopen("/proc/cmdline", "r")) != NULL) { if(feof(file) == 0 && ferror(file) == 0) { int len = fread(strValue, 1, 1024, file); strValue[len] = '\0'; } fclose(file); } // 检查是否缺少"iommu=pt"参数 if(strstr(strValue, "iommu=pt") == NULL) WARN("内核命令行中缺少\"iommu=pt\"参数,这可能导致系统不稳定或挂起!"); } float* ptr; // 尝试分配细粒度PCIe内存 hipError_t err = hipExtMallocWithFlags((void**)&ptr, 128, hipDeviceMallocFinegrained); if(err != hipSuccess) hsaFineGrainFlag = false; } #endif // 设置初始化标志 asm_ops::st_release_sys_global(&initialized, true); } // 解锁 pthread_mutex_unlock(&initLock); return scclSuccess; } scclResult_t bootstrapGetUniqueId(BootstrapHandle_t* handle) { SCCLCHECK(basicInit()); // 在每个进程中设置 handle 的值 getRandomData(&handle->magic, sizeof(handle->magic)); const char* env = getenv("SCCL_COMM_ID"); if(env) { memset(&handle->magic, 0, sizeof(handle->magic)); INFO(SCCL_LOG_BOOTSTRAP, "SCCL_COMM_ID set by environment to %s", env); if(scclSocketGetAddrFromString(&handle->addr, env) != scclSuccess) { WARN("Invalid SCCL_COMM_ID, please use format: : or []: or :"); return scclInvalidArgument; } } else { // 初始化socket scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr(); memcpy(&handle->addr, &localSocketAddr, sizeof(scclSocketAddress_t)); // 启动根节点listen监听 SCCLCHECK(bootstrapCreateRoot(handle)); } return scclSuccess; } static scclResult_t setFilesLimit() { struct rlimit filesLimit; SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit"); filesLimit.rlim_cur = filesLimit.rlim_max; SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit"); return scclSuccess; } /** * @brief 根节点引导程序,负责收集所有rank的地址信息并广播给其他rank * 由于同一个socket数据传输比较慢,所以在进行数据广播时,仅传送给localRank==0的rank,再由其进行节点内广播 * 该函数所有数据传输与 Bootstrap::bootstrapRootGatherAndBroadcast 函数相配合 * * @param rargs 包含监听套接字和验证魔数的参数结构体 * @return void* 总是返回NULL * * 该函数执行以下主要操作: * 1. 初始化资源并设置文件描述符限制 * 2. 循环接收所有rank的连接请求,收集地址信息 * 3. 验证接收到的rank信息一致性 * 4. 计算本地rank数量(nLocalRanks) * 5. 使用线程池并行发送nLocalRanks值给所有rank * 6. 将收集到的所有rank地址信息广播给每个节点的localRank=0的进程 * 7. 清理资源并返回 * * @note 函数使用线程池加速消息分发,并通过日志记录关键操作步骤 */ static void* bootstrapRoot(void* rargs) { bootstrapRootArgs_t* args = (bootstrapRootArgs_t*)rargs; scclSocket_t* listenSock = args->listenSock; // 用于监听的套接字 uint64_t magic = args->magic; // 用于验证的魔数 scclResult_t res = scclSuccess; // 函数结果 class ThreadPool* pthread_pool = nullptr; // 用于根节点分发消息的线程池 int nRanks = 0; // nRanks: 进程总数; int nLocalRanks = 1; int c = 0; // c: 已连接的进程计数 uint64_t rootHostHash = 0; BootstrapNodeBasic_t node_basic = {}; // 用于存储扩展信息的结构体 BootstrapNodeBasic_t* all_rank_node_basic = nullptr; // 所有进程的地址 // 定义一个函数或者一个函数对象,用于执行实际的发送数据操作。在后面执行 auto send_task = [](BootstrapNodeBasic& node_basic, uint64_t magic, int rank, void* data, size_t size) { net::net_socket::scclSocketClientManager client_manager(&node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap); bootstrapNet::bootstrapNetSend(client_manager.getSocket(), data, size); }; // 用于验证的工具进行初始化 scclSocketAddress_t* zero = nullptr; // 用于初始化或比较的零地址 setFilesLimit(); // 设置文件描述符限制 SCCLCHECKGOTO(scclCalloc(&zero, 1), res, out); // 为zero分配内存 INFO(SCCL_LOG_BOOTSTRAP, "BEGIN"); // 日志:开始 // --------------------- 1.从所有rank接收其socket地址(BootstrapNodeBasic) --------------------- // do { net::net_socket::scclSocketAcceptManager accept_manager(listenSock); SCCLCHECKGOTO(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &node_basic, sizeof(node_basic)), res, out); // 接收数据 if(c == 0) { nRanks = node_basic.nRanks; SCCLCHECKGOTO(scclCalloc(&all_rank_node_basic, nRanks), res, out); // 为rankAddresses分配内存 pthread_pool = new ThreadPool(nRanks); } else if(nRanks != node_basic.nRanks) { // 如果接收到的进程总数不匹配 WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nRanks, node_basic.nRanks); // 警告 goto out; // 跳转到out标签 } if(memcmp(zero, &all_rank_node_basic[node_basic.rank].sock.addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到 WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks); // 警告 goto out; // 跳转到out标签 } // 保存该rank的连接句柄 memcpy(all_rank_node_basic + node_basic.rank, &node_basic, sizeof(BootstrapNodeBasic_t)); ++c; // 增加已连接的进程计数 INFO(SCCL_LOG_BOOTSTRAP, "Received connect from rank %d total %d/%d", node_basic.rank, c, nRanks); // 日志 } while(c < nRanks); // 当已连接的进程数小于总数时循环 INFO(SCCL_LOG_BOOTSTRAP, "COLLECTED ALL %d HANDLES", nRanks); // 日志:收集到所有句柄 // --------------------- 2.计算nLocalRanks,并广播给其他所有rank --------------------- // // 首先计算nLocalRanks大小,即具有相同hostHash的节点数量 rootHostHash = all_rank_node_basic[0].hostHash; for(int i = 1; i < nRanks; ++i) { if(rootHostHash == all_rank_node_basic[i].hostHash) { nLocalRanks++; // 如果hostHash相同,则增加本地节点计数 } else { break; // 一旦发现不同的hostHash,停止计数 } } // 给每个节点发送localRank的值 for(int r = 0; r < nRanks; ++r) { auto dst_node_basic = all_rank_node_basic[r]; // 使用std::bind将参数绑定到send_task函数 auto bound_task = std::bind(send_task, dst_node_basic, magic, r, &nLocalRanks, sizeof(int)); // 将绑定后的任务添加到线程池 pthread_pool->enqueue(bound_task); } // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(100); // 每1毫秒检查一次任务完成状态 } // --------------------- 3.给所有localRank==0的rank发送all_rank_node_basic数据 --------------------- // // 给每个节点的localRank=0的进程发送信息,并由其进行广播,从而加快速度 for(int r = 0; r < nRanks / nLocalRanks; ++r) { int dst_rank = r * nLocalRanks; // 计算目标rank auto dst_node_basic = all_rank_node_basic[dst_rank]; net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap); bootstrapNet::bootstrapNetSend(client_manager.getSocket(), all_rank_node_basic, sizeof(BootstrapNodeBasic_t) * nRanks); } // 等待所有任务完成 while(!pthread_pool->allTasksCompleted()) { usleep(100); // 每1毫秒检查一次任务完成状态 } INFO(SCCL_LOG_BOOTSTRAP, "bootstrap send out all %d handles", nRanks); // 日志:发送出所有句柄 out: // 关闭套接字,并释放内存 if(listenSock) { scclSocketClose(listenSock); delete listenSock; } // 释放内存 if(all_rank_node_basic) free(all_rank_node_basic); if(zero) free(zero); if(pthread_pool) delete pthread_pool; free(rargs); // 释放rargs内存 INFO(SCCL_LOG_BOOTSTRAP, "DONE"); // 日志:完成 return NULL; } /** * 创建并启动bootstrap根节点 * * 该函数负责初始化监听socket,创建并启动一个独立的线程来处理bootstrap根节点逻辑。 * 线程会被设置为detach状态,无需等待其结束。 * * @param handle 包含bootstrap配置信息的句柄 * @return 成功返回scclSuccess,失败返回相应的错误码 */ scclResult_t bootstrapCreateRoot(BootstrapHandle_t* handle) { bootstrapRootArgs_t* args; pthread_t thread; // 设置根节点socket监听 net::net_socket::scclSocketServerManager root_manager(&handle->addr, handle->magic, net::net_socket::scclSocketTypeBootstrap); // 为args分配内存 SCCLCHECK(scclCalloc(&args, 1)); // 设置线程参数 args->listenSock = root_manager.releaseSocket(); args->magic = handle->magic; // 创建线程以执行bootstrapRoot函数, 直到线程结束才释放listenSock NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0); // 设置线程名称 scclSetThreadName(thread, "SCCL BootstrapR"); // 分离线程,使其在完成后自动回收资源 NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d return scclSuccess; } ////////////////////////////// 结构体定义 ////////////////////////////// // scclRankPhysSet构造函数定义 scclRankPhysSet::scclRankPhysSet(int nRanks) { rank_info_vec.reserve(nRanks); // 预留空间 rank_info_vec.clear(); } void BootstrapComm::init(int rank, int nRanks, int localRank, int nLocalRanks) { printf("BootstrapComm 构造函数, rank=%d\n", rank); this->rank = rank; this->nRanks = nRanks; this->localRank = localRank; this->nLocalRanks = nLocalRanks; this->interRank = rank / nLocalRanks; this->nInterRanks = nRanks / nLocalRanks; rank_phys_set = new scclRankPhysSet(nRanks); // 假设需要动态分配 }; void BootstrapComm::destroy() { printf("BootstrapComm 析构函数, rank=%d\n", rank); if(rank_phys_set) { delete rank_phys_set; // 释放动态分配的内存 } } //////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////// // 构造函数 Bootstrap::Bootstrap(const BootstrapHandle_t* handle, int rank, int nRanks) : root_handle(handle), rank(rank), nRanks(nRanks), localRank(-1), nLocalRanks(0), socketInitDone(false) { printf("Bootstrap 构造函数\n"); scclCalloc(&all_node_basic, nRanks); } Bootstrap::~Bootstrap() { printf("Bootstrap 析构函数\n"); if(ipcsocket) { delete ipcsocket; } if(all_node_basic) free(all_node_basic); } scclResult_t Bootstrap::init(BootstrapComm_t* bootstrap_comm) { // 如果已经初始化,直接返回成功 if(asm_ops::ld_acquire_sys_global(&socketInitDone)) return scclSuccess; // 加锁以确保初始化过程的线程安全 pthread_mutex_lock(&bootstrapMutex); // -------------------------- 1.获取自身基础信息 ----------------------------------- // SCCLCHECK(basicInit()); // 设置基础信息 BootstrapNodeBasic_t node_basic = {}; // -------------------------- 2.设置0号rank搜集的CPU信息和localRank信息 ----------------------------------- // // 创建根节点的数据收集 SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic)); // -------------------------- 3.设置本地localRank的BootstrapComm信息 ----------------------------------- // // 初始化BootstrapComm类 bootstrap_comm->init(rank, nRanks, localRank, nLocalRanks); if(CPU_COUNT(&bootstrap_comm->cpuAffinity)) { sched_setaffinity(0, sizeof(cpu_set_t), &bootstrap_comm->cpuAffinity); } bootstrap_comm->magic = root_handle->magic; //////// 设置显卡状态 //////// bootstrap_comm->hipDev = localRank; // CUDA 设备 ID uint32_t devices_num; SCCLCHECK(rocm_smi_init()); // 初始化ROCM SMI库 SCCLCHECK(rocm_smi_getNumDevice(&devices_num)); // 获取设备数量 LTCHECK(devices_num, 0); // 检查设备数量是否 devices_num>0 LTCHECK(devices_num, nLocalRanks); // 检查设备数量是否 devices_num>nLocalRanks bootstrap_comm->deviceCnt = static_cast(devices_num); // 将设备数量转换为int并赋值给的deviceCnt LECHECK(devices_num, bootstrap_comm->hipDev); // 检查hipDev是否小于deviceCnt HIPCHECK(hipSetDevice(bootstrap_comm->hipDev)); // 设置当前设备为hipDev //////// 设置启动通信的scclNet //////// // 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB" const char* envNetName = getenv("SCCL_NET_NAME"); // char* netName = (envNetName != NULL) ? strdup(envNetName) : strdup("IB"); char* netName = strdup("IB"); // 初始化网络和引导网络 SCCLCHECK(net::scclNetInit(netName, bootstrap_comm->scclNet)); // 释放分配的网络名称字符串 free(netName); //////// 初始化唯一信息结构体 //////// scclRankInfo_t local_rank_info; local_rank_info.hostHash = node_basic.hostHash; SCCLCHECK(bootstrapCommInitNodeInfo(bootstrap_comm->scclNet, &local_rank_info)); memcpy(&(local_rank_info.cpu.listen_sock), &(node_basic.sock), sizeof(scclSocket_t)); #if 1 printf("devices_num=%d, local_rank_info.net.count=%d\n", bootstrap_comm->deviceCnt, local_rank_info.net.count); #endif // 将每个节点的`rank_info`信息收集到`rank_phys_set`中,以便后续使用 SCCLCHECK(bootstrapAllGather(&local_rank_info, bootstrap_comm->rank_phys_set->rank_info_vec.data(), sizeof(scclRankInfo_t))); // 设置初始化标志 asm_ops::st_release_sys_global(&socketInitDone, true); // 解锁 pthread_mutex_unlock(&bootstrapMutex); return scclSuccess; } /////////////////////////////////////////////////////////////////////////// /** * @brief 执行根节点的聚集和广播操作 * * 该函数负责在bootstrap过程中完成以下操作: * 1. 各rank首先设置监听 * 2. 向根节点发送基础数据 * 3. 从根节点接收nLocalRanks值 * 4. 当localRank为0时,从根节点接收所有rank的IP数据 * 5. 将IP数据广播给节点内其他rank * * @param send_data_basic 指向要发送的基础数据的指针 * @return scclResult_t 返回操作结果,成功返回scclSuccess */ scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(BootstrapNodeBasic_t* send_data_basic) { // 总的需要广播的数据 scclSocketAddress_t root_addr = root_handle->addr; scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr(); // ------------- 1.各个rank在发送给根节点数据之前,首先设置监听listen ------------- // net::net_socket::scclSocketServerManager local_server_manager(&localSocketAddr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap); // 设置基础数据 send_data_basic->rank = rank; send_data_basic->nRanks = nRanks; send_data_basic->hostHash = getHostHash(); scclSocket_t* local_server_sock = local_server_manager.releaseSocket(); send_data_basic->sock = *local_server_sock; // ------------- 2.各个节点向根节点发送数据 ------------- // { net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap); SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(BootstrapNodeBasic_t))); } // ------------- 3.从根节点接收nLocalRanks值 ------------- // // 接收nLocalRanks信息 { net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock); SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &nLocalRanks, sizeof(int))); } // 要求必须 nRanks%nLocalRanks == 0 NEQCHECK(nRanks % nLocalRanks, 0); // ------------- 4.nLocalRanks==0时,从根节点接收所有rank的ip数据 ------------- // this->localRank = rank % nLocalRanks; this->interRank = rank / nLocalRanks; this->nInterRanks = nRanks / nLocalRanks; int all_node_basic_size = nRanks * sizeof(BootstrapNodeBasic_t); // 从根节点接收数据,对应到函数 bootstrapRoot if(localRank == 0) { net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock); SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), all_node_basic, all_node_basic_size)); } printf("all_node_basic_size=%d\n", all_node_basic_size); // ------------- 5.nLocalRanks==0时,将所有rank的ip数据广播给节点内其他rank ------------- // ipcsocket = new scclIpcSocket_t(localRank, nLocalRanks, /*hash*/ root_handle->magic); ipcsocket->scclIpcSocketBroadcast(all_node_basic, all_node_basic_size, /*localRank root*/ 0); return scclSuccess; } /** * @brief 初始化节点通信信息 * * 该函数用于初始化节点的通信信息,包括基础信息和硬件信息。 * * @param scclNet 网络设备句柄 * @param rank_info 节点信息结构体指针 * @return scclResult_t 返回操作结果,成功返回scclSuccess * * @note 基础信息包括: * - rank: 当前节点的全局排名 * - localRank: 本地计算节点中的排名 * - pidHash: 进程ID哈希值 * * @note 硬件信息包括: * - GPU信息: 设备号、名称、GCN架构、计算能力、PCI总线ID * - RDMA信息: 网卡数量、名称、PCI路径、GUID、指针支持类型、端口速度、端口号、延迟、最大通信数和接收数 * * @todo 更多硬件信息可参考ncclTopoGetXmlFromSys函数实现 */ scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, scclRankInfo_t* rank_info) { ////////////////// 设置基础信息 ////////////////// rank_info->rank = rank; // 当前节点的全局排名 rank_info->localRank = localRank; // 当前节点在本地计算节点中的排名 rank_info->pidHash = getPidHash(); // 获取进程ID哈希值并赋值给的pidHash int hipDev = localRank; ////////////////// 设置硬件信息 ////////////////// //// 1.设置GPU信息 rank_info->gpu.dev = hipDev; hipDeviceProp_t deviceProp; HIPCHECK(hipGetDeviceProperties(&deviceProp, hipDev)); snprintf(rank_info->gpu.name, sizeof(rank_info->gpu.name), "%s", deviceProp.name); snprintf(rank_info->gpu.gcn, sizeof(rank_info->gpu.gcn), "%s", deviceProp.gcnArchName); rank_info->gpu.compCap = deviceProp.major * 10 + deviceProp.minor; // 设置GPU的busId SCCLCHECK(getBusId(hipDev, &rank_info->gpu.pciBusId)); // 根据GPU的busId设置pci路径 char busIdStr[] = "00000000:00:00.0"; char* gpuPath = NULL; SCCLCHECK(int64ToBusId(rank_info->gpu.pciBusId, busIdStr)); SCCLCHECK(getPciPath(busIdStr, &gpuPath)); snprintf(rank_info->gpu.pciPath, sizeof(rank_info->gpu.pciPath), "%s", gpuPath); // 设备在/sys中的路径。 //// 2.设置RDMA信息 net::scclNetProperties_t props; SCCLCHECK(scclNet->getProperties(hipDev, &props)); SCCLCHECK(scclNet->devices(&rank_info->net.count)); // 节点内网卡数量 snprintf(rank_info->net.name, sizeof(rank_info->net.name), "%s", props.name); // 主要用于日志记录。 snprintf(rank_info->net.pciPath, sizeof(rank_info->net.pciPath), "%s", props.pciPath); // PCI设备在/sys中的路径。 #if 0 printf("rank_info->net.pciPath len=%zu\n", strlen(rank_info->net.pciPath)); #endif // TODO: 更多硬件信息参考ncclTopoGetXmlFromSys函数写法,可以通过 "/sys/class/drm/card1/device" 等路径读取 rank_info->net.guid = props.guid; // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。 rank_info->net.ptrSupport = props.ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF] rank_info->net.speed = props.speed; // 端口速度,单位为Mbps。 rank_info->net.port = props.port; // 端口号。 rank_info->net.latency = props.latency; // 网络延迟 rank_info->net.maxComms = props.maxComms; // 可以创建的最大通信数量 rank_info->net.maxRecvs = props.maxRecvs; // 最大分组接收数量。 return scclSuccess; } // TODO: 后续可以采用优化,先节点内allgather,再节点间的allgather,最后节点内的Broadcast。优化的算法并保证正确性 /** * @brief 实现跨节点的AllGather通信操作 * * 该函数实现了一个跨节点的AllGather通信操作,包括节点内通信和节点间通信。 * 在节点内通信中,使用IPC套接字进行AllGather操作;在节点间通信中,使用Ring AllGather算法进行数据传输。 * 最后,通过节点内通信的Broadcast操作,将收集到的数据分发给所有节点内的进程。 * * @param src_data 源数据指针,表示每个节点要发送的数据 * @param dst_data 目标数据指针,表示所有节点收集到的数据将存储在此处 * @param data_size 每个节点要发送的数据大小(以字节为单位) * @return scclResult_t 返回操作结果状态码: * - scclSuccess: 操作成功 * - 其他错误码: 表示操作失败 * * @note 该函数假设所有节点的本地秩(localRank)和节点间秩(interRank)已经正确设置。 * 此外,该函数还假设所有节点的基本信息(如套接字地址)已经通过其他途径正确获取并存储在all_node_basic向量中。 * 在节点间通信中,使用了Ring AllGather算法,该算法在nRanks特别大的时候可能不是最优的选择,可以考虑进一步优化算法以减少通信次数。 */ scclResult_t Bootstrap::bootstrapAllGather(const void* src_data, void* dst_data, int data_size) const { // 数据准备 size_t inter_data_len = nLocalRanks * data_size; // 节点间传输时每个子块的大小 auto all_recv_data = reinterpret_cast(dst_data); //// 1.节点内通信 allgather auto local_recv_data = all_recv_data + this->interRank * inter_data_len; ipcsocket->scclIpcSocketAllgather(src_data, (void*)local_recv_data, data_size); if(nInterRanks <= 1) { return scclSuccess; } //// 2.节点间通信,ring allgather // TODO: 后续nRanks特别大的时候,可以进一步优化算法,减少通信次数。 // 因为节点内信息是已经 allgather了的,所以节点间的传输可以不仅仅依赖localRank == 0,每个localRank都可以用上,即分组 if(localRank == 0) { int next_interRank = (this->interRank + 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks; //// 对于prev,当前rank是客户端;对于next,当前rank是服务器端 // 客户端:用于发送数据 scclSocket_t next_rank_sock = all_node_basic[next_interRank].sock; net::net_socket::scclSocketClientManager client_manager(&next_rank_sock.addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap); // 服务器端:用于接收数据 scclSocket_t self_rank_sock = all_node_basic[rank].sock; net::net_socket::scclSocketAcceptManager accept_manager(&self_rank_sock); /////////////////// 实现数据传输 /////////////////// for(int r = 0; r < nInterRanks - 1; r++) { int prev_rank = (this->interRank - r - 1 + nInterRanks) % nInterRanks; int next_rank = (this->interRank - r + nInterRanks) % nInterRanks; // 准备发送/接收的数据 auto send_data = all_recv_data + next_rank * inter_data_len; auto recv_data = all_recv_data + prev_rank * inter_data_len; // 发送/接收数据 SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data, inter_data_len)); SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data, inter_data_len)); } } //// 3.节点内通信 broadcast ipcsocket->scclIpcSocketBroadcast(all_recv_data, nRanks * data_size, 0); return scclSuccess; } /////////////////////////////////////////////////////////////////////////////////////////////////////// // 函数:打印 scclRankInfo 结构体的信息 scclResult_t printRankInfo(const std::string& prefix, scclRankInfo_t* info) { char addrline[net::SOCKET_NAME_MAXLEN + 1]; // if(info->localRank == 0) { if(1) { // 将GPU的pciBusId转换为字符串格式 char busIdhip[16]; SCCLCHECK(int64ToBusId(info->gpu.pciBusId, busIdhip)); printf("==========================================\n" "%s, Total Rank: %d, Local Rank: %d, Host Hash: %lu, PID Hash: %lu\n" "gpu: dev=%d, gpu.name=%s, gcn=%s, compCap=%d\n" "gpu: busId=%s, gpuPath=%s\n" "net: count=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%u, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n" "cpu: socketAddr=%s\npci: busId=%ld" "\n==========================================\n", prefix.c_str(), info->rank, info->localRank, info->hostHash, info->pidHash, info->gpu.dev, info->gpu.name, info->gpu.gcn, info->gpu.compCap, busIdhip, info->gpu.pciPath, info->net.count, info->net.name, info->net.pciPath, info->net.guid, static_cast(info->net.ptrSupport), info->net.speed, info->net.port, info->net.latency, info->net.maxComms, info->net.maxRecvs, net::net_socket::scclSocketToString(&info->cpu.listen_sock.addr, addrline), info->gpu.pciBusId); } return scclSuccess; } } // namespace bootstrap } // namespace topology } // namespace hardware } // namespace sccl