Commit c0dad530 authored by wangkaixiong's avatar wangkaixiong 🚴🏼
Browse files

init

parents
#!/bin/bash
mpirun -H master:8,node1:8 --prefix /opt/mpi -np 16 --allow-run-as-root \
--output-filename t1 \
-x NCCL_DEBUG=TRACE \
-x UCX_NET_DEVICES=mlx5_2:1 \
-x NCCL_TOPO_FILE=/data1/sunzhq/rccl-tests-develop/topo-0507-115-real.xml \
-x NCCL_NET_GDR_READ=1 \
./build2/all_reduce_perf -b 7618 -e 1G -f 2 -g 1
#!/bin/bash
export NCCL_TOPO_DUMP_FILE=./topo-dump.xml
#-x NCCL_TOPO_FILE=./topo.xml \
mpirun -H master:8,node1:8 --prefix /opt/mpi -np 16 --allow-run-as-root \
--mca plm_rsh_args "-p 2222" \
--mca btl_tcp_if_include p14p2 \
-x ROCM_PATH -x LD_LIBRARY_PATH \
-x NCCL_TOPO_FILE=./topo-0507-115-update.xml \
-x NCCL_DEBUG=WARN \
-x NCCL_SOCKET_IFNAME=p14p2 \
-x HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 \
-x NCCL_IB_HCA=mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_1:1,mlx5_10:1,mlx5_7:1,mlx5_8:1,mlx5_9:1 \
./build/alltoall_perf -b 7618 -e 1G -f 2 -g 1 -d half
#-x NCCL_TOPO_FILE=/data1/sunzhq/rccl-tests-develop/topo-0507-115-real.xml \
#-x NCCL_IB_QP_PER_CONNECTION=4 \
#-x NCCL_IB_HCA=mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_1:1,mlx5_10:1,mlx5_7:1,mlx5_8:1,mlx5_9:1 \
# -x NCCL_IB_HCA=mlx5_1,mlx5_2,mlx5_3,mlx5_4,mlx5_7,mlx5_8,mlx5_9,mlx5_10 \
# --mca plm rsh \
# --mca plm_rsh_agent ssh \
# -x NCCL_GRAPH_FILE=./topo.xml \
# -x NCCL_ALGO=tree,ring \
# --mca plm_rsh_args "-2 -o StrictHostKeyChecking=no" \ # 使用 SSH 通信
# -x NCCL_DEBUG=INFO # 输出调试日志,定位问题
# -x NCCL_IB_GID_INDEX=3 # 若 IB 多子网,指定 GID 索引
mpirun --prefix /opt/mpi -np 16 -H master:8,node1:8 --allow-run-as-root --mca plm_rsh_args "-p 2222" --mca btl_tcp_if_include p14p2 -x ROCM_PATH -x LD_LIBRARY_PATH ./runrccl.sh
#!/bin/bash
#export NCCL_GRAPH_DUMP_FILE=./graph-dump.xml
#export NCCL_TOPO_DUMP_FILE=./topo-0515-dump.xml
#export NCCL_TOPO_DUMP_FILE=./topo-wkx-exp.xml
#export NCCL_MAX_NCHANNELS=24
export NCCL_MIN_NCHANNELS=32
export NCCL_MIN_P2P_NCHANNELS=32
#export NCCL_P2P_LEVEL=7
export NCCL_ALGO=Ring
#export NCCL_MIN_P2P_NCHANNELS=24
#export NCCL_NCHANNELS_PER_PEER=24
export HSA_FORCE_FINE_GRAIN_PCIE=1
export NCCL_SOCKET_IFNAME=p14p2
export HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
#export NCCL_TOPO_FILE=./topo-0507-115-update.xml
export NCCL_P2P_LEVEL=SYS
export NCCL_TOPO_FILE=./topo-BW-0520.xml
export RCCL_SDMA_COUNT_ENABLE=1
export RCCL_SDMA_COPY_ENABLE=0
export RCCL_COLL_XHCL_CHANNEL_NUM=28 \
#export NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1
#export NCCL_TOPO_FILE=null
#export NCCL_GRAPH_FILE=./graph_debug.xml
#export NCCL_DEBUG=TRACE
#export NCCL_SHM_DISABLE=1
#export NCCL_P2P_DISABLE=1
#export NCCL_GRAPH_FILE=./graph_debug.xml
#export NCCL_IB_HCA=mlx5_7:1,mlx5_9:1,mlx5_10:1 # MLX5_10 没流量
#export NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1 # MLX5_10 没流量
#./build/alltoall_perf -b 2 -e 2G -f 2 -g 1
./build/all_reduce_perf -b 256M -e 256M -f 2 -g 8
#./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#mpirun -np 16 -H master:1,node1:8 --allow-run-as-root -x NCCL_TOPO_FILE=./topo-0507-115-real.xml --mca plm_rsh_args "-p 2222" -x NCCL_MAX_NCHANNELS=20 -x NCCL_MIN_NCHANNELS=20 -x NCCL_P2P_LEVEL=SYS -x NCCL_ALGO=Ring -x NCCL_MIN_P2P_NCHANNELS=20 -x NCCL_NCHANNELS_PER_PEER=20 -x HSA_FORCE_FINE_GRAIN_PCIE=1 -x HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 ./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#mpirun -np 8 --allow-run-as-root -x NCCL_TOPO_FILE=./topo-0507-115-real.xml -x NCCL_MAX_NCHANNELS=20 -x NCCL_MIN_NCHANNELS=20 -x NCCL_P2P_LEVEL=SYS -x NCCL_ALGO=Ring -x HSA_FORCE_FINE_GRAIN_PCIE=1 -x HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 ./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#-x NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1 \
#-x NCCL_TOPO_FILE=/data1/sunzhq/rccl-tests-develop/topo-0507-115-real.xml \
#-x NCCL_IB_QP_PER_CONNECTION=4 \
#-x NCCL_IB_HCA=mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_1:1,mlx5_10:1,mlx5_7:1,mlx5_8:1,mlx5_9:1 \
# -x NCCL_IB_HCA=mlx5_1,mlx5_2,mlx5_3,mlx5_4,mlx5_7,mlx5_8,mlx5_9,mlx5_10 \
# --mca plm rsh \
# --mca plm_rsh_agent ssh \
# -x NCCL_GRAPH_FILE=./topo.xml \
# -x NCCL_ALGO=tree,ring \
# --mca plm_rsh_args "-2 -o StrictHostKeyChecking=no" \ # 使用 SSH 通信
# -x NCCL_DEBUG=INFO # 输出调试日志,定位问题
# -x NCCL_IB_GID_INDEX=3 # 若 IB 多子网,指定 GID 索引
#!/bin/bash
#export NCCL_GRAPH_DUMP_FILE=./graph-dump.xml
#export NCCL_TOPO_DUMP_FILE=./topo-0515-dump.xml
#export NCCL_TOPO_DUMP_FILE=topo-tujie-test.xml #./topo-wkx-exp.xml
#export NCCL_MAX_NCHANNELS=24
export NCCL_MIN_NCHANNELS=24
export NCCL_MIN_P2P_NCHANNELS=24
#export NCCL_P2P_LEVEL=7
#export NCCL_ALGO=Ring
#export NCCL_MIN_P2P_NCHANNELS=24
#export NCCL_NCHANNELS_PER_PEER=24
export HSA_FORCE_FINE_GRAIN_PCIE=1
export NCCL_SOCKET_IFNAME=p14p2
export HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
#export NCCL_TOPO_FILE=./topo-0507-115-update.xml
export NCCL_P2P_LEVEL=SYS
export NCCL_TOPO_FILE=./topo-BW-0520.xml
#export NCCL_TOPO_FILE=/data1/sunzhq/tujie/rccl-test/rccl/topo-0507_0520.xml
#export RCCL_SDMA_COUNT_ENABLE=1
#export RCCL_SDMA_COPY_ENABLE=0
#export RCCL_COLL_XHCL_CHANNEL_NUM=28 \
#export NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1
#export NCCL_TOPO_FILE=null
#export NCCL_GRAPH_FILE=./graph_debug.xml
#export NCCL_DEBUG=TRACE
#export NCCL_SHM_DISABLE=1
#export NCCL_P2P_DISABLE=1
#export NCCL_GRAPH_FILE=./graph_debug.xml
#export NCCL_IB_HCA=mlx5_7:1,mlx5_9:1,mlx5_10:1 # MLX5_10 没流量
#export NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1 # MLX5_10 没流量
#./build/alltoall_perf -b 2 -e 2G -f 2 -g 1
./build/all_reduce_perf -b 256M -e 256M -f 2 -g 1
#./build/sendrecv_perf -b 2 -e 1g -f 2 -g 1
#./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#mpirun -np 16 -H master:1,node1:8 --allow-run-as-root -x NCCL_TOPO_FILE=./topo-0507-115-real.xml --mca plm_rsh_args "-p 2222" -x NCCL_MAX_NCHANNELS=20 -x NCCL_MIN_NCHANNELS=20 -x NCCL_P2P_LEVEL=SYS -x NCCL_ALGO=Ring -x NCCL_MIN_P2P_NCHANNELS=20 -x NCCL_NCHANNELS_PER_PEER=20 -x HSA_FORCE_FINE_GRAIN_PCIE=1 -x HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 ./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#mpirun -np 8 --allow-run-as-root -x NCCL_TOPO_FILE=./topo-0507-115-real.xml -x NCCL_MAX_NCHANNELS=20 -x NCCL_MIN_NCHANNELS=20 -x NCCL_P2P_LEVEL=SYS -x NCCL_ALGO=Ring -x HSA_FORCE_FINE_GRAIN_PCIE=1 -x HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 ./build/all_reduce_perf -b 2 -e 2G -f 2 -g 1
#-x NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_7:1,mlx5_8:1,mlx5_9:1,mlx5_10:1 \
#-x NCCL_TOPO_FILE=/data1/sunzhq/rccl-tests-develop/topo-0507-115-real.xml \
#-x NCCL_IB_QP_PER_CONNECTION=4 \
#-x NCCL_IB_HCA=mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_1:1,mlx5_10:1,mlx5_7:1,mlx5_8:1,mlx5_9:1 \
# -x NCCL_IB_HCA=mlx5_1,mlx5_2,mlx5_3,mlx5_4,mlx5_7,mlx5_8,mlx5_9,mlx5_10 \
# --mca plm rsh \
# --mca plm_rsh_agent ssh \
# -x NCCL_GRAPH_FILE=./topo.xml \
# -x NCCL_ALGO=tree,ring \
# --mca plm_rsh_args "-2 -o StrictHostKeyChecking=no" \ # 使用 SSH 通信
# -x NCCL_DEBUG=INFO # 输出调试日志,定位问题
# -x NCCL_IB_GID_INDEX=3 # 若 IB 多子网,指定 GID 索引
# ########################################################################
# Copyright 2022 Advanced Micro Devices, Inc.
# ########################################################################
# Compile common object library
set_property(SOURCE common.cu timer.cc ../verifiable/verifiable.cu PROPERTY LANGUAGE CXX)
add_library(rccl_common OBJECT common.cu timer.cc ../verifiable/verifiable.cu)
target_link_libraries(rccl_common roc::rccl hip::device)
if(USE_MPI)
target_link_libraries(rccl_common MPI::MPI_CXX)
endif()
function(add_relative_test test_name test_target)
get_target_property(EXE_PATH ${test_target} RUNTIME_OUTPUT_DIRECTORY)
if(EXE_PATH STREQUAL "EXE_PATH-NOTFOUND")
set(EXE_PATH ".")
endif()
get_filename_component(EXE_PATH "${EXE_PATH}" ABSOLUTE BASE_DIR "${CMAKE_CURRENT_BINARY_DIR}")
get_target_property(EXE_NAME ${test_target} RUNTIME_OUTPUT_NAME)
if(EXE_NAME STREQUAL "EXE_NAME-NOTFOUND")
get_target_property(EXE_NAME ${test_target} OUTPUT_NAME)
if(EXE_NAME STREQUAL "EXE_NAME-NOTFOUND")
set(EXE_NAME "${test_target}")
endif()
endif()
file(RELATIVE_PATH rel_path "${CMAKE_CURRENT_BINARY_DIR}" "${EXE_PATH}/${EXE_NAME}")
add_test(NAME "${test_name}" COMMAND "./${rel_path}")
endfunction()
function(add_rccl_test TEST)
set(TEST_SOURCE "${TEST}.cu")
set_property(SOURCE ${TEST_SOURCE} PROPERTY LANGUAGE CXX)
set(TEST_TARGET "${TEST}_perf")
add_executable(${TEST_TARGET} ${TEST_SOURCE})
target_link_libraries(
${TEST_TARGET}
PRIVATE
rccl_common
)
set_target_properties(
${TEST_TARGET}
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}"
# LINKER_LANGUAGE CXX
)
add_relative_test(${TEST} ${TEST_TARGET})
rocm_install(TARGETS ${TEST_TARGET})
# TODO: copy/install DLLs on Windows
set_target_properties(
${TEST_TARGET} PROPERTIES
INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib;${ROCM_PATH}/lib"
)
endfunction()
add_rccl_test(all_gather)
add_rccl_test(all_reduce)
add_rccl_test(alltoall)
add_rccl_test(alltoallv)
add_rccl_test(broadcast)
add_rccl_test(gather)
add_rccl_test(hypercube)
add_rccl_test(reduce_scatter)
add_rccl_test(reduce)
add_rccl_test(scatter)
add_rccl_test(sendrecv)
#
# Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
# Modifications are Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
#
# See LICENSE.txt for license information
#
ROCM_PATH ?= /opt/rocm
MPI_HOME ?= /usr/lib/openmpi
PREFIX ?= /usr/local
VERBOSE ?= 0
DEBUG ?= 0
NCCL_HOME ?= ""
HIPCC = $(ROCM_PATH)/bin/hipcc
CXX = $(HIPCC)
HIPCUFLAGS := -std=c++14
LDFLAGS :=
HIPLDFLAGS :=
ifneq ($(NCCL_HOME), "")
HIPCUFLAGS += -I$(NCCL_HOME)/ -I$(NCCL_HOME)/include
HIPLDFLAGS += -Wl,-rpath,$(NCCL_HOME) -L$(NCCL_HOME) -L$(NCCL_HOME)/lib
endif
HIPCUFLAGS += -I$(ROCM_PATH)/include
HIPCUFLAGS += -I$(ROCM_PATH)/include/hip
LDFLAGS += -L$(ROCM_PATH)/lib -lhsa-runtime64 -lrt
HIPLDFLAGS += $(CUSTOM_RCCL_LIB) -L$(ROCM_PATH)/lib -lhsa-runtime64 -lrt -pthread
ifeq ($(DEBUG), 0)
HIPCUFLAGS += -O3
else
HIPCUFLAGS += -O0 -g -ggdb3
endif
ifeq ($(VERBOSE), 0)
.SILENT:
endif
.PHONY: build clean
BUILDDIR ?= ../build
ifeq ($(MPI), 1)
HIPCUFLAGS += -DMPI_SUPPORT -I${MPI_HOME}/include -I${MPI_HOME}/include/mpi
HIPLDFLAGS += -L${MPI_HOME}/lib -lmpi
else ifeq ($(MPICH), 1)
HIPCUFLAGS += -DMPI_SUPPORT -I/usr/include/mpich -I/usr/include/x86_64-linux-gnu/mpich
HIPLDFLAGS += -L/usr/lib -lmpich
endif
LIBRARIES += rccl
HIPLDFLAGS += $(LIBRARIES:%=-l%)
DST_DIR := $(BUILDDIR)
SRC_FILES := $(wildcard *.cu)
OBJ_FILES := $(SRC_FILES:%.cu=${DST_DIR}/%.o)
BIN_FILES_LIST := all_reduce all_gather broadcast reduce_scatter reduce alltoall scatter gather sendrecv alltoallv
BIN_FILES := $(BIN_FILES_LIST:%=${DST_DIR}/%_perf)
build: ${BIN_FILES}
clean:
rm -rf ${DST_DIR}
TEST_VERIFIABLE_SRCDIR := ../verifiable
TEST_VERIFIABLE_BUILDDIR := $(BUILDDIR)/verifiable
include ../verifiable/verifiable.mk
${DST_DIR}/%.o: %.cu common.h $(TEST_VERIFIABLE_HDRS)
@printf "Compiling %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
echo "$(HIPCC) -o $@ $(HIPCUFLAGS) -c $<"
$(HIPCC) -o $@ $(HIPCUFLAGS) -c $<
${DST_DIR}/timer.o: timer.cc timer.h
@printf "Compiling %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(CXX) $(CXXFLAGS) -o $@ -c timer.cc
${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_OBJS)
@printf "Linking %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
echo "$(HIPCC) -o $@ $(HIPCUFLAGS) $^ ${HIPLDFLAGS}"
$(HIPCC) -o $@ $(HIPCUFLAGS) $^ ${HIPLDFLAGS}
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
#define ALIGN 4
void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
size_t base = (count/(ALIGN*nranks))*ALIGN;
*sendcount = base;
*recvcount = base*nranks;
*sendInplaceOffset = base;
*recvInplaceOffset = 0;
*paramcount = base;
}
testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(nranks - 1))/((double)nranks);
*busBw = baseBw * factor;
}
testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream));
return testSuccess;
}
struct testColl allGatherTest = {
"AllGather",
AllGatherGetCollByteCount,
AllGatherInitData,
AllGatherGetBw,
AllGatherRunColl
};
void AllGatherGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AllGatherGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &allGatherTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "none", -1));
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
AllGatherGetBuffSize,
AllGatherRunTest
};
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = *sendcount;
}
testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k = 0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(2*(nranks - 1)))/((double)nranks);
*busBw = baseBw * factor;
}
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream));
return testSuccess;
}
struct testColl allReduceTest = {
"AllReduce",
AllReduceGetCollByteCount,
AllReduceInitData,
AllReduceGetBw,
AllReduceRunColl
};
void AllReduceGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AllReduceGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &allReduceTest;
ncclDataType_t *run_types;
ncclRedOp_t *run_ops;
const char **run_typenames, **run_opnames;
int type_count, op_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
if ((int)op != -1) {
op_count = 1;
run_ops = &op;
run_opnames = &opName;
} else {
op_count = test_opnum;
run_ops = test_ops;
run_opnames = test_opnames;
}
for (int i=0; i<type_count; i++) {
for (int j=0; j<op_count; j++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], -1));
}
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
AllReduceGetBuffSize,
AllReduceRunTest
};
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = (count/nranks)*nranks;
*recvcount = (count/nranks)*nranks;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = count/nranks;
}
testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
size_t partcount = sendcount/nranks;
TESTCHECK(InitData(((char*)args->expected[k])+ j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
// We don't support in-place alltoall
args->reportErrors = in_place ? 0 : 1;
return testSuccess;
}
void AlltoAllGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * nranks * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(nranks-1))/((double)(nranks));
*busBw = baseBw * factor;
}
testResult_t AlltoAllRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
NCCLCHECK(ncclAllToAll(sendbuff, recvbuff, count, type, comm, stream));
return testSuccess;
}
struct testColl alltoAllTest = {
"AlltoAll",
AlltoAllGetCollByteCount,
AlltoAllInitData,
AlltoAllGetBw,
AlltoAllRunColl
};
void AlltoAllGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AlltoAllGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AlltoAllRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &alltoAllTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "none", -1));
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
AlltoAllGetBuffSize,
AlltoAllRunTest
};
/*************************************************************************
* Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
#define USE_RCCL_GATHER_SCATTER
void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
if (count < nranks*nranks/2) {
*sendcount = 0;
*recvcount = 0;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = 0;
} else {
*sendcount = (count/nranks)*nranks;
*recvcount = (count/nranks)*nranks;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = count/nranks;
}
}
testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep+rank, 1, 0));
#if 0
int *dataHost = (int *)malloc(args->sendBytes);
hipMemcpy(dataHost, data, args->sendBytes, hipMemcpyDeviceToHost);
printf(" Rank [%d] Original: ", rank);
for(int j=0; j<sendcount; j++) {
printf("%d:%d ", j, dataHost[j]);
}
printf("\n");
free(dataHost);
#endif
size_t rdisp = 0;
size_t data_count = sendcount*2/nranks;
size_t chunksize = data_count/nranks;
for (int j=0; j<nranks; j++) {
size_t scount = 0, rcount = ((j+rank)%nranks)*chunksize;
if ((j+rank)%nranks == 0)
rcount += (sendcount-chunksize*(nranks-1)*nranks/2);
size_t sdisp = 0;
for (int kk=0; kk<nranks; kk++) {
scount = ((kk+j)%nranks)*chunksize;
if ((kk+j)%nranks == 0)
scount += (sendcount-chunksize*(nranks-1)*nranks/2);
if (kk == rank)
break;
sdisp += scount;
}
TESTCHECK(InitData(((char*)args->expected[k])+rdisp*wordSize(type), rcount, sdisp, type, ncclSum, 33*rep+j, 1, 0));
rdisp += rcount;
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
// We don't support in-place alltoall
args->reportErrors = in_place ? 0 : 1;
return testSuccess;
}
void AlltoAllvGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * nranks * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(nranks-1))/((double)(nranks));
*busBw = baseBw * factor;
}
testResult_t AlltoAllvRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
int nranks;
NCCLCHECK(ncclCommCount(comm, &nranks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
if (count == 0) return testSuccess;
size_t *sendcounts, *recvcounts, *sdispls, *rdispls;
sendcounts = (size_t *)malloc(nranks*nranks*sizeof(size_t));
recvcounts = (size_t *)malloc(nranks*nranks*sizeof(size_t));
sdispls = (size_t *)malloc(nranks*nranks*sizeof(size_t));
rdispls = (size_t *)malloc(nranks*nranks*sizeof(size_t));
if (sendcounts == nullptr || recvcounts == nullptr || sdispls == nullptr || rdispls == nullptr) {
printf("failed to allocate buffers for alltoallv\n");
return testNcclError;
}
size_t disp = 0;
size_t chunksize = count*2/nranks;
for (int i = 0; i < nranks; i++) {
size_t scount = ((i+rank)%nranks)*chunksize;
if ((i+rank)%nranks == 0)
scount += (count*nranks-chunksize*(nranks-1)*nranks/2);
sendcounts[i+rank*nranks] = recvcounts[i+rank*nranks] = scount;
sdispls[i+rank*nranks] = rdispls[i+rank*nranks] = disp;
disp += scount;
//printf("%d->%d: sendcounts/recvcounts %lx sdispls/rdispls %lx\n", rank, i, sendcounts[i+rank*nranks]*wordSize(type), sdispls[i+rank*nranks]*wordSize(type));
}
#if NCCL_MAJOR < 2 || NCCL_MINOR < 7
printf("NCCL 2.7 or later is needed for alltoallv. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#else
#if defined(RCCL_ALLTOALLV) && defined(USE_RCCL_GATHER_SCATTER)
NCCLCHECK(ncclAllToAllv(sendbuff, sendcounts+rank*nranks, sdispls+rank*nranks, recvbuff, recvcounts+rank*nranks, rdispls+rank*nranks, type, comm, stream));
#else
NCCLCHECK(ncclGroupStart());
for (int r=0; r<nranks; r++) {
if (sendcounts[r+rank*nranks] != 0) {
NCCLCHECK(ncclSend(
((char*)sendbuff) + sdispls[r+rank*nranks] * wordSize(type),
sendcounts[r+rank*nranks],
type,
r,
comm,
stream));
}
if (recvcounts[r+rank*nranks] != 0) {
NCCLCHECK(ncclRecv(
((char*)recvbuff) + rdispls[r+rank*nranks] * wordSize(type),
recvcounts[r+rank*nranks],
type,
r,
comm,
stream));
}
}
NCCLCHECK(ncclGroupEnd());
#endif
#endif
free(sendcounts);
free(recvcounts);
free(sdispls);
free(rdispls);
return testSuccess;
}
struct testColl alltoAllTest = {
"AlltoAllv",
AlltoAllvGetCollByteCount,
AlltoAllvInitData,
AlltoAllvGetBw,
AlltoAllvRunColl
};
void AlltoAllvGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
AlltoAllvGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t AlltoAllvRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &alltoAllTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = ncclNumTypes;
run_types = test_types;
run_typenames = test_typenames;
}
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", -1));
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
AlltoAllvGetBuffSize,
AlltoAllvRunTest
};
/*************************************************************************
* Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
void BroadcastGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = *sendcount;
}
testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[k], recvcount, 0, type, ncclSum, rep, 1, 0));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void BroadcastGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = 1;
*busBw = baseBw * factor;
}
testResult_t BroadcastRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
#if NCCL_MAJOR >= 2 && NCCL_MINOR >= 2
NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, type, root, comm, stream));
#else
if (rank == root) {
NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream));
} else {
NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream));
}
#endif
return testSuccess;
}
struct testColl broadcastTest = {
"Broadcast",
BroadcastGetCollByteCount,
BroadcastInitData,
BroadcastGetBw,
BroadcastRunColl
};
void BroadcastGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
BroadcastGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &broadcastTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
int begin_root, end_root;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
if (root != -1) {
begin_root = end_root = root;
} else {
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}
for (int i=0; i<type_count; i++) {
for (int j=begin_root; j<=end_root; j++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "none", j));
}
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
BroadcastGetBuffSize,
BroadcastRunTest
};
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "hip/hip_runtime.h"
#include "rccl_bfloat16.h"
#include "common.h"
#include <pthread.h>
#include <cstdio>
#include <type_traits>
#include <getopt.h>
#include <libgen.h>
//#define DEBUG_PRINT
#include "../verifiable/verifiable.h"
int test_ncclVersion = 0; // init'd with ncclGetVersion()
#if NCCL_MAJOR >= 2
ncclDataType_t test_types[ncclNumTypes] = {
ncclInt8, ncclUint8, ncclInt32, ncclUint32, ncclInt64, ncclUint64, ncclHalf, ncclFloat, ncclDouble
#if RCCL_BFLOAT16 == 1
, ncclBfloat16
#endif
};
const char *test_typenames[ncclNumTypes] = {
"int8", "uint8", "int32", "uint32", "int64", "uint64", "half", "float", "double"
#if RCCL_BFLOAT16 == 1
, "bfloat16"
#endif
};
int test_typenum = -1;
const char *test_opnames[] = {"sum", "prod", "max", "min", "avg", "mulsum"};
ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
, ncclAvg
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
, ncclNumOps // stand in for ncclRedOpCreatePreMulSum() created on-demand
#endif
};
int test_opnum = -1;
#else
ncclDataType_t test_types[ncclNumTypes] = {ncclChar, ncclInt, ncclHalf, ncclFloat, ncclDouble, ncclInt64, ncclUint64};
const char *test_typenames[ncclNumTypes] = {"char", "int", "half", "float", "double", "int64", "uint64"};
int test_typenum = 7;
const char *test_opnames[] = {"sum", "prod", "max", "min"};
ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin};
int test_opnum = 4;
#endif
const char *test_memorytypes[nccl_NUM_MTYPES] = {"coarse", "fine", "host", "managed"};
// For libnccl's < 2.13
extern "C" __attribute__((weak)) char const* ncclGetLastError(ncclComm_t comm) {
return "";
}
int is_main_proc = 0;
thread_local int is_main_thread = 0;
// Command line parameter defaults
static int nThreads = 1;
static int nGpus = 1;
static size_t minBytes = 32*1024*1024;
static size_t maxBytes = 32*1024*1024;
static size_t stepBytes = 1*1024*1024;
static size_t stepFactor = 1;
static int datacheck = 1;
static int warmup_iters = 5;
static int iters = 20;
static int agg_iters = 1;
static int ncclop = ncclSum;
static int nccltype = ncclFloat;
static int ncclroot = 0;
static int parallel_init = 0;
static int blocking_coll = 0;
static int memorytype = 0;
static int stress_cycles = 1;
static uint32_t cumask[4];
static int streamnull = 0;
static int timeout = 0;
static int cudaGraphLaunches = 0;
static int report_cputime = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
static int numDevices = 1;
static int ranksPerGpu = 1;
static int enable_multiranks = 0;
static int delay_inout_place = 0;
static int enable_out_of_place = 1;
#define NUM_BLOCKS 32
static double parsesize(const char *value) {
long long int units;
double size;
char size_lit;
int count = sscanf(value, "%lf %1s", &size, &size_lit);
switch (count) {
case 2:
switch (size_lit) {
case 'G':
case 'g':
units = 1024*1024*1024;
break;
case 'M':
case 'm':
units = 1024*1024;
break;
case 'K':
case 'k':
units = 1024;
break;
default:
return -1.0;
};
break;
case 1:
units = 1;
break;
default:
return -1.0;
}
return size * units;
}
static bool minReqVersion(int rmajor, int rminor, int rpatch)
{
int version;
int major, minor, patch, rem;
ncclGetVersion(&version);
if (version < 10000) {
major = version/1000;
rem = version%1000;
minor = rem/100;
patch = rem%100;
}
else {
major = version/10000;
rem = version%10000;
minor = rem/100;
patch = rem%100;
}
if (major < rmajor) return false;
else if (major > rmajor) return true;
// major == rmajor
if (minor < rminor) return false;
else if (minor > rminor) return true;
// major == rmajor && minor == rminor
if (patch < rpatch) return false;
return true;
}
testResult_t CheckDelta(void* results, void* expected, size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int64_t *wrongEltN) {
ncclVerifiableVerify(results, expected, count, (int)type, (int)op, nranks, seed, offset, wrongEltN, hipStreamDefault);
HIPCHECK(hipDeviceSynchronize());
return testSuccess;
}
testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks) {
ncclVerifiablePrepareExpected(data, count, (int)type, (int)op, nranks, seed, offset, hipStreamDefault);
return testSuccess;
}
testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int rank) {
ncclVerifiablePrepareInput(data, count, (int)type, (int)op, nranks, rank, seed, offset, hipStreamDefault);
return testSuccess;
}
void Barrier(struct threadArgs *args) {
thread_local int epoch = 0;
static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER};
static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER};
static int counter[2] = {0, 0};
pthread_mutex_lock(&lock[epoch]);
if(++counter[epoch] == args->nThreads)
pthread_cond_broadcast(&cond[epoch]);
if(args->thread+1 == args->nThreads) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
MPI_Barrier(MPI_COMM_WORLD);
#endif
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
else {
while(counter[epoch] != 0)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
}
pthread_mutex_unlock(&lock[epoch]);
epoch ^= 1;
}
// Inter-thread/process barrier+allreduce. The quality of the return value
// for average=0 (which means broadcast from rank=0) is dubious. The returned
// value will actually be the result of process-local broadcast from the local thread=0.
template<typename T>
void Allreduce(struct threadArgs* args, T* value, int average) {
thread_local int epoch = 0;
static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER};
static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER};
static T accumulator[2];
static int counter[2] = {0, 0};
pthread_mutex_lock(&lock[epoch]);
if(counter[epoch] == 0) {
if(average != 0 || args->thread == 0) accumulator[epoch] = *value;
} else {
switch(average) {
case /*r0*/ 0: if(args->thread == 0) accumulator[epoch] = *value; break;
case /*avg*/1: accumulator[epoch] += *value; break;
case /*min*/2: accumulator[epoch] = std::min<T>(accumulator[epoch], *value); break;
case /*max*/3: accumulator[epoch] = std::max<T>(accumulator[epoch], *value); break;
case /*sum*/4: accumulator[epoch] += *value; break;
}
}
if(++counter[epoch] == args->nThreads)
pthread_cond_broadcast(&cond[epoch]);
if(args->thread+1 == args->nThreads) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
if(average != 0) {
static_assert(std::is_same<T, long long>::value || std::is_same<T, double>::value, "Allreduce<T> only for T in {long long, double}");
MPI_Datatype ty = std::is_same<T, long long>::value ? MPI_LONG_LONG :
std::is_same<T, double>::value ? MPI_DOUBLE :
MPI_Datatype();
MPI_Op op = average == 1 ? MPI_SUM :
average == 2 ? MPI_MIN :
average == 3 ? MPI_MAX :
average == 4 ? MPI_SUM : MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD);
}
#endif
if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads;
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
else {
while(counter[epoch] != 0)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
}
pthread_mutex_unlock(&lock[epoch]);
*value = accumulator[epoch];
epoch ^= 1;
}
testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int64_t *wrongElts) {
int nranks = args->nProcs*args->nGpus*args->nThreads;
size_t count = args->expectedBytes/wordSize(type);
int64_t *wrongPerGpu = nullptr;
HIPCHECK(hipHostMalloc((void**)&wrongPerGpu, args->nGpus*sizeof(int64_t), hipHostMallocMapped));
for (int i=0; i<args->nGpus*args->nRanks; i++) {
int device;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
NCCLCHECK(ncclCommCuDevice(args->comms[i], &device));
HIPCHECK(hipSetDevice(device));
void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i];
TESTCHECK(CheckDelta(data, args->expected[i], count, 0, type, op, 0, nranks, wrongPerGpu+i));
#if 1 && DEBUG_PRINT
if (args->reportErrors && wrongPerGpu[i] != 0) {
printf("rank=%d #wrong=%d\n", rank, (int)wrongPerGpu[i]);
char *expectedHost = (char*)malloc(args->expectedBytes);
char *dataHost = (char*)malloc(args->expectedBytes);
int eltsz = wordSize(type);
hipMemcpy(expectedHost, args->expected[i], args->expectedBytes, hipMemcpyDeviceToHost);
hipMemcpy(dataHost, data, args->expectedBytes, hipMemcpyDeviceToHost);
for(int j=0; j<args->expectedBytes/eltsz; j++) {
unsigned long long want, got;
want = 0;
memcpy(&want, expectedHost + j*eltsz, eltsz);
got = 0;
memcpy(&got, dataHost + j*eltsz, eltsz);
if(want != got) {
printf(" rank=%d elt[%d]: want=0x%llx got=0x%llx\n", rank, j, want, got);
}
}
free(expectedHost);
free(dataHost);
}
#endif
}
*wrongElts = 0;
for (int i=0; i < args->nGpus; i++) *wrongElts += wrongPerGpu[i];
hipHostFree(wrongPerGpu);
if (args->reportErrors && *wrongElts) args->errors[0]++;
return testSuccess;
}
testResult_t testStreamSynchronize(int nStreams, hipStream_t* streams, ncclComm_t* comms) {
hipError_t hipErr;
int remaining = nStreams;
int* done = (int*)malloc(sizeof(int)*nStreams);
memset(done, 0, sizeof(int)*nStreams);
timer tim;
while (remaining) {
int idle = 1;
for (int i=0; i<nStreams; i++) {
if (done[i]) continue;
hipErr = hipStreamQuery(streams[i]);
if (hipErr == hipSuccess) {
done[i] = 1;
remaining--;
idle = 0;
continue;
}
if (hipErr != hipErrorNotReady) HIPCHECK(hipErr);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,4,0)
if (test_ncclVersion >= NCCL_VERSION(2,4,0) && comms) {
ncclResult_t ncclAsyncErr;
NCCLCHECK(ncclCommGetAsyncError(comms[i], &ncclAsyncErr));
if (ncclAsyncErr != ncclSuccess) {
// An asynchronous error happened. Stop the operation and destroy
// the communicator
for (int i=0; i<nStreams; i++)
NCCLCHECK(ncclCommAbort(comms[i]));
// Abort the perf test
NCCLCHECK(ncclAsyncErr);
}
}
double delta = tim.elapsed();
if (delta > timeout && timeout > 0) {
for (int i=0; i<nStreams; i++)
NCCLCHECK(ncclCommAbort(comms[i]));
char hostname[1024];
getHostName(hostname, 1024);
printf("%s: Test timeout (%ds) %s:%d\n",
hostname,
timeout,
__FILE__,__LINE__);
free(done);
return testTimeout;
}
#endif
}
// We might want to let other threads (including NCCL threads) use the CPU.
if (idle) sched_yield();
}
free(done);
return testSuccess;
}
testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t opIndex, int root, int in_place, int iter) {
size_t count = args->nbytes / wordSize(type);
// Try to change offset for each iteration so that we avoid cache effects and catch race conditions in ptrExchange
size_t totalnbytes = std::max(args->sendBytes, args->expectedBytes);
size_t steps = totalnbytes ? args->maxbytes / totalnbytes : 1;
size_t shift = totalnbytes * (iter % steps);
if (args->nGpus> 1 || args->nRanks > 1) NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus*args->nRanks; i++) {
#ifndef NCCL_MAJOR
int hipDev;
NCCLCHECK(ncclCommCuDevice(args->comms[i], &hipDev));
HIPCHECK(hipSetDevice(hipDev));
#endif
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i);
char* recvBuff = ((char*)args->recvbuffs[i]) + shift;
char* sendBuff = ((char*)args->sendbuffs[i]) + shift;
ncclRedOp_t op;
if(opIndex < ncclNumOps) {
op = opIndex;
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
else {
union {
int8_t i8; uint8_t u8; int32_t i32; uint32_t u32; int64_t i64; uint64_t u64;
half f16; float f32; double f64;
#if defined(RCCL_BFLOAT16)
rccl_bfloat16 bf16;
#endif
};
switch(type) {
case ncclInt8: i8 = ncclVerifiablePremulScalar<int8_t>(rank); break;
case ncclUint8: u8 = ncclVerifiablePremulScalar<uint8_t>(rank); break;
case ncclInt32: i32 = ncclVerifiablePremulScalar<int32_t>(rank); break;
case ncclUint32: u32 = ncclVerifiablePremulScalar<uint32_t>(rank); break;
case ncclInt64: i64 = ncclVerifiablePremulScalar<int64_t>(rank); break;
case ncclUint64: u64 = ncclVerifiablePremulScalar<uint64_t>(rank); break;
case ncclFloat16: f16 = ncclVerifiablePremulScalar<half>(rank); break;
case ncclFloat32: f32 = ncclVerifiablePremulScalar<float>(rank); break;
case ncclFloat64: f64 = ncclVerifiablePremulScalar<double>(rank); break;
#if defined(RCCL_BFLOAT16)
case ncclBfloat16: bf16 = ncclVerifiablePremulScalar<rccl_bfloat16>(rank); break;
#endif
}
NCCLCHECK(ncclRedOpCreatePreMulSum(&op, &u64, type, ncclScalarHostImmediate, args->comms[i]));
}
#endif
TESTCHECK(args->collTest->runColl(
(void*)(in_place ? recvBuff + args->sendInplaceOffset*rank : sendBuff),
(void*)(in_place ? recvBuff + args->recvInplaceOffset*rank : recvBuff),
count, type, op, root, args->comms[i], args->streams[i]));
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
if(opIndex >= ncclNumOps) {
NCCLCHECK(ncclRedOpDestroy(op, args->comms[i]));
}
#endif
}
if (args->nGpus > 1 || args->nRanks > 1) NCCLCHECK(ncclGroupEnd());
if (blocking_coll) {
// Complete op before returning
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
}
if (blocking_coll) Barrier(args);
return testSuccess;
}
testResult_t completeColl(struct threadArgs* args) {
if (blocking_coll) return testSuccess;
TESTCHECK(testStreamSynchronize(args->nGpus*args->nRanks, args->streams, args->comms));
return testSuccess;
}
//RCCL: Revisit because of cudaGraphLaunches
testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
size_t count = args->nbytes / wordSize(type);
if (datacheck) {
// Initialize sendbuffs, recvbuffs and expected
TESTCHECK(args->collTest->initData(args, type, op, root, 99, in_place));
}
if (warmup_iters) {
// Sync
TESTCHECK(startColl(args, type, op, root, in_place, 0));
TESTCHECK(completeColl(args));
}
Barrier(args);
#if HIP_VERSION >= 50221310
hipGraph_t graphs[args->nGpus*args->nRanks];
hipGraphExec_t graphExec[args->nGpus*args->nRanks];
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
// Thread local mdoe is needed for:
// - Multi-thread mode: where graph capture and instantiation can happen concurrently across threads
// - P2P pre-connect: when there is no warm-up, P2P pre-connect is done during graph capture.
// Since pre-connect calls cudaMalloc, we cannot use global capture mode
HIPCHECK(hipStreamBeginCapture(args->streams[i], hipStreamCaptureModeThreadLocal));
}
}
#endif
// Performance Benchmark
timer tim;
for (int iter = 0; iter < iters; iter++) {
if (agg_iters>1) NCCLCHECK(ncclGroupStart());
for (int aiter = 0; aiter < agg_iters; aiter++) {
TESTCHECK(startColl(args, type, op, root, in_place, iter*agg_iters+aiter));
}
if (agg_iters>1) NCCLCHECK(ncclGroupEnd());
}
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Resync CPU, restart timing, launch cuda graph
Barrier(args);
tim.reset();
for (int l=0; l<cudaGraphLaunches; l++) {
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
}
#endif
double cputimeSec = tim.elapsed()/(iters*agg_iters);
TESTCHECK(completeColl(args));
double deltaSec = tim.elapsed();
deltaSec = deltaSec/(iters*agg_iters);
if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches;
Allreduce(args, &deltaSec, average);
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
}
#endif
double algBw, busBw;
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus*args->nRanks);
Barrier(args);
int64_t wrongElts = 0;
static __thread int rep = 0;
rep++;
if (datacheck) {
// Initialize sendbuffs, recvbuffs and expected
TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place));
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture for data check
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipStreamBeginCapture(args->streams[i], args->nThreads > 1 ? hipStreamCaptureModeThreadLocal : hipStreamCaptureModeGlobal));
}
}
#endif
//test validation in single itertion, should ideally be included into the multi-iteration run
TESTCHECK(startColl(args, type, op, root, in_place, 0));
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
// End cuda graph capture
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipStreamEndCapture(args->streams[i], graphs+i));
}
// Instantiate cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphInstantiate(graphExec+i, graphs[i], NULL, NULL, 0));
}
// Launch cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphLaunch(graphExec[i], args->streams[i]));
}
}
#endif
TESTCHECK(completeColl(args));
#if HIP_VERSION >= 50221310
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
for (int i=0; i<args->nGpus*args->nRanks; i++) {
HIPCHECK(hipGraphExecDestroy(graphExec[i]));
HIPCHECK(hipGraphDestroy(graphs[i]));
}
}
#endif
TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts));
//aggregate delta from all threads and procs
long long wrongElts1 = wrongElts;
Allreduce(args, &wrongElts1, /*sum*/4);
wrongElts = wrongElts1;
}
double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6;
char timeStr[100];
if (timeUsec >= 10000.0) {
sprintf(timeStr, "%7.0f", timeUsec);
} else if (timeUsec >= 100.0) {
sprintf(timeStr, "%7.1f", timeUsec);
} else {
sprintf(timeStr, "%7.2f", timeUsec);
}
if (args->reportErrors) {
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
args->bw[0] += busBw;
args->bw_count[0]++;
return testSuccess;
}
void setupArgs(size_t size, ncclDataType_t type, struct threadArgs* args) {
int nranks = args->nProcs*args->nGpus*args->nThreads*args->nRanks;
size_t count, sendCount, recvCount, paramCount, sendInplaceOffset, recvInplaceOffset;
count = size / wordSize(type);
args->collTest->getCollByteCount(&sendCount, &recvCount, &paramCount, &sendInplaceOffset, &recvInplaceOffset, (size_t)count, (size_t)nranks);
args->nbytes = paramCount * wordSize(type);
args->sendBytes = sendCount * wordSize(type);
args->expectedBytes = recvCount * wordSize(type);
args->sendInplaceOffset = sendInplaceOffset * wordSize(type);
args->recvInplaceOffset = recvInplaceOffset * wordSize(type);
}
testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) {
// Sync to avoid first-call timeout
Barrier(args);
// Warm-up for large size
setupArgs(args->maxbytes, type, args);
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, type, op, root, 0, iter));
}
TESTCHECK(completeColl(args));
// Warm-up for small size
setupArgs(args->minbytes, type, args);
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, type, op, root, iter < warmup_iters/2 ? 0 : 1, iter));
}
TESTCHECK(completeColl(args));
for (size_t iter = 0; iter < stress_cycles; iter++) {
if (iter > 0) PRINT("# Testing %lu cycle.\n", iter+1);
// Benchmark
for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
setupArgs(size, type, args);
char rootName[100];
sprintf(rootName, "%6i", root);
PRINT("%12li %12li %8s %6s %6s", std::max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
if (enable_out_of_place) {
TESTCHECK(BenchTime(args, type, op, root, 0));
usleep(delay_inout_place);
}
TESTCHECK(BenchTime(args, type, op, root, 1));
PRINT("\n");
}
}
return testSuccess;
}
testResult_t threadRunTests(struct threadArgs* args) {
// Set device to the first of our GPUs. If we don't do that, some operations
// will be done on the current GPU (by default : 0) and if the GPUs are in
// exclusive mode those operations will fail.
HIPCHECK(hipSetDevice(args->gpus[0]));
TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]));
return testSuccess;
}
testResult_t threadInit(struct threadArgs* args) {
char hostname[1024];
getHostName(hostname, 1024);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
//set main thread again
is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0;
NCCLCHECK(ncclGroupStart());
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int j=0; j<args->nRanks; j++) {
int rank = (args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + j;
if (args->enable_multiranks)
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
#ifdef RCCL_MULTIRANKPERGPU
else
NCCLCHECK(ncclCommInitRankMulti(args->comms+i*args->nRanks+j, nranks, args->ncclId, rank, rank));
#endif
}
}
NCCLCHECK(ncclGroupEnd());
TESTCHECK(threadRunTests(args));
for (int i=0; i<args->nGpus*args->nRanks; i++) {
NCCLCHECK(ncclCommDestroy(args->comms[i]));
}
return testSuccess;
}
void* threadLauncher(void* thread_) {
struct testThread* thread = (struct testThread*)thread_;
thread->ret = thread->func(&thread->args);
return NULL;
}
testResult_t threadLaunch(struct testThread* thread) {
pthread_create(&thread->thread, NULL, threadLauncher, thread);
return testSuccess;
}
testResult_t AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, size_t nbytes) {
if (memorytype == ncclFine) {
HIPCHECK(hipExtMallocWithFlags(sendbuff, nbytes, hipDeviceMallocFinegrained));
HIPCHECK(hipExtMallocWithFlags(recvbuff, nbytes, hipDeviceMallocFinegrained));
if (datacheck) HIPCHECK(hipExtMallocWithFlags(expected, recvBytes, hipDeviceMallocFinegrained));
}
else if (memorytype == ncclHost) {
HIPCHECK(hipHostMalloc(sendbuff, nbytes));
HIPCHECK(hipHostMalloc(recvbuff, nbytes));
if (datacheck) HIPCHECK(hipHostMalloc(expected, recvBytes));
}
else if (memorytype == ncclManaged) {
HIPCHECK(hipMallocManaged(sendbuff, nbytes));
HIPCHECK(hipMallocManaged(recvbuff, nbytes));
if (datacheck) HIPCHECK(hipMallocManaged(expected, recvBytes));
#if 0
HIPCHECK(hipMemset(*sendbuff, 0, nbytes));
HIPCHECK(hipMemset(*recvbuff, 0, nbytes));
if (datacheck) HIPCHECK(hipMemset(*expected, 0, recvBytes));
#endif
}
else {
HIPCHECK(hipMalloc(sendbuff, nbytes));
HIPCHECK(hipMalloc(recvbuff, nbytes));
if (datacheck) HIPCHECK(hipMalloc(expected, recvBytes));
}
return testSuccess;
}
testResult_t run(); // Main function
int main(int argc, char* argv[]) {
// Make sure everyline is flushed so that we see the progress of the test
setlinebuf(stdout);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,4,0)
ncclGetVersion(&test_ncclVersion);
#else
test_ncclVersion = NCCL_VERSION_CODE;
#endif
//printf("# NCCL_VERSION_CODE=%d ncclGetVersion=%d\n", NCCL_VERSION_CODE, test_ncclVersion);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,0,0)
test_opnum = 4;
test_typenum = 9;
if (NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0) && test_ncclVersion >= NCCL_VERSION(2,10,0)) {
test_opnum++; // ncclAvg
#if defined(RCCL_BFLOAT16)
test_typenum++; // bfloat16
#endif
}
if (NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) && test_ncclVersion >= NCCL_VERSION(2,11,0)) {
test_opnum++; // PreMulSum
}
#endif
// Parse args
double parsed;
int longindex;
static struct option longopts[] = {
{"nthreads", required_argument, 0, 't'},
{"ngpus", required_argument, 0, 'g'},
{"minbytes", required_argument, 0, 'b'},
{"maxbytes", required_argument, 0, 'e'},
{"stepbytes", required_argument, 0, 'i'},
{"stepfactor", required_argument, 0, 'f'},
{"iters", required_argument, 0, 'n'},
{"agg_iters", required_argument, 0, 'm'},
{"warmup_iters", required_argument, 0, 'w'},
{"parallel_init", required_argument, 0, 'p'},
{"check", required_argument, 0, 'c'},
{"op", required_argument, 0, 'o'},
{"datatype", required_argument, 0, 'd'},
{"root", required_argument, 0, 'r'},
{"blocking", required_argument, 0, 'z'},
{"memory_type", required_argument, 0, 'y'}, //RCCL
{"stress_cycles", required_argument, 0, 's'}, //RCCL
{"cumask", required_argument, 0, 'u'}, //RCCL
{"stream_null", required_argument, 0, 'y'}, //NCCL
{"timeout", required_argument, 0, 'T'}, //NCCL
{"cudagraph", required_argument, 0, 'G'},
{"report_cputime", required_argument, 0, 'C'},
{"average", required_argument, 0, 'a'},
{"out_of_place", required_argument, 0, 'O'},
#ifdef RCCL_MULTIRANKPERGPU
{"enable_multiranks", required_argument, 0, 'x'},
{"ranks_per_gpu", required_argument, 0, 'R'},
#endif
{"help", no_argument, 0, 'h'},
{}
};
while(1) {
int c;
#ifdef RCCL_MULTIRANKPERGPU
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:Y:T:G:C:O:a:y:s:u:h:R:x:q:", longopts, &longindex);
#else
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:Y:T:G:C:O:a:y:s:u:h:q:", longopts, &longindex);
#endif
if (c == -1)
break;
switch(c) {
case 't':
nThreads = strtol(optarg, NULL, 0);
break;
case 'g':
nGpus = strtol(optarg, NULL, 0);
break;
case 'b':
parsed = parsesize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'minbytes'\n");
return -1;
}
minBytes = (size_t)parsed;
break;
case 'e':
parsed = parsesize(optarg);
if (parsed < 0) {
fprintf(stderr, "invalid size specified for 'maxbytes'\n");
return -1;
}
maxBytes = (size_t)parsed;
break;
case 'i':
stepBytes = strtol(optarg, NULL, 0);
break;
case 'f':
stepFactor = strtol(optarg, NULL, 0);
break;
case 'n':
iters = (int)strtol(optarg, NULL, 0);
break;
case 'm':
#if NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 2)
agg_iters = (int)strtol(optarg, NULL, 0);
#else
fprintf(stderr, "Option -m not supported before NCCL 2.2. Ignoring\n");
#endif
break;
case 'w':
warmup_iters = (int)strtol(optarg, NULL, 0);
break;
case 'c':
datacheck = (int)strtol(optarg, NULL, 0);
break;
case 'p':
parallel_init = (int)strtol(optarg, NULL, 0);
break;
case 'o':
ncclop = ncclstringtoop(optarg);
break;
case 'd':
nccltype = ncclstringtotype(optarg);
break;
case 'r':
ncclroot = strtol(optarg, NULL, 0);
break;
case 'z':
blocking_coll = strtol(optarg, NULL, 0);
break;
case 'Y':
memorytype = ncclstringtomtype(optarg);
break;
case 's':
stress_cycles = strtol(optarg, NULL, 0);
break;
case 'u':
{
int nmasks = 0;
char *mask = strtok(optarg, ",");
while (mask != NULL && nmasks < 4) {
cumask[nmasks++] = strtol(mask, NULL, 16);
mask = strtok(NULL, ",");
};
}
break;
case 'y':
streamnull = strtol(optarg, NULL, 0);
break;
case 'T':
timeout = strtol(optarg, NULL, 0);
break;
case 'G':
#if (NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 9)) && HIP_VERSION >= 50221310
cudaGraphLaunches = strtol(optarg, NULL, 0);
#else
printf("Option -G (HIP graph) not supported before NCCL 2.9 + ROCm 5.2 Ignoring\n");
#endif
break;
case 'C':
report_cputime = strtol(optarg, NULL, 0);
break;
case 'O':
enable_out_of_place = strtol(optarg, NULL, 0);
break;
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
#ifdef RCCL_MULTIRANKPERGPU
case 'x':
enable_multiranks = (int)strtol(optarg, NULL, 0);
break;
case 'R':
ranksPerGpu = (int)strtol(optarg, NULL, 0);
break;
#endif
case 'q':
delay_inout_place = (int)strtol(optarg, NULL, 10);
break;
case 'h':
default:
if (c != 'h') printf("invalid option '%c'\n", c);
printf("USAGE: %s \n\t"
"[-t,--nthreads <num threads>] \n\t"
"[-g,--ngpus <gpus per thread>] \n\t"
"[-b,--minbytes <min size in bytes>] \n\t"
"[-e,--maxbytes <max size in bytes>] \n\t"
"[-i,--stepbytes <increment size>] \n\t"
"[-f,--stepfactor <increment factor>] \n\t"
"[-n,--iters <iteration count>] \n\t"
"[-m,--agg_iters <aggregated iteration count>] \n\t"
"[-w,--warmup_iters <warmup iteration count>] \n\t"
"[-p,--parallel_init <0/1>] \n\t"
"[-c,--check <0/1>] \n\t"
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
"[-o,--op <sum/prod/min/max/avg/mulsum/all>] \n\t"
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
"[-o,--op <sum/prod/min/max/avg/all>] \n\t"
#else
"[-o,--op <sum/prod/min/max/all>] \n\t"
#endif
"[-d,--datatype <nccltype/all>] \n\t"
"[-r,--root <root>] \n\t"
"[-z,--blocking <0/1>] \n\t"
"[-Y,--memory_type <coarse/fine/host/managed>] \n\t"
"[-s,--stress_cycles <number of cycles>] \n\t"
"[-u,--cumask <d0,d1,d2,d3>] \n\t"
"[-y,--stream_null <0/1>] \n\t"
"[-T,--timeout <time in seconds>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-O,--out_of_place <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
#ifdef RCCL_MULTIRANKPERGPU
"[-x,--enable_multiranks <0/1> enable using multiple ranks per GPU] \n\t"
"[-R,--ranks_per_gpu] \n\t"
#endif
"[-q,--delay <delay between out-of-place and in-place in microseconds>] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
}
}
HIPCHECK(hipGetDeviceCount(&numDevices));
#ifndef MPI_SUPPORT
if (nGpus > numDevices)
{
fprintf(stderr, "[ERROR] The number of requested GPUs (%d) is greater than the number of GPUs available (%d)\n", nGpus, numDevices);
return testNcclError;
}
#endif
if (minBytes > maxBytes) {
fprintf(stderr, "invalid sizes for 'minbytes' and 'maxbytes': %llu > %llu\n",
(unsigned long long)minBytes,
(unsigned long long)maxBytes);
return -1;
}
if (!minReqVersion(2, 12, 12) && enable_multiranks) {
fprintf(stderr, "Multiple Ranks per GPU requested, but rccl library found does not support this feature.\n");
fprintf(stderr, "Please check LD_LIBRARY_PATH. Resetting enable_multiranks and ranksPerGpu to default values.\n");
enable_multiranks = 0;
ranksPerGpu = 1;
}
if (enable_multiranks && parallel_init) {
fprintf(stderr, "Cannot use parallel_init when using multiple ranks per GPU.\n");
return -1;
}
if (ranksPerGpu > 1 && !enable_multiranks) {
fprintf(stderr, "Need to enable multiranks option to use multiple ranks per GPU\n");
return -1;
}
#ifdef MPI_SUPPORT
MPI_Init(&argc, &argv);
#endif
TESTCHECK(run());
return 0;
}
testResult_t run() {
int totalProcs = 1, proc = 0, ncclProcs = 1, ncclProc = 0, color = 0;
int localRank = 0;
int localSize = 0;
char hostname[1024];
getHostName(hostname, 1024);
#ifdef MPI_SUPPORT
MPI_Comm_size(MPI_COMM_WORLD, &totalProcs);
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
uint64_t hostHashs[totalProcs];
hostHashs[proc] = getHostHash(hostname);
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD);
for (int p=0; p<totalProcs; p++) {
if (p == proc) break;
if (hostHashs[p] == hostHashs[proc]) localRank++;
}
char* str = getenv("NCCL_TESTS_SPLIT_MASK");
uint64_t mask = str ? strtoul(str, NULL, 16) : 0;
MPI_Comm mpi_comm;
color = proc & mask;
MPI_Comm_split(MPI_COMM_WORLD, color, proc, &mpi_comm);
MPI_Comm_size(mpi_comm, &ncclProcs);
MPI_Comm_rank(mpi_comm, &ncclProc);
for (int p=0; p<totalProcs; p++) {
if (hostHashs[p] == hostHashs[proc]) localSize++;
}
if (nGpus * localSize > numDevices)
{
fprintf(stderr, "[ERROR] The number of requested GPUs (%d) is greater than the number of GPUs available (%d) on node (%s)\n", nGpus*localSize, numDevices, hostname);
return testNcclError;
}
#endif
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
PRINT("# nThreads: %d nGpus: %d nRanks: %d minBytes: %ld maxBytes: %ld step: %ld(%s) warmupIters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, ranksPerGpu, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
PRINT("#\n");
PRINT("# Using devices\n");
#define MAX_LINE 2048
char line[MAX_LINE];
int len = 0;
size_t maxMem = ~0;
char* envstr = getenv("NCCL_TESTS_DEVICE");
int gpu0 = envstr ? atoi(envstr) : -1;
for (int i=0; i<nThreads*nGpus; i++) {
int hipDev = localRank*nThreads*nGpus+i;
if (enable_multiranks)
hipDev = hipDev % numDevices;
hipDeviceProp_t prop;
HIPCHECK(hipGetDeviceProperties(&prop, hipDev));
for (int j=0; j<ranksPerGpu; j++) {
int rank = proc*nThreads*nGpus*ranksPerGpu+i*ranksPerGpu + j;
char busIdStr[] = "00000000:00:00.0";
HIPCHECK(hipDeviceGetPCIBusId(busIdStr, sizeof(busIdStr), hipDev));
len += snprintf(line+len, MAX_LINE>len ? MAX_LINE-len : 0, "# Rank %2d Pid %6d on %10s device %2d [%s] %s\n",
rank, getpid(), hostname, hipDev, busIdStr, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
}
}
#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
if (proc == 0) {
for (int p = 0; p < totalProcs; p++)
PRINT("%s", lines+MAX_LINE*p);
free(lines);
}
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
#else
PRINT("%s", line);
#endif
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
size_t memMaxBytes = (maxMem - (1<<30)) / (datacheck ? 3 : 2);
if (maxBytes > memMaxBytes) {
maxBytes = memMaxBytes;
if (proc == 0) printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
}
ncclUniqueId ncclId;
if (ncclProc == 0) {
NCCLCHECK(ncclGetUniqueId(&ncclId));
}
#ifdef MPI_SUPPORT
MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, mpi_comm);
#endif
int gpus[nGpus*nThreads];
hipStream_t streams[nGpus*nThreads*ranksPerGpu];
void* sendbuffs[nGpus*nThreads*ranksPerGpu];
void* recvbuffs[nGpus*nThreads*ranksPerGpu];
void* expected[nGpus*nThreads*ranksPerGpu];
size_t sendBytes, recvBytes;
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads*ranksPerGpu);
envstr = getenv("NCCL_TESTS_DEVICE");
gpu0 = envstr ? atoi(envstr) : -1;
for (int ii=0; ii<nGpus*nThreads; ii++) {
int gpuid = localRank*nThreads*nGpus+ii;
if (enable_multiranks)
gpuid = gpuid % numDevices;
gpus[ii] = gpu0 != -1 ? gpu0+ii : gpuid;
HIPCHECK(hipSetDevice(gpus[ii]));
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
if (streamnull)
streams[i] = NULL;
else {
if (cumask[0] || cumask[1] || cumask[2] || cumask[3]) {
PRINT("cumask: ");
for (int i = 0; i < 4 ; i++) PRINT("%x,", cumask[i]);
PRINT("\n");
HIPCHECK(hipExtStreamCreateWithCUMask(streams+i, 4, cumask));
} else
HIPCHECK(hipStreamCreateWithFlags(streams+i, hipStreamNonBlocking));
}
}
}
//if parallel init is not selected, use main thread to initialize NCCL
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus*ranksPerGpu);
if (!parallel_init) {
if (ncclProcs == 1 && !enable_multiranks) {
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus));
} else {
NCCLCHECK(ncclGroupStart());
for (int ii=0; ii<nGpus*nThreads; ii++) {
HIPCHECK(hipSetDevice(gpus[ii]));
if (!enable_multiranks) {
NCCLCHECK(ncclCommInitRank(comms+ii, ncclProcs*nThreads*nGpus, ncclId, proc*nThreads*nGpus+ii));
}
#ifdef RCCL_MULTIRANKPERGPU
else
for (int j=0; j<ranksPerGpu; j++) {
int i = ii*ranksPerGpu+j;
NCCLCHECK(ncclCommInitRankMulti(comms+i, ncclProcs*nThreads*nGpus*ranksPerGpu, ncclId,
proc*nThreads*nGpus*ranksPerGpu+i, proc*nThreads*nGpus*ranksPerGpu+i));
}
#endif
}
NCCLCHECK(ncclGroupEnd());
}
}
int errors[nThreads];
double bw[nThreads];
double* delta;
HIPCHECK(hipHostMalloc(&delta, sizeof(double)*nThreads*NUM_BLOCKS, hipHostMallocPortable | hipHostMallocMapped));
int bw_count[nThreads];
for (int t=0; t<nThreads; t++) {
bw[t] = 0.0;
errors[t] = bw_count[t] = 0;
}
fflush(stdout);
const char* timeStr = report_cputime ? "cputime" : "time";
PRINT("#\n");
if (enable_out_of_place) {
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
} else {
PRINT("# %10s %12s %8s %6s %6s in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
timeStr, "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "");
}
struct testThread threads[nThreads];
memset(threads, 0, sizeof(struct testThread)*nThreads);
for (int t=nThreads-1; t>=0; t--) {
threads[t].args.minbytes=minBytes;
threads[t].args.maxbytes=maxBytes;
threads[t].args.stepbytes=stepBytes;
threads[t].args.stepfactor=stepFactor;
threads[t].args.localRank = localRank;
threads[t].args.totalProcs = totalProcs;
threads[t].args.localNumDevices = numDevices;
threads[t].args.enable_multiranks = enable_multiranks;
threads[t].args.nRanks = ranksPerGpu;
threads[t].args.nProcs=ncclProcs;
threads[t].args.proc=ncclProc;
threads[t].args.nThreads=nThreads;
threads[t].args.thread=t;
threads[t].args.nGpus=nGpus;
threads[t].args.gpus=gpus+t*nGpus;
threads[t].args.sendbuffs = sendbuffs+t*nGpus*ranksPerGpu;
threads[t].args.recvbuffs = recvbuffs+t*nGpus*ranksPerGpu;
threads[t].args.expected = expected+t*nGpus*ranksPerGpu;
threads[t].args.ncclId = ncclId;
threads[t].args.comms=comms+t*nGpus*ranksPerGpu;
threads[t].args.streams=streams+t*nGpus*ranksPerGpu;
threads[t].args.enable_out_of_place=enable_out_of_place;
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
threads[t].args.bw_count=bw_count+t;
threads[t].args.reportErrors = datacheck;
threads[t].func = parallel_init ? threadInit : threadRunTests;
if (t)
TESTCHECK(threadLaunch(threads+t));
else
TESTCHECK(threads[t].func(&threads[t].args));
}
// Wait for other threads and accumulate stats and errors
for (int t=nThreads-1; t>=0; t--) {
if (t) pthread_join(threads[t].thread, NULL);
TESTCHECK(threads[t].ret);
if (t) {
errors[0] += errors[t];
bw[0] += bw[t];
bw_count[0] += bw_count[t];
}
}
#ifdef MPI_SUPPORT
MPI_Allreduce(MPI_IN_PLACE, &errors[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
#endif
if (!parallel_init) {
for(int i=0; i<nGpus*nThreads*ranksPerGpu; ++i)
NCCLCHECK(ncclCommDestroy(comms[i]));
free(comms);
}
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
HIPCHECK(hipStreamDestroy(streams[i]));
}
// Free off HIP allocated memory
for (int i=0; i<nGpus*nThreads*ranksPerGpu; i++) {
if (memorytype == ncclHost) {
HIPCHECK(hipHostFree(sendbuffs[i]));
HIPCHECK(hipHostFree(recvbuffs[i]));
if (datacheck) HIPCHECK(hipHostFree(expected[i]));
}
else {
HIPCHECK(hipFree(sendbuffs[i]));
HIPCHECK(hipFree(recvbuffs[i]));
if (datacheck) HIPCHECK(hipFree(expected[i]));
}
}
HIPCHECK(hipHostFree(delta));
envstr = getenv("NCCL_TESTS_MIN_BW");
double check_avg_bw = envstr ? atof(envstr) : -1;
bw[0] /= bw_count[0];
if (datacheck) PRINT("# Errors with asterisks indicate errors that have exceeded the maximum threshold.\n");
PRINT("# Out of bounds values : %d %s\n", errors[0], errors[0] ? "FAILED" : "OK");
PRINT("# Avg bus bandwidth : %g %s\n", bw[0], check_avg_bw == -1 ? "" : (bw[0] < check_avg_bw*(0.9) ? "FAILED" : "OK"));
PRINT("#\n");
#ifdef MPI_SUPPORT
MPI_Finalize();
#endif
// 'hip-memcheck --leak-check full' requires this
PRINT("%s\n", ncclGetLastError(NULL));
hipDeviceReset();
if (errors[0] || bw[0] < check_avg_bw*(0.9))
exit(EXIT_FAILURE);
else
exit(EXIT_SUCCESS);
}
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef __COMMON_H__
#define __COMMON_H__
#include "rccl/rccl.h"
#include <stdio.h>
#include <cstdint>
#include <algorithm>
#ifdef MPI_SUPPORT
#include "mpi.h"
#endif
#include <pthread.h>
#include "nccl1_compat.h"
#include "timer.h"
// For nccl.h < 2.13 since we define a weak fallback
extern "C" char const* ncclGetLastError(ncclComm_t comm);
#define HIPCHECK(cmd) do { \
hipError_t e = cmd; \
if( e != hipSuccess ) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Test HIP failure %s:%d '%s'\n", \
hostname, \
__FILE__,__LINE__,hipGetErrorString(e)); \
return testCudaError; \
} \
} while(0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,13,0)
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Test NCCL failure %s:%d " \
"'%s / %s'\n", \
hostname,__FILE__,__LINE__, \
ncclGetErrorString(res), \
ncclGetLastError(NULL)); \
return testNcclError; \
} \
} while(0)
#else
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Test NCCL failure %s:%d '%s'\n", \
hostname, \
__FILE__,__LINE__,ncclGetErrorString(res)); \
return testNcclError; \
} \
} while(0)
#endif
typedef enum {
testSuccess = 0,
testInternalError = 1,
testCudaError = 2,
testNcclError = 3,
testTimeout = 4,
testNumResults = 5
} testResult_t;
// Relay errors up and trace
#define TESTCHECK(cmd) do { \
testResult_t r = cmd; \
if (r!= testSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf(" .. %s pid %d: Test failure %s:%d\n", \
hostname, getpid(), \
__FILE__,__LINE__); \
return r; \
} \
} while(0)
struct testColl {
const char name[20];
void (*getCollByteCount)(
size_t *sendcount, size_t *recvcount, size_t *paramcount,
size_t *sendInplaceOffset, size_t *recvInplaceOffset,
size_t count, int nranks);
testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type,
ncclRedOp_t op, int root, int rep, int in_place);
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type,
ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream);
};
extern struct testColl allReduceTest;
extern struct testColl allGatherTest;
extern struct testColl reduceScatterTest;
extern struct testColl broadcastTest;
extern struct testColl reduceTest;
extern struct testColl alltoAllTest;
struct testEngine {
void (*getBuffSize)(size_t *sendcount, size_t *recvcount, size_t count, int nranks);
testResult_t (*runTest)(struct threadArgs* args, int root, ncclDataType_t type,
const char* typeName, ncclRedOp_t op, const char* opName);
};
extern struct testEngine ncclTestEngine;
struct threadArgs {
size_t nbytes;
size_t minbytes;
size_t maxbytes;
size_t stepbytes;
size_t stepfactor;
int totalProcs;
int nProcs;
int proc;
int nThreads;
int thread;
int nGpus;
int* gpus;
int localRank;
int localNumDevices;
int enable_multiranks;
int enable_out_of_place;
int nRanks;
void** sendbuffs;
size_t sendBytes;
size_t sendInplaceOffset;
void** recvbuffs;
size_t recvInplaceOffset;
ncclUniqueId ncclId;
ncclComm_t* comms;
hipStream_t* streams;
void** expected;
size_t expectedBytes;
int* errors;
double* bw;
int* bw_count;
int reportErrors;
struct testColl* collTest;
};
typedef testResult_t (*threadFunc_t)(struct threadArgs* args);
struct testThread {
pthread_t thread;
threadFunc_t func;
struct threadArgs args;
testResult_t ret;
};
// Provided by common.cu
extern void Barrier(struct threadArgs* args);
extern testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root);
extern testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const uint64_t seed, const int nranks);
extern testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, const uint64_t seed, const int nranks, const int rank);
extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, void **expectedHost, size_t nbytes, int nranks);
#include <unistd.h>
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
#include <stdint.h>
static uint64_t getHash(const char* string, size_t n) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (size_t c = 0; c < n; c++) {
result = ((result << 5) + result) ^ string[c];
}
return result;
}
/* Generate a hash of the unique identifying string for this host
* that will be unique for both bare-metal and container instances
* Equivalent of a hash of;
*
* $(hostname)$(cat /proc/sys/kernel/random/boot_id)
*
*/
#define HOSTID_FILE "/proc/sys/kernel/random/boot_id"
static uint64_t getHostHash(const char* hostname) {
char hostHash[1024];
// Fall back is the hostname if something fails
(void) strncpy(hostHash, hostname, sizeof(hostHash));
int offset = strlen(hostHash);
FILE *file = fopen(HOSTID_FILE, "r");
if (file != NULL) {
char *p;
if (fscanf(file, "%ms", &p) == 1) {
strncpy(hostHash+offset, p, sizeof(hostHash)-offset-1);
free(p);
}
}
fclose(file);
// Make sure the string is terminated
hostHash[sizeof(hostHash)-1]='\0';
return getHash(hostHash, strlen(hostHash));
}
static size_t wordSize(ncclDataType_t type) {
switch(type) {
case ncclChar:
#if NCCL_MAJOR >= 2
//case ncclInt8:
case ncclUint8:
#endif
return 1;
case ncclHalf:
#if NCCL_MAJOR >= 2 && RCCL_BFLOAT16 == 1
case ncclBfloat16:
#endif
//case ncclFloat16:
return 2;
case ncclInt:
case ncclFloat:
#if NCCL_MAJOR >= 2
//case ncclInt32:
case ncclUint32:
//case ncclFloat32:
#endif
return 4;
case ncclInt64:
case ncclUint64:
case ncclDouble:
//case ncclFloat64:
return 8;
default: return 0;
}
}
extern int test_ncclVersion; // init'd with ncclGetVersion()
typedef enum { ncclCoarse = 0,
ncclFine = 1,
ncclHost = 2,
ncclManaged = 3,
nccl_NUM_MTYPES = 4 } ncclMemoryType_t;
extern const char *test_memorytypes[nccl_NUM_MTYPES];
constexpr int test_opNumMax = (int)ncclNumOps + (NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) ? 1 : 0);
extern int test_opnum;
extern int test_typenum;
extern ncclDataType_t test_types[ncclNumTypes];
extern const char *test_typenames[ncclNumTypes];
extern ncclRedOp_t test_ops[];
extern const char *test_opnames[];
static int ncclstringtotype(char *str) {
for (int t=0; t<ncclNumTypes; t++) {
if (strcmp(str, test_typenames[t]) == 0) {
return t;
}
}
if (strcmp(str, "all") == 0) {
return -1;
}
printf("invalid type %s, defaulting to %s .. \n", str, test_typenames[ncclFloat]);
return ncclFloat;
}
static int ncclstringtoop (char *str) {
for (int o=0; o<test_opnum; o++) {
if (strcmp(str, test_opnames[o]) == 0) {
return o;
}
}
if (strcmp(str, "all") == 0) {
return -1;
}
printf("invalid op %s, defaulting to %s .. \n", str, test_opnames[ncclSum]);
return ncclSum;
}
static int ncclstringtomtype (char *str) {
for (int o=0; o<nccl_NUM_MTYPES; o++) {
if (strcmp(str, test_memorytypes[o]) == 0) {
return o;
}
}
printf("invalid memorytype %s, defaulting to %s .. \n", str, test_memorytypes[ncclCoarse]);
return ncclCoarse;
}
extern int is_main_proc;
extern thread_local int is_main_thread;
#define PRINT if (is_main_thread) printf
#endif
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "hip/hip_runtime.h"
#include "common.h"
void GatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count/nranks;
*recvcount = (count/nranks)*nranks;
*sendInplaceOffset = count/nranks;
*recvInplaceOffset = 0;
*paramcount = count/nranks;
}
testResult_t GatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) {
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k]), nranks*sendcount, 0, type, ncclSum, rep, 1, 0));
}
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void GatherGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * nranks * typesize) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(nranks-1))/((double)(nranks));
*busBw = baseBw * factor;
}
testResult_t GatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankOffset = count * wordSize(type);
if (count == 0) return testSuccess;
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sendbuff, count, type, root, comm, stream));
if (rank == root) {
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, count, type, r, comm, stream));
}
}
NCCLCHECK(ncclGroupEnd());
return testSuccess;
}
struct testColl gatherTest = {
"Gather",
GatherGetCollByteCount,
GatherInitData,
GatherGetBw,
GatherRunColl
};
void GatherGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
GatherGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t GatherRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &gatherTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
int begin_root, end_root;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
if (root != -1) {
begin_root = end_root = root;
} else {
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}
for (int i=0; i<type_count; i++) {
for (int j=begin_root; j<=end_root; j++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "none", j));
}
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
GatherGetBuffSize,
GatherRunTest
};
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "hip/hip_runtime.h"
#include "common.h"
#define ALIGN 4
void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
size_t base = (count/(ALIGN*nranks))*ALIGN;
*sendcount = base;
*recvcount = base*nranks;
*sendInplaceOffset = base;
*recvInplaceOffset = 0;
*paramcount = base;
}
testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[k])+rank*args->sendBytes : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[k])+args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void HyperCubeGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize * (nranks - 1)) / 1.0E9 / sec;
*algBw = baseBw;
double factor = 1;
*busBw = baseBw * factor;
}
testResult_t HyperCubeRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
char* sbuff = (char*)sendbuff;
char* rbuff = (char*)recvbuff;
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankSize = count * wordSize(type);
if (rbuff+rank*rankSize != sbuff) HIPCHECK(hipMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, hipMemcpyDeviceToDevice, stream));
// Hypercube AllGather
for (int mask=1; mask<nRanks; mask<<=1) {
NCCLCHECK(ncclGroupStart());
int s = rank & ~(mask-1);
int r = s ^ mask;
NCCLCHECK(ncclSend(rbuff+s*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclRecv(rbuff+r*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclGroupEnd());
}
return testSuccess;
}
struct testColl hyperCubeTest = {
"HyperCube",
HyperCubeGetCollByteCount,
HyperCubeInitData,
HyperCubeGetBw,
HyperCubeRunColl
};
void HyperCubeGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
HyperCubeGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t HyperCubeRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &hyperCubeTest;
ncclDataType_t *run_types;
const char **run_typenames;
int type_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
// Check if this is a power of 2
int nRanks = args->nProcs*args->nThreads*args->nGpus;
if (nRanks && !(nRanks & (nRanks - 1))) {
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", -1));
}
} else {
printf("nRanks %d is not a power of 2, skipping\n", nRanks);
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
HyperCubeGetBuffSize,
HyperCubeRunTest
};
/*************************************************************************
* Copyright (c) 2017-2019, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef NCCL1_COMPAT_H
#define NCCL1_COMPAT_H
#ifndef NCCL_MAJOR // NCCL 1.x
#define NCCL_MAJOR 1
#define NCCL_MINOR 0
#define ncclNumOps nccl_NUM_OPS
#define ncclNumTypes nccl_NUM_TYPES
static ncclResult_t ncclGroupStart() { return ncclSuccess; }
static ncclResult_t ncclGroupEnd() { return ncclSuccess; }
#define CHECKCOUNT(count) if (count > INT_MAX) return ncclInvalidArgument;
static ncclResult_t ncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype,
ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
CHECKCOUNT(count);
return ncclReduce(sendbuff, recvbuff, (int)count, datatype, op, root, comm, stream);
}
static ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count,
ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, hipStream_t stream) {
CHECKCOUNT(count);
return ncclAllReduce(sendbuff, recvbuff, (int)count, datatype, op, comm, stream);
}
static ncclResult_t ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root,
ncclComm_t comm, hipStream_t stream) {
CHECKCOUNT(count);
return ncclBcast(buff, (int)count, datatype, root, comm, stream);
}
static ncclResult_t ncclReduceScatter(const void* sendbuff, void* recvbuff,
size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm,
hipStream_t stream) {
CHECKCOUNT(recvcount);
return ncclReduceScatter(sendbuff, recvbuff, (int)recvcount, datatype, op, comm, stream);
}
static ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount,
ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream) {
CHECKCOUNT(sendcount);
return ncclAllGather(sendbuff, (int)sendcount, datatype, recvbuff, comm, stream);
}
#endif
#endif
/**
* MIT License
*
* Copyright 2019-2020 Advanced Micro Devices, Inc. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/*!\file
* \brief rccl_bfloat16.h provides struct for rccl_bfloat16 typedef
*/
#ifndef _RCCL_BFLOAT16_H_
#define _RCCL_BFLOAT16_H_
#if __cplusplus < 201103L || (!defined(__HCC__) && !defined(__HIPCC__) && !defined(__HIP_PLATFORM_HCC__))
// If this is a C compiler, C++ compiler below C++11, or a host-only compiler, we only
// include a minimal definition of rccl_bfloat16
#include <stdint.h>
/*! \brief Struct to represent a 16 bit brain floating point number. */
typedef struct
{
uint16_t data;
} rccl_bfloat16;
#else // __cplusplus < 201103L || (!defined(__HCC__) && !defined(__HIPCC__) && !defined(__HIP_PLATFORM_HCC__))
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <hip/hip_runtime.h>
#include <ostream>
#include <type_traits>
struct rccl_bfloat16
{
uint16_t data;
enum truncate_t
{
truncate
};
__host__ __device__ rccl_bfloat16() = default;
// round upper 16 bits of IEEE float to convert to bfloat16
explicit __host__ __device__ rccl_bfloat16(float f)
: data(float_to_bfloat16(f))
{
}
explicit __host__ __device__ rccl_bfloat16(float f, truncate_t)
: data(truncate_float_to_bfloat16(f))
{
}
// zero extend lower 16 bits of bfloat16 to convert to IEEE float
__host__ __device__ operator float() const
{
union
{
uint32_t int32;
float fp32;
} u = {uint32_t(data) << 16};
return u.fp32;
}
private:
static __host__ __device__ uint16_t float_to_bfloat16(float f)
{
union
{
float fp32;
uint32_t int32;
} u = {f};
if(~u.int32 & 0x7f800000)
{
// When the exponent bits are not all 1s, then the value is zero, normal,
// or subnormal. We round the bfloat16 mantissa up by adding 0x7FFF, plus
// 1 if the least significant bit of the bfloat16 mantissa is 1 (odd).
// This causes the bfloat16's mantissa to be incremented by 1 if the 16
// least significant bits of the float mantissa are greater than 0x8000,
// or if they are equal to 0x8000 and the least significant bit of the
// bfloat16 mantissa is 1 (odd). This causes it to be rounded to even when
// the lower 16 bits are exactly 0x8000. If the bfloat16 mantissa already
// has the value 0x7f, then incrementing it causes it to become 0x00 and
// the exponent is incremented by one, which is the next higher FP value
// to the unrounded bfloat16 value. When the bfloat16 value is subnormal
// with an exponent of 0x00 and a mantissa of 0x7F, it may be rounded up
// to a normal value with an exponent of 0x01 and a mantissa of 0x00.
// When the bfloat16 value has an exponent of 0xFE and a mantissa of 0x7F,
// incrementing it causes it to become an exponent of 0xFF and a mantissa
// of 0x00, which is Inf, the next higher value to the unrounded value.
u.int32 += 0x7fff + ((u.int32 >> 16) & 1); // Round to nearest, round to even
}
else if(u.int32 & 0xffff)
{
// When all of the exponent bits are 1, the value is Inf or NaN.
// Inf is indicated by a zero mantissa. NaN is indicated by any nonzero
// mantissa bit. Quiet NaN is indicated by the most significant mantissa
// bit being 1. Signaling NaN is indicated by the most significant
// mantissa bit being 0 but some other bit(s) being 1. If any of the
// lower 16 bits of the mantissa are 1, we set the least significant bit
// of the bfloat16 mantissa, in order to preserve signaling NaN in case
// the bloat16's mantissa bits are all 0.
u.int32 |= 0x10000; // Preserve signaling NaN
}
return uint16_t(u.int32 >> 16);
}
// Truncate instead of rounding, preserving SNaN
static __host__ __device__ uint16_t truncate_float_to_bfloat16(float f)
{
union
{
float fp32;
uint32_t int32;
} u = {f};
return uint16_t(u.int32 >> 16) | (!(~u.int32 & 0x7f800000) && (u.int32 & 0xffff));
}
};
typedef struct
{
uint16_t data;
} rccl_bfloat16_public;
static_assert(std::is_standard_layout<rccl_bfloat16>{},
"rccl_bfloat16 is not a standard layout type, and thus is "
"incompatible with C.");
static_assert(std::is_trivial<rccl_bfloat16>{},
"rccl_bfloat16 is not a trivial type, and thus is "
"incompatible with C.");
static_assert(sizeof(rccl_bfloat16) == sizeof(rccl_bfloat16_public)
&& offsetof(rccl_bfloat16, data) == offsetof(rccl_bfloat16_public, data),
"internal rccl_bfloat16 does not match public rccl_bfloat16");
inline std::ostream& operator<<(std::ostream& os, const rccl_bfloat16& bf16)
{
return os << float(bf16);
}
inline __host__ __device__ rccl_bfloat16 operator+(rccl_bfloat16 a)
{
return a;
}
inline __host__ __device__ rccl_bfloat16 operator-(rccl_bfloat16 a)
{
a.data ^= 0x8000;
return a;
}
inline __host__ __device__ rccl_bfloat16 operator+(rccl_bfloat16 a, rccl_bfloat16 b)
{
return rccl_bfloat16(float(a) + float(b));
}
inline __host__ __device__ rccl_bfloat16 operator-(rccl_bfloat16 a, rccl_bfloat16 b)
{
return rccl_bfloat16(float(a) - float(b));
}
inline __host__ __device__ rccl_bfloat16 operator*(rccl_bfloat16 a, rccl_bfloat16 b)
{
return rccl_bfloat16(float(a) * float(b));
}
inline __host__ __device__ rccl_bfloat16 operator/(rccl_bfloat16 a, rccl_bfloat16 b)
{
return rccl_bfloat16(float(a) / float(b));
}
inline __host__ __device__ bool operator<(rccl_bfloat16 a, rccl_bfloat16 b)
{
return float(a) < float(b);
}
inline __host__ __device__ bool operator==(rccl_bfloat16 a, rccl_bfloat16 b)
{
return float(a) == float(b);
}
inline __host__ __device__ bool operator>(rccl_bfloat16 a, rccl_bfloat16 b)
{
return b < a;
}
inline __host__ __device__ bool operator<=(rccl_bfloat16 a, rccl_bfloat16 b)
{
return !(a > b);
}
inline __host__ __device__ bool operator!=(rccl_bfloat16 a, rccl_bfloat16 b)
{
return !(a == b);
}
inline __host__ __device__ bool operator>=(rccl_bfloat16 a, rccl_bfloat16 b)
{
return !(a < b);
}
inline __host__ __device__ rccl_bfloat16& operator+=(rccl_bfloat16& a, rccl_bfloat16 b)
{
return a = a + b;
}
inline __host__ __device__ rccl_bfloat16& operator-=(rccl_bfloat16& a, rccl_bfloat16 b)
{
return a = a - b;
}
inline __host__ __device__ rccl_bfloat16& operator*=(rccl_bfloat16& a, rccl_bfloat16 b)
{
return a = a * b;
}
inline __host__ __device__ rccl_bfloat16& operator/=(rccl_bfloat16& a, rccl_bfloat16 b)
{
return a = a / b;
}
inline __host__ __device__ rccl_bfloat16& operator++(rccl_bfloat16& a)
{
return a += rccl_bfloat16(1.0f);
}
inline __host__ __device__ rccl_bfloat16& operator--(rccl_bfloat16& a)
{
return a -= rccl_bfloat16(1.0f);
}
inline __host__ __device__ rccl_bfloat16 operator++(rccl_bfloat16& a, int)
{
rccl_bfloat16 orig = a;
++a;
return orig;
}
inline __host__ __device__ rccl_bfloat16 operator--(rccl_bfloat16& a, int)
{
rccl_bfloat16 orig = a;
--a;
return orig;
}
namespace std
{
constexpr __host__ __device__ bool isinf(rccl_bfloat16 a)
{
return !(~a.data & 0x7f80) && !(a.data & 0x7f);
}
constexpr __host__ __device__ bool isnan(rccl_bfloat16 a)
{
return !(~a.data & 0x7f80) && +(a.data & 0x7f);
}
constexpr __host__ __device__ bool iszero(rccl_bfloat16 a)
{
return !(a.data & 0x7fff);
}
inline rccl_bfloat16 sin(rccl_bfloat16 a)
{
return rccl_bfloat16(sinf(float(a)));
}
inline rccl_bfloat16 cos(rccl_bfloat16 a)
{
return rccl_bfloat16(cosf(float(a)));
}
}
#endif // __cplusplus < 201103L || (!defined(__HCC__) && !defined(__HIPCC__))
#endif // _RCCL_BFLOAT16_H_
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = *sendcount;
}
testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[k], recvcount, 0, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void ReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize) / 1.0E9 / sec;
*algBw = baseBw;
*busBw = baseBw;
}
testResult_t ReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
NCCLCHECK(ncclReduce(sendbuff, recvbuff, count, type, op, root, comm, stream));
return testSuccess;
}
struct testColl reduceTest = {
"Reduce",
ReduceGetCollByteCount,
ReduceInitData,
ReduceGetBw,
ReduceRunColl
};
void ReduceGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
ReduceGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &reduceTest;
ncclDataType_t *run_types;
ncclRedOp_t *run_ops;
const char **run_typenames, **run_opnames;
int type_count, op_count;
int begin_root, end_root;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
if ((int)op != -1) {
op_count = 1;
run_ops = &op;
run_opnames = &opName;
} else {
op_count = test_opnum;
run_ops = test_ops;
run_opnames = test_opnames;
}
if (root != -1) {
begin_root = end_root = root;
} else {
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}
for (int i=0; i<type_count; i++) {
for (int j=0; j<op_count; j++) {
for (int k=begin_root; k<=end_root; k++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], k));
}
}
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
ReduceGetBuffSize,
ReduceRunTest
};
/*************************************************************************
* Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved.
* Modifications Copyright (c) 2019-2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <hip/hip_runtime.h>
#include "common.h"
#define ALIGN 4
void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
size_t base = (count/(ALIGN*nranks))*ALIGN;
*sendcount = base*nranks;
*recvcount = base;
*sendInplaceOffset = 0;
*recvInplaceOffset = base;
*paramcount = base;
}
testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {
size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus*args->nRanks;
int k=0;
for (int i=0; i<args->nGpus; i++) {
HIPCHECK(hipSetDevice(args->gpus[i]));
for (int l=0; l<args->nRanks; l++) {
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus*args->nRanks + i*args->nRanks + l);
HIPCHECK(hipMemset(args->recvbuffs[k], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[k] : args->sendbuffs[k];
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
HIPCHECK(hipMemcpy(args->expected[k], args->recvbuffs[k], args->expectedBytes, hipMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[k], recvcount, rank*recvcount, type, op, rep, nranks));
k++;
}
HIPCHECK(hipDeviceSynchronize());
}
return testSuccess;
}
void ReduceScatterGetBw(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks) {
double baseBw = (double)(count * typesize * nranks) / 1.0E9 / sec;
*algBw = baseBw;
double factor = ((double)(nranks - 1))/((double)nranks);
*busBw = baseBw * factor;
}
testResult_t ReduceScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream) {
NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, count, type, op, comm, stream));
return testSuccess;
}
struct testColl reduceScatterTest = {
"ReduceScatter",
ReduceScatterGetCollByteCount,
ReduceScatterInitData,
ReduceScatterGetBw,
ReduceScatterRunColl
};
void ReduceScatterGetBuffSize(size_t *sendcount, size_t *recvcount, size_t count, int nranks) {
size_t paramcount, sendInplaceOffset, recvInplaceOffset;
ReduceScatterGetCollByteCount(sendcount, recvcount, &paramcount, &sendInplaceOffset, &recvInplaceOffset, count, nranks);
}
testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
args->collTest = &reduceScatterTest;
ncclDataType_t *run_types;
ncclRedOp_t *run_ops;
const char **run_typenames, **run_opnames;
int type_count, op_count;
if ((int)type != -1) {
type_count = 1;
run_types = &type;
run_typenames = &typeName;
} else {
type_count = test_typenum;
run_types = test_types;
run_typenames = test_typenames;
}
if ((int)op != -1) {
run_ops = &op;
run_opnames = &opName;
op_count = 1;
} else {
op_count = test_opnum;
run_ops = test_ops;
run_opnames = test_opnames;
}
for (int i=0; i<type_count; i++) {
for (int j=0; j<op_count; j++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], -1));
}
}
return testSuccess;
}
struct testEngine ncclTestEngine = {
ReduceScatterGetBuffSize,
ReduceScatterRunTest
};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment