bootstrap_net.cpp 4.35 KB
Newer Older
lishen's avatar
lishen committed
1
2
3
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
4
5
6
7
8
9
10
11
#include <sys/resource.h>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <chrono>
#include <ctime>
#include <cstdint>

lishen's avatar
lishen committed
12
13
14
15
16
17
18
#include "bootstrap_net.h"

namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {

19
20
21
22
23
24
25
26
27
28
29
bootstrapNet::bootstrapNet(struct scclBootstrapComm* bootstrap_comm) {
    auto unique_info = bootstrap_comm->unique_info;
    // 设置节点内socket通信工具
    ipcsocket = new scclIpcSocket_t(unique_info->localRank, unique_info->nRanks, unique_info->hostHash, bootstrap_comm->abortFlag);
}

bootstrapNet::~bootstrapNet() {
    if(ipcsocket) {
        delete ipcsocket;
    }
}
lishen's avatar
lishen committed
30
31
32
33

/**
 * @brief 初始化引导网络
 *
34
35
36
37
38
 * 该函数用于初始化SCCL的引导网络。
 * 如果设置了 NCCL_COMM_ID 环境变量,则查找一个和该环境变量中指定的 IP 地址处于同一子网的网卡作为 booststrap 网络通信所使用的网卡 bootstrapNetIfAddr
 * 否则,使用 ncclFindInterfaces 函数选择一个合适的网卡
 *
 * 函数使用互斥锁确保线程安全。
lishen's avatar
lishen committed
39
40
41
42
43
44
45
 *
 * @return scclResult_t 返回操作结果:
 *      - scclSuccess: 初始化成功
 *      - scclInvalidArgument: 无效的SCCL_COMM_ID格式
 *      - scclSystemError: 找不到匹配的网络接口
 *      - scclInternalError: 找不到可用的网络接口
 */
46
scclResult_t bootstrapNet::bootstrapNetInit() {
lishen's avatar
lishen committed
47
48
49
50
51
52
    if(bootstrapNetInitDone == 0) {
        pthread_mutex_lock(&bootstrapNetLock);
        if(bootstrapNetInitDone == 0) {
            char* env = getenv("SCCL_COMM_ID");
            if(env) {
                scclSocketAddress_t remoteAddr;
53
                if(net::net_socket::scclSocketGetAddrFromString(&remoteAddr, env) != scclSuccess) {
lishen's avatar
lishen committed
54
55
56
                    WARN("Invalid SCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
                    return scclInvalidArgument;
                }
57
                if(net::net_socket::scclFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, MAX_IF_NAME_SIZE, 1) <= 0) {
lishen's avatar
lishen committed
58
59
60
61
                    WARN("NET/Socket : No usable listening interface found");
                    return scclSystemError;
                }
            } else {
62
                int nIfs = net::net_socket::scclFindSocketInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, MAX_IF_NAME_SIZE, 1);
lishen's avatar
lishen committed
63
64
65
66
67
68
                if(nIfs <= 0) {
                    WARN("Bootstrap : no socket interface found");
                    return scclInternalError;
                }
            }
            char line[SOCKET_NAME_MAXLEN + MAX_IF_NAME_SIZE + 2];
69
70
71
            sprintf(line, "%s:", bootstrapNetIfName);
            net::net_socket::scclSocketToString(&bootstrapNetIfAddr, line + strlen(line));
            INFO(SCCL_LOG_BOOTSTRAP, "Bootstrap : Using %s", line);
lishen's avatar
lishen committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
            bootstrapNetInitDone = 1;
        }
        pthread_mutex_unlock(&bootstrapNetLock);
    }
    return scclSuccess;
}

// Additional sync functions
/**
 * 通过网络发送数据
 *
 * @param sock 已连接的socket指针
 * @param data 要发送的数据指针
 * @param size 要发送的数据大小(字节)
 * @return scclResult_t 返回操作结果(scclSuccess表示成功)
 *
 * @note 先发送数据大小(sizeof(int)),再发送实际数据
 */
90
91
92
scclResult_t bootstrapNet::bootstrapNetSend(scclSocket_t* sock, void* data, int size) {
    SCCLCHECK(net::net_socket::scclSocketSend(sock, &size, sizeof(int)));
    SCCLCHECK(net::net_socket::scclSocketSend(sock, data, size));
lishen's avatar
lishen committed
93
94
95
96
97
98
99
100
101
102
103
104
105
    return scclSuccess;
}

/**
 * 从socket接收数据
 *
 * @param sock 要接收数据的socket
 * @param data 接收数据的缓冲区
 * @param size 缓冲区大小
 * @return scclResult_t 返回操作结果,成功返回scclSuccess,否则返回错误码
 *
 * @note 如果接收到的数据大小超过缓冲区大小,会截断数据并返回scclInternalError
 */
106
scclResult_t bootstrapNet::bootstrapNetRecv(scclSocket_t* sock, void* data, int size) {
lishen's avatar
lishen committed
107
    int recvSize;
108
    SCCLCHECK(net::net_socket::scclSocketRecv(sock, &recvSize, sizeof(int)));
lishen's avatar
lishen committed
109
110
111
112
    if(recvSize > size) {
        WARN("Message truncated : received %d bytes instead of %d", recvSize, size);
        return scclInternalError;
    }
113
    SCCLCHECK(net::net_socket::scclSocketRecv(sock, data, std::min(recvSize, size)));
lishen's avatar
lishen committed
114
115
116
117
118
119
120
    return scclSuccess;
}

} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl