bootstrap.cpp 27.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <sys/resource.h>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <chrono>
#include <ctime>
#include <cstdint>
11
#include <memory> // for std::unique_ptr
12
13
14
15
16
17

#include "bootstrap.h"

namespace sccl {
namespace hardware {
namespace topology {
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * @brief 执行根节点的数据收集和广播操作
 *
 * 该函数负责以下操作:
 * 1. 设置本地监听服务
 * 2. 向根节点发送本节点的基本数据
 * 3. 从根节点接收本地rank数量信息
 * 4. 当本地rank为0时,从根节点接收所有rank的IP数据
 * 5. 将收集到的所有rank数据广播给节点内其他rank
 *
 * @param send_data_basic 发送给根节点的节点基础数据
 * @param recv_data_basic 接收广播数据的缓冲区向量
 * @return scclResult_t 返回操作结果,成功返回scclSuccess
 */
32
33
namespace bootstrap {

34
35
36
37
////////////////////////////////////////////////////////////////////////////////////////////////////////
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER; // 线程锁
static bool initialized  = false;                     // 标志是否已经初始化
bool hsaFineGrainFlag    = true;                      // 标志变量,用于指示是否启用HSAP细粒度标志
38

39
static scclResult_t basicInit() {
40
41
42
43
44
45
46
47
    // 如果已经初始化,直接返回成功
    if(asm_ops::ld_acquire_sys_global(&initialized))
        return scclSuccess;

    // 加锁以确保初始化过程的线程安全
    pthread_mutex_lock(&initLock);
    // 如果尚未初始化,进行初始化操作
    if(!initialized) {
48
49
50
51
52
        initEnv(); // 初始化环境
        // 始终初始化引导网络
        SCCLCHECK(bootstrapNet::bootstrapNetInit());
        // initGdrCopy(); // 初始化GDR复制
        // SCCLCHECK(scclNetPluginInit());
53
#if 0
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        char strValue[1024];
        // 检查NUMA自动平衡是否启用
        SCCLCHECK(scclTopoGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue));
        if(strcmp(strValue, "1") == 0)
            WARN("NUMA自动平衡已启用,这可能导致RCCL性能的不稳定性!通过\"sudo sysctl kernel.numa_balancing=0\"禁用");
        // 获取内核版本信息
        SCCLCHECK(scclTopoGetStrFromSys("/proc", "version", strValue));
        char *verStr, *state;
        verStr = strtok_r(strValue, " ", &state);
        for(int i = 0; i < 2; i++) {
            verStr = strtok_r(NULL, " ", &state);
            if(verStr == NULL)
                break;
        }
        INFO(SCCL_LOG_BOOTSTRAP, "内核版本: %s", verStr);
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
        // 检查是否为Cray系统
        if(strstr(verStr, "cray") == NULL) {
            // 获取BIOS版本信息
            SCCLCHECK(scclTopoGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue));
            if(strncmp("Hyper-V UEFI Release", strValue, 20) != 0) {
                FILE* file;
                // 读取内核命令行参数
                if((file = fopen("/proc/cmdline", "r")) != NULL) {
                    if(feof(file) == 0 && ferror(file) == 0) {
                        int len       = fread(strValue, 1, 1024, file);
                        strValue[len] = '\0';
                    }
                    fclose(file);
                }
                // 检查是否缺少"iommu=pt"参数
                if(strstr(strValue, "iommu=pt") == NULL)
                    WARN("内核命令行中缺少\"iommu=pt\"参数,这可能导致系统不稳定或挂起!");
            }
87

88
89
90
91
92
93
            float* ptr;
            // 尝试分配细粒度PCIe内存
            hipError_t err = hipExtMallocWithFlags((void**)&ptr, 128, hipDeviceMallocFinegrained);
            if(err != hipSuccess)
                hsaFineGrainFlag = false;
        }
94
#endif
95

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
        // 设置初始化标志
        asm_ops::st_release_sys_global(&initialized, true);
    }
    // 解锁
    pthread_mutex_unlock(&initLock);
    return scclSuccess;
}

scclResult_t bootstrapGetUniqueId(struct BootstrapHandle* handle) {
    SCCLCHECK(basicInit());
    // 在每个进程中设置 handle 的值
    getRandomData(&handle->magic, sizeof(handle->magic));

    const char* env = getenv("SCCL_COMM_ID");
    if(env) {
        memset(&handle->magic, 0, sizeof(handle->magic));
        INFO(SCCL_LOG_BOOTSTRAP, "SCCL_COMM_ID set by environment to %s", env);
        if(scclSocketGetAddrFromString(&handle->addr, env) != scclSuccess) {
            WARN("Invalid SCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
            return scclInvalidArgument;
        }
    } else {
        // 初始化socket
        scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
        memcpy(&handle->addr, &localSocketAddr, sizeof(scclSocketAddress_t));
        // 启动根节点listen监听
        SCCLCHECK(bootstrapCreateRoot(handle));
    }
    return scclSuccess;
}

static scclResult_t setFilesLimit() {
    struct rlimit filesLimit;
    SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit");
    filesLimit.rlim_cur = filesLimit.rlim_max;
    SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit");
132
133
134
135
    return scclSuccess;
}

/**
136
137
138
139
140
141
 * @brief 根节点引导程序,负责收集所有rank的地址信息并广播给其他rank
 * 由于同一个socket数据传输比较慢,所以在进行数据广播时,仅传送给localRank==0的rank,再由其进行节点内广播
 * 该函数所有数据传输与 Bootstrap::bootstrapRootGatherAndBroadcast 函数相配合
 *
 * @param rargs 包含监听套接字和验证魔数的参数结构体
 * @return void* 总是返回NULL
142
 *
143
144
145
146
147
148
149
150
 * 该函数执行以下主要操作:
 * 1. 初始化资源并设置文件描述符限制
 * 2. 循环接收所有rank的连接请求,收集地址信息
 * 3. 验证接收到的rank信息一致性
 * 4. 计算本地rank数量(nLocalRanks)
 * 5. 使用线程池并行发送nLocalRanks值给所有rank
 * 6. 将收集到的所有rank地址信息广播给每个节点的localRank=0的进程
 * 7. 清理资源并返回
151
 *
152
 * @note 函数使用线程池加速消息分发,并通过日志记录关键操作步骤
153
 */
154
155
156
157
158
159
160
static void* bootstrapRoot(void* rargs) {
    struct bootstrapRootArgs* args = (struct bootstrapRootArgs*)rargs;
    scclSocket_t* listenSock       = args->listenSock; // 用于监听的套接字
    uint64_t magic                 = args->magic;      // 用于验证的魔数
    scclResult_t res               = scclSuccess;      // 函数结果
    class ThreadPool* pthread_pool = nullptr;          // 用于根节点分发消息的线程池

161
162
163
164
165
    int nRanks                                     = 0; // nRanks: 进程总数;
    int nLocalRanks                                = 1;
    int c                                          = 0; // c: 已连接的进程计数
    uint64_t rootHostHash                          = 0;
    struct BootstrapNodeBasic node_basic           = {};      // 用于存储扩展信息的结构体
166
167
    struct BootstrapNodeBasic* all_rank_node_basic = nullptr; // 所有进程的地址

168
    // 定义一个函数或者一个函数对象,用于执行实际的发送数据操作。在后面执行
169
    auto send_task = [](BootstrapNodeBasic& node_basic, uint64_t magic, int rank, void* data, size_t size) {
170
        net::net_socket::scclSocketClientManager client_manager(&node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
        bootstrapNet::bootstrapNetSend(client_manager.getSocket(), data, size);
    };

    // 用于验证的工具进行初始化
    scclSocketAddress_t* zero = nullptr;           // 用于初始化或比较的零地址
    setFilesLimit();                               // 设置文件描述符限制
    SCCLCHECKGOTO(scclCalloc(&zero, 1), res, out); // 为zero分配内存

    INFO(SCCL_LOG_BOOTSTRAP, "BEGIN"); // 日志:开始
    // --------------------- 1.从所有rank接收其socket地址(BootstrapNodeBasic) --------------------- //
    do {
        net::net_socket::scclSocketAcceptManager accept_manager(listenSock);
        SCCLCHECKGOTO(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &node_basic, sizeof(node_basic)), res, out); // 接收数据

        if(c == 0) {
            nRanks = node_basic.nRanks;
            SCCLCHECKGOTO(scclCalloc(&all_rank_node_basic, nRanks), res, out); // 为rankAddresses分配内存
            pthread_pool = new ThreadPool(nRanks);
        } else if(nRanks != node_basic.nRanks) {                                                           // 如果接收到的进程总数不匹配
            WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nRanks, node_basic.nRanks); // 警告
            goto out;                                                                                      // 跳转到out标签
        }
193
194
195
        if(memcmp(zero, &all_rank_node_basic[node_basic.rank].sock.addr, sizeof(scclSocketAddress_t)) != 0) { // 如果rank已经签到
            WARN("Bootstrap Root : rank %d of %d ranks has already checked in", node_basic.rank, nRanks);     // 警告
            goto out;                                                                                         // 跳转到out标签
196
        }
197

198
199
200
201
202
203
        // 保存该rank的连接句柄
        memcpy(all_rank_node_basic + node_basic.rank, &node_basic, sizeof(struct BootstrapNodeBasic));
        ++c;                                                                                               // 增加已连接的进程计数
        INFO(SCCL_LOG_BOOTSTRAP, "Received connect from rank %d total %d/%d", node_basic.rank, c, nRanks); // 日志
    } while(c < nRanks); // 当已连接的进程数小于总数时循环
    INFO(SCCL_LOG_BOOTSTRAP, "COLLECTED ALL %d HANDLES", nRanks); // 日志:收集到所有句柄
204

205
206
207
208
209
210
211
212
213
214
    // --------------------- 2.计算nLocalRanks,并广播给其他所有rank --------------------- //
    // 首先计算nLocalRanks大小,即具有相同hostHash的节点数量
    rootHostHash = all_rank_node_basic[0].hostHash;
    for(int i = 1; i < nRanks; ++i) {
        if(rootHostHash == all_rank_node_basic[i].hostHash) {
            nLocalRanks++; // 如果hostHash相同,则增加本地节点计数
        } else {
            break; // 一旦发现不同的hostHash,停止计数
        }
    }
215
    // 给每个节点发送localRank的值
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    for(int r = 0; r < nRanks; ++r) {
        auto dst_node_basic = all_rank_node_basic[r];
        // 使用std::bind将参数绑定到send_task函数
        auto bound_task = std::bind(send_task, dst_node_basic, magic, r, &nLocalRanks, sizeof(int));
        // 将绑定后的任务添加到线程池
        pthread_pool->enqueue(bound_task);
    }
    // 等待所有任务完成
    while(!pthread_pool->allTasksCompleted()) {
        usleep(1000); // 每1毫秒检查一次任务完成状态
    }

    // --------------------- 3.给所有localRank==0的rank发送all_rank_node_basic数据 --------------------- //
    // 给每个节点的localRank=0的进程发送信息,并由其进行广播,从而加快速度
    for(int r = 0; r < nRanks / nLocalRanks; ++r) {
        int dst_rank        = r * nLocalRanks; // 计算目标rank
        auto dst_node_basic = all_rank_node_basic[dst_rank];
233
        net::net_socket::scclSocketClientManager client_manager(&dst_node_basic.sock.addr, magic, net::net_socket::scclSocketTypeBootstrap);
234
235
236
237
238
239
240
        bootstrapNet::bootstrapNetSend(client_manager.getSocket(), all_rank_node_basic, sizeof(struct BootstrapNodeBasic) * nRanks);
        printf("root send nLocalRanks value to rank=%d\n", r);
    }
    // 等待所有任务完成
    while(!pthread_pool->allTasksCompleted()) {
        usleep(1000); // 每1毫秒检查一次任务完成状态
    }
241

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    INFO(SCCL_LOG_BOOTSTRAP, "bootstrap send out all %d handles", nRanks); // 日志:发送出所有句柄
out:
    // 关闭套接字,并释放内存
    if(listenSock) {
        scclSocketClose(listenSock);
        delete listenSock;
    }
    // 释放内存
    if(all_rank_node_basic)
        free(all_rank_node_basic);
    if(zero)
        free(zero);
    if(pthread_pool)
        delete pthread_pool;
    free(rargs);                      // 释放rargs内存
    INFO(SCCL_LOG_BOOTSTRAP, "DONE"); // 日志:完成
258

259
    return NULL;
260
}
261

262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
/**
 * 创建并启动bootstrap根节点
 *
 * 该函数负责初始化监听socket,创建并启动一个独立的线程来处理bootstrap根节点逻辑。
 * 线程会被设置为detach状态,无需等待其结束。
 *
 * @param handle 包含bootstrap配置信息的句柄
 * @return 成功返回scclSuccess,失败返回相应的错误码
 */
scclResult_t bootstrapCreateRoot(struct BootstrapHandle* handle) {
    struct bootstrapRootArgs* args;
    pthread_t thread;

    // 设置根节点socket监听
    net::net_socket::scclSocketServerManager root_manager(&handle->addr, handle->magic, net::net_socket::scclSocketTypeBootstrap);

    // 为args分配内存
    SCCLCHECK(scclCalloc(&args, 1));
    // 设置线程参数
    args->listenSock = root_manager.releaseSocket();
    args->magic      = handle->magic;

    // 创建线程以执行bootstrapRoot函数, 直到线程结束才释放listenSock
    NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0);
    // 设置线程名称
    scclSetThreadName(thread, "SCCL BootstrapR");
    // 分离线程,使其在完成后自动回收资源
    NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d
290

291
292
    return scclSuccess;
}
293

294
295
296
297
298
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
// 构造函数
Bootstrap::Bootstrap(const struct BootstrapHandle* handle, int rank, int nRanks)
    : root_handle(handle), rank(rank), nRanks(nRanks), localRank(-1), nLocalRanks(0), socketInitDone(false) {
299
    printf("Bootstrap 构造函数\n");
300
}
301

302
Bootstrap::~Bootstrap() {
303
    printf("Bootstrap 析构函数\n");
304
305
306
307
    if(ipcsocket) {
        delete ipcsocket;
    }
}
308

309
310
311
312
313
314
315
316
317
318
319
scclResult_t Bootstrap::init(struct BootstrapComm* bootstrap_comm) {
    // 如果已经初始化,直接返回成功
    if(asm_ops::ld_acquire_sys_global(&socketInitDone))
        return scclSuccess;

    // 加锁以确保初始化过程的线程安全
    pthread_mutex_lock(&bootstrapMutex);

    // -------------------------- 1.获取自身基础信息 ----------------------------------- //
    SCCLCHECK(basicInit());
    // 设置基础信息
320
    struct BootstrapNodeBasic node_basic = {};
321
322
323

    // -------------------------- 2.设置0号rank搜集的CPU信息和localRank信息 ----------------------------------- //
    // 创建根节点的数据收集
324
325
    std::vector<struct BootstrapNodeBasic> all_node_basic;
    all_node_basic.reserve(nRanks);
326
    SCCLCHECK(bootstrapRootGatherAndBroadcast(&node_basic, all_node_basic));
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344

    // -------------------------- 3.设置本地localRank的BootstrapComm信息 ----------------------------------- //
    // 初始化BootstrapComm类
    bootstrap_comm->init(rank, nRanks, localRank, nLocalRanks);

    if(CPU_COUNT(&bootstrap_comm->cpuAffinity)) {
        sched_setaffinity(0, sizeof(cpu_set_t), &bootstrap_comm->cpuAffinity);
    }
    bootstrap_comm->magic = root_handle->magic;

    //////// 设置显卡状态 ////////
    bootstrap_comm->hipDev = localRank; // CUDA 设备 ID
    uint32_t devices_num;
    SCCLCHECK(rocm_smi_init());                                // 初始化ROCM SMI库
    SCCLCHECK(rocm_smi_getNumDevice(&devices_num));            // 获取设备数量
    LTCHECK(devices_num, 0);                                   // 检查设备数量是否 devices_num>0
    LTCHECK(devices_num, nLocalRanks);                         // 检查设备数量是否 devices_num>nLocalRanks
    bootstrap_comm->deviceCnt = static_cast<int>(devices_num); // 将设备数量转换为int并赋值给的deviceCnt
345
#if 0 
346
    printf("devices_num=%d\n", bootstrap_comm->deviceCnt);
347
#endif
348
349
350
351
352
353
354
    LECHECK(devices_num, bootstrap_comm->hipDev);   // 检查hipDev是否小于deviceCnt
    HIPCHECK(hipSetDevice(bootstrap_comm->hipDev)); // 设置当前设备为hipDev

    //////// 设置启动通信的scclNet ////////
    // 获取环境变量SCCL_NET_NAME的值,如果不存在则默认使用"IB"
    const char* envNetName = getenv("SCCL_NET_NAME");
    char* netName          = (envNetName != NULL) ? strdup(envNetName) : strdup("IB");
355
#if 0
356
    printf("netName=%s\n", netName);
357
#endif
358
359
360
361
362
363
364
    // 初始化网络和引导网络
    SCCLCHECK(net::scclNetInit(netName, bootstrap_comm->scclNet));
    // 释放分配的网络名称字符串
    free(netName);

    //////// 初始化唯一信息结构体 ////////
    struct scclNodeInfo local_node_info;
365
366
    // 补充定义
    local_node_info.hostHash = node_basic.hostHash;
367
    SCCLCHECK(bootstrapCommInitNodeInfo(bootstrap_comm->scclNet, &local_node_info));
368
369
370
371
372
373
374
375
376
377
378
    // 设置CPU信息
    memcpy(&(local_node_info.localNode.cpu.listen_sock), &(node_basic.sock), sizeof(scclSocket_t));

#if 0
        {
            char line[20];
            sprintf(line, "11111 print rank=%d", rank);
            std::string prefix(line);                // 创建prefix字符串
            printNodeInfo(prefix, &local_node_info); // 正确的调用方式
        }
#endif
379
380

    // -------------------------- 4.BootstrapComm信息的allgather ----------------------------------- //
381
382
383
384
385
386
387
388
389
390
391
    bootstrapCommAllGather(all_node_basic, &local_node_info, bootstrap_comm->node_info_set);

    if(1) {
        char line[20];
        sprintf(line, "print rank=%d", rank);
        std::string prefix(line); // 创建prefix字符串
        for(int r = 0; r < nRanks; r++) {
            struct scclNodeInfo node_basic = bootstrap_comm->node_info_set->node_info_vec[r];
            printNodeInfo(prefix, &node_basic); // 正确的调用方式
        }
    }
392
393
394
395
396

    // 设置初始化标志
    asm_ops::st_release_sys_global(&socketInitDone, true);
    // 解锁
    pthread_mutex_unlock(&bootstrapMutex);
397
398
399
400

    return scclSuccess;
}

401
///////////////////////////////////////////////////////////////////////////
402
/**
403
 * @brief 执行根节点的数据收集和广播操作
404
 *
405
406
407
408
409
410
 * 该函数负责以下操作:
 * 1. 设置本地监听服务
 * 2. 向根节点发送本节点的基本数据
 * 3. 从根节点接收本地rank数量信息
 * 4. 当本地rank为0时,从根节点接收所有rank的IP数据
 * 5. 将收集到的所有rank数据广播给节点内其他rank
411
 *
412
413
414
 * @param send_data 发送给根节点的数据指针
 * @param recv_data 接收广播数据的缓冲区指针
 * @return scclResult_t 返回操作结果,成功返回scclSuccess
415
 */
416
417
418
419
420
scclResult_t Bootstrap::bootstrapRootGatherAndBroadcast(struct BootstrapNodeBasic* send_data_basic, std::vector<struct BootstrapNodeBasic>& recv_data_basic) {
    // 总的需要广播的数据
    int recv_data_basic_size            = nRanks * sizeof(struct BootstrapNodeBasic);
    scclSocketAddress_t root_addr       = root_handle->addr;
    scclSocketAddress_t localSocketAddr = bootstrapNet::getLocalSocketAddr();
421
422

    // ------------- 1.各个rank在发送给根节点数据之前,首先设置监听listen ------------- //
423
    net::net_socket::scclSocketServerManager local_server_manager(&localSocketAddr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
424

425
426
427
428
    // 设置基础数据
    send_data_basic->rank     = rank;
    send_data_basic->nRanks   = nRanks;
    send_data_basic->hostHash = getHostHash();
429

430
431
432
433
434
435
436
437
    scclSocket_t* local_server_sock = local_server_manager.releaseSocket();
    send_data_basic->sock           = *local_server_sock;

    // ------------- 2.各个节点向根节点发送数据 ------------- //
    {
        net::net_socket::scclSocketClientManager client_manager(&root_addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
        SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data_basic, sizeof(struct BootstrapNodeBasic)));
    }
438
439
440
    // ------------- 3.从根节点接收nLocalRanks值 ------------- //
    // 接收nLocalRanks信息
    {
441
        net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
442
443
        SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), &nLocalRanks, sizeof(int)));
    }
444

445
446
447
    // 要求必须 nRanks%nLocalRanks == 0
    NEQCHECK(nRanks % nLocalRanks, 0);

448
    // ------------- 4.nLocalRanks==0时,从根节点接收所有rank的ip数据 ------------- //
449
450
451
452
453
    this->localRank   = rank % nLocalRanks;
    this->interRank   = rank / nLocalRanks;
    this->nInterRanks = nRanks / nLocalRanks;

    // 从根节点接收数据,对应到函数 bootstrapRoot
454
    if(localRank == 0) {
455
456
        net::net_socket::scclSocketAcceptManager accept_manager(local_server_sock);
        SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data_basic.data(), recv_data_basic_size));
457
    }
458
459
460

    // ------------- 5.nLocalRanks==0时,将所有rank的ip数据广播给节点内其他rank ------------- //
    ipcsocket = new scclIpcSocket_t(localRank, nLocalRanks, /*hash*/ root_handle->magic);
461
    ipcsocket->scclIpcSocketBroadcast(recv_data_basic.data(), recv_data_basic_size, /*localRank root*/ 0);
462

463
    return scclSuccess;
464
465
}

466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/**
 * @brief 初始化节点通信信息
 *
 * 该函数用于初始化节点的通信信息,包括:
 * - 设置节点的全局排名和本地排名
 * - 获取并设置进程ID哈希值
 * - 设置GPU设备属性(名称、GCN架构、计算能力)
 * - 设置RDMA网络属性
 * - 设置PCI总线ID
 * - 设置CPU套接字地址
 *
 * @param scclNet 网络句柄
 * @param socket_addr 套接字地址
 * @param node_info 节点信息结构体指针
 * @return scclResult_t 返回操作结果,成功返回scclSuccess
 */
scclResult_t Bootstrap::bootstrapCommInitNodeInfo(scclNet_t* scclNet, struct scclNodeInfo* node_info) {
    ////////////////// 设置基础信息 //////////////////
    node_info->rank      = rank;         // 当前节点的全局排名
    node_info->localRank = localRank;    // 当前节点在本地计算节点中的排名
    node_info->pidHash   = getPidHash(); // 获取进程ID哈希值并赋值给的pidHash
    int hipDev           = localRank;

    ////////////////// 设置硬件信息 //////////////////
    struct topoLocalNode* p_localNode = &node_info->localNode;

    // 设置PCI信息
    SCCLCHECK(getBusId(hipDev, &p_localNode->pci.busId));

    // 设置GPU信息
    p_localNode->gpu.dev = hipDev;
    hipDeviceProp_t deviceProp;
    HIPCHECK(hipGetDeviceProperties(&deviceProp, hipDev));
    snprintf(p_localNode->gpu.name, sizeof(p_localNode->gpu.name), "%s", deviceProp.name);
    snprintf(p_localNode->gpu.gcn, sizeof(p_localNode->gpu.gcn), "%s", deviceProp.gcnArchName);
    p_localNode->gpu.compCap = deviceProp.major * 10 + deviceProp.minor;

    // 设置RDMA信息
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
    net::scclNetProperties_t props;
    SCCLCHECK(scclNet->getProperties(hipDev, &props));
    SCCLCHECK(scclNet->devices(&p_localNode->net.count));                                      // 节点内网卡数量
    snprintf(p_localNode->net.name, sizeof(p_localNode->net.name), "%s", props.name);          // 主要用于日志记录。
    snprintf(p_localNode->net.pciPath, sizeof(p_localNode->net.pciPath), "%s", props.pciPath); // PCI设备在/sys中的路径。
#if 0
    printf("p_localNode->net.pciPath len=%zu\n", strlen(p_localNode->net.pciPath));
#endif
    p_localNode->net.guid       = props.guid;       // NIC芯片的唯一标识符。对于具有多个PCI功能(物理或虚拟)的卡非常重要。
    p_localNode->net.ptrSupport = props.ptrSupport; // [SCCL_PTR_HOST|SCCL_PTR_CUDA|SCCL_PTR_DMABUF]
    p_localNode->net.speed      = props.speed;      // 端口速度,单位为Mbps。
    p_localNode->net.port       = props.port;       // 端口号。
    p_localNode->net.latency    = props.latency;    // 网络延迟
    p_localNode->net.maxComms   = props.maxComms;   // 可以创建的最大通信数量
    p_localNode->net.maxRecvs   = props.maxRecvs;   // 最大分组接收数量。
519
520
521
522

    return scclSuccess;
}

523
524
525
526
527
528
529
scclResult_t Bootstrap::bootstrapCommAllGather(std::vector<struct BootstrapNodeBasic>& all_node_basic,
                                               struct scclNodeInfo* node_info,
                                               struct scclNodeInfoSet* node_info_set) {
    // 数据准备
    size_t inter_data_len = nLocalRanks * sizeof(struct scclNodeInfo); // 节点间传输时每个子块的大小
    auto all_recv_data    = reinterpret_cast<char*>(node_info_set->node_info_vec.data());

530
    // 1.节点内通信 allgather
531
532
    auto local_recv_data = all_recv_data + this->interRank * inter_data_len;
    ipcsocket->scclIpcSocketAllgatherSync(node_info, (void*)local_recv_data, sizeof(struct scclNodeInfo));
533
534

    // 2.节点间通信,ring allgather
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
    // TODO: 后续nRanks特别大的时候,可以进一步优化算法,减少通信次数
    if(localRank == 0) {
        int prev_interRank = (this->interRank - 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks;
        int next_interRank = (this->interRank + 1 + this->nInterRanks) % this->nInterRanks * this->nLocalRanks;
        // scclSocket_t prev_rank_sock = all_node_basic[prev_interRank].sock;
        scclSocket_t next_rank_sock = all_node_basic[next_interRank].sock;
        scclSocket_t self_rank_sock = all_node_basic[rank].sock;

        printf("bootstrap allgather 11: rank %d, prev_interRank=%d, next_interRank=%d\n", rank, prev_interRank, next_interRank);

        // 对于prev,当前rank是客户端;对于next,当前rank是服务器端
        // 客户端:用于发送数据
        net::net_socket::scclSocketClientManager client_manager(&next_rank_sock.addr, root_handle->magic, net::net_socket::scclSocketTypeBootstrap);
        // 服务器端:用于接收数据
        net::net_socket::scclSocketAcceptManager accept_manager(&self_rank_sock);

        /////////////////// 实现数据传输 ///////////////////
        for(int r = 0; r < nInterRanks - 1; r++) {
            int prev_rank = (this->interRank - r - 1 + nInterRanks) % nInterRanks;
            int next_rank = (this->interRank - r + nInterRanks) % nInterRanks;
            // 准备发送/接收的数据
            auto send_data = all_recv_data + next_rank * inter_data_len;
            auto recv_data = all_recv_data + prev_rank * inter_data_len;
#if 0
            printf("bootstrapCommAllGather rank=%d, interRank=%d, r=%d, prev_rank=%d, next_rank=%d, inter_data_len=%zu\n",
                   this->rank,
                   this->interRank,
                   r,
                   prev_rank,
                   next_rank,
                   inter_data_len);
#endif
567

568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
            // 发送/接收数据
            SCCLCHECK(bootstrapNet::bootstrapNetSend(client_manager.getSocket(), send_data, inter_data_len));
            SCCLCHECK(bootstrapNet::bootstrapNetRecv(accept_manager.getSocket(), recv_data, inter_data_len));
        }
    }

#if 0
    printf("222222\n");
    if(rank == 0) {
        char line[20];
        sprintf(line, "print rank=%d", rank);
        std::string prefix(line); // 创建prefix字符串
        for(int r = 0; r < nRanks; r++) {
            struct scclNodeInfo node_basic = node_info_set->node_info_vec[r];
            printNodeInfo(prefix, &node_basic); // 正确的调用方式
        }
    }
#endif

    // 3.节点内通信 broadcast
    ipcsocket->scclIpcSocketBroadcast(all_recv_data, nRanks * sizeof(struct scclNodeInfo), 0);
589
590
591
592
593
594
595
596

    return scclSuccess;
}

} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl