#include #include #include #include #include #include #include #include #include #include "net_ib.h" #include "rocm_wrap.h" #include "base.h" namespace sccl { namespace hardware { namespace net { namespace net_ib { ///////////////////////////////////////// 环境变量读取及设置 ///////////////////////////////////////// // 定义InfiniBand GID索引,默认值为0 SCCL_PARAM(IbGidIndex, "IB_GID_INDEX", 0); // 定义InfiniBand超时时间,默认值为18 SCCL_PARAM(IbTimeout, "IB_TIMEOUT", 18); // 定义InfiniBand重试次数,默认值为7 SCCL_PARAM(IbRetryCnt, "IB_RETRY_CNT", 7); // 定义InfiniBand分区密钥,默认值为0 SCCL_PARAM(IbPkey, "IB_PKEY", 0); // 定义是否使用InfiniBand内联传输,默认值为0(不使用) SCCL_PARAM(IbUseInline, "IB_USE_INLINE", 0); // 定义InfiniBand服务级别,默认值为0 SCCL_PARAM(IbSl, "IB_SL", 0); // 定义InfiniBand流量类别,默认值为0 SCCL_PARAM(IbTc, "IB_TC", 0); // 定义InfiniBand自动路由阈值,默认值为8192 SCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192); // 定义InfiniBand PCI宽松排序选项,默认值为2 SCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2); // 定义是否启用InfiniBand自适应路由,默认值为-2(可能表示禁用或默认设置) SCCL_PARAM(IbAdaptiveRouting, "IB_ADAPTIVE_ROUTING", -2); // 定义InfiniBand套接字客户端端口重用选项,默认值为0(不重用) SCCL_PARAM(IbSockClientPortReuse, "IB_SOCK_CLIENT_PORT_REUSE", 0); // 定义InfiniBand套接字服务器端口重用选项,默认值为0(不重用) SCCL_PARAM(IbSockServerPortReuse, "IB_SOCK_SERVER_PORT_REUSE", 0); // 定义是否禁用InfiniBand,默认值为0(不禁用) SCCL_PARAM(IbDisable, "IB_DISABLE", 0); // 定义是否合并InfiniBand虚拟功能,默认值为1(合并) SCCL_PARAM(IbMergeVfs, "IB_MERGE_VFS", 1); // 定义每个连接的InfiniBand队列对(QP)数量,默认值为1 SCCL_PARAM(IbQpsPerConn, "IB_QPS_PER_CONNECTION", 1); // 定义是否禁用GDR刷新,默认值为0(不禁用) SCCL_PARAM(IbGdrFlushDisable, "GDR_FLUSH_DISABLE", 0); // 定义是否在队列对上分割数据,默认值为1(分割) SCCL_PARAM(IbSplitDataOnQps, "IB_SPLIT_DATA_ON_QPS", 1); ///////////////////////////////////////// 参数及结构体设置 ///////////////////////////////////////// constexpr int MAXNAMESIZE = 64; static char scclIbIfName[MAX_IF_NAME_SIZE + 1]; // 用于存储网络接口名称的字符数组 static union net_socket::scclSocketAddress scclIbIfAddr; // 定义一个联合体类型的变量,用于存储网络接口地址 struct scclIbMr { uintptr_t addr; // 内存地址 int pages; // 页数 int refs; // 引用计数 ibv_mr* mr; // InfiniBand内存注册对象指针 }; // 结构体用于缓存 InfiniBand 内存注册对象 struct scclIbMrCache { struct scclIbMr* slots; // 缓存槽,用于存储内存注册对象 int capacity; int population; // 缓存的容量和当前已填充的数量 }; // 定义一个对齐到 64 字节边界的结构体 scclIbDev,用于表示 InfiniBand 设备 struct alignas(64) scclIbDev { pthread_mutex_t lock; // 互斥锁,用于线程同步 int device; // 设备编号 uint64_t guid; // 全局唯一标识符 uint8_t port; // 端口号 uint8_t link; // 链路层信息 int speed; // 传输速度 ibv_context* context; // InfiniBand 上下文 int pdRefs; // 保护域引用计数 ibv_pd* pd; // 保护域 char devName[MAXNAMESIZE]; // 设备名称 char* pciPath; // PCI 路径 int realPort; // 实际使用的端口 int maxQp; // 最大队列对数量 struct scclIbMrCache mrCache; // 内存注册对象缓存 int ar; // ADAPTIVE_ROUTING,自适应路由标志 }; struct userIbDev { char devName[MAXNAMESIZE]; uint16_t port_en; }; // 定义最大InfiniBand设备数量为16 static constexpr int MAX_IB_DEVS = 16; // 定义一个结构体数组,用于存储InfiniBand设备信息 struct scclIbDev scclIbDevs[MAX_IB_DEVS]; // 定义一个结构体数组,用于存储用户级别的InfiniBand设备信息 struct userIbDev userIbDevs[MAX_IB_DEVS]; // 定义一个互斥锁,用于保护对InfiniBand设备的并发访问 pthread_mutex_t scclIbLock = PTHREAD_MUTEX_INITIALIZER; // 定义一个静态整数,用于指示是否启用了InfiniBand的Relaxed Ordering模式 static int scclIbRelaxedOrderingEnabled = 0; // 定义一个线程局部变量,用于存储重用的地址信息 static thread_local union net_socket::scclSocketAddress reusedAddr; // 定义一个线程局部变量,用于存储重用的套接字文件描述符 static thread_local int reusedSockfd = -1; // 定义一个线程ID,用于异步线程操作 pthread_t scclIbAsyncThread; // 定义一个常量,表示InfiniBand网络接口的最大接收数量 static constexpr int SCCL_NET_IB_MAX_RECVS = 8; // 定义一个常量,表示最大字符串长度 static constexpr int MAX_STR_LEN = 255; // 为每个并发接收支持SCCL_NET_MAX_REQUESTS static constexpr int MAX_REQUESTS = (SCCL_NET_MAX_REQUESTS * SCCL_NET_IB_MAX_RECVS); static_assert(MAX_REQUESTS <= 256, "request id are encoded in wr_id and we need up to 8 requests ids per completion"); // Retain local and remote RoCE addresses for error logging struct scclIbGidInfo { uint8_t link_layer; // 链路层类型,表示网络连接的物理层类型 union ibv_gid localGid; // 本地设备的全局标识符(GID) union ibv_gid remoteGid; // 远程设备的全局标识符(GID) }; /* scclIbRequest 结构体用于封装 InfiniBand 通信请求的详细信息,包括通信接口、请求类型、数据缓冲区等。 联合体 union 根据请求类型(发送或接收)存储不同的数据结构,以支持灵活的通信操作。 */ struct scclIbRequest { struct scclIbVerbs* verbs; // 指向 scclIbVerbs 结构体的指针,包含 Infiniband 相关的操作 int type; // 请求的类型,例如发送或接收 int events; // 事件标志, 用于记录请求相关的事件状态 struct net_socket::scclSocket* sock; // 指向 scclSocket 结构体的指针,表示网络套接字 struct scclIbGidInfo* gidInfo; // 指向 scclIbGidInfo 结构体的指针,包含全局标识符信息 int nreqs; // 请求的数量 // 联合体,用于存储不同类型请求的特定信息 union { // send: 发送请求的相关信息 struct { int size; // 发送数据的大小 void* data; // 指向发送数据的指针 uint32_t lkey; // 本地密钥,用于数据访问 int offset; // 数据偏移量 } send; // recv: 接收请求的相关信息 struct { int sizes[SCCL_NET_IB_MAX_RECVS]; // 接收数据的大小数组,最多包含 SCCL_NET_IB_MAX_RECVS 个元素 } recv; }; }; /*用于封装 InfiniBand 通信所需的资源,便于管理和复用。*/ struct scclIbVerbs { int dev; // 设备索引,标识使用的 InfiniBand 设备 struct ibv_pd* pd; // 指向 InfiniBand 保护域(Protection Domain)的指针,用于内存注册和队列管理 struct ibv_cq* cq; // 指向 InfiniBand 完成队列(Completion Queue)的指针,用于跟踪异步操作的状态 uint64_t pad[1]; // 填充字段,可能用于内存对齐或未来扩展 struct scclIbRequest reqs[MAX_REQUESTS]; // 存储最大请求数(MAX_REQUESTS)的请求结构体数组 }; /*用于 InfiniBand 通信的发送队列(FIFO),存储待发送数据的元信息,供底层网络驱动或通信库使用*/ struct alignas(64) scclIbSendFifo { uint64_t addr; // 目标内存地址(远程地址) int size; // 发送数据的大小(字节) uint32_t rkey; // 远程密钥(Remote Key),用于 InfiniBand 的远程内存访问(RMA) uint32_t nreqs; // 发送请求的数量(可能用于批量操作) uint32_t tag; // 标签或标识符,用于区分不同的发送操作 uint64_t idx; // 索引值,可能用于跟踪或管理发送队列中的位置 }; static constexpr int SCCL_IB_MAX_QPS = 128; // 最大队列对数量 struct scclIbSendComm { struct scclIbVerbs verbs; // RDMA verbs结构体 struct scclIbSendFifo fifo[MAX_REQUESTS][SCCL_NET_IB_MAX_RECVS]; // 发送FIFO队列 uint64_t fifoHead; // FIFO队列头指针 struct scclIbRequest* fifoReqs[MAX_REQUESTS][SCCL_NET_IB_MAX_RECVS]; // FIFO请求指针数组 struct ibv_send_wr wrs[SCCL_NET_IB_MAX_RECVS + 1]; // 发送工作请求结构体数组 struct ibv_sge sges[SCCL_NET_IB_MAX_RECVS]; // 散布-聚集元素结构体数组 struct net_socket::scclSocket sock; // 套接字结构体 int ready; // 是否准备好 struct ibv_qp* qps[SCCL_IB_MAX_QPS]; // 队列对指针数组 int nqps; // 队列对数量 int qpIndex; // 当前队列对索引 struct ibv_mr* fifoMr; // FIFO内存区域指针 int ar; // 自动重发标志 struct scclIbGidInfo gidInfo; // GID信息结构体 }; struct scclIbQpInfo { uint32_t lid; uint8_t ib_port; uint8_t link_layer; uint32_t qpn[SCCL_IB_MAX_QPS]; // For RoCE uint64_t spn; uint64_t iid; enum ibv_mtu mtu; // FIFO RDMA info uint32_t fifoRkey; uint64_t fifoAddr; }; struct scclIbGpuFlush { int enabled; int hostMem; struct ibv_mr* hostMr; struct ibv_sge sge; struct ibv_qp* qp; }; struct scclIbRemFifo { struct scclIbSendFifo elems[MAX_REQUESTS][SCCL_NET_IB_MAX_RECVS]; uint64_t fifoTail; uint64_t addr; uint32_t rkey; uint32_t flags; struct ibv_mr* mr; struct ibv_sge sge; }; struct scclIbRecvComm { struct scclIbVerbs verbs; struct scclIbRemFifo remFifo; struct net_socket::scclSocket sock; int ready; struct ibv_qp* qps[SCCL_IB_MAX_QPS]; int nqps; int qpIndex; struct scclIbGpuFlush gpuFlush; struct scclIbGidInfo gidInfo; }; static_assert((offsetof(struct scclIbRecvComm, remFifo) % 32) == 0, "scclIbSendComm fifo must be 32-byte aligned"); ///////////////////////////////////////// net_ib的函数 ///////////////////////////////////////// /** * @brief IB异步事件处理线程主函数 * * 该函数作为独立线程运行,持续监听并处理IB设备的异步事件。 * 对于每个接收到的异步事件(除IBV_EVENT_COMM_EST外),会输出警告日志。 * 处理完成后必须调用wrap_ibv_ack_async_event进行事件确认。 * * @param args 传入参数,应转换为ibv_context结构体指针 * @return void* 线程返回值,始终返回NULL */ void* scclNetIb::scclIbAsyncThreadMain(void* args) { // 将传入的参数转换为InfiniBand上下文结构体指针 struct ibv_context* context = (struct ibv_context*)args; // 无限循环,持续监听异步事件 while(1) { // 定义一个结构体来存储异步事件 struct ibv_async_event event; // 调用封装的函数获取异步事件,如果获取失败则退出循环 if(scclSuccess != wrap_ibv_get_async_event(context, &event)) { break; } // 定义一个字符指针用于存储事件类型的字符串描述 char* str; // 调用封装的函数将事件类型转换为字符串,如果转换失败则退出循环 if(scclSuccess != wrap_ibv_event_type_str(&str, event.event_type)) { break; } // 如果事件类型不是通信建立事件,则输出警告信息 if(event.event_type != IBV_EVENT_COMM_EST) WARN("NET/IB : Got async event : %s", str); // 调用封装的函数确认(acknowledge)异步事件,如果确认失败则退出循环 if(scclSuccess != wrap_ibv_ack_async_event(&event)) { break; } } // 线程结束,返回NULL return NULL; } /** * @brief 获取IB设备的PCI路径并处理多端口和虚拟功能合并 * * 该函数通过设备名称获取IB设备的真实PCI路径,并对多端口NIC和虚拟功能(VF)进行合并处理, * 将它们视为同一PCI设备。同时记录实际端口号。 * * @param devName 输入参数,IB设备名称 * @param path 输出参数,存储获取到的PCI路径 * @param realPort 输出参数,记录实际端口号 * @return scclResult_t 返回操作结果,成功返回scclSuccess */ scclResult_t scclNetIb::scclIbGetPciPath(char* devName, char** path, int* realPort) { // 定义一个字符数组用于存储设备路径 char devicePath[PATH_MAX]; // 构造设备路径字符串,格式为 "/sys/class/infiniband//device" snprintf(devicePath, PATH_MAX, "/sys/class/infiniband/%s/device", devName); // 获取设备路径的绝对路径 char* p = realpath(devicePath, NULL); if(p == NULL) { // 如果无法获取绝对路径,记录警告信息 WARN("Could not find real path of %s (%s)", devName, devicePath); } else { // 处理多端口 NIC(网络接口卡),将路径末尾的端口编号替换为 '0' p[strlen(p) - 1] = '0'; // 如果启用了虚拟函数(VF)合并,则将路径中倒数第3和第4字符替换为 '0' if(scclParamIbMergeVfs()) p[strlen(p) - 3] = p[strlen(p) - 4] = '0'; // 初始化 realPort 为 0,用于统计实际端口数量 *realPort = 0; // 遍历已知的 InfiniBand 设备列表 for(int d = 0; d < scclNIbDevs; d++) { // 如果当前路径与已知的设备 PCI 路径匹配,则增加实际端口计数 if(strcmp(p, scclIbDevs[d].pciPath) == 0) (*realPort)++; } } // 将计算得到的绝对路径赋值给输出参数 path *path = p; // 返回成功状态 return scclSuccess; } static int ibvWidths[] = {1, 4, 8, 12, 2}; static int ibvSpeeds[] = {2500, /* SDR */ 5000, /* DDR */ 10000, /* QDR */ 10000, /* QDR */ 14000, /* FDR */ 25000, /* EDR */ 50000, /* HDR */ 100000 /* NDR */}; /** * 查找第一个被设置的bit位 * @param val 要检查的整数值 * @param max 最大检查位数 * @return 第一个被设置的bit位索引,若未找到则返回max */ static int firstBitSet(int val, int max) { int i = 0; while(i < max && ((val & (1 << i)) == 0)) i++; return i; } /** * 根据输入的宽度值,返回对应的IB(InfiniBand)链路宽度索引 * @param width 输入的宽度值 * @return 返回ibvWidths数组中对应的宽度索引值 */ int scclNetIb::scclIbWidth(int width) { return ibvWidths[firstBitSet(width, sizeof(ibvWidths) / sizeof(int) - 1)]; } /** * 根据给定的速度值查找并返回对应的IB传输速率 * @param speed 输入的速度值 * @return 返回ibvSpeeds数组中第一个匹配的IB传输速率 */ int scclNetIb::scclIbSpeed(int speed) { return ibvSpeeds[firstBitSet(speed, sizeof(ibvSpeeds) / sizeof(int) - 1)]; } /** * 检查当前IB设备是否支持宽松排序(Relaxed Ordering)模式 * * @return 1表示支持,0表示不支持 * @note 通过查询IBVERBS_1.8 API的ibv_reg_mr_iova2函数来检测IBV_ACCESS_RELAXED_ORDERING支持 * @see scclParamIbPciRelaxedOrdering() 获取当前配置的RO模式 */ int scclNetIb::scclIbRelaxedOrderingCapable(void) { int roMode = scclParamIbPciRelaxedOrdering(); scclResult_t r = scclInternalError; if(roMode == 1 || roMode == 2) { // Query IBVERBS_1.8 API - needed for IBV_ACCESS_RELAXED_ORDERING support r = wrap_ibv_reg_mr_iova2(NULL, NULL, NULL, 0, 0, 0); } return r == scclInternalError ? 0 : 1; } /** * @brief 获取并处理用户指定的IB设备环境变量 * * 该函数检查并处理环境变量SCCL_IB_HCA的值,支持以下特殊前缀: * - '^' 表示反向匹配 * - '=' 表示精确匹配 * * @param shownIbHcaEnv 计数器,用于控制日志输出次数 * @return char* 处理后的IB设备环境变量值 */ char* scclNetIb::scclIbGetIbHca(int& shownIbHcaEnv, bool* searchNot, bool* searchExact) { // 检查用户是否定义了要使用的IB设备:端口 char* userIbEnv = getenv("SCCL_IB_HCA"); if(userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(SCCL_LOG_NET, "SCCL_IB_HCA set to %s", userIbEnv); *searchNot = userIbEnv && userIbEnv[0] == '^'; if(*searchNot) userIbEnv++; *searchExact = userIbEnv && userIbEnv[0] == '='; if(*searchExact) userIbEnv++; return userIbEnv; } /** * @brief 从系统文件中读取字符串内容 * * 该函数通过拼接路径和文件名,打开指定文件并读取其内容到字符串缓冲区中。 * 如果读取失败或文件为空,会将缓冲区置为空字符串并记录警告信息。 * * @param path 文件所在目录路径 * @param fileName 要读取的文件名 * @param strValue 用于存储读取内容的字符串缓冲区 * @return scclResult_t 始终返回scclSuccess * * @note 缓冲区最大长度为MAX_STR_LEN,超出部分会被截断 * 文件内容末尾会自动添加字符串结束符'\0' */ scclResult_t scclNetIb::scclGetStrFromSys(const char* path, const char* fileName, char* strValue) { char filePath[PATH_MAX]; sprintf(filePath, "%s/%s", path, fileName); int offset = 0; FILE* file; if((file = fopen(filePath, "r")) != NULL) { while(feof(file) == 0 && ferror(file) == 0 && offset < MAX_STR_LEN) { int len = fread(strValue + offset, 1, MAX_STR_LEN - offset, file); offset += len; } fclose(file); } if(offset == 0) { strValue[0] = '\0'; INFO(SCCL_LOG_NET, "System detection : could not read %s, ignoring", filePath); } else { strValue[offset - 1] = '\0'; } return scclSuccess; } /** * @brief 检查IB设备是否支持GPU Direct RDMA (GDR) * * 该函数用于检测当前系统环境是否支持GPU Direct RDMA功能。 * 在HIP平台下会检查内核模块加载状态、BIOS版本和NUMA平衡设置, * 其他平台默认不支持。 * * @param ibDev IB设备号 * @return scclResult_t 返回scclSuccess表示支持,返回scclSystemError表示不支持 */ scclResult_t scclNetIb::scclIbGdrSupport(int ibDev) { static int moduleLoaded = -1; if(moduleLoaded == -1) { #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) moduleLoaded = (access("/sys/kernel/mm/memory_peers/amdkfd/version", F_OK) == -1) ? 0 : 1; char strValue[MAX_STR_LEN]; SCCLCHECK(scclGetStrFromSys("/sys/devices/virtual/dmi/id", "bios_version", strValue)); if(strncmp("Hyper-V UEFI Release", strValue, 20) == 0) { int roMode = scclParamIbPciRelaxedOrdering(); SCCLCHECK(scclGetStrFromSys("/proc/sys/kernel", "numa_balancing", strValue)); if(strcmp(strValue, "1") == 0 && roMode == 0) moduleLoaded = 0; } #else moduleLoaded = 0; #endif } if(moduleLoaded == 0) return scclSystemError; return scclSuccess; } /** * @brief 检查设备是否支持DMA-BUF功能 * * 该函数用于检测指定IB设备是否支持DMA-BUF内存注册功能。 * 通过尝试注册一个无效的DMA-BUF文件描述符来测试支持性。 * 结果会被缓存以避免重复检测。 * * @param dev 设备索引 * @return scclResult_t 返回scclSuccess表示支持,scclSystemError表示不支持 */ scclResult_t scclNetIb::scclIbDmaBufSupport(int dev) { static int dmaBufSupported = -1; if(dmaBufSupported == -1) { scclResult_t res; SCCLCHECKGOTO(rocmLibraryInit(), res, failure); struct ibv_pd* pd; struct ibv_context* ctx; ctx = scclIbDevs[dev].context; SCCLCHECKGOTO(wrap_ibv_alloc_pd(&pd, ctx), res, failure); // Test kernel DMA-BUF support with a dummy call (fd=-1) (void)wrap_direct_ibv_reg_dmabuf_mr(pd, 0ULL /*offset*/, 0ULL /*len*/, 0ULL /*iova*/, -1 /*fd*/, 0 /*flags*/); // ibv_reg_dmabuf_mr() will fail with EOPNOTSUPP/EPROTONOSUPPORT if not supported (EBADF otherwise) dmaBufSupported = (errno != EOPNOTSUPP && errno != EPROTONOSUPPORT) ? 1 : 0; SCCLCHECKGOTO(wrap_ibv_dealloc_pd(pd), res, failure); } if(dmaBufSupported == 0) return scclSystemError; return scclSuccess; failure: dmaBufSupported = 0; return scclSystemError; } struct scclIbHandle { union net_socket::scclSocketAddress connectAddr; // Filled by the target (目标填充) uint64_t magic; // random number to help debugging (用于调试的随机数) struct scclIbCommStage stage; // Used by the other side when connecting (连接时由另一侧使用) }; /** * @brief 初始化InfiniBand Verbs资源 * * 该函数用于初始化指定设备的InfiniBand Verbs资源,包括: * - 分配保护域(PD) * - 创建完成队列(CQ) * * @param dev 设备索引 * @param ctx IB设备上下文 * @param verbs 要初始化的Verbs结构体指针 * @return scclResult_t 返回操作结果,scclSuccess表示成功 * * @note 该函数会递增设备的PD引用计数,并在首次调用时为设备分配PD * @note 创建的CQ大小为2*MAX_REQUESTS*IB_QPS_PER_CONNECTION,以支持接收请求的双重完成 */ scclResult_t scclNetIb::scclIbInitVerbs(int dev, struct ibv_context* ctx, struct scclIbVerbs* verbs) { verbs->dev = dev; pthread_mutex_lock(&scclIbDevs[dev].lock); if(0 == scclIbDevs[dev].pdRefs++) { scclResult_t res; SCCLCHECKGOTO(wrap_ibv_alloc_pd(&scclIbDevs[dev].pd, ctx), res, failure); if(0) { failure: pthread_mutex_unlock(&scclIbDevs[dev].lock); return res; } } verbs->pd = scclIbDevs[dev].pd; pthread_mutex_unlock(&scclIbDevs[dev].lock); // Recv requests can generate 2 completions (one for the post FIFO, one for the Recv). SCCLCHECK(wrap_ibv_create_cq(&verbs->cq, ctx, 2 * MAX_REQUESTS * scclParamIbQpsPerConn(), NULL, NULL, 0)); return scclSuccess; } /** * 创建并初始化一个InfiniBand队列对(QP) * * @param ib_port IB端口号 * @param verbs IB verbs结构体指针 * @param access_flags QP访问权限标志 * @param qp 输出的QP指针 * * @return 返回scclSuccess表示成功,否则返回错误码 * * @note QP类型为可靠连接(RC),发送队列大小为2*MAX_REQUESTS, * 接收队列大小为MAX_REQUESTS,支持内联数据发送(如果配置启用) */ scclResult_t scclNetIb::scclIbCreateQp(uint8_t ib_port, struct scclIbVerbs* verbs, int access_flags, struct ibv_qp** qp) { struct ibv_qp_init_attr qpInitAttr; memset(&qpInitAttr, 0, sizeof(struct ibv_qp_init_attr)); qpInitAttr.send_cq = verbs->cq; qpInitAttr.recv_cq = verbs->cq; qpInitAttr.qp_type = IBV_QPT_RC; // We might send 2 messages per send (RDMA and RDMA_WITH_IMM) qpInitAttr.cap.max_send_wr = 2 * MAX_REQUESTS; qpInitAttr.cap.max_recv_wr = MAX_REQUESTS; qpInitAttr.cap.max_send_sge = 1; qpInitAttr.cap.max_recv_sge = 1; qpInitAttr.cap.max_inline_data = scclParamIbUseInline() ? sizeof(struct scclIbSendFifo) : 0; SCCLCHECK(wrap_ibv_create_qp(qp, verbs->pd, &qpInitAttr)); struct ibv_qp_attr qpAttr; memset(&qpAttr, 0, sizeof(struct ibv_qp_attr)); qpAttr.qp_state = IBV_QPS_INIT; qpAttr.pkey_index = scclParamIbPkey(); qpAttr.port_num = ib_port; qpAttr.qp_access_flags = access_flags; SCCLCHECK(wrap_ibv_modify_qp(*qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)); return scclSuccess; } /** * 将IB QP状态修改为RTR(Ready to Receive)状态 * * @param qp IB QP指针 * @param qpn 目标QP号 * @param info QP配置信息,包含MTU、链路层类型、端口号等参数 * * @return 成功返回scclSuccess,失败返回错误码 * * @note 根据链路层类型(以太网/IB)设置不同的AH属性 * 以太网需要设置全局路由头(GRH)相关参数 * IB链路需要设置目标LID */ scclResult_t scclNetIb::scclIbRtrQp(struct ibv_qp* qp, uint32_t qpn, struct scclIbQpInfo* info) { struct ibv_qp_attr qpAttr; memset(&qpAttr, 0, sizeof(struct ibv_qp_attr)); qpAttr.qp_state = IBV_QPS_RTR; qpAttr.path_mtu = info->mtu; qpAttr.dest_qp_num = qpn; qpAttr.rq_psn = 0; qpAttr.max_dest_rd_atomic = 1; qpAttr.min_rnr_timer = 12; if(info->link_layer == IBV_LINK_LAYER_ETHERNET) { qpAttr.ah_attr.is_global = 1; qpAttr.ah_attr.grh.dgid.global.subnet_prefix = info->spn; qpAttr.ah_attr.grh.dgid.global.interface_id = info->iid; qpAttr.ah_attr.grh.flow_label = 0; qpAttr.ah_attr.grh.sgid_index = scclParamIbGidIndex(); qpAttr.ah_attr.grh.hop_limit = 255; qpAttr.ah_attr.grh.traffic_class = scclParamIbTc(); } else { qpAttr.ah_attr.is_global = 0; qpAttr.ah_attr.dlid = info->lid; } qpAttr.ah_attr.sl = scclParamIbSl(); qpAttr.ah_attr.src_path_bits = 0; qpAttr.ah_attr.port_num = info->ib_port; SCCLCHECK(wrap_ibv_modify_qp( qp, &qpAttr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)); return scclSuccess; } /** * 将IB(InfiniBand)队列对(QP)状态修改为RTS(Ready To Send)状态 * * @param qp IB队列对指针 * @return 成功返回scclSuccess,失败返回错误码 * * 该函数配置QP属性并调用ibv_modify_qp将其状态改为RTS状态, * 设置了超时时间、重试次数、RNR重试次数、SQ PSN和最大RD原子操作数等参数。 */ scclResult_t scclNetIb::scclIbRtsQp(struct ibv_qp* qp) { struct ibv_qp_attr qpAttr; memset(&qpAttr, 0, sizeof(struct ibv_qp_attr)); qpAttr.qp_state = IBV_QPS_RTS; qpAttr.timeout = scclParamIbTimeout(); qpAttr.retry_cnt = scclParamIbRetryCnt(); qpAttr.rnr_retry = 7; qpAttr.sq_psn = 0; qpAttr.max_rd_atomic = 1; SCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)); return scclSuccess; } typedef enum : int { SCCL_NET_IB_REQ_UNUSED = 0, SCCL_NET_IB_REQ_SEND = 1, SCCL_NET_IB_REQ_RECV = 2, SCCL_NET_IB_REQ_FLUSH = 3 } NetIbReq_t; const char* reqTypeStr[] = {"Unused", "Send", "Recv", "Flush"}; // The SendFifo needs to be 32-byte aligned and each element needs // to be a 32-byte multiple, so that an entry does not get split and // written out of order when IB Relaxed Ordering is enabled static_assert((offsetof(struct scclIbSendComm, fifo) % 32) == 0, "scclIbSendComm fifo must be 32-byte aligned"); static_assert((sizeof(struct scclIbSendFifo) % 32) == 0, "scclIbSendFifo element size must be 32-byte multiples"); /** * @brief 销毁IB Verbs资源 * * 释放指定的IB Verbs资源,包括完成队列(CQ)和保护域(PD)。 * 当PD的引用计数减至0时,会自动释放PD资源。 * 该函数是线程安全的,使用互斥锁保护共享资源。 * * @param verbs 指向要销毁的IB Verbs结构体 * @return scclResult_t 返回操作结果,scclSuccess表示成功 */ scclResult_t scclNetIb::scclIbDestroyVerbs(struct scclIbVerbs* verbs) { scclResult_t res; SCCLCHECK(wrap_ibv_destroy_cq(verbs->cq)); pthread_mutex_lock(&scclIbDevs[verbs->dev].lock); if(0 == --scclIbDevs[verbs->dev].pdRefs) { SCCLCHECKGOTO(wrap_ibv_dealloc_pd(scclIbDevs[verbs->dev].pd), res, returning); } res = scclSuccess; returning: pthread_mutex_unlock(&scclIbDevs[verbs->dev].lock); return res; } /** * @brief 从verbs请求池中获取一个未使用的请求结构体 * * @param verbs 指向scclIbVerbs结构体的指针,包含请求池 * @param req 输出参数,用于返回获取到的请求结构体指针 * @return scclResult_t 成功返回scclSuccess,失败返回scclInternalError * * 该函数遍历verbs请求池,查找第一个未使用的请求(SCCL_NET_IB_REQ_UNUSED), * 初始化其字段后返回。如果所有请求都在使用中,则返回错误。 */ scclResult_t scclNetIb::scclIbGetRequest(struct scclIbVerbs* verbs, struct scclIbRequest** req) { for(int i = 0; i < MAX_REQUESTS; i++) { struct scclIbRequest* r = verbs->reqs + i; if(r->type == SCCL_NET_IB_REQ_UNUSED) { r->verbs = verbs; r->events = 1; r->sock = NULL; r->gidInfo = NULL; *req = r; return scclSuccess; } } WARN("NET/IB : unable to allocate requests"); *req = NULL; return scclInternalError; } /** * 释放IB网络请求资源。 * * 将请求类型标记为未使用状态,但不实际释放内存。 * * @param r 要释放的IB网络请求指针 * @return 总是返回scclSuccess表示操作成功 */ scclResult_t scclNetIb::scclIbFreeRequest(struct scclIbRequest* r) { r->type = SCCL_NET_IB_REQ_UNUSED; return scclSuccess; } /** * @brief 执行IB网络的多发送操作 * * 该函数处理IB网络的多发送请求,包括设置发送工作请求(WR)和分散/聚集元素(SGE), * 并处理自适应路由(AR)和QP分割等高级功能。 * * @param comm 指向scclIbSendComm结构的指针,包含发送通信上下文 * @param slot 要使用的发送槽位索引 * @return scclResult_t 返回操作结果,成功返回scclSuccess,失败返回错误码 * * @note 1. 支持多QP分割发送,确保128B对齐 * 2. 使用RDMA_WRITE_WITH_IMM发送立即数据 * 3. 当请求数>32时会返回错误 * 4. 自适应路由模式下会发送两次WR */ scclResult_t scclNetIb::scclIbMultiSend(struct scclIbSendComm* comm, int slot) { struct scclIbRequest** reqs = comm->fifoReqs[slot]; volatile struct scclIbSendFifo* slots = comm->fifo[slot]; int nreqs = slots[0].nreqs; if(nreqs > SCCL_NET_IB_MAX_RECVS) return scclInternalError; uint64_t wr_id = 0ULL; for(int r = 0; r < nreqs; r++) { struct ibv_send_wr* wr = comm->wrs + r; memset(wr, 0, sizeof(struct ibv_send_wr)); struct ibv_sge* sge = comm->sges + r; sge->addr = (uintptr_t)reqs[r]->send.data; sge->lkey = reqs[r]->send.lkey; wr->opcode = IBV_WR_RDMA_WRITE; wr->send_flags = 0; wr->wr.rdma.remote_addr = slots[r].addr; wr->wr.rdma.rkey = slots[r].rkey; wr->next = wr + 1; wr_id += (reqs[r] - comm->verbs.reqs) << (r * 8); } // Write size as immediate data. In the case of multi-send, only write // 0 or 1 as size to indicate whether there was data sent or received. uint32_t immData = 0; if(nreqs == 1) { immData = reqs[0]->send.size; } else { if(nreqs > 32) { WARN("Cannot store sizes of %d requests in a 32-bits field", nreqs); return scclInternalError; } for(int r = 0; r < nreqs; r++) { immData |= (reqs[r]->send.size ? 1 : 0) << r; } } struct ibv_send_wr* lastWr = comm->wrs + nreqs - 1; if(nreqs > 1 || (comm->ar && reqs[0]->send.size > scclParamIbArThreshold())) { // When using ADAPTIVE_ROUTING, send the bulk of the data first as an // RDMA_WRITE, then a 0-byte RDMA_WRITE_WITH_IMM to trigger a remote // completion. lastWr++; memset(lastWr, 0, sizeof(struct ibv_send_wr)); } lastWr->wr_id = wr_id; lastWr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM; lastWr->imm_data = immData; lastWr->next = NULL; lastWr->send_flags = IBV_SEND_SIGNALED; // Multi-QP: make sure IB writes are multiples of 128B so that LL and LL128 protocols still work const int align = 128; const int nqps = scclParamIbSplitDataOnQps() ? comm->nqps : 1; for(int q = 0; q < nqps; q++) { for(int r = 0; r < nreqs; r++) { int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, nqps), align) * align; int length = std::min(reqs[r]->send.size - reqs[r]->send.offset, chunkSize); if(length <= 0) { comm->wrs[r].sg_list = NULL; comm->wrs[r].num_sge = 0; } else { comm->sges[r].length = length; comm->wrs[r].sg_list = comm->sges + r; comm->wrs[r].num_sge = 1; } } struct ibv_send_wr* bad_wr; SCCLCHECK(wrap_ibv_post_send(comm->qps[comm->qpIndex], comm->wrs, &bad_wr)); comm->qpIndex = (comm->qpIndex + 1) % comm->nqps; for(int r = 0; r < nreqs; r++) { int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, nqps), align) * align; reqs[r]->send.offset += chunkSize; comm->sges[r].addr += chunkSize; comm->wrs[r].wr.rdma.remote_addr += chunkSize; } } return scclSuccess; } /** * @brief 通过IB Verbs RDMA写入操作向远程FIFO队列提交数据 * * @param comm 指向接收通信上下文的指针 * @param n 要发送的数据块数量 * @param data 数据指针数组 * @param sizes 数据大小数组 * @param tags 数据标签数组 * @param mhandles 内存句柄数组 * @param req 请求结构体指针 * @return scclResult_t 返回操作结果(scclSuccess表示成功) * * @note 该函数会将数据打包到本地FIFO元素中,并通过RDMA写入到远程FIFO队列。 * 每MAX_REQUESTS次操作会触发一次带信号(SIGNALED)的发送,以避免发送队列堵塞。 * 使用IBV_WR_RDMA_WRITE操作码进行数据传输。 */ scclResult_t scclNetIb::scclIbPostFifo(struct scclIbRecvComm* comm, int n, void** data, int* sizes, int* tags, void** mhandles, struct scclIbRequest* req) { struct ibv_send_wr wr; memset(&wr, 0, sizeof(wr)); int slot = comm->remFifo.fifoTail % MAX_REQUESTS; struct scclIbSendFifo* localElem = comm->remFifo.elems[slot]; for(int i = 0; i < n; i++) { localElem[i].addr = (uint64_t)data[i]; struct ibv_mr* mr = (struct ibv_mr*)mhandles[i]; localElem[i].rkey = mr->rkey; localElem[i].nreqs = n; localElem[i].size = sizes[i]; // Sanity/Debugging localElem[i].tag = tags[i]; localElem[i].idx = comm->remFifo.fifoTail + 1; } wr.wr.rdma.remote_addr = comm->remFifo.addr + slot * SCCL_NET_IB_MAX_RECVS * sizeof(struct scclIbSendFifo); wr.wr.rdma.rkey = comm->remFifo.rkey; comm->remFifo.sge.addr = (uint64_t)localElem; comm->remFifo.sge.length = n * sizeof(struct scclIbSendFifo); wr.sg_list = &comm->remFifo.sge; wr.num_sge = 1; wr.opcode = IBV_WR_RDMA_WRITE; wr.send_flags = comm->remFifo.flags; // IBV_SEND_INLINE // We need to occasionally post a request with the IBV_SEND_SIGNALED flag, otherwise // the send queue will never empty. // // From https://www.rdmamojo.com/2014/06/30/working-unsignaled-completions/ // "How to use Unsignaled Completion?" / "Gotchas and Pitfalls" // All posted Send Requested, Signaled and Unsignaled, are considered outstanding until // a Work Completion that they, or Send Requests that were posted after them, was polled // from the Completion Queue associated with the Send Queue. This means if one works with // a Queue Pair that was configured to work with Unsignaled Completions, he must make // sure that occasionally (before the Send Queue is full with outstanding Send Requests) // a Send Request that generate Work Completion will be posted. // // Not following this rule may lead to a case that the Send Queue is full with Send // Requests that won't generate Work Completion: // // - The Send Queue is full, so no new Send Requests can be posted to it // - The Send Queue can't be emptied, since no Work Completion can be generated anymore // (the reason is that no Work Completion, that can generate Work Completion that // polling it will empty the Send Queue, can be posted) // - The status of all posted Send Request is considered unknown // if(slot == 0) { wr.send_flags |= IBV_SEND_SIGNALED; wr.wr_id = req - comm->verbs.reqs; req->events++; } struct ibv_send_wr* bad_wr; SCCLCHECK(wrap_ibv_post_send(comm->qps[0], &wr, &bad_wr)); comm->remFifo.fifoTail++; return scclSuccess; } //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////// scclNetIb调用的函数 //////////////////////////////////////// scclNetIb::scclNetIb() : scclNetBase("IB") {} scclNetIb::~scclNetIb() { if(ibComm != nullptr) { free(ibComm); } } /** * @brief 初始化InfiniBand硬件设备 * * 该函数负责检测和初始化可用的InfiniBand设备,包括: * - 加载IB Verbs符号 * - 检测网络接口 * - 查询设备属性 * - 处理用户指定的HCA设备 * - 创建异步线程处理IB事件 * * @return scclResult_t 返回操作状态,scclSuccess表示成功,scclInternalError表示失败 * * @note 函数内部会处理环境变量SCCL_IB_HCA来过滤特定设备 * @note 使用互斥锁scclIbLock保证线程安全 */ scclResult_t scclNetIb::init() { SCCLCHECK(scclCalloc(&ibComm, 1)); // 如果IB被禁用,返回内部错误 if(scclParamIbDisable()) return scclInternalError; // 尝试初始化包装IB符号,如果失败返回内部错误 if(wrap_ibv_symbols() != scclSuccess) { return scclInternalError; } else { INFO(SCCL_LOG_NET, "SCCL IB init done"); } static int shownIbHcaEnv = 0; // 如果IB设备数量未初始化,开始初始化过程 if(scclNIbDevs == -1) { pthread_mutex_lock(&scclIbLock); wrap_ibv_fork_init(); if(scclNIbDevs == -1) { scclNIbDevs = 0; // 查找网络接口 if(net_socket::scclFindSocketInterfaces(scclIbIfName, &scclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) { WARN("NET/IB : No IP interface found."); return scclInternalError; } // 检测IB卡 int nIbDevs; struct ibv_device** devices; struct netIf userIfs[MAX_IB_DEVS]; bool searchNot, searchExact; // 获取用户指定的IB HCA(InfiniBand Host Channel Adapter)环境变量 char* userIbEnv = scclIbGetIbHca(shownIbHcaEnv, &searchNot, &searchExact); // 解析用户指定的IB接口列表,将结果存储在userIfs数组中,最多解析MAX_IB_DEVS个接口 int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS); // 获取设备列表 if(scclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return scclInternalError; // 遍历所有设备 for(int d = 0; d < nIbDevs && scclNIbDevs < MAX_IB_DEVS; d++) { struct ibv_context* context; // 尝试打开设备 if(scclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) { WARN("NET/IB : Unable to open device %s", devices[d]->name); continue; } int nPorts = 0; struct ibv_device_attr devAttr; memset(&devAttr, 0, sizeof(devAttr)); // 查询设备属性 if(scclSuccess != wrap_ibv_query_device(context, &devAttr)) { WARN("NET/IB : Unable to query device %s", devices[d]->name); if(scclSuccess != wrap_ibv_close_device(context)) { return scclInternalError; } continue; } // 遍历设备的所有端口 for(int port = 1; port <= devAttr.phys_port_cnt; port++) { struct ibv_port_attr portAttr; // 查询端口属性 if(scclSuccess != wrap_ibv_query_port(context, port, &portAttr)) { WARN("NET/IB : Unable to query port %d", port); continue; } // 检查端口状态和链接层 if(portAttr.state != IBV_PORT_ACTIVE) continue; if(portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue; // 检查用户指定的HCA/端口 if(!(matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } INFO(SCCL_LOG_NET, "NET/IB: [%d] %s: port=%d/IB=%s, speed:%d/%d", d, devices[d]->name, port, portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE", scclIbSpeed(portAttr.active_speed), scclIbSpeed(portAttr.active_speed) * scclIbWidth(portAttr.active_width)); pthread_mutex_init(&scclIbDevs[scclNIbDevs].lock, NULL); INFO(SCCL_LOG_NET, "d=%d, node_guid=%llu, sys_image_guid=%llu\n", d, devAttr.node_guid, devAttr.sys_image_guid); // 设置Infiniband设备的属性 { scclIbDevs[scclNIbDevs].device = d; // 设备索引 scclIbDevs[scclNIbDevs].guid = devAttr.sys_image_guid; // 系统图像GUID scclIbDevs[scclNIbDevs].port = port; // 端口编号 scclIbDevs[scclNIbDevs].link = portAttr.link_layer; // 链路层类型 scclIbDevs[scclNIbDevs].speed = scclIbSpeed(portAttr.active_speed) * scclIbWidth(portAttr.active_width); // 计算设备速度 scclIbDevs[scclNIbDevs].context = context; // 设备上下文 scclIbDevs[scclNIbDevs].pdRefs = 0; // 保护域引用计数 scclIbDevs[scclNIbDevs].pd = NULL; // 保护域指针 strncpy(scclIbDevs[scclNIbDevs].devName, devices[d]->name, MAXNAMESIZE); // 复制设备名称 SCCLCHECK(scclIbGetPciPath( scclIbDevs[scclNIbDevs].devName, &scclIbDevs[scclNIbDevs].pciPath, &scclIbDevs[scclNIbDevs].realPort)); // 获取PCI路径和实际端口 scclIbDevs[scclNIbDevs].maxQp = devAttr.max_qp; // 最大队列对数量 scclIbDevs[scclNIbDevs].mrCache.capacity = 0; // MR缓存容量 scclIbDevs[scclNIbDevs].mrCache.population = 0; // MR缓存人口 scclIbDevs[scclNIbDevs].mrCache.slots = NULL; // MR缓存槽 // 默认在IB网络上启用ADAPTIVE_ROUTING,但允许通过环境参数覆盖 scclIbDevs[scclNIbDevs].ar = (portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND) ? 1 : 0; // 根据链路层类型设置自适应路由 if(scclParamIbAdaptiveRouting() != -2) scclIbDevs[scclNIbDevs].ar = scclParamIbAdaptiveRouting(); // 如果环境参数设置,则覆盖默认值 } // 创建一个新的线程,用于处理SCCL Infiniband的异步操作 pthread_create(&scclIbAsyncThread, NULL, scclIbAsyncThreadMain, context); // 设置新创建线程的名称,以便于调试和识别 scclSetThreadName(scclIbAsyncThread, "SCCL IbAsync %2d", scclNIbDevs); // 分离线程,使其在完成后自动回收资源,不需要调用pthread_join() pthread_detach(scclIbAsyncThread); scclNIbDevs++; // 增加Infiniband设备的计数 nPorts++; // 增加端口计数 // 再次调用pthread_detach,这行代码可能是多余的,需检查是否为误写 pthread_detach(scclIbAsyncThread); } // 如果没有活动端口,关闭设备 if(nPorts == 0 && scclSuccess != wrap_ibv_close_device(context)) { return scclInternalError; } } // 释放设备列表 if(nIbDevs && (scclSuccess != wrap_ibv_free_device_list(devices))) { return scclInternalError; }; } // 如果没有找到设备,打印信息 if(scclNIbDevs == 0) { WARN("NET/IB : No device found."); } else { char line[1024]; line[0] = '\0'; // 确定是否启用了RELAXED_ORDERING scclIbRelaxedOrderingEnabled = scclIbRelaxedOrderingCapable(); for(int d = 0; d < scclNIbDevs; d++) { snprintf(line + strlen(line), 1023 - strlen(line), " -- [%d]%s:%d/%s; ", d, scclIbDevs[d].devName, scclIbDevs[d].port, scclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); } // 确保line字符串以null字符结尾,防止字符串操作时出现未定义行为 line[1023] = '\0'; // 定义一个字符数组addrline,用于存储转换后的地址字符串 char addrline[SOCKET_NAME_MAXLEN + 1]; // 记录日志信息,描述当前网络/IB设备的配置和状态 // line 是设备的相关信息字符串 // scclIbRelaxedOrderingEnabled 是一个布尔值,指示是否启用了Relaxed Ordering // scclIbIfName 是IB接口的名称 // net_socket::scclSocketToString 是一个函数,用于将socket地址转换为字符串 // addrline 是存储转换后地址字符串的数组 INFO(SCCL_LOG_NET, "NET/IB : Using%s %s; OOB %s:%s", line, scclIbRelaxedOrderingEnabled ? "[RO]" : "", scclIbIfName, net_socket::scclSocketToString(&scclIbIfAddr, addrline)); } pthread_mutex_unlock(&scclIbLock); } return scclSuccess; } /** * 获取可用的InfiniBand设备数量 * * @param ndev [out] 用于存储设备数量的指针 * @return scclResult_t 返回操作结果,scclSuccess表示成功 */ scclResult_t scclNetIb::devices(int* ndev) { *ndev = scclNIbDevs; return scclSuccess; } /** * @brief 获取指定IB设备的网络属性 * * 该函数用于查询指定InfiniBand设备的各项属性,包括设备名称、PCI路径、GUID、 * 指针支持类型、速度、延迟、端口号、最大通信数和最大接收数等。 * * @param dev 设备索引 * @param props 用于存储设备属性的结构体指针 * @return scclResult_t 返回操作结果,成功返回scclSuccess */ scclResult_t scclNetIb::getProperties(int dev, scclNetProperties_t* props) { props->name = scclIbDevs[dev].devName; props->pciPath = scclIbDevs[dev].pciPath; props->guid = scclIbDevs[dev].guid; props->ptrSupport = SCCL_PTR_HOST; if(scclIbGdrSupport(dev) == scclSuccess) { props->ptrSupport |= SCCL_PTR_CUDA; // GDR support via nv_peermem } if(scclIbDmaBufSupport(dev) == scclSuccess) { props->ptrSupport |= SCCL_PTR_DMABUF; // GDR support via DMA-BUF } props->speed = scclIbDevs[dev].speed; props->latency = 0; // Not set props->port = scclIbDevs[dev].port + scclIbDevs[dev].realPort; props->maxComms = scclIbDevs[dev].maxQp; props->maxRecvs = SCCL_NET_IB_MAX_RECVS; return scclSuccess; } /** * @brief 在指定设备上创建并初始化IB监听通信 * * @param dev 设备号 * @param opaqueHandle 不透明的句柄指针,用于存储连接信息 * @param listenComm 返回的监听通信结构体指针 * @return scclResult_t 返回操作结果状态码 * * 该函数会: * 1. 分配并初始化监听通信结构体 * 2. 设置设备号和魔法数 * 3. 根据配置决定是否复用套接字 * 4. 启动套接字监听并获取连接地址 */ scclResult_t scclNetIb::listen(int dev, void* opaqueHandle, void** listenComm) { memset(ibComm, 0, sizeof(struct scclIbListenComm)); struct scclIbHandle* handle = (struct scclIbHandle*)opaqueHandle; // 静态断言,确保 scclIbHandle 结构体的大小不超过 SCCL_NET_HANDLE_MAXSIZE static_assert(sizeof(struct scclIbHandle) < SCCL_NET_HANDLE_MAXSIZE, "scclIbHandle size too large"); // 将 handle 指向的内存区域清零,大小为 scclIbHandle 结构体的大小 memset(handle, 0, sizeof(struct scclIbHandle)); // 设置设备和处理句柄 ibComm->dev = dev; handle->magic = SCCL_SOCKET_MAGIC; SCCLCHECK(net_socket::scclSocketInit(&ibComm->sock, &scclIbIfAddr, handle->magic, net_socket::scclSocketTypeNetIb, NULL, 1)); // 如果启用了端口复用,则复用套接字地址和文件描述符 if(scclParamIbSockServerPortReuse()) { if(reusedSockfd == -1) { SCCLCHECK(scclSocketListen(&ibComm->sock)); memcpy(&reusedAddr, &ibComm->sock.addr, sizeof(union net_socket::scclSocketAddress)); reusedSockfd = ibComm->sock.fd; } else { memcpy(&ibComm->sock.addr, &reusedAddr, sizeof(union net_socket::scclSocketAddress)); ibComm->sock.fd = reusedSockfd; } } else { SCCLCHECK(net_socket::scclSocketListen(&ibComm->sock)); } // 获取套接字地址并设置监听通信 SCCLCHECK(net_socket::scclSocketGetAddr(&ibComm->sock, &handle->connectAddr)); *listenComm = ibComm; return scclSuccess; } /** * @brief 建立IB网络连接并初始化通信资源 * * 该函数负责完成以下操作: * 1. 初始化socket连接 * 2. 创建IB QP队列对 * 3. 交换QP信息 * 4. 完成QP状态转换(RTR/RTS) * 5. 注册内存区域 * * @param dev 设备索引 * @param opaqueHandle 包含连接信息的句柄 * @param sendComm 输出参数,返回建立的发送通信上下文 * @return scclResult_t 返回操作结果状态码 * * @note 该函数使用状态机模式处理异步连接过程 * @warning 不能重复连接已建立的sendComm */ scclResult_t scclNetIb::connect(int dev, void* opaqueHandle, void** sendComm) { struct scclIbHandle* handle = (struct scclIbHandle*)opaqueHandle; struct scclIbCommStage* stage = &handle->stage; struct scclIbSendComm* comm = (struct scclIbSendComm*)stage->comm; int ready; *sendComm = NULL; if(stage->state == scclIbCommStateConnect) goto ib_connect_check; if(stage->state == scclIbCommStateSend) goto ib_send; if(stage->state == scclIbCommStateConnecting) goto ib_connect; if(stage->state == scclIbCommStateConnected) goto ib_send_ready; if(stage->state != scclIbCommStateStart) { WARN("Error: trying to connect already connected sendComm"); return scclInternalError; } SCCLCHECK(scclIbMalloc((void**)&comm, sizeof(struct scclIbSendComm))); SCCLCHECK(net_socket::scclSocketInit(&comm->sock, &handle->connectAddr, handle->magic, net_socket::scclSocketTypeNetIb, NULL, 1)); stage->comm = comm; stage->state = scclIbCommStateConnect; SCCLCHECK(net_socket::scclSocketConnect(&comm->sock, scclParamIbSockClientPortReuse())); ib_connect_check: /* since scclSocketConnect is async, we must check if connection is complete */ SCCLCHECK(net_socket::scclSocketReady(&comm->sock, &ready)); if(!ready) return scclSuccess; // IB Setup struct ibv_context* ctx; ctx = scclIbDevs[dev].context; SCCLCHECK(scclIbInitVerbs(dev, ctx, &comm->verbs)); uint8_t ib_port; ib_port = scclIbDevs[dev].port; comm->nqps = scclParamIbQpsPerConn(); for(int q = 0; q < comm->nqps; q++) { SCCLCHECK(scclIbCreateQp(ib_port, &comm->verbs, IBV_ACCESS_REMOTE_WRITE, comm->qps + q)); } comm->ar = scclIbDevs[dev].ar; // ADAPTIVE_ROUTING // Send my QP Info to receiver through the socket. Hope this won't block. struct ibv_port_attr portAttr; SCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr)); struct scclIbQpInfo qpInfo; qpInfo.ib_port = ib_port; for(int q = 0; q < comm->nqps; q++) qpInfo.qpn[q] = comm->qps[q]->qp_num; qpInfo.mtu = portAttr.active_mtu; // Prepare my fifo SCCLCHECK(wrap_ibv_reg_mr(&comm->fifoMr, comm->verbs.pd, comm->fifo, sizeof(struct scclIbSendFifo) * MAX_REQUESTS * SCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ)); qpInfo.fifoRkey = comm->fifoMr->rkey; qpInfo.fifoAddr = (uint64_t)comm->fifo; // RoCE support qpInfo.lid = portAttr.lid; qpInfo.link_layer = comm->gidInfo.link_layer = portAttr.link_layer; if(qpInfo.link_layer == IBV_LINK_LAYER_INFINIBAND) { // IB for(int q = 0; q < comm->nqps; q++) INFO(SCCL_LOG_NET, "NET/IB: Dev %d Port %d qpn %d mtu %d LID %d", dev, ib_port, qpInfo.qpn[q], qpInfo.mtu, qpInfo.lid); } else { // RoCE SCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, scclParamIbGidIndex(), &comm->gidInfo.localGid)); qpInfo.spn = comm->gidInfo.localGid.global.subnet_prefix; qpInfo.iid = comm->gidInfo.localGid.global.interface_id; for(int q = 0; q < comm->nqps; q++) INFO(SCCL_LOG_NET, "NET/IB: Dev %d Port %d qpn %d mtu %d GID %ld (%lX/%lX)", dev, ib_port, qpInfo.qpn[q], qpInfo.mtu, scclParamIbGidIndex(), qpInfo.spn, qpInfo.iid); } stage->state = scclIbCommStateSend; stage->offset = 0; SCCLCHECK(scclIbMalloc((void**)&stage->buffer, sizeof(qpInfo))); memcpy(stage->buffer, &qpInfo, sizeof(qpInfo)); ib_send: SCCLCHECK(scclSocketProgress(SCCL_SOCKET_SEND, &comm->sock, stage->buffer, sizeof(qpInfo), &stage->offset)); if(stage->offset != sizeof(qpInfo)) return scclSuccess; stage->state = scclIbCommStateConnecting; stage->offset = 0; // Clear the staging buffer for re-use memset(stage->buffer, 0, sizeof(qpInfo)); ib_connect: struct scclIbQpInfo remQpInfo; SCCLCHECK(scclSocketProgress(SCCL_SOCKET_RECV, &comm->sock, stage->buffer, sizeof(scclIbQpInfo), &stage->offset)); if(stage->offset != sizeof(remQpInfo)) return scclSuccess; memcpy(&remQpInfo, stage->buffer, sizeof(scclIbQpInfo)); comm->gidInfo.remoteGid.global.subnet_prefix = remQpInfo.spn; comm->gidInfo.remoteGid.global.interface_id = remQpInfo.iid; for(int q = 0; q < comm->nqps; q++) { struct ibv_qp* qp = comm->qps[q]; SCCLCHECK(scclIbRtrQp(qp, remQpInfo.qpn[q], &remQpInfo)); SCCLCHECK(scclIbRtsQp(qp)); } comm->ready = 1; stage->state = scclIbCommStateConnected; stage->offset = 0; ib_send_ready: SCCLCHECK(scclSocketProgress(SCCL_SOCKET_SEND, &comm->sock, &comm->ready, sizeof(int), &stage->offset)); if(stage->offset != sizeof(int)) return scclSuccess; free(stage->buffer); stage->state = scclIbCommStateStart; *sendComm = comm; return scclSuccess; } /** * @brief 接受IB连接请求并建立通信通道 * * 该函数处理IB连接的接受过程,包括以下步骤: * 1. 初始化接收通信结构体 * 2. 接受socket连接 * 3. 交换QP信息 * 4. 创建并配置QP队列 * 5. 设置远程FIFO信息 * 6. 处理GPU直接RDMA刷新缓冲区 * 7. 完成握手过程 * * @param listenComm 监听通信句柄 * @param recvComm 输出参数,接收通信句柄 * @return scclResult_t 返回操作结果,成功返回scclSuccess */ scclResult_t scclNetIb::accept(void* listenComm, void** recvComm) { struct scclIbListenComm* lComm = (struct scclIbListenComm*)listenComm; struct scclIbCommStage* stage = &lComm->stage; struct scclIbRecvComm* rComm = (struct scclIbRecvComm*)stage->comm; int ready; *recvComm = NULL; if(stage->state == scclIbCommStateAccept) goto ib_accept_check; if(stage->state == scclIbCommStateRecv) goto ib_recv; if(stage->state == scclIbCommStateSend) goto ib_send; if(stage->state == scclIbCommStatePendingReady) goto ib_recv_ready; if(stage->state != scclIbCommStateStart) { WARN("Listencomm in unknown state %d", stage->state); return scclInternalError; } SCCLCHECK(scclIbMalloc((void**)&rComm, sizeof(struct scclIbRecvComm))); stage->comm = rComm; stage->state = scclIbCommStateAccept; SCCLCHECK(net_socket::scclSocketInit(&rComm->sock)); SCCLCHECK(net_socket::scclSocketAccept(&rComm->sock, &lComm->sock)); ib_accept_check: SCCLCHECK(net_socket::scclSocketReady(&rComm->sock, &ready)); if(!ready) return scclSuccess; struct scclIbQpInfo remQpInfo; stage->state = scclIbCommStateRecv; stage->offset = 0; SCCLCHECK(scclIbMalloc((void**)&stage->buffer, sizeof(remQpInfo))); ib_recv: SCCLCHECK(net_socket::scclSocketProgress(SCCL_SOCKET_RECV, &rComm->sock, stage->buffer, sizeof(remQpInfo), &stage->offset)); if(stage->offset != sizeof(remQpInfo)) return scclSuccess; /* copy back the received info */ memcpy(&remQpInfo, stage->buffer, sizeof(struct scclIbQpInfo)); rComm->gidInfo.remoteGid.global.subnet_prefix = remQpInfo.spn; rComm->gidInfo.remoteGid.global.interface_id = remQpInfo.iid; // IB setup struct ibv_context* ctx; uint8_t ib_port; ctx = scclIbDevs[lComm->dev].context; ib_port = scclIbDevs[lComm->dev].port; struct ibv_port_attr portAttr; SCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr)); SCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, scclParamIbGidIndex(), &rComm->gidInfo.localGid)); // QP Creation SCCLCHECK(scclIbInitVerbs(lComm->dev, ctx, &rComm->verbs)); rComm->nqps = scclParamIbQpsPerConn(); for(int q = 0; q < rComm->nqps; q++) { SCCLCHECK(scclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_REMOTE_WRITE, rComm->qps + q)); } // Adjust the MTU remQpInfo.mtu = (enum ibv_mtu)std::min(remQpInfo.mtu, portAttr.active_mtu); // Setup QP for(int q = 0; q < rComm->nqps; q++) { struct ibv_qp* qp = rComm->qps[q]; SCCLCHECK(scclIbRtrQp(qp, remQpInfo.qpn[q], &remQpInfo)); SCCLCHECK(scclIbRtsQp(qp)); } // Retain remote fifo info and prepare my RDMA ops rComm->remFifo.rkey = remQpInfo.fifoRkey; rComm->remFifo.addr = remQpInfo.fifoAddr; SCCLCHECK(wrap_ibv_reg_mr(&rComm->remFifo.mr, rComm->verbs.pd, &rComm->remFifo.elems, sizeof(struct scclIbSendFifo) * MAX_REQUESTS * SCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ)); rComm->remFifo.sge.lkey = rComm->remFifo.mr->lkey; if(scclParamIbUseInline()) rComm->remFifo.flags = IBV_SEND_INLINE; // Allocate Flush dummy buffer for GPU Direct RDMA rComm->gpuFlush.enabled = ((scclIbGdrSupport(lComm->dev) == scclSuccess || scclIbDmaBufSupport(lComm->dev) == scclSuccess) && (scclParamIbGdrFlushDisable() == 0)) ? 1 : 0; if(rComm->gpuFlush.enabled) { SCCLCHECK(wrap_ibv_reg_mr(&rComm->gpuFlush.hostMr, rComm->verbs.pd, &rComm->gpuFlush.hostMem, sizeof(int), IBV_ACCESS_LOCAL_WRITE)); rComm->gpuFlush.sge.addr = (uint64_t)&rComm->gpuFlush.hostMem; rComm->gpuFlush.sge.length = 1; rComm->gpuFlush.sge.lkey = rComm->gpuFlush.hostMr->lkey; SCCLCHECK(scclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ, &rComm->gpuFlush.qp)); struct scclIbQpInfo localQpInfo; localQpInfo.lid = portAttr.lid; localQpInfo.link_layer = portAttr.link_layer; localQpInfo.ib_port = ib_port; localQpInfo.spn = rComm->gidInfo.localGid.global.subnet_prefix; localQpInfo.iid = rComm->gidInfo.localGid.global.interface_id; localQpInfo.mtu = portAttr.active_mtu; SCCLCHECK(scclIbRtrQp(rComm->gpuFlush.qp, rComm->gpuFlush.qp->qp_num, &localQpInfo)); SCCLCHECK(scclIbRtsQp(rComm->gpuFlush.qp)); } // Fill Handle struct scclIbQpInfo qpInfo; qpInfo.lid = portAttr.lid; qpInfo.link_layer = rComm->gidInfo.link_layer = portAttr.link_layer; qpInfo.ib_port = ib_port; for(int q = 0; q < rComm->nqps; q++) qpInfo.qpn[q] = rComm->qps[q]->qp_num; qpInfo.spn = rComm->gidInfo.localGid.global.subnet_prefix; qpInfo.iid = rComm->gidInfo.localGid.global.interface_id; qpInfo.mtu = remQpInfo.mtu; stage->state = scclIbCommStateSend; stage->offset = 0; if(stage->buffer) free(stage->buffer); SCCLCHECK(scclIbMalloc((void**)&stage->buffer, sizeof(struct scclIbQpInfo))); memcpy(stage->buffer, &qpInfo, sizeof(struct scclIbQpInfo)); ib_send: SCCLCHECK(net_socket::scclSocketProgress(SCCL_SOCKET_SEND, &rComm->sock, stage->buffer, sizeof(struct scclIbQpInfo), &stage->offset)); if(stage->offset < sizeof(struct scclIbQpInfo)) return scclSuccess; stage->offset = 0; stage->state = scclIbCommStatePendingReady; ib_recv_ready: SCCLCHECK(net_socket::scclSocketProgress(SCCL_SOCKET_RECV, &rComm->sock, &rComm->ready, sizeof(int), &stage->offset)); if(stage->offset != sizeof(int)) return scclSuccess; free(stage->buffer); *recvComm = rComm; /* reset lComm stage */ stage->state = scclIbCommStateStart; stage->offset = 0; stage->comm = NULL; stage->buffer = NULL; return scclSuccess; } /* DMA-BUF support */ scclResult_t scclNetIb::regMrDmaBuf(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) { static_assert(offsetof(struct scclIbSendComm, verbs) == offsetof(struct scclIbRecvComm, verbs), "Send and recv comms must have verbs at the same offset"); assert(size > 0); static __thread uintptr_t pageSize = 0; if(pageSize == 0) pageSize = sysconf(_SC_PAGESIZE); struct scclIbVerbs* verbs = (struct scclIbVerbs*)comm; struct scclIbMrCache* cache = &scclIbDevs[verbs->dev].mrCache; uintptr_t addr = (uintptr_t)data & -pageSize; size_t pages = ((uintptr_t)data + size - addr + pageSize - 1) / pageSize; scclResult_t res; pthread_mutex_lock(&scclIbDevs[verbs->dev].lock); for(int slot = 0; /*true*/; slot++) { if(slot == cache->population) { // didn't find in cache if(cache->population == cache->capacity) { // must grow cache cache->capacity = cache->capacity < 32 ? 32 : 2 * cache->capacity; SCCLCHECKGOTO(scclRealloc(&cache->slots, cache->population, cache->capacity), res, returning); } // Deregister / register struct ibv_mr* mr; unsigned int flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; if(scclIbRelaxedOrderingEnabled) flags |= IBV_ACCESS_RELAXED_ORDERING; if(fd != -1) { /* DMA-BUF support */ SCCLCHECKGOTO(wrap_ibv_reg_dmabuf_mr(&mr, verbs->pd, offset, pages * pageSize, addr, fd, flags), res, returning); } else { if(scclIbRelaxedOrderingEnabled) { // Use IBVERBS_1.8 API - needed for IBV_ACCESS_RELAXED_ORDERING support SCCLCHECKGOTO(wrap_ibv_reg_mr_iova2(&mr, verbs->pd, (void*)addr, pages * pageSize, addr, flags), res, returning); } else { SCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages * pageSize, flags), res, returning); } } INFO(SCCL_LOG_NET, "regAddr %llx size %lld rkey %x fd %d", (unsigned long long)addr, (long long)pages * pageSize, mr->rkey, fd); cache->population += 1; cache->slots[slot].addr = addr; cache->slots[slot].pages = pages; cache->slots[slot].refs = 1; cache->slots[slot].mr = mr; *mhandle = (void*)mr; res = scclSuccess; goto returning; } else if(cache->slots[slot].addr == addr && cache->slots[slot].pages == pages) { cache->slots[slot].refs += 1; *mhandle = (void*)cache->slots[slot].mr; res = scclSuccess; goto returning; } } returning: pthread_mutex_unlock(&scclIbDevs[verbs->dev].lock); return res; } scclResult_t scclNetIb::regMr(void* comm, void* data, int size, int type, void** mhandle) { return regMrDmaBuf(comm, data, (size_t)size, type, 0ULL, -1, mhandle); } /** * @brief 注销IB内存区域(MR) * * 该函数用于注销指定的IB内存区域(MR),并更新MR缓存。如果MR的引用计数减至0, * 则从缓存中移除并调用ibv_dereg_mr释放资源。 * * @param comm 通信上下文指针 * @param mhandle 要注销的内存区域句柄 * @return scclResult_t 返回操作结果(scclSuccess表示成功) */ scclResult_t scclNetIb::deregMr(void* comm, void* mhandle) { struct scclIbVerbs* verbs = (struct scclIbVerbs*)comm; struct scclIbMrCache* cache = &scclIbDevs[verbs->dev].mrCache; scclResult_t res; pthread_mutex_lock(&scclIbDevs[verbs->dev].lock); for(int i = 0; i < cache->population; i++) { if(mhandle == cache->slots[i].mr) { if(0 == --cache->slots[i].refs) { memmove(&cache->slots[i], &cache->slots[--cache->population], sizeof(struct scclIbMr)); if(cache->population == 0) { free(cache->slots); cache->slots = NULL; cache->capacity = 0; } SCCLCHECKGOTO(wrap_ibv_dereg_mr((struct ibv_mr*)mhandle), res, returning); } res = scclSuccess; goto returning; } } WARN("NET/IB: could not find mr %p inside cache of %d entries", mhandle, cache->population); res = scclInternalError; returning: pthread_mutex_unlock(&scclIbDevs[verbs->dev].lock); return res; } scclResult_t scclNetIb::isend(void* sendComm, void* data, int size, int tag, void* mhandle, void** request) { struct scclIbSendComm* comm = (struct scclIbSendComm*)sendComm; if(comm->ready == 0) { WARN("NET/IB: isend() called when comm->ready == 0"); return scclInternalError; } if(comm->ready == 0) { *request = NULL; return scclSuccess; } struct ibv_mr* mr = (struct ibv_mr*)mhandle; // Wait for the receiver to have posted the corresponding receive int nreqs = 0; volatile struct scclIbSendFifo* slots; int slot = (comm->fifoHead) % MAX_REQUESTS; struct scclIbRequest** reqs = comm->fifoReqs[slot]; slots = comm->fifo[slot]; uint64_t idx = comm->fifoHead + 1; if(slots[0].idx != idx) { *request = NULL; return scclSuccess; } nreqs = slots[0].nreqs; // Wait until all data has arrived for(int r = 1; r < nreqs; r++) while(slots[r].idx != idx) ; __sync_synchronize(); // order the nreqsPtr load against tag/rkey/addr loads below for(int r = 0; r < nreqs; r++) { if(reqs[r] != NULL || slots[r].tag != tag) continue; // Sanity checks to catch user collective call count/size mismatches if(size > slots[r].size) { char line[SOCKET_NAME_MAXLEN + 1]; union net_socket::scclSocketAddress addr; net_socket::scclSocketGetAddr(&comm->sock, &addr); WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error, local size %d remote size %d", r, nreqs, tag, net_socket::scclSocketToString(&addr, line), size, slots[r].size); return scclInvalidUsage; } // plus any potential programming errors else if(slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) { char line[SOCKET_NAME_MAXLEN + 1]; union net_socket::scclSocketAddress addr; net_socket::scclSocketGetAddr(&comm->sock, &addr); WARN("NET/IB : req %d/%d tag %x peer %s posted incorrect receive info: size %d addr %lx rkey %x", r, nreqs, tag, net_socket::scclSocketToString(&addr, line), slots[r].size, slots[r].addr, slots[r].rkey); return scclInternalError; } struct scclIbRequest* req; SCCLCHECK(scclIbGetRequest(&comm->verbs, &req)); req->type = SCCL_NET_IB_REQ_SEND; req->sock = &comm->sock; req->verbs = &comm->verbs; req->nreqs = nreqs; req->send.size = size; req->send.data = data; req->send.lkey = mr->lkey; req->send.offset = 0; req->events = scclParamIbSplitDataOnQps() ? comm->nqps : 1; if(comm->gidInfo.link_layer == IBV_LINK_LAYER_ETHERNET) req->gidInfo = &comm->gidInfo; *request = reqs[r] = req; // If this is a multi-recv, send only when all requests have matched. for(int r = 0; r < nreqs; r++) { if(reqs[r] == NULL) return scclSuccess; } SCCLCHECK(scclIbMultiSend(comm, slot)); // Clear slots[0]->nreqs, as well as other fields to help debugging and sanity checks memset((void*)slots, 0, sizeof(struct scclIbSendFifo)); memset(reqs, 0, SCCL_NET_IB_MAX_RECVS * sizeof(struct scclIbRequest*)); comm->fifoHead++; return scclSuccess; } *request = NULL; return scclSuccess; } scclResult_t scclNetIb::irecv(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request) { struct scclIbRecvComm* comm = (struct scclIbRecvComm*)recvComm; if(comm->ready == 0) { WARN("NET/IB: irecv() called when comm->ready == 0"); return scclInternalError; } if(comm->ready == 0) { *request = NULL; return scclSuccess; } if(n > SCCL_NET_IB_MAX_RECVS) return scclInternalError; struct scclIbRequest* req; SCCLCHECK(scclIbGetRequest(&comm->verbs, &req)); req->type = SCCL_NET_IB_REQ_RECV; req->sock = &comm->sock; req->nreqs = n; if(comm->gidInfo.link_layer == IBV_LINK_LAYER_ETHERNET) req->gidInfo = &comm->gidInfo; for(int i = 0; i < n; i++) req->recv.sizes[i] = 0; struct ibv_recv_wr wr; memset(&wr, 0, sizeof(wr)); wr.wr_id = req - comm->verbs.reqs; wr.sg_list = NULL; wr.num_sge = 0; const int nqps = scclParamIbSplitDataOnQps() ? comm->nqps : 1; for(int q = 0; q < nqps; q++) { struct ibv_qp* qp = comm->qps[comm->qpIndex]; struct ibv_recv_wr* bad_wr; SCCLCHECK(wrap_ibv_post_recv(qp, &wr, &bad_wr)); comm->qpIndex = (comm->qpIndex + 1) % comm->nqps; } req->events = nqps; *request = req; // Post to FIFO to notify sender SCCLCHECK(scclIbPostFifo(comm, n, data, sizes, tags, mhandles, req)); return scclSuccess; } scclResult_t scclNetIb::iflush(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) { struct scclIbRecvComm* comm = (struct scclIbRecvComm*)recvComm; int last = -1; for(int i = 0; i < n; i++) if(sizes[i]) last = i; if(comm->gpuFlush.enabled == 0 || last == -1) return scclSuccess; // Only flush once using the last non-zero receive struct scclIbRequest* req; SCCLCHECK(scclIbGetRequest(&comm->verbs, &req)); req->type = SCCL_NET_IB_REQ_FLUSH; req->sock = &comm->sock; struct ibv_mr* mr = (struct ibv_mr*)mhandles[last]; struct ibv_send_wr wr; memset(&wr, 0, sizeof(wr)); wr.wr_id = req - comm->verbs.reqs; wr.wr.rdma.remote_addr = (uint64_t)data[last]; wr.wr.rdma.rkey = mr->rkey; wr.sg_list = &comm->gpuFlush.sge; wr.num_sge = 1; wr.opcode = IBV_WR_RDMA_READ; wr.send_flags = IBV_SEND_SIGNALED; struct ibv_send_wr* bad_wr; SCCLCHECK(wrap_ibv_post_send(comm->gpuFlush.qp, &wr, &bad_wr)); *request = req; return scclSuccess; } scclResult_t scclNetIb::test(void* request, int* done, int* sizes) { struct scclIbRequest* r = (struct scclIbRequest*)request; *done = 0; while(1) { if(r->events == 0) { *done = 1; if(sizes && r->type == SCCL_NET_IB_REQ_RECV) { for(int i = 0; i < r->nreqs; i++) sizes[i] = r->recv.sizes[i]; } SCCLCHECK(scclIbFreeRequest(r)); return scclSuccess; } int wrDone = 0; struct ibv_wc wcs[4]; SCCLCHECK(wrap_ibv_poll_cq(r->verbs->cq, 4, wcs, &wrDone)); if(wrDone == 0) return scclSuccess; for(int w = 0; w < wrDone; w++) { struct ibv_wc* wc = wcs + w; if(wc->status != IBV_WC_SUCCESS) { char line[SOCKET_NAME_MAXLEN + 1]; union net_socket::scclSocketAddress addr; net_socket::scclSocketGetAddr(r->sock, &addr); char localGidString[INET6_ADDRSTRLEN] = ""; char remoteGidString[INET6_ADDRSTRLEN] = ""; const char *localGidStr = NULL, *remoteGidStr = NULL; if(r->gidInfo) { localGidStr = inet_ntop(AF_INET6, &r->gidInfo->localGid, localGidString, sizeof(localGidString)); remoteGidStr = inet_ntop(AF_INET6, &r->gidInfo->remoteGid, remoteGidString, sizeof(remoteGidString)); } WARN("NET/IB : Got completion from peer %s with error %d, opcode %d, len %d, vendor err %d (%s)%s%s%s%s", net_socket::scclSocketToString(&addr, line), wc->status, wc->opcode, wc->byte_len, wc->vendor_err, reqTypeStr[r->type], localGidStr ? " localGid " : "", localGidString, remoteGidStr ? " remoteGid " : "", remoteGidString); return scclRemoteError; } struct scclIbRequest* req = r->verbs->reqs + (wc->wr_id & 0xff); if(req->type == SCCL_NET_IB_REQ_SEND) { for(int i = 0; i < req->nreqs; i++) { struct scclIbRequest* sendReq = r->verbs->reqs + ((wc->wr_id >> (i * 8)) & 0xff); if((sendReq->events <= 0)) return scclInternalError; sendReq->events--; } } else { if(req && wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { if(req->type != SCCL_NET_IB_REQ_RECV) return scclInternalError; if(req->nreqs > 1) { // In the case of a multi recv, we only set sizes to 0 or 1. for(int i = 0; i < req->nreqs; i++) { req->recv.sizes[i] = (wc->imm_data >> i) & 0x1; } } else { req->recv.sizes[0] += wc->imm_data; } } req->events--; } } } } scclResult_t scclNetIb::closeSend(void* sendComm) { struct scclIbSendComm* comm = (struct scclIbSendComm*)sendComm; if(comm) { SCCLCHECK(net_socket::scclSocketClose(&comm->sock)); for(int q = 0; q < comm->nqps; q++) if(comm->qps[q] != NULL) SCCLCHECK(wrap_ibv_destroy_qp(comm->qps[q])); if(comm->fifoMr != NULL) SCCLCHECK(wrap_ibv_dereg_mr(comm->fifoMr)); SCCLCHECK(scclIbDestroyVerbs(&comm->verbs)); free(comm); } return scclSuccess; } scclResult_t scclNetIb::closeRecv(void* recvComm) { struct scclIbRecvComm* comm = (struct scclIbRecvComm*)recvComm; if(comm) { if(!scclParamIbSockServerPortReuse() || reusedSockfd != comm->sock.fd) SCCLCHECK(net_socket::scclSocketClose(&comm->sock)); for(int q = 0; q < comm->nqps; q++) if(comm->qps[q] != NULL) SCCLCHECK(wrap_ibv_destroy_qp(comm->qps[q])); if(comm->gpuFlush.enabled) { if(comm->gpuFlush.qp != NULL) SCCLCHECK(wrap_ibv_destroy_qp(comm->gpuFlush.qp)); if(comm->gpuFlush.hostMr != NULL) SCCLCHECK(wrap_ibv_dereg_mr(comm->gpuFlush.hostMr)); } if(comm->remFifo.mr != NULL) SCCLCHECK(wrap_ibv_dereg_mr(comm->remFifo.mr)); SCCLCHECK(scclIbDestroyVerbs(&comm->verbs)); free(comm); } return scclSuccess; } scclResult_t scclNetIb::closeListen(void* listenComm) { struct scclIbListenComm* comm = (struct scclIbListenComm*)listenComm; if(comm) { SCCLCHECK(net_socket::scclSocketClose(&comm->sock)); free(comm); } return scclSuccess; } } // namespace net_ib } // namespace net } // namespace hardware } // namespace sccl