Commit 7dc4e964 authored by wanghan's avatar wanghan
Browse files

Initial commit: RCCL auto-tuning project

parents
#!/bin/bash
#
# Copyright (c) 2017-2019, NVIDIA CORPORATION. All rights reserved.
#
# See LICENSE.txt for license information
#
# To run from $BUILDDIR/
BUILDDIR=`basename $PWD`
cd ..
NCCL_MAJOR=${nccl:Major}
NCCL_MINOR=${nccl:Minor}
NCCL_PATCH=${nccl:Patch}
NCCL_SUFFIX=${nccl:Suffix}
CUDA_MAJOR=${cuda:Major}
CUDA_MINOR=${cuda:Minor}
PKG_REVISION=${pkg:Revision}
PKG_ARCH=${pkg:Arch}
NCCLNAME="nccl_${NCCL_MAJOR}.${NCCL_MINOR}.${NCCL_PATCH}${NCCL_SUFFIX}-${PKG_REVISION}+cuda${CUDA_MAJOR}.${CUDA_MINOR}_${PKG_ARCH}"
tar --transform "s/^$BUILDDIR/$NCCLNAME/" -Jcf $NCCLNAME.txz --owner=0 --group=0 $BUILDDIR/include $BUILDDIR/lib $BUILDDIR/*.txt
#
# Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
#
# See LICENSE.txt for license information
#
include ../makefiles/common.mk
include ../makefiles/version.mk
##### src files
INCEXPORTS := nccl.h nccl_net.h
LIBSRCFILES := init.cc init_nvtx.cc channel.cc bootstrap.cc transport.cc enqueue.cc group.cc debug.cc proxy.cc net.cc tuner.cc \
misc/cudawrap.cc misc/nvmlwrap.cc misc/ibvsymbols.cc misc/ibvwrap.cc misc/gdrwrap.cc \
misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc misc/param.cc misc/strongstream.cc \
misc/ipcsocket.cc \
transport/p2p.cc transport/shm.cc transport/net.cc transport/net_socket.cc transport/net_ib.cc transport/coll_net.cc transport/nvls.cc \
collectives/sendrecv.cc collectives/all_reduce.cc collectives/all_gather.cc collectives/broadcast.cc collectives/reduce.cc collectives/reduce_scatter.cc \
graph/topo.cc graph/paths.cc graph/search.cc graph/connect.cc graph/rings.cc graph/trees.cc graph/tuning.cc graph/xml.cc
##### lib files
LIBNAME := libnccl.so
STATICLIBNAME := libnccl_static.a
##### pkgconfig files
PKGCONFIGFILE := nccl.pc
##### dirs
BUILDDIR ?= $(abspath ../build)
INCDIR := $(BUILDDIR)/include
LIBDIR := $(BUILDDIR)/lib
OBJDIR := $(BUILDDIR)/obj
PKGDIR := $(BUILDDIR)/lib/pkgconfig
##### target files
CUDARTLIB ?= cudart_static
ifeq ($(CUDARTLIB), cudart_static)
# Use compatibility shim only with static cudart; see https://github.com/NVIDIA/nccl/issues/658
LIBSRCFILES += enhcompat.cc
endif
INCTARGETS := $(INCEXPORTS:%=$(INCDIR)/%)
LIBSONAME := $(LIBNAME:%=%.$(NCCL_MAJOR))
LIBTARGET := $(LIBNAME:%=%.$(NCCL_MAJOR).$(NCCL_MINOR).$(NCCL_PATCH))
STATICLIBTARGET := $(STATICLIBNAME)
PKGTARGET := $(PKGCONFIGFILE)
LIBOBJ := $(LIBSRCFILES:%.cc=$(OBJDIR)/%.o)
DEPFILES := $(LIBOBJ:%.o=%.d)
LDFLAGS += -L${CUDA_LIB} -l$(CUDARTLIB) -lpthread -lrt -ldl
DEVICELIB := $(BUILDDIR)/obj/collectives/device/colldevice.a
##### rules
build : lib staticlib
lib : $(INCTARGETS) $(LIBDIR)/$(LIBTARGET) $(PKGDIR)/$(PKGTARGET)
staticlib : $(LIBDIR)/$(STATICLIBTARGET)
$(DEVICELIB): ALWAYS_REBUILD $(INCTARGETS)
$(MAKE) -C collectives/device
# Empty target to force rebuild
ALWAYS_REBUILD:
-include $(DEPFILES)
$(LIBDIR)/$(LIBTARGET) $(LIBDIR)/$(STATICLIBTARGET) : $(LIBOBJ)
$(INCDIR)/nccl.h : nccl.h.in ../makefiles/version.mk
# NCCL_VERSION(X,Y,Z) ((X) * 10000 + (Y) * 100 + (Z))
@$(eval NCCL_VERSION := $(shell printf "%d%02d%02d" $(NCCL_MAJOR) $(NCCL_MINOR) $(NCCL_PATCH)))
mkdir -p $(INCDIR)
@printf "Generating %-35s > %s\n" $< $@
sed -e "s/\$${nccl:Major}/$(NCCL_MAJOR)/g" \
-e "s/\$${nccl:Minor}/$(NCCL_MINOR)/g" \
-e "s/\$${nccl:Patch}/$(NCCL_PATCH)/g" \
-e "s/\$${nccl:Suffix}/$(NCCL_SUFFIX)/g" \
-e "s/\$${nccl:Version}/$(NCCL_VERSION)/g" \
$< > $@
$(LIBDIR)/$(LIBTARGET): $(LIBOBJ) $(DEVICELIB)
@printf "Linking %-35s > %s\n" $(LIBTARGET) $@
mkdir -p $(LIBDIR)
$(CXX) $(CXXFLAGS) -shared -Wl,--no-as-needed -Wl,-soname,$(LIBSONAME) -o $@ $(LIBOBJ) $(DEVICELIB) $(LDFLAGS)
ln -sf $(LIBSONAME) $(LIBDIR)/$(LIBNAME)
ln -sf $(LIBTARGET) $(LIBDIR)/$(LIBSONAME)
null :=
space := $(null) #
comma := ,
$(LIBDIR)/$(STATICLIBTARGET): $(LIBOBJ) $(DEVICELIB)
@printf "Archiving %-35s > %s\n" $(STATICLIBTARGET) $@
mkdir -p $(LIBDIR)
printf "create $@\naddlib $(DEVICELIB)\naddmod $(subst $(space),$(comma),$(strip $(LIBOBJ)))\nsave\nend" | ar -M
$(PKGDIR)/nccl.pc : nccl.pc.in
mkdir -p $(PKGDIR)
@printf "Generating %-35s > %s\n" $< $@
sed -e 's|$${nccl:Prefix}|\$(PREFIX)|g' \
-e "s/\$${nccl:Major}/$(NCCL_MAJOR)/g" \
-e "s/\$${nccl:Minor}/$(NCCL_MINOR)/g" \
-e "s/\$${nccl:Patch}/$(NCCL_PATCH)/g" \
$< > $@
$(INCDIR)/%.h : %.h
@printf "Grabbing %-35s > %s\n" $< $@
mkdir -p $(INCDIR)
install -m 644 $< $@
$(INCDIR)/nccl_%.h : include/nccl_%.h
@printf "Grabbing %-35s > %s\n" $< $@
mkdir -p $(INCDIR)
install -m 644 $< $@
$(PKGDIR)/%.pc : %.pc
@printf "Grabbing %-35s > %s\n" $< $@
mkdir -p $(PKGDIR)
install -m 644 $< $@
$(OBJDIR)/%.o : %.cc $(INCTARGETS)
@printf "Compiling %-35s > %s\n" $< $@
mkdir -p `dirname $@`
$(CXX) -I. -I$(INCDIR) $(CXXFLAGS) -Iinclude -c $< -o $@
@$(CXX) -I. -I$(INCDIR) $(CXXFLAGS) -Iinclude -M $< > $(@:%.o=%.d.tmp)
@sed "0,/^.*:/s//$(subst /,\/,$@):/" $(@:%.o=%.d.tmp) > $(@:%.o=%.d)
@sed -e 's/.*://' -e 's/\\$$//' < $(@:%.o=%.d.tmp) | fmt -1 | \
sed -e 's/^ *//' -e 's/$$/:/' >> $(@:%.o=%.d)
@rm -f $(@:%.o=%.d.tmp)
clean :
$(MAKE) -C collectives/device clean
rm -rf ${INCDIR} ${LIBDIR} ${PKGDIR} ${OBJDIR}
install : build
mkdir -p $(PREFIX)/lib
mkdir -p $(PREFIX)/lib/pkgconfig
mkdir -p $(PREFIX)/include
cp -P -v $(BUILDDIR)/lib/lib* $(PREFIX)/lib/
cp -P -v $(BUILDDIR)/lib/pkgconfig/* $(PREFIX)/lib/pkgconfig/
cp -v $(BUILDDIR)/include/* $(PREFIX)/include/
FILESTOFORMAT := $(shell find . -name ".\#*" -prune -o \( -name "*.cc" -o -name "*.h" \) -print | grep -v -E 'ibvwrap.h|nvmlwrap.h|gdrwrap.h|nccl.h')
# Note that formatting.mk defines a new target so in order to not overwrite the default target,
# it shouldn't be included at the top. Also, it uses the above definition of FILESTOFORMAT as well
# as the BUILDDIR variable.
include ../makefiles/formatting.mk
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "nccl.h"
#include "core.h"
#include "utils.h"
#include "bootstrap.h"
#include "net.h"
#include <unistd.h>
#include <sys/types.h>
#include "proxy.h"
#include "signals.h" // [RCCL]
struct bootstrapRootArgs {
struct ncclSocket* listenSock;
uint64_t magic;
};
/* Init functions */
static char bootstrapNetIfName[MAX_IF_NAME_SIZE+1];
static union ncclSocketAddress bootstrapNetIfAddr;
static int bootstrapNetInitDone = 0;
pthread_mutex_t bootstrapNetLock = PTHREAD_MUTEX_INITIALIZER;
ncclResult_t bootstrapNetInit() {
if (bootstrapNetInitDone == 0) {
pthread_mutex_lock(&bootstrapNetLock);
if (bootstrapNetInitDone == 0) {
char* env = getenv("NCCL_COMM_ID");
if (env) {
union ncclSocketAddress remoteAddr;
if (ncclSocketGetAddrFromString(&remoteAddr, env) != ncclSuccess) {
WARN("Invalid NCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return ncclInvalidArgument;
}
if (ncclFindInterfaceMatchSubnet(bootstrapNetIfName, &bootstrapNetIfAddr, &remoteAddr, MAX_IF_NAME_SIZE, 1) <= 0) {
WARN("NET/Socket : No usable listening interface found");
return ncclSystemError;
}
} else {
int nIfs = ncclFindInterfaces(bootstrapNetIfName, &bootstrapNetIfAddr, MAX_IF_NAME_SIZE, 1);
if (nIfs <= 0) {
WARN("Bootstrap : no socket interface found");
return ncclInternalError;
}
}
char line[SOCKET_NAME_MAXLEN+MAX_IF_NAME_SIZE+2];
sprintf(line, " %s:", bootstrapNetIfName);
ncclSocketToString(&bootstrapNetIfAddr, line+strlen(line));
INFO(NCCL_INIT, "Bootstrap : Using%s", line);
bootstrapNetInitDone = 1;
}
pthread_mutex_unlock(&bootstrapNetLock);
}
return ncclSuccess;
}
/* Socket Interface Selection type */
enum bootstrapInterface_t { findSubnetIf = -1, dontCareIf = -2 };
// Additional sync functions
static ncclResult_t bootstrapNetSend(struct ncclSocket* sock, void* data, int size) {
NCCLCHECK(ncclSocketSend(sock, &size, sizeof(int)));
NCCLCHECK(ncclSocketSend(sock, data, size));
return ncclSuccess;
}
static ncclResult_t bootstrapNetRecv(struct ncclSocket* sock, void* data, int size) {
int recvSize;
NCCLCHECK(ncclSocketRecv(sock, &recvSize, sizeof(int)));
if (recvSize > size) {
WARN("Message truncated : received %d bytes instead of %d", recvSize, size);
return ncclInternalError;
}
NCCLCHECK(ncclSocketRecv(sock, data, std::min(recvSize, size)));
return ncclSuccess;
}
struct extInfo {
int rank;
int nranks;
union ncclSocketAddress extAddressListenRoot;
union ncclSocketAddress extAddressListen;
};
#include <sys/resource.h>
static ncclResult_t setFilesLimit() {
struct rlimit filesLimit;
SYSCHECK(getrlimit(RLIMIT_NOFILE, &filesLimit), "getrlimit");
filesLimit.rlim_cur = filesLimit.rlim_max;
SYSCHECK(setrlimit(RLIMIT_NOFILE, &filesLimit), "setrlimit");
return ncclSuccess;
}
static void *bootstrapRoot(void* rargs) {
struct bootstrapRootArgs* args = (struct bootstrapRootArgs*)rargs;
struct ncclSocket* listenSock = args->listenSock;
uint64_t magic = args->magic;
ncclResult_t res = ncclSuccess;
int nranks = 0, c = 0;
struct extInfo info;
union ncclSocketAddress *rankAddresses = NULL;
union ncclSocketAddress *rankAddressesRoot = NULL; // for initial rank <-> root information exchange
union ncclSocketAddress *zero = NULL;
NCCLCHECKGOTO(ncclCalloc(&zero, 1), res, out);
setFilesLimit();
TRACE(NCCL_INIT, "BEGIN");
/* Receive addresses from all ranks */
do {
struct ncclSocket sock;
NCCLCHECKGOTO(ncclSocketInit(&sock), res, out);
NCCLCHECKGOTO(ncclSocketAccept(&sock, listenSock), res, out);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, &info, sizeof(info)), res, out);
NCCLCHECKGOTO(ncclSocketClose(&sock), res, out);
if (c == 0) {
nranks = info.nranks;
NCCLCHECKGOTO(ncclCalloc(&rankAddresses, nranks), res, out);
NCCLCHECKGOTO(ncclCalloc(&rankAddressesRoot, nranks), res, out);
}
if (nranks != info.nranks) {
WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nranks);
goto out;
}
if (memcmp(zero, &rankAddressesRoot[info.rank], sizeof(union ncclSocketAddress)) != 0) {
WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);
goto out;
}
// Save the connection handle for that rank
memcpy(rankAddressesRoot+info.rank, &info.extAddressListenRoot, sizeof(union ncclSocketAddress));
memcpy(rankAddresses+info.rank, &info.extAddressListen, sizeof(union ncclSocketAddress));
++c;
TRACE(NCCL_INIT, "Received connect from rank %d total %d/%d", info.rank, c, nranks);
} while (c < nranks);
TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
// Send the connect handle for the next rank in the AllGather ring
for (int r=0; r<nranks; ++r) {
int next = (r+1) % nranks;
struct ncclSocket sock;
NCCLCHECKGOTO(ncclSocketInit(&sock, rankAddressesRoot+r, magic, ncclSocketTypeBootstrap), res, out);
NCCLCHECKGOTO(ncclSocketConnect(&sock), res, out);
NCCLCHECKGOTO(bootstrapNetSend(&sock, rankAddresses+next, sizeof(union ncclSocketAddress)), res, out);
NCCLCHECKGOTO(ncclSocketClose(&sock), res, out);
}
TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
out:
if (listenSock != NULL) {
ncclSocketClose(listenSock);
free(listenSock);
}
if (rankAddresses) free(rankAddresses);
if (rankAddressesRoot) free(rankAddressesRoot);
if (zero) free(zero);
free(rargs);
TRACE(NCCL_INIT, "DONE");
return NULL;
}
ncclResult_t bootstrapCreateRoot(struct ncclBootstrapHandle* handle, bool idFromEnv) {
struct ncclSocket* listenSock;
struct bootstrapRootArgs* args;
pthread_t thread;
NCCLCHECK(ncclCalloc(&listenSock, 1));
NCCLCHECK(ncclSocketInit(listenSock, &handle->addr, handle->magic, ncclSocketTypeBootstrap, NULL, 0));
NCCLCHECK(ncclSocketListen(listenSock));
NCCLCHECK(ncclSocketGetAddr(listenSock, &handle->addr));
NCCLCHECK(ncclCalloc(&args, 1));
args->listenSock = listenSock;
args->magic = handle->magic;
NEQCHECK(pthread_create(&thread, NULL, bootstrapRoot, (void*)args), 0);
ncclSetThreadName(thread, "NCCL BootstrapR");
NEQCHECK(pthread_detach(thread), 0); // will not be pthread_join()'d
return ncclSuccess;
}
ncclResult_t bootstrapGetUniqueId(struct ncclBootstrapHandle* handle) {
memset(handle, 0, sizeof(ncclBootstrapHandle));
NCCLCHECK(getRandomData(&handle->magic, sizeof(handle->magic)));
char* env = getenv("NCCL_COMM_ID");
if (env) {
INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
if (ncclSocketGetAddrFromString(&handle->addr, env) != ncclSuccess) {
WARN("Invalid NCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return ncclInvalidArgument;
}
} else {
memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union ncclSocketAddress));
NCCLCHECK(bootstrapCreateRoot(handle, false));
}
return ncclSuccess;
}
struct unexConn {
int peer;
int tag;
struct ncclSocket sock;
struct unexConn* next;
};
struct bootstrapState {
struct ncclSocket listenSock;
struct ncclSocket ringRecvSocket;
struct ncclSocket ringSendSocket;
union ncclSocketAddress* peerCommAddresses;
union ncclSocketAddress* peerProxyAddresses;
struct unexConn* unexpectedConnections;
int cudaDev;
int rank;
int nranks;
uint64_t magic;
volatile uint32_t *abortFlag;
};
ncclResult_t bootstrapInit(struct ncclBootstrapHandle* handle, struct ncclComm* comm) {
int rank = comm->rank;
int nranks = comm->nRanks;
struct bootstrapState* state;
struct ncclSocket* proxySocket;
ncclSocketAddress nextAddr;
struct ncclSocket sock, listenSockRoot;
struct extInfo info = { 0 };
NCCLCHECK(ncclCalloc(&state, 1));
state->rank = rank;
state->nranks = nranks;
state->abortFlag = comm->abortFlag;
comm->bootstrap = state;
comm->magic = state->magic = handle->magic;
TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
// [RCCL] Register custom signal handlers if requested
RegisterSignalHandlers();
// [/RCCL]
info.rank = rank;
info.nranks = nranks;
// Create socket for other ranks to contact me
NCCLCHECK(ncclSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECK(ncclSocketListen(&state->listenSock));
NCCLCHECK(ncclSocketGetAddr(&state->listenSock, &info.extAddressListen));
// Create socket for root to contact me
NCCLCHECK(ncclSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECK(ncclSocketListen(&listenSockRoot));
NCCLCHECK(ncclSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot));
// stagger connection times to avoid an overload of the root
if (nranks > 128) {
long msec = rank;
struct timespec tv;
tv.tv_sec = msec / 1000;
tv.tv_nsec = 1000000 * (msec % 1000);
TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
(void) nanosleep(&tv, NULL);
}
// send info on my listening socket to root
NCCLCHECK(ncclSocketInit(&sock, &handle->addr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECK(ncclSocketConnect(&sock));
NCCLCHECK(bootstrapNetSend(&sock, &info, sizeof(info)));
NCCLCHECK(ncclSocketClose(&sock));
// get info on my "next" rank in the bootstrap ring from root
NCCLCHECK(ncclSocketInit(&sock));
NCCLCHECK(ncclSocketAccept(&sock, &listenSockRoot));
NCCLCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union ncclSocketAddress)));
NCCLCHECK(ncclSocketClose(&sock));
NCCLCHECK(ncclSocketClose(&listenSockRoot));
NCCLCHECK(ncclSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECK(ncclSocketConnect(&state->ringSendSocket));
// Accept the connect request from the previous rank in the AllGather ring
NCCLCHECK(ncclSocketInit(&state->ringRecvSocket));
NCCLCHECK(ncclSocketAccept(&state->ringRecvSocket, &state->listenSock));
// AllGather all listen handlers
NCCLCHECK(ncclCalloc(&state->peerCommAddresses, nranks));
NCCLCHECK(ncclSocketGetAddr(&state->listenSock, state->peerCommAddresses+rank));
NCCLCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union ncclSocketAddress)));
// Create the service proxy
NCCLCHECK(ncclCalloc(&state->peerProxyAddresses, nranks));
// proxy is aborted through a message; don't set abortFlag
NCCLCHECK(ncclCalloc(&proxySocket, 1));
NCCLCHECK(ncclSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeProxy, comm->abortFlag));
NCCLCHECK(ncclSocketListen(proxySocket));
NCCLCHECK(ncclSocketGetAddr(proxySocket, state->peerProxyAddresses+rank));
NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union ncclSocketAddress)));
NCCLCHECK(ncclProxyInit(comm, proxySocket, state->peerProxyAddresses));
TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
return ncclSuccess;
}
ncclResult_t bootstrapSplit(struct ncclBootstrapHandle* handle, struct ncclComm* comm, struct ncclComm* parent, int color, int key, int* parentRanks) {
ncclResult_t ret = ncclSuccess;
int rank = comm->rank;
int nranks = comm->nRanks;
int prev, next;
ncclSocketAddress listenAddr, tmpAddr;
struct ncclSocket* proxySocket;
struct bootstrapState* state;
NCCLCHECKGOTO(ncclCalloc(&state, 1), ret, fail);
state->rank = rank;
state->nranks = nranks;
state->abortFlag = comm->abortFlag;
comm->bootstrap = state;
comm->magic = state->magic = handle->magic;
prev = parentRanks[(rank-1+nranks)%nranks];
next = parentRanks[(rank+1)%nranks];
// Setup my sockets for the allgather ring and other p2p connections
NCCLCHECKGOTO(ncclSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(&state->ringRecvSocket, NULL, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag, 0), ret, fail);
// Create socket for other ranks to contact me
NCCLCHECKGOTO(ncclSocketListen(&state->listenSock), ret, fail);
// Get addr from next rank
NCCLCHECKGOTO(ncclSocketGetAddr(&state->listenSock, &listenAddr), ret, fail);
NCCLCHECKGOTO(bootstrapSend(parent->bootstrap, prev, -2, &listenAddr, sizeof(union ncclSocketAddress)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(parent->bootstrap, next, -2, &tmpAddr, sizeof(union ncclSocketAddress)), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(&state->ringSendSocket, &tmpAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketConnect(&state->ringSendSocket), ret, fail);
// Accept the connect request from the previous rank in the AllGather ring
NCCLCHECKGOTO(ncclSocketAccept(&state->ringRecvSocket, &state->listenSock), ret, fail);
// AllGather all listen handlers
NCCLCHECKGOTO(ncclCalloc(&state->peerCommAddresses, nranks), ret, fail);
memcpy(state->peerCommAddresses+rank, &listenAddr, sizeof(union ncclSocketAddress));
NCCLCHECKGOTO(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union ncclSocketAddress)), ret, fail);
if (parent->config.splitShare) {
/* map local rank to top parent local rank. */
for (int i = 0; i < nranks; ++i) {
comm->topParentRanks[i] = parent->topParentRanks[parentRanks[i]];
}
comm->proxyState = parent->sharedRes->proxyState;
ncclAtomicRefCountIncrement(&parent->sharedRes->proxyState->refCount);
} else {
// Create the service proxy
NCCLCHECKGOTO(ncclCalloc(&state->peerProxyAddresses, nranks), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&proxySocket, 1), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeProxy, comm->abortFlag, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketListen(proxySocket), ret, fail);
NCCLCHECKGOTO(ncclSocketGetAddr(proxySocket, &tmpAddr), ret, fail);
memcpy(state->peerProxyAddresses + rank, &tmpAddr, sizeof(union ncclSocketAddress));
NCCLCHECKGOTO(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union ncclSocketAddress)), ret, fail);
NCCLCHECKGOTO(ncclProxyInit(comm, proxySocket, state->peerProxyAddresses), ret, fail);
}
INFO(NCCL_INIT, "bootstrapSplit: rank %d nranks %d color %d key %d prev %d next %d - DONE", rank, nranks, color, key, prev, next);
exit:
return ret;
fail:
goto exit;
}
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
struct bootstrapState* state = (struct bootstrapState*)commState;
char* data = (char*)allData;
int rank = state->rank;
int nranks = state->nranks;
TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);
/* Simple ring based AllGather
* At each step i receive data from (rank-i-1) from left
* and send previous step's data from (rank-i) to right
*/
for (int i=0; i<nranks-1; i++) {
size_t rslice = (rank - i - 1 + nranks) % nranks;
size_t sslice = (rank - i + nranks) % nranks;
// Send slice to the right
NCCLCHECK(bootstrapNetSend(&state->ringSendSocket, data+sslice*size, size));
// Recv slice from the left
NCCLCHECK(bootstrapNetRecv(&state->ringRecvSocket, data+rslice*size, size));
}
TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return ncclSuccess;
}
ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size) {
ncclResult_t ret = ncclSuccess;
struct bootstrapState* state = (struct bootstrapState*)commState;
struct ncclSocket sock;
NCCLCHECKGOTO(ncclSocketInit(&sock, state->peerCommAddresses+peer, state->magic, ncclSocketTypeBootstrap), ret, fail);
NCCLCHECKGOTO(ncclSocketConnect(&sock), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, &state->rank, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, &tag, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, data, size), ret, fail);
exit:
NCCLCHECK(ncclSocketClose(&sock));
return ret;
fail:
goto exit;
}
ncclResult_t bootstrapBarrier(void* commState, int *ranks, int rank, int nranks, int tag) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d tag %x - ENTER", rank, nranks, tag);
/* Simple intra process barrier
*
* Based on the dissemination algorithm by Debra Hensgen, Raphael Finkel, and Udi Manbet,
* "Two Algorithms for Barrier Synchronization," International Journal of Parallel Programming, 17(1):1-17, 1988"
*/
int data[1];
for (int mask=1; mask<nranks; mask<<=1) {
int src = (rank - mask + nranks) % nranks;
int dst = (rank + mask) % nranks;
NCCLCHECK(bootstrapSend(commState, ranks[dst], tag, data, sizeof(data)));
NCCLCHECK(bootstrapRecv(commState, ranks[src], tag, data, sizeof(data)));
}
TRACE(NCCL_INIT, "rank %d nranks %d tag %x - DONE", rank, nranks, tag);
return ncclSuccess;
}
ncclResult_t bootstrapIntraNodeAllGather(void* commState, int *ranks, int rank, int nranks, void* allData, int size) {
if (nranks == 1) return ncclSuccess;
char* data = (char*)allData;
TRACE(NCCL_INIT, "rank %d nranks %d size %d - ENTER", rank, nranks, size);
for (int i=1; i<nranks; i++) {
int src = (rank - i + nranks) % nranks;
int dst = (rank + i) % nranks;
NCCLCHECK(bootstrapSend(commState, ranks[dst], /*tag=*/i, data+rank*size, size));
NCCLCHECK(bootstrapRecv(commState, ranks[src], /*tag=*/i, data+src*size, size));
}
TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return ncclSuccess;
}
// IntraNode in-place Broadcast
ncclResult_t bootstrapIntraNodeBroadcast(void* commState, int *ranks, int rank, int nranks, int root, void* bcastData, int size) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - ENTER", rank, nranks, root, size);
if (rank == root) {
for (int i=0; i<nranks; i++) {
if (i != root) NCCLCHECK(bootstrapSend(commState, ranks[i], /*tag=*/ranks[i], bcastData, size));
}
}
else {
NCCLCHECK(bootstrapRecv(commState, ranks[root], /*tag=*/ranks[rank], bcastData, size));
}
TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - DONE", rank, nranks, root, size);
return ncclSuccess;
}
ncclResult_t unexpectedEnqueue(struct bootstrapState* state, int peer, int tag, struct ncclSocket* sock) {
// New unex
struct unexConn* unex;
NCCLCHECK(ncclCalloc(&unex, 1));
unex->peer = peer;
unex->tag = tag;
memcpy(&unex->sock, sock, sizeof(struct ncclSocket));
// Enqueue
struct unexConn* list = state->unexpectedConnections;
if (list == NULL) {
state->unexpectedConnections = unex;
return ncclSuccess;
}
while (list->next) list = list->next;
list->next = unex;
return ncclSuccess;
}
ncclResult_t unexpectedDequeue(struct bootstrapState* state, int peer, int tag, struct ncclSocket* sock, int* found) {
struct unexConn* elem = state->unexpectedConnections;
struct unexConn* prev = NULL;
*found = 0;
while (elem) {
if (elem->peer == peer && elem->tag == tag) {
if (prev == NULL) {
state->unexpectedConnections = elem->next;
} else {
prev->next = elem->next;
}
memcpy(sock, &elem->sock, sizeof(struct ncclSocket));
free(elem);
*found = 1;
return ncclSuccess;
}
prev = elem;
elem = elem->next;
}
return ncclSuccess;
}
static void unexpectedFree(struct bootstrapState* state) {
struct unexConn* elem = state->unexpectedConnections;
struct unexConn* prev = NULL;
while (elem) {
prev = elem;
elem = elem->next;
free(prev);
}
return;
}
// We can't know who we'll receive from, so we need to receive everything at once
ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size) {
ncclResult_t ret = ncclSuccess;
struct bootstrapState* state = (struct bootstrapState*)commState;
struct ncclSocket sock;
int newPeer, newTag;
// Search unexpected connections first
int found;
NCCLCHECK(unexpectedDequeue(state, peer, tag, &sock, &found));
if (found) {
NCCLCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
// Then look for new connections
while (1) {
NCCLCHECKGOTO(ncclSocketInit(&sock), ret, fail);
NCCLCHECKGOTO(ncclSocketAccept(&sock, &state->listenSock), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, &newPeer, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, &newTag, sizeof(int)), ret, fail);
if (newPeer == peer && newTag == tag) {
NCCLCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
// Unexpected connection. Save for later.
NCCLCHECKGOTO(unexpectedEnqueue(state, newPeer, newTag, &sock), ret, fail);
}
exit:
NCCLCHECK(ncclSocketClose(&sock));
return ret;
fail:
goto exit;
}
ncclResult_t bootstrapClose(void* commState) {
struct bootstrapState* state = (struct bootstrapState*)commState;
if (state->unexpectedConnections != NULL) {
unexpectedFree(state);
if (*state->abortFlag == 0) {
WARN("Unexpected connections are not empty");
return ncclInternalError;
}
}
NCCLCHECK(ncclSocketClose(&state->listenSock));
NCCLCHECK(ncclSocketClose(&state->ringSendSocket));
NCCLCHECK(ncclSocketClose(&state->ringRecvSocket));
free(state->peerCommAddresses);
free(state);
return ncclSuccess;
}
ncclResult_t bootstrapAbort(void* commState) {
struct bootstrapState* state = (struct bootstrapState*)commState;
if (commState == NULL) return ncclSuccess;
NCCLCHECK(ncclSocketClose(&state->listenSock));
NCCLCHECK(ncclSocketClose(&state->ringSendSocket));
NCCLCHECK(ncclSocketClose(&state->ringRecvSocket));
free(state->peerCommAddresses);
free(state->peerProxyAddresses);
free(state);
return ncclSuccess;
}
/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "channel.h"
#include "param.h"
#include "gdrwrap.h"
ncclResult_t initChannel(struct ncclComm* comm, int channelId) {
struct ncclChannel* channel = &comm->channels[channelId];
if (channel->id != -1) return ncclSuccess;
int nRanks = comm->nRanks;
int nPeers = nRanks + 1 /* Collnet */ + comm->localRanks /* NVLS */;
channel->id = channelId;
channel->workFifoSent = 0;
struct ncclSharedResources* sharedRes = comm->sharedRes;
NCCLCHECK(ncclStrongStreamAcquireUncaptured(&sharedRes->deviceStream));
if (channel->peers == NULL) {
// The extra on nRanks+1 is for collnet root (i.e. network)
// Allocate everything related to sharedRes with ncclCalloc as this can be
// shared between communicators hence should not be tied to comm.
if (sharedRes->peers[channelId] == NULL) {
NCCLCHECK(ncclCalloc(sharedRes->peers + channelId, sharedRes->tpNRanks));
}
channel->peers = ncclMemoryStackAlloc<struct ncclChannelPeer*>(&comm->memPermanent, nPeers);
for (int r = 0; r < nRanks; r++) {
channel->peers[r] = comm->sharedRes->peers[channelId] + comm->topParentRanks[r];
ncclAtomicRefCountIncrement(&channel->peers[r]->refCount);
}
}
if (channel->devPeers == NULL) {
if (sharedRes->devPeers[channelId] == NULL) {
NCCLCHECK(ncclCudaCallocAsync(sharedRes->devPeers + channelId, sharedRes->tpNRanks, sharedRes->deviceStream.cudaStream));
}
/* channel->devPeers is not shared, so just free it when calling commFree() */
NCCLCHECK(ncclCudaCallocAsync(&channel->devPeers, nPeers, sharedRes->deviceStream.cudaStream));
ncclCommPushCudaFree(comm, channel->devPeers);
for (int r = 0; r < nRanks; r++) {
uintptr_t addr = (uintptr_t)(comm->sharedRes->devPeers[channelId] + comm->topParentRanks[r]);
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
}
}
channel->ring.userRanks = ncclMemoryStackAlloc<int>(&comm->memPermanent, nRanks);
NCCLCHECK(ncclCudaCallocAsync(&channel->devRingUserRanks, nRanks, sharedRes->deviceStream.cudaStream));
ncclCommPushCudaFree(comm, channel->devRingUserRanks);
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
CUDACHECK(hipEventRecord(sharedRes->deviceStream.scratchEvent, sharedRes->deviceStream.cudaStream));
CUDACHECK(hipStreamWaitEvent(sharedRes->deviceStream.cudaStream, sharedRes->deviceStream.scratchEvent, 0));
return ncclSuccess;
}
ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclComm* parent, bool share) {
struct ncclChannel* channel = &comm->channels[channelId];
struct ncclSharedResources* sharedRes = comm->sharedRes;
if (channel->nvlsPeers != NULL)
return ncclSuccess;
if (channel->id == -1)
NCCLCHECK(initChannel(comm, channelId));
NCCLCHECK(ncclStrongStreamAcquireUncaptured(&sharedRes->deviceStream));
if (share) {
channel->nvlsPeers = parent->channels[channelId].nvlsPeers;
channel->nvlsDevPeers = parent->channels[channelId].nvlsDevPeers;
for (int r = 0; r < comm->localRanks; ++r) {
int tr = comm->topParentLocalRanks[r];
uintptr_t addr = (uintptr_t)(parent->channels[channelId].nvlsDevPeers + tr);
channel->peers[comm->nRanks + 1 + r] = parent->channels[channelId].nvlsPeers + tr;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
ncclAtomicRefCountIncrement(&parent->channels[channelId].nvlsPeers[tr].refCount);
}
} else {
NCCLCHECK(ncclCalloc(&channel->nvlsPeers, comm->localRanks));
NCCLCHECK(ncclCudaCallocAsync(&channel->nvlsDevPeers, comm->localRanks, sharedRes->deviceStream.cudaStream));
for (int r = 0; r < comm->localRanks; ++r) {
uintptr_t addr = (uintptr_t)(channel->nvlsDevPeers + r);
channel->peers[comm->nRanks + 1 + r] = channel->nvlsPeers + r;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
ncclAtomicRefCountIncrement(&channel->nvlsPeers[r].refCount);
}
}
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
return ncclSuccess;
}
ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncclComm* parent, bool share) {
struct ncclChannel* channel = &comm->channels[channelId];
struct ncclSharedResources* sharedRes = comm->sharedRes;
uintptr_t addr;
if (channel->collnetPeers != NULL)
return ncclSuccess;
if (channel->id == -1)
NCCLCHECK(initChannel(comm, channelId));
NCCLCHECK(ncclStrongStreamAcquireUncaptured(&sharedRes->deviceStream));
if (share) {
channel->collnetPeers = parent->channels[channelId].collnetPeers;
channel->collnetDevPeers = parent->channels[channelId].collnetDevPeers;
addr = (uintptr_t)parent->channels[channelId].collnetDevPeers;
channel->peers[comm->nRanks] = parent->channels[channelId].collnetPeers;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
ncclAtomicRefCountIncrement(&parent->channels[channelId].collnetPeers->refCount);
} else {
NCCLCHECK(ncclCalloc(&channel->collnetPeers, 1));
NCCLCHECK(ncclCudaCallocAsync(&channel->collnetDevPeers, 1, sharedRes->deviceStream.cudaStream));
addr = (uintptr_t)channel->collnetDevPeers;
channel->peers[comm->nRanks] = channel->collnetPeers;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
ncclAtomicRefCountIncrement(&channel->collnetPeers->refCount);
}
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
return ncclSuccess;
}
ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks, int collnetNRanks, int nvlsNRanks) {
int nPeers = nRanks + collnetNRanks + nvlsNRanks;
/* channel peers are only valid when async init thread completes commAlloc() and
* the channel is intialized with initChannel(); if either is not done, this channel
* should never be free. */
if (channel->id == -1 || channel->peers == NULL) return ncclSuccess;
// Free transport proxy resources
// Note: free all send resources first due to CollNet arrangement
for (int r = 0; r < nPeers; r++) {
struct ncclChannelPeer* peer = channel->peers[r];
if (peer) {
if (ncclAtomicRefCountDecrement(&peer->refCount) == 0) {
for (int b=0; b<NCCL_MAX_CONNS; b++) {
if (peer->send[b].transportComm) NCCLCHECK(peer->send[b].transportComm->free(peer->send+b));
if (peer->recv[b].transportComm) NCCLCHECK(peer->recv[b].transportComm->free(peer->recv+b));
}
if (r == nRanks) {
free(channel->collnetPeers);
ncclCudaFree(channel->collnetDevPeers);
} else if (r == nPeers - 1) {
free(channel->nvlsPeers);
ncclCudaFree(channel->nvlsDevPeers);
}
}
}
}
return ncclSuccess;
}
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef ALLREDUCECLIQUEKERNEL_H
#define ALLREDUCECLIQUEKERNEL_H
#include "CliqueCommon.h"
#include "devcomm.h"
#include "reduce_kernel.h"
#include "common_kernel.h"
template <class FUNC, typename T, int NUM_RANKS>
__device__ void AllReduceCliqueSplitKernel(struct ncclWorkElem* args)
{
// Clique-specific kernel arguments
cliqueDevicePtrs_t* cliquePtrs = args->clique.ptrs; // Collection of all input/output pointers across ranks in clique
size_t const N = args->clique.count; // Total number of elements to reduce
int const nBlocks = args->clique.nChannels; // Total number of blocks assigned to this kernel (may be different than gridDim.x)
int const blockId = args->clique.bid; // 0-indexed blockIdx for this threadblock (may be different than blockIdx.x)
int const rank = args->comm->rank; // Current rank
// Each threadblock works independently of others on a subsection of the input
// First split evently across ranks, while maintaining multiples of blocksize
size_t const perRankN = RoundUp((N + NUM_RANKS - 1) / NUM_RANKS, blockDim.x);
size_t const perBlockN = RoundUp((perRankN + nBlocks - 1) / nBlocks, blockDim.x);
size_t const currBlockStart = min((rank * nBlocks + blockId) * perBlockN, N);
size_t const currBlockStop = min(currBlockStart + perBlockN, N);
size_t const blockN = currBlockStop - currBlockStart;
if (blockN > 0)
{
// Prepare input / output subarrays
T const** inputs = (T const**)cliquePtrs->inputs;
T** outputs = (T **)cliquePtrs->outputs;
T const* srcs[NUM_RANKS];
T* dsts[NUM_RANKS];
#pragma unroll
for (int r = 0; r < NUM_RANKS; r++)
{
srcs[r] = inputs[r] + currBlockStart;
dsts[r] = outputs[r] + currBlockStart;
}
// Perform the reduction
#define ALL_REDUCE_CLIQUE_UNROLL 1
ReduceOrCopyMulti<ALL_REDUCE_CLIQUE_UNROLL, FUNC, T, NUM_RANKS, NUM_RANKS, NUM_RANKS, NUM_RANKS, 0>(
threadIdx.x, blockDim.x, nullptr, false, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN);
}
// Even if there was nothing for this GPU to do, it must participate in a barrier
// because other GPUs may be modifying this GPUs output buffer still
if (blockId == 0) WaitForBarrier<NUM_RANKS>(cliquePtrs->barrier);
}
#endif
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef CLIQUE_COMMON_H
#define CLIQUE_COMMON_H
#include "nccl.h"
#include <cstdint>
#define MIN_CLIQUE_SIZE 2
#define MAX_CLIQUE_SIZE 8
typedef struct
{
int* globalCount; // Shared across GPUs
int* globalSense; // Shared across GPUs
int* localSense; // Local to this GPU
} gpuBarrier_t;
typedef struct
{
// Input/output pointers from participating ranks
void const* inputs[MAX_CLIQUE_SIZE];
void* outputs[MAX_CLIQUE_SIZE];
// Barrier variable
gpuBarrier_t barrier;
} cliqueDevicePtrs_t;
// Helper macro to launch an appropriate kernel by converting rank to a template argument
#define LAUNCH_CLIQUE_KERNEL(kernelname, FUNC, T, args) \
{ \
switch (args->comm->nRanks){ \
case 2: kernelname<FUNC, T, 2>(args); break; \
case 3: kernelname<FUNC, T, 3>(args); break; \
case 4: kernelname<FUNC, T, 4>(args); break; \
case 5: kernelname<FUNC, T, 5>(args); break; \
case 6: kernelname<FUNC, T, 6>(args); break; \
case 7: kernelname<FUNC, T, 7>(args); break; \
case 8: kernelname<FUNC, T, 8>(args); break; \
} \
}
// Multi-GPU (on same node) barrier. One thread per grid per GPU updates barrier / waits
template <int NUM_RANKS>
__forceinline__ __device__ void WaitForBarrier(gpuBarrier_t const& barrier)
{
if (threadIdx.x == 0)
{
// Sense inversion barrier
*barrier.localSense = 1 - *barrier.localSense;
int localSense = *barrier.localSense;
int val = __atomic_add_fetch(barrier.globalCount, 1, __ATOMIC_SEQ_CST);
if (val == NUM_RANKS)
{
// Last arrival resets barrier
__atomic_store_n(barrier.globalCount, 0, __ATOMIC_SEQ_CST);
__atomic_store_n(barrier.globalSense, localSense, __ATOMIC_SEQ_CST);
}
else
{
// Wait for all ranks to reach barrier
while (__atomic_load_n(barrier.globalSense, __ATOMIC_SEQ_CST) != localSense);
}
}
}
__forceinline__ __host__ __device__ size_t RoundUp(size_t X, size_t Y)
{
return (X+Y-1)/Y * Y;
}
#endif
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "CliqueManager.h"
#include "CliqueShmNames.h"
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "Hash.h"
#include "AllReduceCliqueKernel.h"
#include <hip/hip_runtime.h>
#include "hsa_ext_amd.h"
#include <stdio.h>
#include <stdlib.h>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
#include <unistd.h>
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int CliqueManager::m_staticBarrierCount[NCCL_MAX_OPS*2] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
// Define some environment variables that affect clique-based kernels
RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels
RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 0); // Max number of bytes to use clique-based kernels for all reduce (0 for auto-select)
RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select)
CliqueManager::CliqueManager(int const rank,
int const numRanks,
cliqueMode_t const cliqueMode) :
m_rank(rank),
m_numRanks(numRanks),
m_hash(0),
m_cliqueMode(cliqueMode),
m_opIndexHead(0),
m_opIndexTail(0),
m_init(false),
m_gcnArchName(char[256]),
m_allReduceByteLimit(0),
m_pinnedCliquePtrs(NULL),
m_gpuBarrierGlobalCount(NULL),
m_gpuBarrierGlobalSense(NULL),
m_gpuBarrierLocalSense(NULL),
m_cpuBarrierCount(NULL),
m_shmHandles(),
m_ipcHandleSendCache(),
m_ipcHandleRecvCache(),
m_sharedCpuMemory(),
m_sharedIpcHandle(),
m_fineGrainBarrierMem(NULL),
m_sharedBarrierCount(NULL)
{}
CliqueManager::~CliqueManager()
{
if (m_init)
{
CleanUp();
}
}
void CliqueManager::CleanUp()
{
if (m_cliqueMode == CLIQUE_DISABLED) return;
// Free variables that are shared between SINGLE_PROCESS / SINGLE_NODE
if (m_pinnedCliquePtrs) hipHostFree(m_pinnedCliquePtrs);
if (m_gpuBarrierLocalSense) hipFree(m_gpuBarrierLocalSense);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Release caches
INFO(NCCL_COLL, "Rank %d deleting IPC caches", m_rank);
if (m_ipcHandleSendCache) delete m_ipcHandleSendCache;
if (m_ipcHandleRecvCache) delete m_ipcHandleRecvCache;
// Close shared memory
m_shmHandles.Close();
m_sharedCpuMemory.Close();
m_sharedIpcHandle.Close();
if (m_fineGrainBarrierMem)
{
if (m_rank == 0)
hipFree(m_fineGrainBarrierMem);
else
hipIpcCloseMemHandle(m_fineGrainBarrierMem);
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
if (m_rank == 0 && m_staticGpuBarrierMem)
hipFree(m_staticGpuBarrierMem);
}
m_init = false;
}
ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
{
ncclResult_t res;
if (m_init) return ncclSuccess;
m_init = true;
m_hash = djb2Hash(commId->internal);
if (m_cliqueMode == CLIQUE_DISABLED)
{
INFO(NCCL_INIT, "Clique kernels disabled");
return ncclSuccess;
}
// Check parameters
if (m_rank < 0 || m_rank >= m_numRanks)
{
WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks);
return ncclInvalidUsage;
}
// For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var
if (!rcclParamEnableClique())
{
INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)");
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
// Allocate pinned CPU memory for holding clique pointers, which kernels will have access to
if (hipHostMalloc(&m_pinnedCliquePtrs, sizeof(cliqueDevicePtrs_t) * NCCL_MAX_OPS) != hipSuccess)
{
WARN("Unable to allocated pinned host memory for clique pointers. Disabling clique-based kernels");
m_cliqueMode = CLIQUE_DISABLED;
m_init = true;
return ncclSuccess;
}
std::string shmSuffix = std::to_string(m_hash) + "_" + std::to_string(suffix);
// Allocate sense barrier variable on local GPU
NCCLCHECKGOTO(ncclCudaCalloc(&m_gpuBarrierLocalSense, NCCL_MAX_OPS * sizeof(int)), res, dropback);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Initialize shared memory file for IPC handles (based on commId hash)
m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, m_hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix);
NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback);
// Initialize IPC caches
m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS);
m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS,
100,
hipIpcMemHandleHash,
hipIpcMemHandleEqual);
// Initialize shared object for GPU barrier IPC handle
m_sharedIpcHandle = ShmObject<hipIpcMemHandle_t>(std::max(4096LU, sizeof(hipIpcMemHandle_t)),
CliqueShmNames["Barriers"] + shmSuffix,
m_rank,
m_numRanks,
m_hash);
NCCLCHECKGOTO(m_sharedIpcHandle.Open(), res, dropback);
if (m_rank == 0)
{
hipIpcMemHandle_t handle;
// Allocate fine-grained device memory on rank 0 and get IPC handle for it
// Re-usable barrier consists of (globalCount / globalSense) pair of integers
NCCLCHECKGOTO(ncclCudaCalloc(&m_fineGrainBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), nullptr, true), res, dropback);
if (hipIpcGetMemHandle(&handle, m_fineGrainBarrierMem) != hipSuccess)
{
WARN("Unable to get IPC handle for barrier memory");
goto dropback;
}
// Write IPC handle to shared memory for other ranks to receive
*m_sharedIpcHandle.Get() = handle;
// Set up global count/sense for first rank
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
// Initialize shared CPU memory to be used for barrier variables
m_sharedCpuMemory = ShmObject<int32_t>(NCCL_MAX_OPS * 2 * sizeof(int32_t),
CliqueShmNames["SharedCounters"] + shmSuffix,
m_rank,
m_numRanks,
m_hash);
NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback);
// Split up the shared CPU memory for barrier counters / global sense
m_cpuBarrierCount = m_sharedCpuMemory.Get();
// Initialize CPU barriers
if (m_rank == 0)
{
memset(m_cpuBarrierCount, 0, NCCL_MAX_OPS * 2 * sizeof(int32_t));
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
m_cpuBarrierCount = &m_staticBarrierCount[0];
// First rank prepares fine-grained memory shared across ranks used for the two barrier variables
if (m_rank == 0)
{
NCCLCHECKGOTO(ncclCudaCalloc(&m_staticGpuBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), nullptr, true), res, dropback);
// Prepare all barriers
for (int opIndex = 0; opIndex < NCCL_MAX_OPS; opIndex++)
{
m_staticCliquePtrs[opIndex].barrier.globalCount = &m_staticGpuBarrierMem[opIndex];
m_staticCliquePtrs[opIndex].barrier.globalSense = &m_staticGpuBarrierMem[opIndex + NCCL_MAX_OPS];;
}
}
}
// Figure out device arch for tuning
int deviceId;
CUDACHECK(hipGetDevice(&deviceId));
hipDeviceProp_t devProp;
CUDACHECK(hipGetDeviceProperties(&devProp, deviceId));
m_gcnArchName = devProp.gcnArchName;
// Establish when to use clique-based kernels based on input size
SetByteLimits();
m_init = true;
INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d) [GCN %d]", m_cliqueMode, m_gcnArchName);
return ncclSuccess;
dropback:
// NOTE: This currently assumes that all ranks will fail the same way
// Additional support is required to handle cases when some processes succeed while others fail
WARN("Unable to initialize shared memory. Disabling clique-based kernels");
CleanUp();
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
void CliqueManager::SetByteLimits()
{
m_allReduceByteLimit = rcclParamAllReduceCliqueByteLimit();
if (m_allReduceByteLimit == 0)
{
if (IsArchMatch(m_gcnArchName, "gfx906"))
m_allReduceByteLimit = 16777216;
else if (IsArchMatch(m_gcnArchName, "gfx908"))
m_allReduceByteLimit = 8388608;
else
m_allReduceByteLimit = 16777216;
}
}
bool CliqueManager::IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const
{
if (m_cliqueMode == CLIQUE_DISABLED) return false;
// Filter based on total input size for each collective type and ops sum/prod/min/max
size_t totalBytes = count * ncclTypeSize(datatype);
if (coll == ncclFuncAllReduce && (totalBytes <= m_allReduceByteLimit) && op < ncclAvg) return true;
return false;
}
ncclResult_t CliqueManager::DeclarePointers(void const* inputPtr, void* outputPtr)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Add to queue of in-progress collectives
int32_t const opIndex = m_opIndexTail;
m_opIndexTail = (m_opIndexTail + 1) % NCCL_MAX_OPS;
INFO(NCCL_COLL, "Rank %d declaring pointers for opIndex %d", m_rank, opIndex);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Get fine-grained device memory if not already done
if (m_fineGrainBarrierMem == NULL)
{
hipIpcMemHandle_t handle = *m_sharedIpcHandle.Get();
CUDACHECK(hipIpcOpenMemHandle((void**)&m_fineGrainBarrierMem, handle, hipIpcMemLazyEnablePeerAccess));
// Prepare global count/sense barrier variables used the ipc-shared gpu device memory
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(NUM_HANDLES_PER_RANK);
// Get IPC handles for input/output pointers from cache
NCCLCHECK(CheckCacheForPtr(const_cast<void*>(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0]));
NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1]));
// Prepare barrier pointers (done after the IpcOpenMemory)
m_pinnedCliquePtrs[opIndex].barrier.globalCount = &m_gpuBarrierGlobalCount[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.globalSense = &m_gpuBarrierGlobalSense[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
// Write IPC handles to shared memory for given rank / opCount
NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles));
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Store this rank's input/output pointers into static member
m_staticCliquePtrs[opIndex].inputs[m_rank] = inputPtr;
m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr;
}
// Increment entry barrier counter - must not block
volatile int* entryCounter = &m_cpuBarrierCount[2 * opIndex];
int entryVal = LOAD(entryCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets exit barrier counter prior to incrementing entry count to numRanks
if (entryVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex + 1] = 0;
done = __sync_bool_compare_and_swap(entryCounter, entryVal, entryVal+1);
entryVal++;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse) const
{
size_t const totalBytes = count * ncclTypeSize(datatype);
*numChannelstoUse = 1;
if (coll == ncclFuncAllReduce) {
if (rcclParamAllReduceNumChannels() == 0)
{
// NOTE: These are currently based on collected data and not necessarily ideal for all hardware
int numChannels;
if (IsArchMatch(m_gcnArchName, "gfx906")) {
if (totalBytes <= 16384) numChannels = 1;
else numChannels = 2;
} else if (IsArchMatch(m_gcnArchName, "gfx908")) {
if (totalBytes <= 131072) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 6;
else if (totalBytes <= 1048576) numChannels = 13;
else numChannels = 16;
} else if (IsArchMatch(m_gcnArchName, "gfx90a")) {
if (totalBytes <= 262144) numChannels = 4;
else numChannels = 8;
} else {
if (totalBytes <= 65536) numChannels = 1;
else if (totalBytes <= 262144) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 4;
else if (totalBytes <= 2097152) numChannels = 8;
else numChannels = 11;
}
*numChannelstoUse = std::min(numChannels, totalNumChannels);
}
else
{
*numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels);
}
}
return ncclSuccess;
}
ncclResult_t CliqueManager::WaitForPointers(ncclWorkElem* args)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Check that collective queue is not empty
if (m_opIndexHead == m_opIndexTail)
{
WARN("WaitForPointers must be called after DeclarePointers");
return ncclInvalidUsage;
}
// Pop first collective off queue
int32_t const opIndex = m_opIndexHead;
INFO(NCCL_COLL, "Rank %d waiting for pointers for opIndex %d", m_rank, opIndex);
m_opIndexHead = (m_opIndexHead + 1) % NCCL_MAX_OPS;
args->clique.ptrs = &m_pinnedCliquePtrs[opIndex];
// Wait for all ranks to declare pointers for this opIndex
volatile int* entryCounter = (volatile int*)(&m_cpuBarrierCount[2 * opIndex]);
int entryVal = LOAD(entryCounter);
while (entryVal != m_numRanks) entryVal = LOAD(entryCounter);
// Last rank to past barrier resets entry barrier
// NOTE: There is another GPU-barrier performed during the kernels therefore it should
// not be possible for any rank to modify entry count prior to being reset
volatile int* exitCounter = &m_cpuBarrierCount[2 * opIndex + 1];
int exitVal = LOAD(exitCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets entry counter
if (exitVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex] = 0;
done = __sync_bool_compare_and_swap(exitCounter, exitVal, exitVal+1);
exitVal++;
}
INFO(NCCL_COLL, "Rank %d past opIndex barrier %d", m_rank, opIndex);
// Collect pointers
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(numHandles);
// Collect the ready handles from shared memory and convert them to device pointers
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast<const void *>(input);
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i]));
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Copy from static memory to pinned host memory and set local sense
memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t));
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair)
{
// Get the base address for this device allocation
#if 0
hsa_status_t status;
hsa_amd_pointer_info_t info;
info.size = sizeof(hsa_amd_pointer_info_t);
status = hsa_amd_pointer_info(devPtr, &info, NULL, NULL, NULL);
if (status != HSA_STATUS_SUCCESS) {
WARN("Uanble to get pointer information for %p", devPtr);
return ncclInvalidArgument;
}
// Compute the offset between the device addres and the base address
uint64_t baseAddr = (uint64_t)info.agentBaseAddress;
uint64_t realAddr = (uint64_t)devPtr;
handlePair->second = realAddr - baseAddr;
CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr));
/* Disabling cache until proper deallocation methods are available
// IPC handles are only supported for base address pointers
NcclIpcHandleSendCache::iterator it = cache->find(baseAddr);
if (it == cache->end())
{
INFO(NCCL_COLL, "Rank %d searching IPC handle cache for %p (not found)", rank, devPtr);
CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr));
cache->insert(baseAddr, handlePair->first);
}
else
{
INFO(NCCL_COLL, "Rank %d searching IPC handle cache for %p (found!)", rank, devPtr);
handlePair->first = (it->second).first;
}
*/
#endif
hipError_t status;
hipPointerAttribute_t info;
status = hipPointerGetAttributes(&info, devPtr);
if (status != hipSuccess) {
WARN("Uanble to get pointer information for %p", devPtr);
return ncclInvalidArgument;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr)
{
// Until proper deallocation hooks are implemented, receive cache can not be used
// Handles will need to be extract each time
void* baseAddr;
CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess));
/*
NcclIpcHandleRecvCache::iterator it = cache->find(handlePair.first);
// Get base address pointer from cache if it exists
if (it == cache->end())
{
CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess));
cache->insert(handlePair.first, baseAddr);
}
else
{
baseAddr = (it->second).first;
}
*/
// Modify base address pointer with offset
uint64_t realAddr = (uint64_t)baseAddr + handlePair.second;
*ptr = (void*)realAddr;
return ncclSuccess;
}
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
if (rcclParamEnableClique())
{
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
mqd_t mq_desc;
std::string msgQueueName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
NCCLCHECK(MsgQueueGetId(msgQueueName, true, mq_desc));
NCCLCHECK(MsgQueueClose(msgQueueName, mq_desc, true));
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
}
}
else
{
INFO(NCCL_INIT, "Not performing bootstrap root for clique kernels as clique mode not enabled.");
}
return ncclSuccess;
}
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_CLIQUE_MANAGER_HPP_
#define RCCL_CLIQUE_MANAGER_HPP_
#include <semaphore.h>
#include <mutex>
#include "nccl.h"
#include "devcomm.h"
#include "CliqueCommon.h"
#include "HandleCache.h"
#include "HandleShm.h"
#define NUM_HANDLES_PER_RANK 2
class CliqueManager
{
public:
typedef enum
{
CLIQUE_DISABLED = 0,
CLIQUE_SINGLE_PROCESS = 1,
CLIQUE_SINGLE_NODE = 2
} cliqueMode_t;
CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode);
~CliqueManager();
void CleanUp();
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
void SetByteLimits();
// Returns true if the collective is supported via a clique-based kernel
bool IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const;
// Provide the pointers to be exchanged across the clique for the given rank / opCount
ncclResult_t DeclarePointers(void const* inputPtr, void* outputPtr);
// Determine the number of channels / CUs to use for this call
ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse) const;
// Blocking call that only returns the in-progress clique pointers are ready
// This needs to be called in same order as DeclarePointers
ncclResult_t WaitForPointers(ncclWorkElem* args);
// Prepares shared memory files upon initialization
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
protected:
ncclResult_t CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair);
ncclResult_t CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr);
int m_rank; // Associated rank
int m_numRanks; // Total number of ranks
unsigned long m_hash; // Hash used for identifying message queues & shared memory
cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node)
int32_t m_opIndexHead; // Track start of outstanding requests
int32_t m_opIndexTail; // Track end of outstanding requests
bool m_init; // Whether CliqueManager has been initialized
char[256] m_gcnArchName; // Device GCN arch value
size_t m_allReduceByteLimit; // Byte limit for AllReduce
cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers
int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks)
int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks)
int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank)
int* m_cpuBarrierCount; // Points to either m_sharedBarrierCount or m_staticBarrierCount
// IPC-related (CLIQUE_SINGLE_NODE)
NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks
NcclIpcHandleSendCache* m_ipcHandleSendCache; // Caches pointers to IPC handles (to send to other processes)
NcclIpcHandleRecvCache* m_ipcHandleRecvCache; // Caches IPC handles to pointers (received from other processes)
ShmObject<int32_t> m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier
ShmObject<hipIpcMemHandle_t> m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle
int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others)
int* m_sharedBarrierCount; // Part of CPU barrier (count variable shared across ranks)
// Single-process (CLIQUE_SINGLE_PROCESS)
static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info
static int m_staticBarrierCount[2*NCCL_MAX_OPS]; // Part of CPU barrier (count variable shared across ranks)
static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier
};
// For use in bootstrapping code
struct bootstrapRootStruct {
int listenFd;
unsigned long hash;
};
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_CLIQUE_SHM_NAMES_H_
#define NCCL_CLIQUE_SHM_NAMES_H_
#include <string>
#include <map>
static std::map<std::string, std::string> CliqueShmNames =
{
{"SharedCounters", "RcclCounters" },
{"Mutexes" , "RcclMutexes" },
{"IpcHandles" , "RcclIpcHandles"},
{"Barriers" , "RcclBarriers" }
};
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "HandleCache.h"
#include "Hash.h"
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle)
{
return djb2Hash(handle.reserved);
}
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HANDLE_CACHE_H_
#define NCCL_HANDLE_CACHE_H_
#include <list>
#include <unordered_map>
#include <functional>
#include "core.h"
//#include "llvm/ADT/DenseMap.h"
template <
class Key,
class Value,
class Hash,
class KeyEqual,
class Allocator
>
class NcclIpcHandleCache
{
public:
typedef std::pair<Value, typename std::list<Key>::iterator> NcclIpcHandleCacheValueType;
typedef std::unordered_map<Key, NcclIpcHandleCacheValueType, Hash, KeyEqual, Allocator> LRUCache;
using iterator = typename LRUCache::iterator;
NcclIpcHandleCache(size_t size,
size_t bucket_count = 100,
const Hash& hash = Hash(),
const KeyEqual& eql = KeyEqual(),
const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc)
{
m_capacity = size;
}
~NcclIpcHandleCache()
{
m_lruHistory.clear();
m_cache.clear();
}
iterator begin()
{
return m_cache.begin();
}
iterator end()
{
return m_cache.end();
}
iterator find(const Key& key)
{
iterator it = m_cache.find(key);
if (it != m_cache.end())
{
updateHistory(it);
}
return it;
}
std::pair<iterator, bool> insert(const Key& key, const Value& value)
{
if (m_cache.size() == m_capacity)
{
// remove entry
pop();
}
typename LRUCache::iterator it = m_cache.find(key);
bool inserted;
if (it == m_cache.end())
{
typename std::list<Key>::iterator it = m_lruHistory.insert(m_lruHistory.end(), key);
m_cache.insert(std::make_pair(key, std::make_pair(value, it)));
inserted = true;
}
else
{
inserted = false;
}
return std::pair<iterator, bool>(it, inserted);
}
private:
void pop()
{
typename LRUCache::iterator it = m_cache.find(m_lruHistory.front());
m_cache.erase(it);
m_lruHistory.pop_front();
}
void updateHistory(const iterator& it)
{
if (!m_lruHistory.empty())
{
m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, (it->second).second);
}
}
size_t m_capacity;
std::list<Key> m_lruHistory;
LRUCache m_cache;
};
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle);
// equality function required for unordered_map
auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r)
{
return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0;
};
//typedef llvm::DenseMap<uint64_t, hipIpcMemHandle_t> SendCache;
//typedef llvm::DenseMap<hipIpcMemHandle_t, void*, decltype(&HandleHash), decltype(HandleEqual)> RecvCache;
typedef NcclIpcHandleCache<uint64_t, hipIpcMemHandle_t, std::hash<uint64_t>, std::equal_to<uint64_t>, std::allocator< std::pair<const uint64_t, std::pair<hipIpcMemHandle_t, std::list<uint64_t>::iterator>>>> NcclIpcHandleSendCache;
typedef NcclIpcHandleCache<hipIpcMemHandle_t, void*, decltype(&hipIpcMemHandleHash), decltype(hipIpcMemHandleEqual), std::allocator< std::pair<const hipIpcMemHandle_t, std::pair<void*, std::list<hipIpcMemHandle_t>::iterator>>>> NcclIpcHandleRecvCache;
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <hip/hip_runtime.h>
#include "HandleShm.h"
#include "CliqueShmNames.h"
#include "core.h"
#include "Hash.h"
#include "shm.h"
NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string const& suffix) :
ShmObject<std::pair<hipIpcMemHandle_t,size_t>>(numRanks * numHandlesPerRank * capacity * sizeof(std::pair<hipIpcMemHandle_t,size_t>),
CliqueShmNames["IpcHandles"] + suffix,
rank,
numRanks,
projid),
m_numHandlesPerRank(numHandlesPerRank),
m_numHandlesPerOpCount(numRanks * numHandlesPerRank)
{
}
NcclIpcHandleShm::NcclIpcHandleShm() :
m_numHandlesPerRank(0),
m_numHandlesPerOpCount(0)
{
}
NcclIpcHandleShm::~NcclIpcHandleShm()
{
}
ncclResult_t NcclIpcHandleShm::Open()
{
return ShmObject::Open();
}
ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles)
{
size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank);
memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(std::pair<hipIpcMemHandle_t,size_t>) * m_numHandlesPerRank);
return ncclSuccess;
}
ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles)
{
size_t idx = opCount * m_numHandlesPerOpCount;
memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(std::pair<hipIpcMemHandle_t,ssize_t>));
return ncclSuccess;
}
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_IPC_HANDLE_SHM_H_
#define NCCL_IPC_HANDLE_SHM_H_
#include <hip/hip_runtime.h>
#include <vector>
#include <string>
#include "nccl.h"
#include "ShmObject.h"
class NcclIpcHandleShm : public ShmObject<std::pair<hipIpcMemHandle_t,size_t>>
{
public:
NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string const& suffix);
NcclIpcHandleShm();
~NcclIpcHandleShm();
ncclResult_t Open();
ncclResult_t WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles);
ncclResult_t ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles);
private:
int m_numHandlesPerRank;
int m_numHandlesPerOpCount;
};
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "Hash.h"
unsigned long djb2Hash(const char* data)
{
unsigned long hash = 5381;
int c;
while ((c = *(data)++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
return hash;
}
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HASH_H_
#define NCCL_HASH_H_
unsigned long djb2Hash(const char* data);
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "MsgQueue.h"
#include <chrono>
#define MSG_QUEUE_PERM S_IRUSR | S_IWUSR
#define MSG_QUEUE_MODE O_RDWR
#define MSG_SIZE 1
#define MSG_QUEUE_TIMEOUT 60
ncclResult_t MsgQueueGetId(std::string const& name, bool exclusive, mqd_t& mq_desc)
{
int flag = (exclusive == true ? O_CREAT | O_EXCL : O_CREAT);
struct mq_attr attr;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MSG_SIZE;
attr.mq_flags = 0;
std::string mq_name = "/" + name;
mq_desc = mq_open(mq_name.c_str(), flag | MSG_QUEUE_MODE, MSG_QUEUE_PERM, &attr);
// Check if we're trying to create message queue and it already exists; if so, delete existing queue
if (mq_desc == -1 && exclusive == true && errno == EBUSY)
{
NCCLCHECK(MsgQueueClose(name, mq_desc, true));
SYSCHECKVAL(mq_open(mq_name.c_str(), flag | MSG_QUEUE_MODE, MSG_QUEUE_PERM, attr), "mq_open", mq_desc);
}
else if (mq_desc == -1)
{
WARN("Call to MsgQueueGetId failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
ncclResult_t MsgQueueSend(mqd_t const& mq_desc, const char* msgp, size_t msgsz)
{
SYSCHECK(mq_send(mq_desc, msgp, msgsz, 0), "mq_send");
return ncclSuccess;
}
ncclResult_t MsgQueueRecv(mqd_t const& mq_desc, char* msgp, size_t msgsz)
{
SYSCHECK(mq_receive(mq_desc, msgp, msgsz, NULL), "mq_receive");
return ncclSuccess;
}
ncclResult_t MsgQueueWaitUntilEmpty(mqd_t const& mq_desc)
{
mq_attr attr;
mq_getattr(mq_desc, &attr);
auto start = std::chrono::steady_clock::now();
while(attr.mq_curmsgs > 0)
{
SYSCHECK(mq_getattr(mq_desc, &attr), "mq_getattr");
if(std::chrono::steady_clock::now() - start > std::chrono::seconds(MSG_QUEUE_TIMEOUT))
{
WARN("Message Queue timed out waiting for all ranks to receive messages.");
return ncclSystemError;
}
}
return ncclSuccess;
}
ncclResult_t MsgQueueClose(std::string const& name, mqd_t& mq_desc, bool unlink)
{
if (unlink)
{
NCCLCHECK(MsgQueueUnlink(name));
}
SYSCHECK(mq_close(mq_desc), "mq_close");
return ncclSuccess;
}
ncclResult_t MsgQueueUnlink(std::string const& name)
{
std::string mq_name = "/" + name;
SYSCHECK(mq_unlink(mq_name.c_str()), "mq_unlink");
return ncclSuccess;
}
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_MSG_QUEUE_HPP_
#define RCCL_MSG_QUEUE_HPP_
#include <string>
#include <mqueue.h>
#include "nccl.h"
#include "core.h"
ncclResult_t MsgQueueGetId(std::string const& name, bool exclusive, mqd_t& mq_desc);
ncclResult_t MsgQueueSend(mqd_t const& mq_desc, const char* msgp, size_t msgsz);
ncclResult_t MsgQueueRecv(mqd_t const& mq_desc, char* msgp, size_t msgsz);
ncclResult_t MsgQueueWaitUntilEmpty(mqd_t const& mq_desc);
ncclResult_t MsgQueueClose(std::string const& name, mqd_t& mq_desc, bool unlink);
ncclResult_t MsgQueueUnlink(std::string const& name);
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef SHAREDMEMHELPER_H
#define SHAREDMEMHELPER_H
class SharedMemHelper
{
public:
SharedMemHelper(int rank, int numRanks, int numEntries);
ncclStatus_t Init(std::string const& baseFilename);
ncclStatus_t
protected:
bool m_initialized;
int m_rank;
int m_numRanks;
};
#endif
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "ShmObject.h"
#include <string>
// Template specializations for sem_t objects which require additional initialization
template<>
ncclResult_t ShmObject<sem_t>::Close()
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
sem_destroy(static_cast<sem_t*>(&m_shmPtr[i]));
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_SHM_OBJECT_H_
#define NCCL_SHM_OBJECT_H_
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <type_traits>
#include <semaphore.h>
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "shm.h"
// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared
// memory object at the same time.
static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) {
*fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
if (*fd == -1) return ncclSystemError;
if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate");
SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap");
close(*fd);
*fd = -1;
if (create) memset(*ptr, 0, shmsize);
return ncclSuccess;
}
template <typename T>
class ShmObject
{
public:
ShmObject(size_t size, std::string const& fileName, int rank, int numRanks, int projid) :
m_shmSize(size),
m_shmName(fileName),
m_rank(rank),
m_numRanks(numRanks),
m_projid(projid),
m_alloc(false),
m_shmPtr(nullptr) {}
ShmObject() :
m_shmSize(0),
m_shmName(""),
m_rank(0),
m_numRanks(0),
m_projid(0),
m_alloc(false),
m_shmPtr(nullptr) {}
~ShmObject() {}
ncclResult_t Open();
ncclResult_t Close()
{
if (m_alloc)
{
SYSCHECK(munmap(m_shmPtr, m_shmSize), "munmap");
}
return ncclSuccess;
}
T*& Get()
{
return m_shmPtr;
}
protected:
ncclResult_t BroadcastMessage(mqd_t& mq_desc, bool pass) const
{
char msg_text[1];
msg_text[0] = (pass == 0 ? 'F': 'P');
for (int rank = 0; rank < m_numRanks; rank++)
{
if (rank == m_rank) continue;
NCCLCHECK(MsgQueueSend(mq_desc, &msg_text[0], sizeof(msg_text)));
}
return ncclSuccess;
}
ncclResult_t BroadcastAndCloseMessageQueue(mqd_t& mq_desc, bool pass)
{
ncclResult_t res;
NCCLCHECKGOTO(BroadcastMessage(mq_desc, pass), res, dropback);
NCCLCHECKGOTO(MsgQueueWaitUntilEmpty(mq_desc), res, dropback);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, true));
return ncclSuccess;
dropback:
WARN("Root rank unable to broadcast across message queue. Closing message queue.");
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, true));
return ncclSystemError;
}
// tag for dispatch
template<class U>
struct OpenTag{};
static ncclResult_t InitIfSemaphore(OpenTag<int> tag);
ncclResult_t InitIfSemaphore(OpenTag<uint32_t> tag);
static ncclResult_t InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<sem_t> tag);
static ncclResult_t InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag);
size_t m_shmSize;
std::string m_shmName;
int m_rank;
int m_numRanks;
int m_projid;
bool m_alloc;
T* m_shmPtr;
};
template <typename T>
ncclResult_t ShmObject<T>::Open()
{
mqd_t mq_desc;
if (m_alloc == false)
{
int shmFd;
INFO(NCCL_INIT, "Rank %d Initializing message queue for %s\n", m_rank, m_shmName.c_str());
NCCLCHECK(MsgQueueGetId(m_shmName, false, mq_desc));
if (m_rank == 0)
{
ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1);
ncclResult_t resultSemInit = InitIfSemaphore(OpenTag<T>{});
if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess))
{
NCCLCHECK(BroadcastAndCloseMessageQueue(mq_desc, false));
WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno));
if (resultSetup == ncclSuccess)
{
Close();
}
return ncclSystemError;
}
ncclResult_t result;
// Broadcast two sets of messages: one set is consumed by the other ranks to acknowledge root rank
// has successfully opened shared memory; second set is consumed by the other ranks to indicate
// that they have successfully opened shared memory and root rank can now unlink shared memory
NCCLCHECK(BroadcastMessage(mq_desc, true));
NCCLCHECK(BroadcastAndCloseMessageQueue(mq_desc, true));
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
else
{
char msg_text[1];
ncclResult_t res;
NCCLCHECKGOTO(MsgQueueRecv(mq_desc, &msg_text[0], sizeof(msg_text)), res, dropback);
if (msg_text[0] == 'P')
{
NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0));
NCCLCHECKGOTO(MsgQueueRecv(mq_desc, &msg_text[0], sizeof(msg_text)), res, dropback);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
}
else
{
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
m_alloc = true;
}
else
{
WARN("Cannot allocate ShmObject twice.\n");
return ncclInvalidUsage;
}
return ncclSuccess;
dropback:
WARN("Rank %d failed ShmObject::Open(). Closing message queue.", m_rank);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
SYSCHECK(shm_unlink(m_shmName.c_str()), "shm_unlink");
NCCLCHECK(Close());
return ncclSystemError;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<unsigned int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<sem_t> tag)
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
SYSCHECK(sem_init(static_cast<sem_t*>(&m_shmPtr[i]), 1, 1), "sem_init");
}
return ncclSuccess;
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment