"git@developer.sourcefind.cn:OpenDAS/TransformerEngine.git" did not exist on "5083a713eee85ae601075f3722a2f32060f01ab8"
Unverified Commit 08e5e4b1 authored by Pavel Shamis (Pasha)'s avatar Pavel Shamis (Pasha) Committed by GitHub
Browse files

[UB] Adding configurable timeout for userbuffer and improving error reporting...


[UB] Adding configurable timeout for userbuffer and improving error reporting for potential hangs (#757)

* Improving error reporting and hang detection logic

* Adding verbose error reporting in case of UB hang
* Adding CE hang detector
* Replacing hard-coded timeout with configurable one
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Cleaning up warnings in the code
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Removing unused codes
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Fixing styling issues reported on github
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Addressing lint new line and casting warnings
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Addressing lint warning about the usage of `unsigned long long`
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Removing unused case causing build issues on multi-arch setup
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

* Post GRDCOPY removal cleanup

* Remove cmake check
* Remove unused includes
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>

---------
Signed-off-by: default avatarPasha (Pavel) Shamis <pasharesearch@gmail.com>
Co-authored-by: default avatarKirthi Shankar Sivamani <ksivamani@nvidia.com>
parent cd54a8cd
......@@ -11,17 +11,11 @@ target_include_directories(transformer_engine_userbuffers PUBLIC
# Configure dependencies
find_package(MPI REQUIRED)
find_library(GDRCOPY_LIBRARY gdrapi
HINTS "${GDRCOPY_LIBRARY_DIR}" "$ENV{GDRCOPY_LIBRARY_DIR}")
if(NOT GDRCOPY_LIBRARY)
message(FATAL_ERROR "Could not find GDRCopy, please set GDRCOPY_LIBRARY_DIR")
endif()
message(STATUS "Found GDRCopy: ${GDRCOPY_LIBRARY}")
target_link_libraries(transformer_engine_userbuffers PUBLIC
CUDA::cudart
CUDA::cuda_driver
MPI::MPI_CXX
${GDRCOPY_LIBRARY})
)
target_include_directories(transformer_engine_userbuffers PRIVATE
${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES})
......
......@@ -11,7 +11,6 @@
#include <chrono>
#include <cuda_runtime.h>
#include <cuda_runtime_api.h>
#include <immintrin.h>
#include <iostream>
#include <math.h>
#include <mpi.h>
......@@ -19,7 +18,6 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <x86intrin.h>
#define MULTICAST_GB_TOTAL 512
static int oob_bcast(void *comm_context, void *buf, int size, int root) {
......@@ -123,11 +121,20 @@ int create_communicator_grouped2(communicator **comm, int pipegpus, int pipenode
(*comm)->basecounter[i] = 0;
(*comm)->head = 0;
(*comm)->tail = 0;
(*comm)->activeproxy = 1;
(*comm)->active_nreqs = 0;
for (int i = 0; i < userbuffers_op_types; i++)
(*comm)->active_req[i].active = -1;
int device_clock = 0;
// 110 sec wait time by default
int sec_timeout = getenv("UB_TIMEOUT") ? atoi(getenv("UB_TIMEOUT")) : 110;
CUDACHECK(cudaDeviceGetAttribute(&device_clock, cudaDevAttrClockRate, cur_dev));
(*comm)->ub_timeout = 1000ull * device_clock * sec_timeout;
if ((*comm)->myrank == 0) {
printf("UB_TIMEOUT is set to %d sec, %" PRIu64 " cycles, freq: %dkhz\n",
sec_timeout, (*comm)->ub_timeout, device_clock);
}
int ret = 0;
// split communicator
char host_name[MPI_MAX_PROCESSOR_NAME];
......@@ -232,59 +239,12 @@ int create_communicator_grouped2(communicator **comm, int pipegpus, int pipenode
(*comm)->num2_nodes = tensornodes;
(*comm)->my2_node = (mynode / datanodes) % tensornodes;
(*comm)->first2_node = mynode - (*comm)->my2_node * datanodes;
char *ib_dev_list;
int ZIONROCE = getenv("NVTE_ZIONROCE") ? atoi(getenv("NVTE_ZIONROCE")) : 0;
int ROCE = getenv("NVTE_ROCE") ? atoi(getenv("NVTE_ROCE")) : 0;
if (ZIONROCE)
ROCE = 1;
int DGX_H100 = device_prop.major == 9;
switch (mylocal) {
case 0:
ib_dev_list = "mlx5_0:1";
break; // NOLINT(*)
case 1:
ib_dev_list = (char *)(DGX_H100 ? "mlx5_3:1" : "mlx5_1:1"); // NOLINT(*)
break; // NOLINT(*)
case 2:
ib_dev_list = (char *)(ZIONROCE ? "mlx5_4:1" : DGX_H100 ? "mlx5_4:1" : "mlx5_2:1"); // NOLINT(*)
break; // NOLINT(*)
case 3:
ib_dev_list = (char *)(DGX_H100 ? "mlx5_5:1" : "mlx5_3:1"); // NOLINT(*)
break; // NOLINT(*)
case 4:
ib_dev_list = (char *)(DGX_H100 ? "mlx5_6:1" : "mlx5_6:1"); // NOLINT(*)
break; // NOLINT(*)
case 5:
ib_dev_list = (char *)(DGX_H100 ? "mlx5_9:1" : "mlx5_7:1"); // NOLINT(*)
break; // NOLINT(*)
case 6:
ib_dev_list = (char *)(ZIONROCE ? "mlx5_10:1" : DGX_H100 ? "mlx5_10:1" : "mlx5_8:1"); // NOLINT(*)
break; // NOLINT(*)
case 7:
ib_dev_list = (char *)(DGX_H100 ? "mlx5_11:1" : "mlx5_9:1"); // NOLINT(*)
break; // NOLINT(*)
default:
break;
}
(*comm)->fifo = reinterpret_cast<ub_request *>(malloc(sizeof(ub_request) * NVTE_MAX_REQUESTS));
(*comm)->nblocks = 8;
(*comm)->alignblock = 1024 * 512;
(*comm)->minblock = 1024 * 2 * 1024;
(*comm)->asyncblocks = 16;
CUDACHECK(cudaMallocHost((void **)&(*comm)->hostflags, // NOLINT(*)
(NVTE_MAX_SMS + 100) * sizeof(int)));
for (int i = 0; i < 100 + NVTE_MAX_SMS; i++)
(*comm)->hostflags[i] = 0;
_mm_mfence();
sleep(1);
// init_p2p_transport();
(*comm)->ibnvsize = (*comm)->nvsize;
#define NBUF 2
if ((*comm)->sm_arch >= 9 && (*comm)->ar2_nvsize > 1 &&
!getenv("UB_SKIPMC")) { // multicast init only for TP ops (____2 operations)
......@@ -374,6 +334,7 @@ int create_communicator_grouped2(communicator **comm, int pipegpus, int pipenode
#define GPU_PAGE_SIZE (1UL << GPU_PAGE_SHIFT)
#define GPU_PAGE_OFFSET (GPU_PAGE_SIZE - 1)
#define GPU_PAGE_MASK (~GPU_PAGE_OFFSET)
CUDACHECK(cudaMalloc(&(*comm)->flags, 2 * GPU_PAGE_SIZE));
unsigned int flag = 1;
CUDACHECK(cudaMemset((*comm)->flags, 0, 2 * GPU_PAGE_SIZE));
......@@ -381,23 +342,6 @@ int create_communicator_grouped2(communicator **comm, int pipegpus, int pipenode
reinterpret_cast<int *>(((CUdeviceptr)(*comm)->flags + GPU_PAGE_SIZE - 1) & GPU_PAGE_MASK);
using namespace std;
(*comm)->g = gdr_open();
if ((*comm)->g == NULL) {
fprintf(stderr, "gdrcopy open failed\n");
return -1;
}
gdr_mh_t mh;
ret = gdr_pin_buffer((*comm)->g, (CUdeviceptr)(*comm)->flags, GPU_PAGE_SIZE, 0, 0, &mh);
if (ret) {
fprintf(stderr, "gdr_pin_buffer failed\n");
return -1;
}
ret = gdr_map((*comm)->g, mh, (void **)&((*comm)->map_flags), GPU_PAGE_SIZE); // NOLINT(*)
if (ret) {
fprintf(stderr, "gdr_map failed\n");
return -1;
}
sched_param param;
pthread_attr_t attr;
pthread_attr_init(&attr);
......@@ -426,10 +370,6 @@ int create_communicator(communicator **comm) {
}
void destroy_communicator(communicator *comm) {
comm->activeproxy = 0;
if (!comm->myrank && getenv("NVTE_UBDEBUG"))
printf("waiting for userbuffers proxy thread to exit()\n");
gdr_close(comm->g);
}
int register_user_buffer_collective(void **gpubuff, size_t bytes, communicator *comm, bool alloc) {
......@@ -533,7 +473,7 @@ int register_user_buffer_collective(void **gpubuff, size_t bytes, communicator *
CUCHECK(cuMulticastBindMem(comm->mc_handle, comm->mc_offset, comm->uchandles[hndl][myrank],
0 /*memOffset*/, aligned_size, 0));
comm->memflags[hndl] |= UB_MEM_MC_CREATED;
comm->mc_ptr[hndl] = comm->mc_baseptr + comm->mc_offset;
comm->mc_ptr[hndl] = reinterpret_cast<char *>(comm->mc_baseptr) + comm->mc_offset;
comm->mc_offset += aligned_size;
} else if (!comm->myrank) {
printf("UB: warning region %d size %ld MB registered without MC access\n", hndl,
......@@ -570,146 +510,3 @@ int register_user_buffer_collective(void **gpubuff, size_t bytes, communicator *
return comm->free_region++;
}
int allreduce_userbuff_inplace_gpu(const int handler, const int offset, const int elements,
const int blocksize, communicator *comm, cudaStream_t stream);
int allreduce2_userbuff_inplace_gpu(const int maxcredit, const int handler, const int offset,
const int elements, const int blocksize, communicator *comm,
cudaStream_t stream, int op);
int reducescatter2_userbuff_inplace_gpu(const int maxcredit, const int handler, const int offset,
const int elements, const int blocksize, communicator *comm,
cudaStream_t stream, int op);
int allgather2_userbuff_inplace_gpu(const int maxcredit, const int handler, const int offset,
const int elements, const int blocksize, communicator *comm,
cudaStream_t stream, int op);
void allreduce_nonsharp_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream, int op) {
if (elements < 64)
NVTE_UB_ERROR("Userbuffer comm for given config not implemented.");
// if(comm->myrank==0) fprintf(stderr,"AR2(%d) user call
// launch_mode=%d\n",op,comm->launch_mode);
const int ar_nvsize = op == userbuffers_allreduceop_nonsharp ? comm->ar_nvsize : comm->ar2_nvsize;
int blocksize = elements * 2;
int maxcredit = 0;
const int num_nodes = op == userbuffers_allreduceop_nonsharp ? comm->num_nodes : comm->num2_nodes;
blocksize = (comm->nblocks - 1 + (comm->alignblock - 1 + elements * 2) / comm->alignblock) /
comm->nblocks; // FIXME TUNING
blocksize *= comm->alignblock;
if (blocksize < comm->minblock)
blocksize = comm->minblock;
maxcredit = (elements * 2 + blocksize - 1) / blocksize;
size_t peerblock = sizeof(int) * NVTE_REG0_COMMBUFFER / maxcredit; // max size we can fit
if (blocksize > peerblock * ar_nvsize)
blocksize = peerblock * ar_nvsize;
int sms = allreduce2_userbuff_inplace_gpu(maxcredit, handler, offset, elements, blocksize, comm,
stream, op);
if (num_nodes > 1 && comm->launch_mode & NVTE_LAUNCH_CPU) {
if (!sms)
return;
comm->fifo[comm->head].optype = op;
comm->fifo[comm->head].basecounter = comm->basecounter[op];
comm->fifo[comm->head].blocksize = blocksize;
comm->fifo[comm->head].maxcredit = maxcredit;
comm->fifo[comm->head].handler = handler;
comm->fifo[comm->head].offset = offset;
comm->fifo[comm->head].elements = elements;
int newhead = (comm->head + 1) & (NVTE_MAX_REQUESTS - 1);
while (newhead == comm->tail) {
}
comm->head = newhead;
comm->basecounter[op] += (elements * 2 + blocksize - 1) / blocksize;
}
}
void allreduce2_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream) {
allreduce_nonsharp_inplace(handler, offset, elements, comm, stream,
userbuffers_allreduceop_nonsharp2);
}
void allreduce_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream) {
if (elements < 64)
NVTE_UB_ERROR("Userbuffer comm for given config not implemented.");
allreduce_nonsharp_inplace(handler, offset, elements, comm, stream,
userbuffers_allreduceop_nonsharp);
return;
}
void reducescatter_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream) {
if (elements < 64)
NVTE_UB_ERROR("Userbuffer comm for given config not implemented.");
int op = userbuffers_allreduceop_nonsharp;
const int ar_nvsize = op == userbuffers_allreduceop_nonsharp ? comm->ar_nvsize : comm->ar2_nvsize;
int blocksize = elements * 2;
int maxcredit = 0;
const int num_nodes = op == userbuffers_allreduceop_nonsharp ? comm->num_nodes : comm->num2_nodes;
blocksize = (comm->nblocks - 1 + (comm->alignblock - 1 + elements * 2) / comm->alignblock) /
comm->nblocks; // FIXME TUNING
blocksize *= comm->alignblock;
if (blocksize < comm->minblock)
blocksize = comm->minblock;
maxcredit = (elements * 2 + blocksize - 1) / blocksize;
size_t peerblock = sizeof(int) * NVTE_REG0_COMMBUFFER / maxcredit; // max size we can fit
if (blocksize > peerblock * ar_nvsize)
blocksize = peerblock * ar_nvsize;
int sms = reducescatter2_userbuff_inplace_gpu(maxcredit, handler, offset, elements, blocksize,
comm, stream, op);
if (num_nodes > 1 && comm->launch_mode & NVTE_LAUNCH_CPU) {
if (!sms)
return;
comm->fifo[comm->head].optype = op;
comm->fifo[comm->head].basecounter = comm->basecounter[op];
comm->fifo[comm->head].blocksize = blocksize;
comm->fifo[comm->head].maxcredit = maxcredit;
comm->fifo[comm->head].handler = handler;
comm->fifo[comm->head].offset = offset;
comm->fifo[comm->head].elements = elements;
int newhead = (comm->head + 1) & (NVTE_MAX_REQUESTS - 1);
while (newhead == comm->tail) {
}
comm->head = newhead;
comm->basecounter[op] += (elements * 2 + blocksize - 1) / blocksize;
}
}
void allgather_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream) {
if (elements < 64)
NVTE_UB_ERROR("Userbuffer comm for given config not implemented.");
int op = userbuffers_allreduceop_nonsharp;
const int ar_nvsize = op == userbuffers_allreduceop_nonsharp ? comm->ar_nvsize : comm->ar2_nvsize;
int blocksize = elements * 2;
int maxcredit = 0;
const int num_nodes = op == userbuffers_allreduceop_nonsharp ? comm->num_nodes : comm->num2_nodes;
blocksize = (comm->nblocks - 1 + (comm->alignblock - 1 + elements * 2) / comm->alignblock) /
comm->nblocks; // FIXME TUNING
blocksize *= comm->alignblock;
if (blocksize < comm->minblock)
blocksize = comm->minblock;
maxcredit = (elements * 2 + blocksize - 1) / blocksize;
size_t peerblock = sizeof(int) * NVTE_REG0_COMMBUFFER / maxcredit; // max size we can fit
if (blocksize > peerblock * ar_nvsize)
blocksize = peerblock * ar_nvsize;
int sms = allgather2_userbuff_inplace_gpu(maxcredit, handler, offset, elements, blocksize, comm,
stream, op);
}
......@@ -12,7 +12,6 @@
#include "cuda_runtime.h"
#include <pthread.h>
#include <chrono>
#include "gdrapi.h"
#include <stdexcept>
#define NVTE_MAX_REGIONS 16
......@@ -32,10 +31,6 @@
#define NVTE_UB_MEM_MC_CREATED 2
#define NVTE_UB_MEM_ALLOCATED 4
#ifdef UCP
#include <ucp/api/ucp.h>
#endif
// region 0 flag offsets
#define NVTE_REG0_OPFLAGS 1024
#define NVTE_REG0_RECV (NVTE_REG0_OPFLAGS * userbuffers_op_types)
......@@ -43,7 +38,8 @@
#define NVTE_REG0_OFFSET(comm) ((2 * NVTE_MAX_REGIONS) * NVTE_MAX_NVLINK \
+ NVTE_REG0_SINGLENODE * 2 + NVTE_MAX_PEERS)
#define NVTE_REG0_COMMBUFFER 0
#define NVTE_REG0_FLAGS (NVTE_REG0_RECV + NVTE_MAX_PEERS * NVTE_MAX_REGIONS)
// x3 for [flagptr, ce_start_ptr, ce_end_ptr]
#define NVTE_REG0_FLAGS (NVTE_REG0_RECV + NVTE_MAX_PEERS * NVTE_MAX_REGIONS * 3)
#define NVTE_REG0_IBRS 32
#define NVTE_REG0_IBAG 512
......@@ -122,16 +118,11 @@ struct communicator {
// max value for running block counters in hostflags
int basecounter[userbuffers_op_types]; // NOLINT(*)
int *hostflags;
int *flags, *map_flags;
gdr_t g;
struct sharp_coll_context *sharp_coll_context;
struct sharp_coll_comm *sharp_coll_comm;
void *mem_mr[NVTE_MAX_REGIONS];
ub_request *fifo;
volatile int activeproxy;
int nblocks, alignblock, minblock, asyncblocks, active_nreqs;
ub_request active_req[userbuffers_op_types]; // NOLINT(*)
int padding[7];
......@@ -142,10 +133,9 @@ struct communicator {
MPI_Request mpihndl[NVTE_MAX_SHARP];
MPI_Comm comm_inter, // reduction group communicator (subset of the nodes) along GPU rail
comm_intra; // full intranode (all ndev GPUS)
int ibnvsize; // can be used to fake smaller or larger nvlink domain to use ib instead of nvlink
// or force MNNVL
int *send_id, *recv_id;
int mydev;
uint64_t ub_timeout;
};
typedef struct communicator communicator;
......@@ -185,23 +175,9 @@ int register_user_buffer_collective(void **gpubuff, size_t bytes, communicator *
SHARP and NSO/MNNVL)
*/
void allreduce_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream = 0);
// for DP distributed optimizer, only nonSHARP multinode is implemented & calls must come in pairs
// ordered
void allgather_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream = 0);
void reducescatter_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream = 0);
void allreduce2_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream = 0);
// for TP-parallelism, only single node is implemented
void allgather2_userbuff_inplace(const int handler, const int offset, const int elements,
communicator *comm, cudaStream_t stream = 0);
void allgather2_userbuff_inplace_sliced(const int handler, const int offset, const int elements,
communicator *comm, const int slice_id, const int nslices,
cudaStream_t stream = 0);
/*
each Rank input is
allgather2_userbuff_inplace: offset+myrank*elements
......@@ -231,14 +207,6 @@ void reducescatter2_userbuff_stridedoutput_fp8(void* output, float* scale, const
template<typename fp8type>
void reducescatter2_userbuff_fp8(void* output, float* scale, const int handler, const int offset,
const int elements, communicator* comm, cudaStream_t stream = 0);
#if 0
template<typename fp8type>
void reducescatter2_userbuff_strided_atomic_fp8(void* output, float *scale, const int handler,
const int offset, const int rowelements,
const int colelements, const int strideelements,
const int numchunks, void *counters,
communicator* comm, cudaStream_t stream = 0);
#endif
template<typename fp8type>
void reducescatter2_userbuff_strided_atomic_fp8(void* output, float *scale, const int handler,
const int offset, const int rowelements,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment