/************************************************************************* * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. * Modifications Copyright (c) 2019-2023 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #include "comm.h" #include "info.h" #include "bootstrap.h" #define ENABLE_TIMER 0 #include "timer.h" struct ncclTransport* ncclTransports[NTRANSPORTS] = { &p2pTransport, &shmTransport, &netTransport, &collNetTransport }; template static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclConnect* connect, int channelId, int peer, int connIndex, int* transportType, bool* needsProxy) { struct ncclPeerInfo* myInfo = comm->peerInfo+comm->rank; struct ncclPeerInfo* peerInfo = comm->peerInfo+peer; struct ncclConnector* connector = (type == 1) ? comm->channels[channelId].peers[peer]->send + connIndex : comm->channels[channelId].peers[peer]->recv + connIndex; // handle intra-node network connections int n1 = -1, n2 = -1; if (connIndex == NCCL_CONN_IDX_P2P_NET) { NCCLCHECK(ncclTopoGetIntraNetDev(comm->topo, comm->rank, graph, channelId, (type == 1) ? 1 : 0, &n1)); NCCLCHECK(ncclTopoGetIntraNetDev(comm->topo, peer, graph, channelId, (type == 1) ? 0 : 1, &n2)); } bool xgmi; NCCLCHECK(ncclTopoGetLinkType(comm->topo, myInfo->cudaDev, peerInfo->cudaDev, &xgmi)); int startTs = comm->channels[channelId].transportType; TRACE(NCCL_INIT, "<%s:%d> -----> startTs: %d, channelId :%d, connIndex: %d, type: %d, n1: %d, n2: %d\n", __func__, __LINE__, startTs, channelId, connIndex, type, n1, n2); for (int t=startTs; t= 0 && n2 >= 0 && t != TRANSPORT_NET) continue; struct ncclTransport *transport = ncclTransports[t]; struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv; int ret = 0; NCCLCHECK(transport->canConnect(&ret, comm->topo, graph, myInfo, peerInfo)); if (ret) { connector->transportComm = transportComm; NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId, connIndex)); if (transportType) *transportType = t; if (needsProxy) *needsProxy = (transportComm->proxyProgress != NULL); return ncclSuccess; } } WARN("No transport found for rank %d[%lx] -> rank %d[%lx]", myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId); return ncclSystemError; } ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) { TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv); struct ncclChannel* channel = &comm->channels[channelId]; uint64_t mask = 1UL << channel->id; for (int i=0; i= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue; comm->connectRecv[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; } for (int i=0; i= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue; comm->connectSend[peer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] |= mask; } return ncclSuccess; } void dumpData(struct ncclConnect* data, int ndata) { for (int n=0; nnRanks); // Store intermediate send/recvData structs for connect struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); // First time initialization for (int i=1; inRanks; i++) { int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0); int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; // Data[i] contains all ncclConnect information for all send and receive connections with a given send and recv peer // This data is packed in the array based on the number of sendChannels and recvChannels connected with these peers // The first N entries contain recvData, connection information for recv connections // The next M entries contain sendData, connection information for send connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS); recvData[i] = data[i]; int sendChannels = 0, recvChannels = 0; int type; bool proxy; TIME_START(0); for (int c=0; c(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type, &proxy), ret, fail); if (type > highestType) highestType = type; } } TIME_STOP(0); TIME_START(1); sendData[i] = recvData[i]+recvChannels; for (int c=0; c(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type, &proxy), ret, fail); if (type > highestType) highestType = type; needsProxyResult |= proxy; } } TIME_STOP(1); TIME_START(2); if (sendPeer == recvPeer) { if (recvChannels+sendChannels) { NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); sendData[i] = data[i]; recvData[i] = data[i]+sendChannels; } } else { if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); } TIME_STOP(2); } // Loop until all channels with all ranks have been connected bool allChannelsConnected; allChannelsConnected = false; while (!allChannelsConnected) { allChannelsConnected = true; for (int i=1; inRanks; i++) { int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; uint64_t recvMask = comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; uint64_t sendMask = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)]; int sendDataOffset = 0; int recvDataOffset = 0; for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; // This connector hasn't completed connection yet if (conn->connected == 0) { NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); if (ret == ncclSuccess) { struct ncclDevChannelPeer* addr; conn->connected = 1; /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[sendPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail); CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); } else if (ret == ncclInProgress) { allChannelsConnected = false; } } } TIME_STOP(3); // Start with recv channels TIME_START(4); if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; // This connector hasn't completed connection yet if (conn->connected == 0) { NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); if (ret == ncclSuccess) { struct ncclDevChannelPeer* addr; conn->connected = 1; /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[recvPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail); CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); } else if (ret == ncclInProgress) { allChannelsConnected = false; } } } TIME_STOP(4); } } } // Clear all connect masks and free each connectInfo array for (int i=1; inRanks; i++) { int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; comm->connectRecv[recvPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = comm->connectSend[sendPeer+comm->nRanks*(connIndex == NCCL_CONN_IDX_P2P_NET ? NCCL_CONN_IDX_P2P_NET : 0)] = 0UL; free(data[i]); } free(data); free(sendData); free(recvData); if (highestTransportType != NULL) *highestTransportType = highestType; if (needsProxy != NULL) *needsProxy = needsProxyResult; TIME_PRINT("P2P Setup/Connect"); exit: NCCLCHECK(ncclStrongStreamWaitStream(ncclCudaGraphNone(), &comm->sharedRes->deviceStream, &comm->sharedRes->hostStream)); NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->hostStream)); return ret; fail: goto exit; } extern struct ncclTransport collNetTransport; // All ranks must participate in collNetSetup call // We do not NCCLCHECK this call because we would fall back to P2P network in case CollNet setup fails int ncclTransportCollNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int masterRank, int masterPeer, int collNetGraphChannelId, int type) { int fail = 1; int rank = comm->rank; int nranks = comm->nRanks; int nMasters = comm->nNodes; int rankInCollNet = -1; int isMaster = (rank == masterRank) ? 1 : 0; struct { int collNetRank; ncclConnect connect; } sendrecvExchange; // check if we can connect to collnet, whose root is the nranks-th rank struct ncclPeerInfo *myInfo = comm->peerInfo+rank, *peerInfo = comm->peerInfo+nranks; peerInfo->rank = nranks; // send master receives connect info from peer recv master if (isMaster && type == collNetSend) { NCCLCHECK(bootstrapRecv(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange))); rankInCollNet = sendrecvExchange.collNetRank; TRACE(NCCL_INIT, "CollNet [send] : rank %d collNetRank %d collNetNranks %d received connect from rank %d", rank, rankInCollNet, nMasters, masterPeer); } // select struct ncclChannelPeer* root = channel->peers[nranks]; // connector index: 0 for recv, 1 for send struct ncclConnector* conn = (type == collNetRecv) ? root->recv+type : root->send+type; struct ncclTransportComm* transportComm = (type == collNetRecv) ? &(collNetTransport.recv) : &(collNetTransport.send); conn->transportComm = transportComm; // setup struct ncclConnect myConnect; if (isMaster) { NCCLCHECK(transportComm->setup(comm, collNetGraph, myInfo, peerInfo, &myConnect, conn, collNetGraphChannelId, type)); } // prepare connect handles ncclResult_t res; struct { int isMaster; ncclConnect connect; } *allConnects = NULL; ncclConnect *masterConnects = NULL; NCCLCHECK(ncclCalloc(&masterConnects, nMasters)); if (type == collNetRecv) { // recv side: AllGather // all ranks must participate NCCLCHECK(ncclCalloc(&allConnects, nranks)); allConnects[rank].isMaster = isMaster; memcpy(&(allConnects[rank].connect), &myConnect, sizeof(struct ncclConnect)); NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allConnects, sizeof(*allConnects)), res, cleanup); // consolidate int c = 0; for (int r = 0; r < nranks; r++) { if (allConnects[r].isMaster) { memcpy(masterConnects+c, &(allConnects[r].connect), sizeof(struct ncclConnect)); if (r == rank) rankInCollNet = c; c++; } } } else { // send side : copy in connect info received from peer recv master if (isMaster) memcpy(masterConnects+rankInCollNet, &(sendrecvExchange.connect), sizeof(struct ncclConnect)); } // connect if (isMaster) { NCCLCHECKGOTO(transportComm->connect(comm, masterConnects, nMasters, rankInCollNet, conn), res, cleanup); struct ncclDevChannelPeer* devRoot; CUDACHECKGOTO(cudaMemcpy(&devRoot, channel->devPeers + nranks, sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost), res, cleanup); struct ncclConnInfo* devConnInfo = (type == collNetRecv) ? devRoot->recv + type : devRoot->send + type; CUDACHECKGOTO(cudaMemcpy(devConnInfo, &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice), res, cleanup); } // recv side sends connect info to send side if (isMaster && type == collNetRecv) { sendrecvExchange.collNetRank = rankInCollNet; memcpy(&sendrecvExchange.connect, masterConnects+rankInCollNet, sizeof(struct ncclConnect)); NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, masterPeer, collNetGraph->id, &sendrecvExchange, sizeof(sendrecvExchange)), res, cleanup); TRACE(NCCL_INIT, "CollNet [recv] : rank %d collNetRank %d collNetNranks %d sent connect to rank %d", rank, rankInCollNet, nMasters, masterPeer); } fail = 0; cleanup: if (allConnects != NULL) free(allConnects); if (masterConnects != NULL) free(masterConnects); return fail; } ncclResult_t ncclTransportCollNetCheck(struct ncclComm* comm, int collNetSetupFail) { // AllGather collNet setup results int allGatherFailures[NCCL_MAX_LOCAL_RANKS] = {0}; allGatherFailures[comm->localRank] = collNetSetupFail; NCCLCHECK(bootstrapIntraNodeAllGather(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, allGatherFailures, sizeof(int))); for (int i=0; ilocalRanks; i++) { if (allGatherFailures[i] != 0) { collNetSetupFail = 1; break; } } if (collNetSetupFail) { if (comm->localRank == 0) WARN("Cannot initialize CollNet, using point-to-point network instead"); return ncclSystemError; } return ncclSuccess; } ncclResult_t ncclTransportCollNetFree(struct ncclComm* comm) { // Free collNet resources for (int r=0; rnChannels; r++) { struct ncclChannel* channel = comm->channels+r; struct ncclChannelPeer* peer = channel->peers[comm->nRanks]; if (peer) { if (ncclAtomicRefCountDecrement(&peer->refCount) == 0) { for (int b=0; bsend + b; if (send->transportResources && send->transportComm) NCCLCHECK(send->transportComm->free(send)); send->transportResources = NULL; // avoid double free } for (int b=0; brecv + b; if (recv->transportResources && recv->transportComm) NCCLCHECK(recv->transportComm->free(recv)); recv->transportResources = NULL; // avoid double free } } } } return ncclSuccess; }