Commit 7e1270f7 authored by yuguo's avatar yuguo
Browse files

[DCU] rccl examples

parent 07b750a2
// hipcc allgather.cu -o allgather -I /opt/mpi/include -L /opt/mpi/lib/ -lmpi -L /opt/dtk/lib/ -lrccl
// mpirun -np 8 --allow-run-as-root --oversubscribe --quiet ./allgather
#include <cstdio>
#include <cstdlib>
#include <mpi.h>
#include <rccl.h>
#include <hip/hip_runtime.h>
#define DATA_SIZE 4
#define CUDACHECK(cmd) do { \
hipError_t e = cmd; \
if (e != hipSuccess) { \
printf("CUDA error %s:%d: '%s'\n", __FILE__, __LINE__, \
hipGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("NCCL error %s:%d: '%s'\n", __FILE__, __LINE__, \
ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[]) {
int world_size, rank, local_rank;
ncclUniqueId id;
ncclComm_t comm;
hipStream_t stream;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
local_rank = rank; // 假设每个 rank 对应一张 GPU
CUDACHECK(hipSetDevice(local_rank));
// 获取 unique ID,并广播
if (rank == 0)
NCCLCHECK(ncclGetUniqueId(&id));
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
// 初始化 NCCL 通信器
NCCLCHECK(ncclCommInitRank(&comm, world_size, id, rank));
CUDACHECK(hipStreamCreate(&stream));
// 准备数据
int* sendbuff;
int* recvbuff;
CUDACHECK(hipMalloc(&sendbuff, DATA_SIZE * sizeof(int)));
CUDACHECK(hipMalloc(&recvbuff, DATA_SIZE * world_size * sizeof(int)));
int h_send[DATA_SIZE];
printf("Rank %d send: ", rank);
for (int i = 0; i < DATA_SIZE; ++i) {
h_send[i] = rank * 100 + i;
printf("%d ", h_send[i]);
}
printf("\n");
CUDACHECK(hipMemcpy(sendbuff, h_send, DATA_SIZE * sizeof(int), hipMemcpyHostToDevice));
// 执行 AllGather
NCCLCHECK(ncclAllGather(
sendbuff, recvbuff, DATA_SIZE,
ncclInt, comm, stream));
// 等待完成
CUDACHECK(hipStreamSynchronize(stream));
// 打印结果
int* h_recv = new int[DATA_SIZE * world_size];
CUDACHECK(hipMemcpy(h_recv, recvbuff, DATA_SIZE * world_size * sizeof(int), hipMemcpyDeviceToHost));
printf("Rank %d received: ", rank);
for (int i = 0; i < DATA_SIZE * world_size; ++i)
printf("%d ", h_recv[i]);
printf("\n");
// 清理资源
delete[] h_recv;
delete[] h_send;
CUDACHECK(hipFree(sendbuff));
CUDACHECK(hipFree(recvbuff));
CUDACHECK(hipStreamDestroy(stream));
NCCLCHECK(ncclCommDestroy(comm));
MPI_Finalize();
return 0;
}
\ No newline at end of file
// hipcc reducescatter.cu -o reducescatter -I /opt/mpi/include -L /opt/mpi/lib/ -lmpi -L /opt/dtk/lib/ -lrccl
// mpirun -np 8 --allow-run-as-root --oversubscribe --quiet ./reducescatter
#include <cstdio>
#include <cstdlib>
#include <mpi.h>
#include <rccl.h>
#include <hip/hip_runtime.h>
#define CHUNK_SIZE 4 // 每个 rank 接收 CHUNK_SIZE 个元素
#define TOTAL_SIZE (CHUNK_SIZE * world_size)
#define CUDACHECK(cmd) do { \
hipError_t e = cmd; \
if (e != hipSuccess) { \
printf("CUDA error %s:%d: '%s'\n", __FILE__, __LINE__, \
hipGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("NCCL error %s:%d: '%s'\n", __FILE__, __LINE__, \
ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[]) {
int rank, world_size;
ncclComm_t comm;
ncclUniqueId id;
hipStream_t stream;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
CUDACHECK(hipSetDevice(rank));
if (rank == 0)
NCCLCHECK(ncclGetUniqueId(&id));
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
NCCLCHECK(ncclCommInitRank(&comm, world_size, id, rank));
CUDACHECK(hipStreamCreate(&stream));
// 每个 rank 分配 TOTAL_SIZE 的发送缓冲区(每 rank 发送全部数据)
int* sendbuff;
int* recvbuff;
CUDACHECK(hipMalloc(&sendbuff, TOTAL_SIZE * sizeof(int)));
CUDACHECK(hipMalloc(&recvbuff, CHUNK_SIZE * sizeof(int)));
// 初始化发送数据(例如每 rank 的数据是 rank*100 + index)
int* h_send = new int[TOTAL_SIZE];
for (int i = 0; i < TOTAL_SIZE; ++i)
h_send[i] = rank * 100 + i;
CUDACHECK(hipMemcpy(sendbuff, h_send, TOTAL_SIZE * sizeof(int), hipMemcpyHostToDevice));
// 打印发送数据
printf("Rank %d original data: ", rank);
for (int i = 0; i < TOTAL_SIZE; ++i)
printf("%d ", h_send[i]);
printf("\n");
delete[] h_send;
// 执行 reduce-scatter(sum)
NCCLCHECK(ncclReduceScatter(
sendbuff, recvbuff, CHUNK_SIZE,
ncclInt, ncclSum, comm, stream));
CUDACHECK(hipStreamSynchronize(stream));
// 打印每个 rank 接收到的结果
int* h_recv = new int[CHUNK_SIZE];
CUDACHECK(hipMemcpy(h_recv, recvbuff, CHUNK_SIZE * sizeof(int), hipMemcpyDeviceToHost));
printf("Rank %d received reduced chunk: ", rank);
for (int i = 0; i < CHUNK_SIZE; ++i)
printf("%d ", h_recv[i]);
printf("\n");
delete[] h_recv;
// 清理资源
CUDACHECK(hipFree(sendbuff));
CUDACHECK(hipFree(recvbuff));
CUDACHECK(hipStreamDestroy(stream));
NCCLCHECK(ncclCommDestroy(comm));
MPI_Finalize();
return 0;
}
// hipcc ring_sendrecv.cu -o ring_sendrecv -I /opt/mpi/include -L /opt/mpi/lib/ -lmpi -L /opt/dtk/lib/ -lrccl
// mpirun -np 8 --allow-run-as-root --oversubscribe --quiet ./ring_sendrecv
#include <cstdio>
#include <cstdlib>
#include <mpi.h>
#include <rccl.h>
#include <hip/hip_runtime.h>
#define MSG_SIZE 4
#define CUDACHECK(cmd) do { \
hipError_t e = cmd; \
if (e != hipSuccess) { \
printf("HIP error %s:%d: '%s'\n", __FILE__, __LINE__, hipGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("NCCL error %s:%d: '%s'\n", __FILE__, __LINE__, ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[]) {
int rank, world_size;
ncclUniqueId id;
ncclComm_t comm;
hipStream_t stream;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int device_count = 0;
CUDACHECK(hipGetDeviceCount(&device_count));
if (world_size > device_count) {
if (rank == 0)
printf("Error: More ranks (%d) than available GPUs (%d)\n", world_size, device_count);
MPI_Finalize();
return -1;
}
CUDACHECK(hipSetDevice(rank));
printf("Rank %d using device %d\n", rank, rank);
if (rank == 0)
NCCLCHECK(ncclGetUniqueId(&id));
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
NCCLCHECK(ncclCommInitRank(&comm, world_size, id, rank));
CUDACHECK(hipStreamCreate(&stream));
int* d_send = nullptr;
int* d_recv = nullptr;
CUDACHECK(hipMalloc(&d_send, MSG_SIZE * sizeof(int)));
CUDACHECK(hipMalloc(&d_recv, MSG_SIZE * sizeof(int)));
CUDACHECK(hipMemset(d_recv, 0, MSG_SIZE * sizeof(int)));
int h_send[MSG_SIZE];
for (int i = 0; i < MSG_SIZE; ++i)
h_send[i] = rank * 100 + i;
CUDACHECK(hipMemcpy(d_send, h_send, MSG_SIZE * sizeof(int), hipMemcpyHostToDevice));
int next = (rank + 1) % world_size;
int prev = (rank - 1 + world_size) % world_size;
printf("Rank %d sending to Rank %d: ", rank, next);
for (int i = 0; i < MSG_SIZE; ++i) printf("%d ", h_send[i]);
printf("\n");
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(d_send, MSG_SIZE, ncclInt, next, comm, stream));
NCCLCHECK(ncclRecv(d_recv, MSG_SIZE, ncclInt, prev, comm, stream));
NCCLCHECK(ncclGroupEnd());
CUDACHECK(hipStreamSynchronize(stream));
CUDACHECK(hipGetLastError());
MPI_Barrier(MPI_COMM_WORLD); // 打印
int h_recv[MSG_SIZE];
CUDACHECK(hipMemcpy(h_recv, d_recv, MSG_SIZE * sizeof(int), hipMemcpyDeviceToHost));
printf("Rank %d received from Rank %d: ", rank, prev);
for (int i = 0; i < MSG_SIZE; ++i) printf("%d ", h_recv[i]);
printf("\n");
CUDACHECK(hipFree(d_send));
CUDACHECK(hipFree(d_recv));
CUDACHECK(hipStreamDestroy(stream));
NCCLCHECK(ncclCommDestroy(comm));
MPI_Finalize();
return 0;
}
// hipcc sendrecv_compute_overlap.cu -o sendrecv_compute_overlap -I /opt/mpi/include -L /opt/mpi/lib/ -lmpi -L /opt/dtk/lib/ -lrccl
// mpirun -np 8 --allow-run-as-root --oversubscribe --quiet ./sendrecv_compute_overlap
#include "hip/hip_runtime.h"
#include <cstdio>
#include <cstdlib>
#include <mpi.h>
#include <rccl.h>
#include <hip/hip_runtime.h>
#define MSG_SIZE 1024 * 1024 // 1M int = 4MB
#define CUDACHECK(cmd) do { \
hipError_t e = cmd; \
if (e != hipSuccess) { \
printf("CUDA error %s:%d: '%s'\n", __FILE__, __LINE__, hipGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("NCCL error %s:%d: '%s'\n", __FILE__, __LINE__, ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
__global__ void compute_kernel(int* data, int size) {
int idx = threadIdx.x + blockIdx.x * blockDim.x;
if (idx < size) {
data[idx] = data[idx] * 2 + 1;
}
}
int main(int argc, char* argv[]) {
int rank, size;
ncclUniqueId id;
ncclComm_t comm;
hipStream_t stream_comm, stream_comp;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size < 2) {
if (rank == 0) printf("This demo needs at least 2 processes.\n");
MPI_Finalize();
return 0;
}
CUDACHECK(hipSetDevice(rank));
if (rank == 0)
NCCLCHECK(ncclGetUniqueId(&id));
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
NCCLCHECK(ncclCommInitRank(&comm, size, id, rank));
CUDACHECK(hipStreamCreate(&stream_comm));
CUDACHECK(hipStreamCreate(&stream_comp));
int* d_buf;
CUDACHECK(hipMalloc(&d_buf, MSG_SIZE * sizeof(int)));
// 初始化数据
if (rank == 0) {
int* h_data = new int[MSG_SIZE];
for (int i = 0; i < MSG_SIZE; ++i)
h_data[i] = i;
CUDACHECK(hipMemcpy(d_buf, h_data, MSG_SIZE * sizeof(int), hipMemcpyHostToDevice));
delete[] h_data;
}
// 创建 cuda events 计时
hipEvent_t start, stop, comp_done, comm_done;
CUDACHECK(hipEventCreate(&start));
CUDACHECK(hipEventCreate(&stop));
CUDACHECK(hipEventCreate(&comp_done));
CUDACHECK(hipEventCreate(&comm_done));
CUDACHECK(hipEventRecord(start));
if (rank == 0) {
// 启动计算(模拟 workload)+ send(通信)
compute_kernel<<<(MSG_SIZE+255)/256, 256, 0, stream_comp>>>(d_buf, MSG_SIZE);
CUDACHECK(hipEventRecord(comp_done, stream_comp)); // 2. 计算完成时记录 event
CUDACHECK(hipStreamWaitEvent(stream_comm, comp_done, 0));
NCCLCHECK(ncclSend(d_buf, MSG_SIZE, ncclInt, 1, comm, stream_comm));
} else if (rank == 1) {
// 启动接收 + 本地计算
NCCLCHECK(ncclRecv(d_buf, MSG_SIZE, ncclInt, 0, comm, stream_comm));
CUDACHECK(hipEventRecord(comm_done, stream_comm)); // 2. 通信完成时记录 event
CUDACHECK(hipStreamWaitEvent(stream_comp, comm_done, 0));
compute_kernel<<<(MSG_SIZE+255)/256, 256, 0, stream_comp>>>(d_buf, MSG_SIZE);
}
// 同步所有 stream
CUDACHECK(hipStreamSynchronize(stream_comm));
CUDACHECK(hipStreamSynchronize(stream_comp));
CUDACHECK(hipEventRecord(stop));
CUDACHECK(hipEventSynchronize(stop));
float ms;
CUDACHECK(hipEventElapsedTime(&ms, start, stop));
printf("Rank %d: Total time = %.2f ms\n", rank, ms);
// 查看部分数据内容
if (rank == 1) {
int* h_out = new int[5];
CUDACHECK(hipMemcpy(h_out, d_buf, 5 * sizeof(int), hipMemcpyDeviceToHost));
printf("Rank 1: first 5 received and computed values: ");
for (int i = 0; i < 5; ++i) printf("%d ", h_out[i]);
printf("\n");
delete[] h_out;
}
// 清理
CUDACHECK(hipFree(d_buf));
CUDACHECK(hipStreamDestroy(stream_comm));
CUDACHECK(hipStreamDestroy(stream_comp));
NCCLCHECK(ncclCommDestroy(comm));
MPI_Finalize();
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment