// hipcc allgather.cu -o allgather -I /opt/mpi/include -L /opt/mpi/lib/ -lmpi -L /opt/dtk/lib/ -lrccl // mpirun -np 8 --allow-run-as-root --oversubscribe --quiet ./allgather #include #include #include #include #include #define DATA_SIZE 4 #define CUDACHECK(cmd) do { \ hipError_t e = cmd; \ if (e != hipSuccess) { \ printf("CUDA error %s:%d: '%s'\n", __FILE__, __LINE__, \ hipGetErrorString(e)); \ exit(EXIT_FAILURE); \ } \ } while(0) #define NCCLCHECK(cmd) do { \ ncclResult_t r = cmd; \ if (r != ncclSuccess) { \ printf("NCCL error %s:%d: '%s'\n", __FILE__, __LINE__, \ ncclGetErrorString(r)); \ exit(EXIT_FAILURE); \ } \ } while(0) int main(int argc, char* argv[]) { int world_size, rank, local_rank; ncclUniqueId id; ncclComm_t comm; hipStream_t stream; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); local_rank = rank; // 假设每个 rank 对应一张 GPU CUDACHECK(hipSetDevice(local_rank)); // 获取 unique ID,并广播 if (rank == 0) NCCLCHECK(ncclGetUniqueId(&id)); MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); // 初始化 NCCL 通信器 NCCLCHECK(ncclCommInitRank(&comm, world_size, id, rank)); CUDACHECK(hipStreamCreate(&stream)); // 准备数据 int* sendbuff; int* recvbuff; CUDACHECK(hipMalloc(&sendbuff, DATA_SIZE * sizeof(int))); CUDACHECK(hipMalloc(&recvbuff, DATA_SIZE * world_size * sizeof(int))); int h_send[DATA_SIZE]; printf("Rank %d send: ", rank); for (int i = 0; i < DATA_SIZE; ++i) { h_send[i] = rank * 100 + i; printf("%d ", h_send[i]); } printf("\n"); CUDACHECK(hipMemcpy(sendbuff, h_send, DATA_SIZE * sizeof(int), hipMemcpyHostToDevice)); // 执行 AllGather NCCLCHECK(ncclAllGather( sendbuff, recvbuff, DATA_SIZE, ncclInt, comm, stream)); // 等待完成 CUDACHECK(hipStreamSynchronize(stream)); // 打印结果 int* h_recv = new int[DATA_SIZE * world_size]; CUDACHECK(hipMemcpy(h_recv, recvbuff, DATA_SIZE * world_size * sizeof(int), hipMemcpyDeviceToHost)); printf("Rank %d received: ", rank); for (int i = 0; i < DATA_SIZE * world_size; ++i) printf("%d ", h_recv[i]); printf("\n"); // 清理资源 delete[] h_recv; delete[] h_send; CUDACHECK(hipFree(sendbuff)); CUDACHECK(hipFree(recvbuff)); CUDACHECK(hipStreamDestroy(stream)); NCCLCHECK(ncclCommDestroy(comm)); MPI_Finalize(); return 0; }