bootstrap.cpp 29.4 KB
Newer Older
1
2
#include <unistd.h>
#include <sys/types.h>
3
#include <cstring>
4
5
6
7
8
9
10
#include <sys/resource.h>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <chrono>
#include <ctime>
#include <cstdint>
11
#include <memory> // for std::unique_ptr
12
13
14
15
16
17
18
19

#include "bootstrap.h"

namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {

20
21
22
23
////////////////////////////////////////////////////////////////////////////////////////////////////////
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; // 线程锁
static bool initialized  = false;                     // 标志是否已经初始化
bool hsaFineGrainFlag    = true;                      // 标志变量,用于指示是否启用HSAP细粒度标志
24

25
static scclResult_t basicInit() {
26
27
28
29
30
31
32
33
    // 如果已经初始化,直接返回成功
    if(asm_ops::ld_acquire_sys_global(&initialized))
        return scclSuccess;

    // 加锁以确保初始化过程的线程安全
    pthread_mutex_lock(&initLock);
    // 如果尚未初始化,进行初始化操作
    if(!initialized) {
34
35
36
37
38
        initEnv(); // 初始化环境
        // 始终初始化引导网络
        SCCLCHECK(bootstrapNet::bootstrapNetInit());
        // initGdrCopy(); // 初始化GDR复制
        // SCCLCHECK(scclNetPluginInit());
39
#if 0
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
        char strValue[1024];
        // 检查NUMA自动平衡是否启用
        SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
        if(strcmp(strValue, "1") == 0)
            WARN("NUMA自动平衡已启用,这可能导致RCCL性能的不稳定性!通过\"sudo sysctl kernel.numa_balancing=0\"禁用");
        // 获取内核版本信息
        SCCLCHECK(scclTopoGetStrFromSys("/proc", "version", strValue));
        char *verStr, *state;
        verStr = strtok_r(strValue, " ", &state);
        for(int i = 0; i < 2; i++) {
            verStr = strtok_r(NULL, " ", &state);
            if(verStr == NULL)
                break;
        }
        INFO(SCCL_LOG_BOOTSTRAP, "内核版本: %s", verStr);
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
        // 检查是否为Cray系统
        if(strstr(verStr, "cray") == NULL) {
            // 获取BIOS版本信息
            SCCLCHECK(scclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue));
            if(strncmp("Hyper-V UEFI Release", strValue, 20) != 0) {
                FILE* file;
                // 读取内核命令行参数
                if((file = fopen("/proc/cmdline", "r")) != NULL) {
                    if(feof(file) == 0 && ferror(file) == 0) {
                        int len       = fread(strValue, 1, 1024, file);
                        strValue[len] = '\0';
                    }
                    fclose(file);
                }
                // 检查是否缺少"iommu=pt"参数
                if(strstr(strValue, "iommu=pt") == NULL)
                    WARN("内核命令行中缺少\"iommu=pt\"参数,这可能导致系统不稳定或挂起!");
            }
73

74
75
76
77
78
79
            float* ptr;
            // 尝试分配细粒度PCIe内存
            hipError_t err = hipExtMallocWithFlags((void**)&ptr, 128, hipDeviceMallocFinegrained);
            if(err != hipSuccess)
                hsaFineGrainFlag = false;
        }
80
#endif
81

82
83
84
85
86
87
88
89
        // 设置初始化标志
        asm_ops::st_release_sys_global(&initialized, true);
    }
    // 解锁
    pthread_mutex_unlock(&initLock);
    return scclSuccess;
}

90
scclResult_t bootstrapGetUniqueId(BootstrapHandle_t* handle) {
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
    SCCLCHECK(basicInit());
    // 在每个进程中设置 handle 的值
    getRandomData(&handle->magic, sizeof(handle->magic));

    const char* env = getenv("SCCL_COMM_ID");
    if(env) {
        memset(&handle->magic, 0, sizeof(handle->magic));
        INFO(SCCL_LOG_BOOTSTRAP, "SCCL_COMM_ID set by environment to %s", env);
        if(scclSocketGetAddrFromString(&handle->addr, env) != scclSuccess) {
            WARN("Invalid SCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
            return scclInvalidArgument;
        }
    } else {
        // 初始化socket
        scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
        memcpy(&handle->addr, &localSocketAddr, sizeof(scclSocketAddress_t));
        // 启动根节点listen监听
        SCCLCHECK(bootstrapCreateRoot(handle));
    }
    return scclSuccess;
}

static scclResult_t setFilesLimit() {
    struct rlimit filesLimit;
    SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit");
    filesLimit.rlim_cur = filesLimit.rlim_max;
    SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit");
118
119
120
121
    return scclSuccess;
}

/**
122
123
124
125
126
127
 * @brief 根节点引导程序,负责收集所有rank的地址信息并广播给其他rank
 * 由于同一个socket数据传输比较慢,所以在进行数据广播时,仅传送给localRank==0的rank,再由其进行节点内广播
 * 该函数所有数据传输与 Bootstrap::bootstrapRootGatherAndBroadcast 函数相配合
 *
 * @param rargs 包含监听套接字和验证魔数的参数结构体
 * @return void* 总是返回NULL
128
 *
129
130
131
132
133
134
135
136
 * 该函数执行以下主要操作:
 * 1. 初始化资源并设置文件描述符限制
 * 2. 循环接收所有rank的连接请求,收集地址信息
 * 3. 验证接收到的rank信息一致性
 * 4. 计算本地rank数量(nLocalRanks)
 * 5. 使用线程池并行发送nLocalRanks值给所有rank
 * 6. 将收集到的所有rank地址信息广播给每个节点的localRank=0的进程
 * 7. 清理资源并返回
137
 *
138
 * @note 函数使用线程池加速消息分发,并通过日志记录关键操作步骤
139
 */
140
static void* bootstrapRoot(void* rargs) {
141
    bootstrapRootArgs_t* args      = (bootstrapRootArgs_t*)rargs;
142
143
144
145
146
    scclSocket_t* listenSock       = args->listenSock; // 用于监听的套接字
    uint64_t magic                 = args->magic;      // 用于验证的魔数
    scclResult_t res               = scclSuccess;      // 函数结果
    class ThreadPool* pthread_pool = nullptr;          // 用于根节点分发消息的线程池

147
148
149
150
151
152
    int nRanks                                = 0; // nRanks: 进程总数;
    int nLocalRanks                           = 1;
    int c                                     = 0; // c: 已连接的进程计数
    uint64_t rootHostHash                     = 0;
    BootstrapNodeBasic_t node_basic           = {};      // 用于存储扩展信息的结构体
    BootstrapNodeBasic_t* all_rank_node_basic = nullptr; // 所有进程的地址
153

154
    // 定义一个函数或者一个函数对象,用于执行实际的发送数据操作。在后面执行
155
    auto send_task = [](BootstrapNodeBasic& node_basic, uint64_t magic, int rank, void* data, size_t size) {
156
        net::net_socket::scclSocketClientManager client_manager(&node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
        bootstrapNet::bootstrapNetSend(client_manager.getSocket(), data, size);
    };

    // 用于验证的工具进行初始化
    scclSocketAddress_t* zero = nullptr;           // 用于初始化或比较的零地址
    setFilesLimit();                               // 设置文件描述符限制
    SCCLCHECKGOTO(scclCalloc(&zero, 1), res, out); // 为zero分配内存

    INFO(SCCL_LOG_BOOTSTRAP, "BEGIN"); // 日志:开始
    // --------------------- 1.从所有rank接收其socket地址(BootstrapNodeBasic) --------------------- //
    do {
        net::net_socket::scclSocketAcceptManager accept_manager(listenSock);
        SCCLCHECKGOTO(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &node_basic, sizeof(node_basic)), res, out); // 接收数据

        if(c == 0) {
            nRanks = node_basic.nRanks;
            SCCLCHECKGOTO(scclCalloc(&all_rank_node_basic, nRanks), res, out); // 为rankAddresses分配内存
            pthread_pool = new ThreadPool(nRanks);
        } else if(nRanks != node_basic.nRanks) {                                                           // 如果接收到的进程总数不匹配
            WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nRanks, node_basic.nRanks); // 警告
            goto out;                                                                                      // 跳转到out标签
        }
179
180
181
        if(memcmp(zero, &all_rank_node_basic[node_basic.rank].sock.addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到
            WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks);     // 警告
            goto out;                                                                                         // 跳转到out标签
182
        }
183

184
        // 保存该rank的连接句柄
185
        memcpy(all_rank_node_basic + node_basic.rank, &node_basic, sizeof(BootstrapNodeBasic_t));
186
187
188
189
        ++c;                                                                                               // 增加已连接的进程计数
        INFO(SCCL_LOG_BOOTSTRAP, "Received connect from rank %d total %d/%d", node_basic.rank, c, nRanks); // 日志
    } while(c < nRanks); // 当已连接的进程数小于总数时循环
    INFO(SCCL_LOG_BOOTSTRAP, "COLLECTED ALL %d HANDLES", nRanks); // 日志:收集到所有句柄
190

191
192
193
194
195
196
197
198
199
200
    // --------------------- 2.计算nLocalRanks,并广播给其他所有rank --------------------- //
    // 首先计算nLocalRanks大小,即具有相同hostHash的节点数量
    rootHostHash = all_rank_node_basic[0].hostHash;
    for(int i = 1; i < nRanks; ++i) {
        if(rootHostHash == all_rank_node_basic[i].hostHash) {
            nLocalRanks++; // 如果hostHash相同,则增加本地节点计数
        } else {
            break; // 一旦发现不同的hostHash,停止计数
        }
    }
201
    // 给每个节点发送localRank的值
202
203
204
205
206
207
208
209
210
    for(int r = 0; r < nRanks; ++r) {
        auto dst_node_basic = all_rank_node_basic[r];
        // 使用std::bind将参数绑定到send_task函数
        auto bound_task = std::bind(send_task, dst_node_basic, magic, r, &nLocalRanks, sizeof(int));
        // 将绑定后的任务添加到线程池
        pthread_pool->enqueue(bound_task);
    }
    // 等待所有任务完成
    while(!pthread_pool->allTasksCompleted()) {
211
        usleep(100); // 每1毫秒检查一次任务完成状态
212
213
214
215
216
217
218
    }

    // --------------------- 3.给所有localRank==0的rank发送all_rank_node_basic数据 --------------------- //
    // 给每个节点的localRank=0的进程发送信息,并由其进行广播,从而加快速度
    for(int r = 0; r < nRanks / nLocalRanks; ++r) {
        int dst_rank        = r * nLocalRanks; // 计算目标rank
        auto dst_node_basic = all_rank_node_basic[dst_rank];
219
        net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
220
        bootstrapNet::bootstrapNetSend(client_manager.getSocket(), all_rank_node_basic, sizeof(BootstrapNodeBasic_t) * nRanks);
221
222
223
    }
    // 等待所有任务完成
    while(!pthread_pool->allTasksCompleted()) {
224
        usleep(100); // 每1毫秒检查一次任务完成状态
225
    }
226

227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
    INFO(SCCL_LOG_BOOTSTRAP, "bootstrap send out all %d handles", nRanks); // 日志:发送出所有句柄
out:
    // 关闭套接字,并释放内存
    if(listenSock) {
        scclSocketClose(listenSock);
        delete listenSock;
    }
    // 释放内存
    if(all_rank_node_basic)
        free(all_rank_node_basic);
    if(zero)
        free(zero);
    if(pthread_pool)
        delete pthread_pool;
    free(rargs);                      // 释放rargs内存
    INFO(SCCL_LOG_BOOTSTRAP, "DONE"); // 日志:完成
243

244
    return NULL;
245
}
246

247
248
249
250
251
252
253
254
255
/**
 * 创建并启动bootstrap根节点
 *
 * 该函数负责初始化监听socket,创建并启动一个独立的线程来处理bootstrap根节点逻辑。
 * 线程会被设置为detach状态,无需等待其结束。
 *
 * @param handle 包含bootstrap配置信息的句柄
 * @return 成功返回scclSuccess,失败返回相应的错误码
 */
256
257
scclResult_t bootstrapCreateRoot(BootstrapHandle_t* handle) {
    bootstrapRootArgs_t* args;
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
    pthread_t thread;

    // 设置根节点socket监听
    net::net_socket::scclSocketServerManager root_manager(&handle->addr, handle->magic, net::net_socket::scclSocketTypeBootstrap);

    // 为args分配内存
    SCCLCHECK(scclCalloc(&args, 1));
    // 设置线程参数
    args->listenSock = root_manager.releaseSocket();
    args->magic      = handle->magic;

    // 创建线程以执行bootstrapRoot函数, 直到线程结束才释放listenSock
    NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0);
    // 设置线程名称
    scclSetThreadName(thread, "SCCL BootstrapR");
    // 分离线程,使其在完成后自动回收资源
    NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d
275

276
277
    return scclSuccess;
}
278

279
280
////////////////////////////// 结构体定义 //////////////////////////////
// scclRankPhysSet构造函数定义
281
scclRankPhysSet::scclRankPhysSet(int nRanks) {
282
283
284
285
286
287
288
289
290
291
292
293
    rank_info_vec.reserve(nRanks); // 预留空间
    rank_info_vec.clear();
}

void BootstrapComm::init(int rank, int nRanks, int localRank, int nLocalRanks) {
    printf("BootstrapComm 构造函数, rank=%d\n", rank);
    this->rank        = rank;
    this->nRanks      = nRanks;
    this->localRank   = localRank;
    this->nLocalRanks = nLocalRanks;
    this->interRank   = rank / nLocalRanks;
    this->nInterRanks = nRanks / nLocalRanks;
294
    rank_phys_set     = new scclRankPhysSet(nRanks); // 假设需要动态分配
295
296
297
298
299
300
301
302
303
};

void BootstrapComm::destroy() {
    printf("BootstrapComm 析构函数, rank=%d\n", rank);
    if(rank_phys_set) {
        delete rank_phys_set; // 释放动态分配的内存
    }
}

304
305
306
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
// 构造函数
307
Bootstrap::Bootstrap(const BootstrapHandle_t* handle, int rank, int nRanks)
308
    : root_handle(handle), rank(rank), nRanks(nRanks), localRank(-1), nLocalRanks(0), socketInitDone(false) {
309
    printf("Bootstrap 构造函数\n");
310
    scclCalloc(&all_node_basic, nRanks);
311
}
312

313
Bootstrap::~Bootstrap() {
314
    printf("Bootstrap 析构函数\n");
315
316
317
    if(ipcsocket) {
        delete ipcsocket;
    }
318
319
    if(all_node_basic)
        free(all_node_basic);
320
}
321

322
scclResult_t Bootstrap::init(BootstrapComm_t* bootstrap_comm) {
323
324
325
326
327
328
329
330
331
332
    // 如果已经初始化,直接返回成功
    if(asm_ops::ld_acquire_sys_global(&socketInitDone))
        return scclSuccess;

    // 加锁以确保初始化过程的线程安全
    pthread_mutex_lock(&bootstrapMutex);

    // -------------------------- 1.获取自身基础信息 ----------------------------------- //
    SCCLCHECK(basicInit());
    // 设置基础信息
333
    BootstrapNodeBasic_t node_basic = {};
334
335
336

    // -------------------------- 2.设置0号rank搜集的CPU信息和localRank信息 ----------------------------------- //
    // 创建根节点的数据收集
337
    SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic));
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354

    // -------------------------- 3.设置本地localRank的BootstrapComm信息 ----------------------------------- //
    // 初始化BootstrapComm类
    bootstrap_comm->init(rank, nRanks, localRank, nLocalRanks);
    if(CPU_COUNT(&bootstrap_comm->cpuAffinity)) {
        sched_setaffinity(0, sizeof(cpu_set_t), &bootstrap_comm->cpuAffinity);
    }
    bootstrap_comm->magic = root_handle->magic;

    //////// 设置显卡状态 ////////
    bootstrap_comm->hipDev = localRank; // CUDA 设备 ID
    uint32_t devices_num;
    SCCLCHECK(rocm_smi_init());                                // 初始化ROCM SMI库
    SCCLCHECK(rocm_smi_getNumDevice(&devices_num));            // 获取设备数量
    LTCHECK(devices_num, 0);                                   // 检查设备数量是否 devices_num>0
    LTCHECK(devices_num, nLocalRanks);                         // 检查设备数量是否 devices_num>nLocalRanks
    bootstrap_comm->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给的deviceCnt
355
356
    LECHECK(devices_num, bootstrap_comm->hipDev);              // 检查hipDev是否小于deviceCnt
    HIPCHECK(hipSetDevice(bootstrap_comm->hipDev));            // 设置当前设备为hipDev
357
358
359
360

    //////// 设置启动通信的scclNet ////////
    // 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB"
    const char* envNetName = getenv("SCCL_NET_NAME");
361
362
    // char* netName          = (envNetName != NULL) ? strdup(envNetName) : strdup("IB");
    char* netName = strdup("IB");
363
364
365
366
367
368
    // 初始化网络和引导网络
    SCCLCHECK(net::scclNetInit(netName, bootstrap_comm->scclNet));
    // 释放分配的网络名称字符串
    free(netName);

    //////// 初始化唯一信息结构体 ////////
369
370
371
372
373
374
375
    scclRankInfo_t local_rank_info;
    local_rank_info.hostHash = node_basic.hostHash;
    SCCLCHECK(bootstrapCommInitNodeInfo(bootstrap_comm->scclNet, &local_rank_info));
    memcpy(&(local_rank_info.cpu.listen_sock), &(node_basic.sock), sizeof(scclSocket_t));
#if 1
    printf("devices_num=%d, local_rank_info.net.count=%d\n", bootstrap_comm->deviceCnt, local_rank_info.net.count);
#endif
376

377
378
    // 将每个节点的`rank_info`信息收集到`rank_phys_set`中,以便后续使用
    SCCLCHECK(bootstrapAllGather(&local_rank_info, bootstrap_comm->rank_phys_set->rank_info_vec.data(), sizeof(scclRankInfo_t)));
379
380
381
382
383

    // 设置初始化标志
    asm_ops::st_release_sys_global(&socketInitDone, true);
    // 解锁
    pthread_mutex_unlock(&bootstrapMutex);
384
385
386
387

    return scclSuccess;
}

388
///////////////////////////////////////////////////////////////////////////
389
/**
390
 * @brief 执行根节点的聚集和广播操作
391
 *
392
393
394
395
396
397
 * 该函数负责在bootstrap过程中完成以下操作:
 * 1. 各rank首先设置监听
 * 2. 向根节点发送基础数据
 * 3. 从根节点接收nLocalRanks值
 * 4. 当localRank为0时,从根节点接收所有rank的IP数据
 * 5. 将IP数据广播给节点内其他rank
398
 *
399
 * @param send_data_basic 指向要发送的基础数据的指针
400
 * @return scclResult_t 返回操作结果,成功返回scclSuccess
401
 */
402
scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(BootstrapNodeBasic_t* send_data_basic) {
403
404
405
    // 总的需要广播的数据
    scclSocketAddress_t root_addr       = root_handle->addr;
    scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
406
407

    // ------------- 1.各个rank在发送给根节点数据之前,首先设置监听listen ------------- //
408
    net::net_socket::scclSocketServerManager local_server_manager(&localSocketAddr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
409

410
411
412
413
    // 设置基础数据
    send_data_basic->rank     = rank;
    send_data_basic->nRanks   = nRanks;
    send_data_basic->hostHash = getHostHash();
414

415
416
417
418
419
420
    scclSocket_t* local_server_sock = local_server_manager.releaseSocket();
    send_data_basic->sock           = *local_server_sock;

    // ------------- 2.各个节点向根节点发送数据 ------------- //
    {
        net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
421
        SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(BootstrapNodeBasic_t)));
422
    }
423
424
425
    // ------------- 3.从根节点接收nLocalRanks值 ------------- //
    // 接收nLocalRanks信息
    {
426
        net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
427
428
        SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &nLocalRanks, sizeof(int)));
    }
429

430
431
432
    // 要求必须 nRanks%nLocalRanks == 0
    NEQCHECK(nRanks % nLocalRanks, 0);

433
    // ------------- 4.nLocalRanks==0时,从根节点接收所有rank的ip数据 ------------- //
434
435
436
437
    this->localRank   = rank % nLocalRanks;
    this->interRank   = rank / nLocalRanks;
    this->nInterRanks = nRanks / nLocalRanks;

438
439
    int all_node_basic_size = nRanks * sizeof(BootstrapNodeBasic_t);

440
    // 从根节点接收数据,对应到函数 bootstrapRoot
441
    if(localRank == 0) {
442
        net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
443
        SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), all_node_basic, all_node_basic_size));
444
    }
445

446
    printf("all_node_basic_size=%d\n", all_node_basic_size);
447
448
    // ------------- 5.nLocalRanks==0时,将所有rank的ip数据广播给节点内其他rank ------------- //
    ipcsocket = new scclIpcSocket_t(localRank, nLocalRanks, /*hash*/ root_handle->magic);
449
    ipcsocket->scclIpcSocketBroadcast(all_node_basic, all_node_basic_size, /*localRank root*/ 0);
450

451
    return scclSuccess;
452
453
}

454
455
456
/**
 * @brief 初始化节点通信信息
 *
457
 * 该函数用于初始化节点的通信信息,包括基础信息和硬件信息。
458
 *
459
460
 * @param scclNet 网络设备句柄
 * @param rank_info 节点信息结构体指针
461
 * @return scclResult_t 返回操作结果,成功返回scclSuccess
462
463
464
465
466
467
468
469
470
471
472
 *
 * @note 基础信息包括:
 * - rank: 当前节点的全局排名
 * - localRank: 本地计算节点中的排名
 * - pidHash: 进程ID哈希值
 *
 * @note 硬件信息包括:
 * - GPU信息: 设备号、名称、GCN架构、计算能力、PCI总线ID
 * - RDMA信息: 网卡数量、名称、PCI路径、GUID、指针支持类型、端口速度、端口号、延迟、最大通信数和接收数
 *
 * @todo 更多硬件信息可参考ncclTopoGetXmlFromSys函数实现
473
 */
474
scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, scclRankInfo_t* rank_info) {
475
    ////////////////// 设置基础信息 //////////////////
476
477
478
    rank_info->rank      = rank;         // 当前节点的全局排名
    rank_info->localRank = localRank;    // 当前节点在本地计算节点中的排名
    rank_info->pidHash   = getPidHash(); // 获取进程ID哈希值并赋值给的pidHash
479
480
481
    int hipDev           = localRank;

    ////////////////// 设置硬件信息 //////////////////
482
483
    //// 1.设置GPU信息
    rank_info->gpu.dev = hipDev;
484
485
    hipDeviceProp_t deviceProp;
    HIPCHECK(hipGetDeviceProperties(&deviceProp, hipDev));
486
487
488
489
490
491
492
493
494
495
496
497
498
    snprintf(rank_info->gpu.name, sizeof(rank_info->gpu.name), "%s", deviceProp.name);
    snprintf(rank_info->gpu.gcn, sizeof(rank_info->gpu.gcn), "%s", deviceProp.gcnArchName);
    rank_info->gpu.compCap = deviceProp.major * 10 + deviceProp.minor;
    // 设置GPU的busId
    SCCLCHECK(getBusId(hipDev, &rank_info->gpu.pciBusId));
    // 根据GPU的busId设置pci路径
    char busIdStr[] = "00000000:00:00.0";
    char* gpuPath   = NULL;
    SCCLCHECK(int64ToBusId(rank_info->gpu.pciBusId, busIdStr));
    SCCLCHECK(getPciPath(busIdStr, &gpuPath));
    snprintf(rank_info->gpu.pciPath, sizeof(rank_info->gpu.pciPath), "%s", gpuPath); // 设备在/sys中的路径。

    //// 2.设置RDMA信息
499
500
    net::scclNetProperties_t props;
    SCCLCHECK(scclNet->getProperties(hipDev, &props));
501
502
503
    SCCLCHECK(scclNet->devices(&rank_info->net.count));                                    // 节点内网卡数量
    snprintf(rank_info->net.name, sizeof(rank_info->net.name), "%s", props.name);          // 主要用于日志记录。
    snprintf(rank_info->net.pciPath, sizeof(rank_info->net.pciPath), "%s", props.pciPath); // PCI设备在/sys中的路径。
504
#if 0
505
    printf("rank_info->net.pciPath len=%zu\n", strlen(rank_info->net.pciPath));
506
#endif
507

508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
    // TODO: 更多硬件信息参考ncclTopoGetXmlFromSys函数写法,可以通过 "/sys/class/drm/card1/device" 等路径读取
    rank_info->net.guid       = props.guid;       // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
    rank_info->net.ptrSupport = props.ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
    rank_info->net.speed      = props.speed;      // 端口速度,单位为Mbps。
    rank_info->net.port       = props.port;       // 端口号。
    rank_info->net.latency    = props.latency;    // 网络延迟
    rank_info->net.maxComms   = props.maxComms;   // 可以创建的最大通信数量
    rank_info->net.maxRecvs   = props.maxRecvs;   // 最大分组接收数量。

    return scclSuccess;
}

// TODO: 后续可以采用优化,先节点内allgather,再节点间的allgather,最后节点内的Broadcast。优化的算法并保证正确性
/**
 * @brief 实现跨节点的AllGather通信操作
 *
 * 该函数实现了一个跨节点的AllGather通信操作,包括节点内通信和节点间通信。
 * 在节点内通信中,使用IPC套接字进行AllGather操作;在节点间通信中,使用Ring AllGather算法进行数据传输。
 * 最后,通过节点内通信的Broadcast操作,将收集到的数据分发给所有节点内的进程。
 *
 * @param src_data 源数据指针,表示每个节点要发送的数据
 * @param dst_data 目标数据指针,表示所有节点收集到的数据将存储在此处
 * @param data_size 每个节点要发送的数据大小(以字节为单位)
 * @return scclResult_t 返回操作结果状态码:
 *     - scclSuccess: 操作成功
 *     - 其他错误码: 表示操作失败
 *
 * @note 该函数假设所有节点的本地秩(localRank)和节点间秩(interRank)已经正确设置。
 *    此外,该函数还假设所有节点的基本信息(如套接字地址)已经通过其他途径正确获取并存储在all_node_basic向量中。
 *    在节点间通信中,使用了Ring AllGather算法,该算法在nRanks特别大的时候可能不是最优的选择,可以考虑进一步优化算法以减少通信次数。
 */
539
scclResult_t Bootstrap::bootstrapAllGather(const void* src_data, void* dst_data, int data_size) const {
540
    // 数据准备
541
542
    size_t inter_data_len = nLocalRanks * data_size; // 节点间传输时每个子块的大小
    auto all_recv_data    = reinterpret_cast<char*>(dst_data);
543

544
    //// 1.节点内通信 allgather
545
    auto local_recv_data = all_recv_data + this->interRank * inter_data_len;
546
    ipcsocket->scclIpcSocketAllgather(src_data, (void*)local_recv_data, data_size);
547

548
549
550
551
552
553
554
    if(nInterRanks <= 1) {
        return scclSuccess;
    }

    //// 2.节点间通信,ring allgather
    // TODO: 后续nRanks特别大的时候,可以进一步优化算法,减少通信次数。
    // 因为节点内信息是已经 allgather了的,所以节点间的传输可以不仅仅依赖localRank == 0,每个localRank都可以用上,即分组
555
556
    if(localRank == 0) {
        int next_interRank = (this->interRank + 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks;
557
        //// 对于prev,当前rank是客户端;对于next,当前rank是服务器端
558
        // 客户端:用于发送数据
559
        scclSocket_t next_rank_sock = all_node_basic[next_interRank].sock;
560
561
        net::net_socket::scclSocketClientManager client_manager(&next_rank_sock.addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
        // 服务器端:用于接收数据
562
        scclSocket_t self_rank_sock = all_node_basic[rank].sock;
563
564
565
566
567
568
569
570
571
572
573
574
575
576
        net::net_socket::scclSocketAcceptManager accept_manager(&self_rank_sock);
        /////////////////// 实现数据传输 ///////////////////
        for(int r = 0; r < nInterRanks - 1; r++) {
            int prev_rank = (this->interRank - r - 1 + nInterRanks) % nInterRanks;
            int next_rank = (this->interRank - r + nInterRanks) % nInterRanks;
            // 准备发送/接收的数据
            auto send_data = all_recv_data + next_rank * inter_data_len;
            auto recv_data = all_recv_data + prev_rank * inter_data_len;
            // 发送/接收数据
            SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data, inter_data_len));
            SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data, inter_data_len));
        }
    }

577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
    //// 3.节点内通信 broadcast
    ipcsocket->scclIpcSocketBroadcast(all_recv_data, nRanks * data_size, 0);

    return scclSuccess;
}

///////////////////////////////////////////////////////////////////////////////////////////////////////
// 函数:打印 scclRankInfo 结构体的信息
scclResult_t printRankInfo(const std::string& prefix, scclRankInfo_t* info) {
    char addrline[net::SOCKET_NAME_MAXLEN + 1];

    // if(info->localRank == 0) {
    if(1) {
        // 将GPU的pciBusId转换为字符串格式
        char busIdhip[16];
        SCCLCHECK(int64ToBusId(info->gpu.pciBusId, busIdhip));

        printf("==========================================\n"
               "%s, Total Rank: %d, Local Rank: %d, Host Hash: %lu, PID Hash: %lu\n"
               "gpu: dev=%d, gpu.name=%s, gcn=%s, compCap=%d\n"
               "gpu: busId=%s, gpuPath=%s\n"
               "net: count=%d, device name=%s, pciPath=%s, guid=%lu, ptrSupport=%u, speed=%d, port=%d, latency=%f, maxComms=%d, maxRecvs=%d\n"
               "cpu: socketAddr=%s\npci: busId=%ld"
               "\n==========================================\n",
               prefix.c_str(),
               info->rank,
               info->localRank,
               info->hostHash,
               info->pidHash,
               info->gpu.dev,
               info->gpu.name,
               info->gpu.gcn,
               info->gpu.compCap,
               busIdhip,
               info->gpu.pciPath,
               info->net.count,
               info->net.name,
               info->net.pciPath,
               info->net.guid,
               static_cast<unsigned int>(info->net.ptrSupport),
               info->net.speed,
               info->net.port,
               info->net.latency,
               info->net.maxComms,
               info->net.maxRecvs,
               net::net_socket::scclSocketToString(&info->cpu.listen_sock.addr, addrline),
               info->gpu.pciBusId);
    }
625
626
627
628
629
630
631
632

    return scclSuccess;
}

} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl