Commit 571a75b5 authored by lishen's avatar lishen
Browse files

完成全部网络的node建立,以及GPU到GPU的path物理路径搜索

parent 379c4128
......@@ -27,40 +27,6 @@ using namespace sccl;
typedef class sccl::hardware::net::ipc_socket::scclIpcSocket scclIpcSocket_t;
int ipcSendRecvFd_nrank2(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int dst_hash = 12345;
scclIpcSocket_t ipcsocket(rank, dst_hash);
if(rank == 0) {
// 进程 0: 打开文件并发送文件描述符
int fd = open("testfile.txt", O_RDONLY);
if(fd < 0) {
perror("Failed to open file");
MPI_Abort(MPI_COMM_WORLD, 1);
}
ipcsocket.scclIpcSocketSendFd(fd, 1, 12345); // 假设 dst_hash 为 12345
close(fd);
} else if(rank == 1) {
// 进程 1: 接收文件描述符并读取文件内容
int fd;
ipcsocket.scclIpcSocketRecvFd(&fd);
char buffer[256];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if(n > 0) {
buffer[n] = '\0';
printf("Process %d received: %s\n", rank, buffer);
}
close(fd);
}
MPI_Finalize();
return 0;
}
int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
int rank, size;
......@@ -68,39 +34,58 @@ int main(int argc, char* argv[]) {
MPI_Comm_size(MPI_COMM_WORLD, &size);
int dst_hash = 12345;
scclIpcSocket_t ipcsocket(rank, dst_hash);
scclIpcSocket_t ipcsocket(rank, size, dst_hash);
int fd;
if(rank == 0) {
// 进程 0: 打开文件并发送文件描述符给所有其他进程
int fd = open("testfile.txt", O_RDONLY);
fd = open("testfile.txt", O_RDONLY);
if(fd < 0) {
perror("Failed to open file");
MPI_Abort(MPI_COMM_WORLD, 1);
}
for(int i = 1; i < size; ++i) {
if(ipcsocket.scclIpcSocketSendFd(fd, i, dst_hash) != scclSuccess) {
if(ipcsocket.scclIpcSocketSendFd(fd, i) != scclSuccess) {
perror("Failed to send file descriptor");
close(fd);
MPI_Abort(MPI_COMM_WORLD, 1);
}
lseek(fd, 0, SEEK_SET);
}
close(fd);
} else {
// 其他进程: 接收文件描述符并读取文件内容
int fd;
if(ipcsocket.scclIpcSocketRecvFd(&fd) < 0) {
perror("Failed to receive file descriptor");
MPI_Abort(MPI_COMM_WORLD, 1);
}
// lseek(fd, 0, SEEK_SET); // 重置文件偏移量到文件开头
printf("11 rank %d received fd %d\n", rank, fd);
char buffer[256];
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLIN;
int pollResult = poll(&pfd, 1, -1); // 无限等待
printf("pollResult=%d, rank=%d\n", pollResult, rank);
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if(n > 0) {
buffer[n] = '\0';
printf("Process %d received: %s\n", rank, buffer);
lseek(fd, 0, SEEK_SET); // 重置文件偏移量到文件开头
}
close(fd);
printf("n=%zd, rank=%d\n", n, rank);
/////////////////////
// 注意,fd会有抢占,同一时间只能有一个进程读取
/////////////////////
}
// if(fd >= 0) {
// close(fd);
// }
MPI_Finalize();
return 0;
}
......
......@@ -13,63 +13,35 @@ using namespace sccl;
typedef class sccl::hardware::net::ipc_socket::scclIpcSocket scclIpcSocket_t;
template <typename T>
void send_data(T* ipcsocket, const void* data, size_t dataLen, int dst_rank) {
if(ipcsocket->scclIpcSocketSendData(data, dataLen, dst_rank) != scclSuccess) {
perror("Failed to send data");
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
template <typename T>
void recv_data(T* ipcsocket, void* buffer, size_t bufferLen, size_t* receivedLen) {
if(ipcsocket->scclIpcSocketRecvData(buffer, bufferLen, receivedLen) != scclSuccess) {
perror("Failed to receive data");
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
template <typename T>
int test_allgather_ver1(T* ipcsocket, int rank, int size) {
int sendDataLen = 256;
std::vector<char> sendData(sendDataLen);
std::vector<char> recvData(size * sendDataLen);
size_t receivedLen;
int test_allgather(T* ipcsocket, int rank, int size, int dataLen = 64 * 1024, int num_iterations = 1) {
std::vector<char> sendData(dataLen);
std::vector<char> recvData(size * dataLen);
// 填充发送数据
snprintf(sendData.data(), sendData.size(), "Data from process %d", rank);
printf("test_allgather dataLen=%d, sendData.size()=%zu\n", dataLen, sendData.size());
auto pthpool = ThreadPool(size * 2);
std::vector<double> elapsed_times; // 用于存储每次执行的耗时
// 发送数据给所有其他进程
for(int i = 0; i < size; ++i) {
if(i != rank) {
auto task_send = std::bind(send_data<scclIpcSocket_t>, ipcsocket, sendData.data(), sendData.size(), i);
pthpool.enqueue(task_send);
// 开始计时
auto start = std::chrono::high_resolution_clock::now();
auto task_recv = std::bind(recv_data<scclIpcSocket_t>, ipcsocket, recvData.data() + i * sendDataLen, sendDataLen, &receivedLen);
pthpool.enqueue(task_recv);
}
// 调用 Allgather 函数
for(int i = 0; i < num_iterations; ++i) {
SCCLCHECK(ipcsocket->scclIpcSocketAllgather(sendData.data(), recvData.data(), dataLen));
}
printf("sendData.size()=%d, receivedLen=%d\n", sendDataLen, int(receivedLen));
// 打印接收到的数据
for(int i = 0; i < size; ++i) {
printf("Process %d received from process %d: %s\n", rank, i, recvData.data() + i * 256);
}
// 结束计时
auto end = std::chrono::high_resolution_clock::now();
return 0;
}
// 所有进程在此处等待,直到所有进程都到达这一点
MPI_Barrier(MPI_COMM_WORLD);
template <typename T>
int test_allgather_ver2(T* ipcsocket, int rank, int size) {
int sendDataLen = 256;
std::vector<char> sendData(sendDataLen);
std::vector<char> recvData(size * sendDataLen);
// 计算并存储每个进程的计时结果
std::chrono::duration<double> elapsed = end - start;
// 填充发送数据
snprintf(sendData.data(), sendData.size(), "Data from process %d", rank);
SCCLCHECK(ipcsocket->scclIpcSocketAllgatherSync(sendData.data(), recvData.data(), sendData.size(), /*wait*/ true));
auto average_time = elapsed.count() * 1e6 / num_iterations; // 转换为微秒
printf("rank %d: Average time for Allgather over %d iterations: %f us.\n", rank, num_iterations, average_time);
// 打印接收到的数据
for(int i = 0; i < size; ++i) {
......@@ -80,67 +52,96 @@ int test_allgather_ver2(T* ipcsocket, int rank, int size) {
}
template <typename T>
int test_allgather_ver3(T* ipcsocket, int rank, int size) {
int sendDataLen = 256;
std::vector<char> sendData(sendDataLen);
std::vector<char> recvData(size * sendDataLen);
// 填充发送数据
snprintf(sendData.data(), sendData.size(), "Data from process %d", rank);
SCCLCHECK(ipcsocket->scclIpcSocketAllgather(sendData.data(), recvData.data(), sendData.size()));
// 打印接收到的数据
for(int i = 0; i < size; ++i) {
printf("rank %d received from process %d: %s\n", rank, i, recvData.data() + i * sendData.size());
int test_broadcast(T* ipcsocket, int rank, int size, int dataLen = 64 * 1024, int num_iterations = 1) {
std::vector<char> data(dataLen);
int root = 0; // 假设 rank 0 是根进程
if(rank == root) {
// 仅根进程填充发送数据
snprintf(data.data(), data.size(), "Data from root process %d", rank);
}
printf("rank=%d, data.size()=%zu\n", rank, data.size());
return 0;
}
std::vector<double> elapsed_times; // 用于存储每次执行的耗时
template <typename T>
int test_broadcast_ver1(T* ipcsocket, int rank, int size) {
int sendDataLen = 256;
std::vector<char> sendData(sendDataLen);
std::vector<char> recvData(sendDataLen);
int root = 0; // 假设 rank 0 是根进程
// 开始计时
auto start = std::chrono::high_resolution_clock::now();
if(rank == root) {
// 仅根进程填充发送数据
snprintf(sendData.data(), sendData.size(), "Data from root process %d", rank);
for(int i = 0; i < num_iterations; ++i) {
SCCLCHECK(ipcsocket->scclIpcSocketBroadcast(data.data(), data.size(), root));
}
SCCLCHECK(ipcsocket->scclIpcSocketBroadcast(sendData.data(), recvData.data(), sendData.size(), root, /*wait*/ true));
// 结束计时
auto end = std::chrono::high_resolution_clock::now();
// 打印接收到的数据
printf("rank %d received: %s\n", rank, recvData.data());
// 所有进程在此处等待,直到所有进程都到达这一点
MPI_Barrier(MPI_COMM_WORLD);
// 计算并存储每个进程的计时结果
std::chrono::duration<double> elapsed = end - start;
auto average_time = elapsed.count() * 1e6 / num_iterations; // 转换为微秒
printf("rank %d: data=%s, Average time for scclIpcSocketBroadcast over %d iterations: %f us.\n", rank, (char*)(data.data()), num_iterations, average_time);
return 0;
}
int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
int dst_hash = 12345;
int dst_hash = 654321;
scclIpcSocket_t* ipcsocket = new scclIpcSocket_t(rank, size, dst_hash);
// test_allgather_ver1(ipcsocket, rank, size);
// test_allgather_ver2(ipcsocket, rank, size);
// test_allgather_ver3(ipcsocket, rank, size);
test_broadcast_ver1(ipcsocket, rank, size);
// 默认参数
std::string test_type = "allgather";
int dataLen = 64 * 1024;
int num_iterations = 1;
// 解析命令行参数
for(int i = 1; i < argc; ++i) {
std::istringstream iss(argv[i]);
std::string arg;
iss >> arg;
if(arg == "--test-type") {
if(++i < argc) {
test_type = argv[i];
}
} else if(arg == "--data-len") {
if(++i < argc) {
iss.clear();
iss.str(argv[i]);
iss >> dataLen;
}
} else if(arg == "--num-iterations") {
if(++i < argc) {
iss.clear();
iss.str(argv[i]);
iss >> num_iterations;
}
}
}
std::this_thread::sleep_for(std::chrono::seconds(10));
// while(!ipcsocket->getPthreadPool()->allTasksCompleted()) {}
// printf("delete ipcsocket... rank=%d\n", rank);
if(test_type == "allgather") {
test_allgather(ipcsocket, rank, size, dataLen, num_iterations);
} else if(test_type == "broadcast") {
test_broadcast(ipcsocket, rank, size, dataLen, num_iterations);
} else {
if(rank == 0) {
std::cerr << "Unknown test type: " << test_type << std::endl;
}
}
delete(ipcsocket);
delete ipcsocket;
MPI_Finalize();
return 0;
}
/*
单机执行
SCCL_DEBUG_LEVEL=ABORT SCCL_DEBUG_SUBSYS=BOOTSTRAP mpirun --allow-run-as-root -np 8 3_socket_mpi_data
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=GRAPH mpirun --allow-run-as-root -np 8 3_socket_mpi_data
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=GRAPH mpirun --allow-run-as-root -np 4 3_socket_mpi_data
*/
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <vector> // 引入vector库
#include <thread> // 为了使用 std::this_thread::sleep_for
#include "mpi.h"
#include "net.h"
#include "ipc_socket.h"
#include "thread_pool.h"
using namespace sccl;
typedef class sccl::hardware::net::ipc_socket::scclIpcSocket scclIpcSocket_t;
template <typename T>
void send_data(T* ipcsocket, const void* data, size_t dataLen, int dst_rank, uint64_t dst_hash) {
if(ipcsocket->scclIpcSocketSendData(data, dataLen, dst_rank, dst_hash) != scclSuccess) {
perror("Failed to send data");
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
template <typename T>
void recv_data(T* ipcsocket, void* buffer, size_t bufferLen, size_t* receivedLen) {
if(ipcsocket->scclIpcSocketRecvData(buffer, bufferLen, receivedLen) != scclSuccess) {
perror("Failed to receive data");
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
int dst_hash = 12345;
scclIpcSocket_t ipcsocket(rank, dst_hash);
int sendDataLen = 256;
std::vector<char> sendData(sendDataLen);
std::vector<char> recvData(size * sendDataLen);
size_t receivedLen;
// 填充发送数据
snprintf(sendData.data(), sendData.size(), "Data from process %d", rank);
auto pthpool = ThreadPool(size * 2);
// 发送数据给所有其他进程
for(int i = 0; i < size; ++i) {
if(i != rank) {
auto task_send = std::bind(send_data<scclIpcSocket_t>, &ipcsocket, sendData.data(), sendData.size(), i, dst_hash);
pthpool.enqueue(task_send);
auto task_recv = std::bind(recv_data<scclIpcSocket_t>, &ipcsocket, recvData.data() + i * sendDataLen, sendDataLen, &receivedLen);
pthpool.enqueue(task_recv);
}
}
printf("sendData.size()=%d, receivedLen=%d\n", sendDataLen, int(receivedLen));
std::this_thread::sleep_for(std::chrono::seconds(2));
// 打印接收到的数据
for(int i = 0; i < size; ++i) {
printf("Process %d received from process %d: %s\n", rank, i, recvData.data() + i * 256);
}
MPI_Finalize();
return 0;
}
/*
单机执行
SCCL_DEBUG_LEVEL=ABORT SCCL_DEBUG_SUBSYS=BOOTSTRAP mpirun --allow-run-as-root -np 8 3_socket_mpi_data
*/
#include <sys/sysinfo.h>
#include <iostream>
int main() {
struct sysinfo info;
if(sysinfo(&info) == 0) {
std::cout << "Uptime: " << info.uptime << std::endl;
std::cout << "Total RAM: " << info.totalram << std::endl;
std::cout << "Free RAM: " << info.freeram << std::endl;
// 输出更多信息...
} else {
std::cerr << "Failed to get system information." << std::endl;
}
return 0;
}
hipcc /public/home/lishen/Code/rocSHMEM/SCCL_v1/examples/2_topo/1_demo_rocm/test_rocm_smi.cpp \
hipcc 1_test_rocm_smi.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/rocm_smi_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/topo_utils.cpp \
-o test_topo \
-o 1_test_rocm_smi \
-std=c++17 -g -O3 -fopenmp -D__HIP_PLATFORM_HCC__ \
-I ./ -I /usr/include -I /opt/dtk/include \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/include \
......
hipcc 2_test_pci_info.cpp \
-o 2_test_pci_info \
-std=c++17 -g -O3 -fopenmp -D__HIP_PLATFORM_HCC__ \
-I ./ -I /usr/include -I /opt/dtk/include \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/include \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/topo \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/ \
-L /usr/lib/x86_64-linux-gnu \
-L /usr/lib/ \
-lamdhip64 -lrocm_smi64
\ No newline at end of file
......@@ -11,9 +11,6 @@ using namespace sccl;
int main(int argc, char* argv[]) {
int rank, nranks;
int tag1, src, dst, cnt;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
......@@ -21,28 +18,28 @@ int main(int argc, char* argv[]) {
printf("rank=%d, nranks=%d\n", rank, nranks);
// ----------------------------------------------------------------------- //
// // ----------------------------------------------------------------------- //
INFO(SCCL_LOG_TOPO, "Bootstrap ...\n");
struct scclRankInfo* rank_info;
struct sccl::hardware::topology::bootstrap::scclBootstrapComm* comm;
// INFO(SCCL_LOG_TOPO, "Bootstrap ...\n");
// scclRankInfo_t* rank_info;
// struct sccl::hardware::topology::bootstrap::BootstrapComm* comm;
SCCLCHECK(scclCalloc(&rank_info, 1));
SCCLCHECK(scclCalloc(&comm, 1));
// SCCLCHECK(scclCalloc(&rank_info, 1));
// SCCLCHECK(scclCalloc(&comm, 1));
rank_info->rank = rank;
rank_info->nRanks = nranks;
rank_info->localRanks = 2;
rank_info->hipDev = rank % rank_info->localRanks;
// rank_info->rank = rank;
// rank_info->nRanks = nranks;
// rank_info->localRanks = 2;
// rank_info->hipDev = rank % rank_info->localRanks;
auto sccl_bootstrap = new sccl::hardware::topology::bootstrap::scclBootstrap(rank_info, comm);
SCCLCHECK(sccl_bootstrap->bootstrapInitCheck());
// auto sccl_bootstrap = new sccl::hardware::topology::bootstrap::Bootstrap(rank_info, comm);
// SCCLCHECK(sccl_bootstrap->bootstrapInitCheck());
sccl::hardware::topology::bootstrap::printUniqueInfo(comm->unique_info);
// sccl::hardware::topology::bootstrap::printUniqueInfo(comm->unique_info);
int cuda_id;
HIPCHECK(hipGetDevice(&cuda_id));
printf("rank=%d, cuda_id=%d\n", rank, cuda_id);
// int cuda_id;
// HIPCHECK(hipGetDevice(&cuda_id));
// printf("rank=%d, cuda_id=%d\n", rank, cuda_id);
MPI_Finalize();
}
......
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <mpi.h>
#include "bootstrap.h"
#include "hardware.h"
using namespace sccl;
typedef sccl::hardware::topology::bootstrap::scclUniqueId scclUniqueId;
typedef sccl::hardware::topology::bootstrap::BootstrapHandle_t BootstrapHandle_t;
typedef sccl::hardware::topology::bootstrap::Bootstrap Bootstrap;
// 全局变量
struct sccl::hardware::topology::bootstrap::BootstrapComm bootstrap_comm;
scclResult_t sccl_init_step1(const scclUniqueId* unique_id, int rank, int nRanks) {
// -------------------------- 1.获取0号rank的地址信息 ----------------------------------- //
auto root_handle = reinterpret_cast<const BootstrapHandle_t*>(unique_id);
EQCHECK(root_handle->magic, 0); // 检查handle是否已经更新
// -------------------------- 2.初始化获取所有节点的node信息 ----------------------------------- //
auto sccl_bootstrap = std::make_unique<Bootstrap>(root_handle, rank, nRanks);
SCCLCHECK(sccl_bootstrap->init(&bootstrap_comm));
return scclSuccess;
}
constexpr int topoNodeMaxNeighbors = 16;
typedef struct topoNode {
uint64_t id; // 图点id标志
int type; // 图点类型
int numaId; // 节点id
char busIdStr[17] = ""; // 总线ID字符串 "00000000:00:00.0"
int speed; // 速度
int width; // 带宽
char cpuAffinity[36] = ""; // cpu的affinity
std::array<uint64_t, topoNodeMaxNeighbors> neighbors; // 邻居图点
size_t neighborCount; // 邻居图点的数量
} topoNode_t;
int main(int argc, char* argv[]) {
// -------------------------- 1.启动MPI ----------------------------------- //
MPI_Init(&argc, &argv);
int rank, nRanks;
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
printf("rank=%d, nRanks=%d\n", rank, nRanks);
int nLocalRanks = 2;
BootstrapHandle_t uqid;
printf("uqid size=%lu\n", sizeof(uqid));
sccl::hardware::topology::bootstrap::scclRankInfo_t rankinfo;
sccl::hardware::topology::bootstrap::scclNodeInfo_t nodeinfo(nLocalRanks);
topoNode_t topo_node;
printf("rankinfo size=%lu\n", sizeof(rankinfo));
printf("rankinfo cpu size=%lu\n", sizeof(rankinfo.cpu));
printf("rankinfo gpu size=%lu\n", sizeof(rankinfo.gpu));
printf("rankinfo net size=%lu\n", sizeof(rankinfo.net));
printf("nodeinfo size=%lu, stu size=%d\n", sizeof(nodeinfo), nodeinfo.size);
printf("topo_node size=%lu\n", sizeof(topo_node));
// -------------------------- 2.获取节点unique_id,主要是socket地址 ----------------------------------- //
scclUniqueId unique_id;
if(rank == 0) {
SCCLCHECK(sccl::hardware::scclGetUniqueId(&unique_id));
}
MPI_Bcast(&unique_id, sizeof(scclUniqueId), MPI_BYTE, 0, MPI_COMM_WORLD);
// -------------------------- 3.基于unique_id的整合结果初始化 ----------------------------------- //
sccl_init_step1(&unique_id, rank, nRanks);
int cuda_id;
HIPCHECK(hipGetDevice(&cuda_id));
printf("rank=%d, cuda_id=%d\n", rank, cuda_id);
MPI_Barrier(MPI_COMM_WORLD);
SCCLCHECK(sccl::hardware::sccl_finalize());
MPI_Finalize();
}
/*
单机执行
SCCL_DEBUG_LEVEL=ABORT mpirun --allow-run-as-root -np 4 2_mpi_init_mpi_init_step1_bootstrap
SCCL_DEBUG_LEVEL=INFO SCCL_DEBUG_SUBSYS=ALL mpirun --allow-run-as-root -np 2 2_mpi_init_mpi_init_step1_bootstrap
跨机执行
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=BOOTSTRAP mpirun --allow-run-as-root --hostfile hostfile2 -np 4 ./2_mpi_init_mpi_init_step1_bootstrap
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=BOOTSTRAP mpirun --allow-run-as-root --hostfile hostfile -np 16 ./2_mpi_init_mpi_init_step1_bootstrap
*/
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <mpi.h>
#include "bootstrap.h"
#include "hardware.h"
using namespace sccl;
int main(int argc, char* argv[]) {
// -------------------------- 1.启动MPI ----------------------------------- //
MPI_Init(&argc, &argv);
int rank, nRanks;
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
printf("rank=%d, nRanks=%d\n", rank, nRanks);
sccl::hardware::topology::bootstrap::BootstrapHandle_t uqid;
sccl::hardware::topology::bootstrap::scclRankInfo_t rankinfo;
sccl::hardware::topology::bootstrap::scclNodeInfo_t nodeinfo(/*nLocalRanks*/ 2);
printf("rankinfo size=%lu\n", sizeof(rankinfo));
printf("rankinfo cpu size=%lu\n", sizeof(rankinfo.cpu));
printf("rankinfo gpu size=%lu\n", sizeof(rankinfo.gpu));
printf("rankinfo net size=%lu\n", sizeof(rankinfo.net));
printf("nodeinfo size=%lu, stu size=%d\n", sizeof(nodeinfo), nodeinfo.totalByteSize);
// topoNode_t topo_node;
// printf("topo_node size=%lu\n", sizeof(topo_node));
// -------------------------- 2.获取节点unique_id,主要是socket地址 ----------------------------------- //
typedef sccl::hardware::topology::bootstrap::scclUniqueId scclUniqueId;
scclUniqueId unique_id;
if(rank == 0) {
SCCLCHECK(sccl::hardware::scclGetUniqueId(&unique_id));
}
MPI_Bcast(&unique_id, sizeof(scclUniqueId), MPI_BYTE, 0, MPI_COMM_WORLD);
// -------------------------- 3.基于unique_id的整合结果初始化 ----------------------------------- //
SCCLCHECK(sccl::hardware::sccl_init(&unique_id, rank, nRanks));
// int cuda_id;
// HIPCHECK(hipGetDevice(&cuda_id));
// printf("rank=%d, cuda_id=%d\n", rank, cuda_id);
// MPI_Barrier(MPI_COMM_WORLD);
SCCLCHECK(sccl::hardware::sccl_finalize());
MPI_Finalize();
}
/*
单机执行
SCCL_DEBUG_LEVEL=WARN mpirun --allow-run-as-root -np 4 3_mpi_init_mpi_init_step2_graph
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=ALL mpirun --allow-run-as-root -np 2 3_mpi_init_mpi_init_step2_graph
跨机执行
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=GRAPH mpirun --allow-run-as-root --hostfile hostfile2 -np 2 ./3_mpi_init_mpi_init_step2_graph
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=GRAPH mpirun --allow-run-as-root --hostfile hostfile2 -np 4 ./3_mpi_init_mpi_init_step2_graph
SCCL_DEBUG_LEVEL=WARN SCCL_DEBUG_SUBSYS=BOOTSTRAP mpirun --allow-run-as-root --hostfile hostfile -np 16 ./3_mpi_init_mpi_init_step2_graph
*/
......@@ -10,6 +10,8 @@ hipcc ./1_mpi_init.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/rocm_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/mpi/mpiwrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/mpi/mpisymbols.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/rocm_smi_wrap.cpp \
......@@ -31,6 +33,8 @@ hipcc ./1_mpi_init.cpp \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ipc_socket/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/mpi \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/mpi \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/ \
-L /public/home/lishen/Code/rocSHMEM/SCCL_v1 \
......
hipcc ./2_mpi_init_mpi_init_step1_bootstrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/hardware_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ibvsymbols.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ibvwrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/net_ib.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/net_socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ipc_socket/ipc_socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/rocm_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/rocm_smi_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/physical_links.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/topo_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/archinfo.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/param.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/hardware.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/thread_pool.cpp \
-o 2_mpi_init_mpi_init_step1_bootstrap \
-std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -Wno-return-type \
-I ./ -I /usr/include -I /opt/dtk/include \
-I /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/include/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/include/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ipc_socket/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/graph/ \
-L /public/home/lishen/Code/rocSHMEM/SCCL_v1 \
-L /opt/dtk/lib -lamdhip64 -lrocm-core -lrocm_smi64 -pthread \
-L /usr/lib/x86_64-linux-gnu -libverbs -lrdmacm \
-L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/lib -lmpi
# # \
# # -L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ucx/lib -lucs -lucp -luct -lucm
# # export HSA_FORCE_FINE_GRAIN_PCIE="1"
# # export iommu=pt
# hipcc ./2_mpi_init_mpi_init_step1_bootstrap.cpp \
# -o 2_mpi_init_mpi_init_step1_bootstrap \
# -std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -Wno-return-type \
# -I ./ -I /usr/include -I /opt/dtk/include \
# -I /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/include/ \
# -L /usr/lib/x86_64-linux-gnu -libverbs -lrdmacm \
# -L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/lib -lmpi \
# -L /opt/dtk/lib -lamdhip64 -lrocm-core -lrocm_smi64 -pthread
hipcc ./3_mpi_init_mpi_init_step2_graph.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/hardware_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ibvsymbols.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ibvwrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/net_ib.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/net_socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ipc_socket/ipc_socket.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/rocm_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_net.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/rocm_smi_wrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/bootstrap.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/physical_links.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/topo_utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/archinfo.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/param.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/utils.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/hardware.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/thread_pool.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/graph/graph.cpp \
/public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/graph/paths.cpp \
-o 3_mpi_init_mpi_init_step2_graph \
-std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -Wno-return-type \
-I ./ -I /usr/include -I /opt/dtk/include \
-I /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/include/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/utils/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/include/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_ib/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/net_socket/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ipc_socket/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/net/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/bootstrap/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/ \
-I /public/home/lishen/Code/rocSHMEM/SCCL_v1/src/hardware/topology/graph/ \
-L /public/home/lishen/Code/rocSHMEM/SCCL_v1 \
-L /opt/dtk/lib -lamdhip64 -lrocm-core -lrocm_smi64 -pthread \
-L /usr/lib/x86_64-linux-gnu -libverbs -lrdmacm \
-L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/lib -lmpi
# # \
# # -L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ucx/lib -lucs -lucp -luct -lucm
# # export HSA_FORCE_FINE_GRAIN_PCIE="1"
# # export iommu=pt
# hipcc ./3_mpi_init_mpi_init_step2_graph.cpp \
# -o 3_mpi_init_mpi_init_step2_graph \
# -std=c++17 -g -O3 -fopenmp -DROC_SHMEM -D__HIP_PLATFORM_HCC__ -Wno-return-type \
# -I ./ -I /usr/include -I /opt/dtk/include \
# -I /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/include/ \
# -L /usr/lib/x86_64-linux-gnu -libverbs -lrdmacm \
# -L /public/home/lishen/Code/rocSHMEM/3rd_party/install/ompi/lib -lmpi \
# -L /opt/dtk/lib -lamdhip64 -lrocm-core -lrocm_smi64 -pthread
node037 slots=2
node038 slots=2
\ No newline at end of file
node038 slots=2
......@@ -5,75 +5,41 @@
#include "base.h"
#include "hardware_utils.h"
#include "bootstrap.h"
#include "graph.h"
#include "hardware.h"
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
// 全局变量,全部节点的信息
struct BootstrapComm bootstrap_comm;
sccl::hardware::topology::bootstrap::BootstrapComm_t bootstrap_comm;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclGetUniqueId(scclUniqueId* unique_id) {
auto handle = reinterpret_cast<struct BootstrapHandle*>(unique_id);
NEQCHECK(sizeof(struct BootstrapHandle), SCCL_UNIQUE_ID_BYTES);
SCCLCHECK(bootstrapGetUniqueId(handle));
auto handle = reinterpret_cast<BootstrapHandle_t*>(unique_id);
NEQCHECK(sizeof(BootstrapHandle_t), SCCL_UNIQUE_ID_BYTES);
SCCLCHECK(topology::bootstrap::bootstrapGetUniqueId(handle));
return scclSuccess;
}
scclResult_t sccl_init(const scclUniqueId* unique_id, int rank, int nRanks) {
// -------------------------- 1.获取0号rank的地址信息 ----------------------------------- //
auto root_handle = reinterpret_cast<const struct BootstrapHandle*>(unique_id);
auto root_handle = reinterpret_cast<const BootstrapHandle_t*>(unique_id);
EQCHECK(root_handle->magic, 0); // 检查handle是否已经更新
// -------------------------- 2.初始化获取所有节点的node信息 ----------------------------------- //
auto sccl_bootstrap = std::make_unique<Bootstrap>(root_handle, rank, nRanks);
auto sccl_bootstrap = std::make_unique<topology::bootstrap::Bootstrap>(root_handle, rank, nRanks);
SCCLCHECK(sccl_bootstrap->init(&bootstrap_comm));
// // -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// auto unique_ids_chr = reinterpret_cast<const char*>(unique_ids);
// -------------------------- 3.MPI 建图 ----------------------------------- //
auto sccl_graph = std::make_unique<topology::graph::Graph>(rank, nRanks);
printf("init pos 2\n");
// 计算通信路径
sccl_graph->calculateCommunicationPaths(&bootstrap_comm);
printf("init pos 3\n");
// // -------------------------- 3.MPI allgather设置unique_id的整合 ----------------------------------- //
// std::vector<scclUniqueId> unique_id_vec(nRanks);
// MPI_Allgather(&unique_id, sizeof(scclUniqueId), MPI_BYTE, &unique_id_vec[0], sizeof(scclUniqueId), MPI_BYTE, MPI_COMM_WORLD);
// for(int i = 0; i < nRanks; ++i) {
// auto root_handle = reinterpret_cast<const struct BootstrapHandle*>(unique_ids_chr + i * sizeof(struct BootstrapHandle));
// printf("rank=%d, i=%d, unique_ids hosthash=%lu\n", root_handle->rank, i, root_handle->hostHash);
// }
// ByteSpan<struct BootstrapHandle> unique_ids_span(unique_ids_chr, nRanks * sizeof(struct BootstrapHandle));
// // -------------------------- 2.设置基础信息 ----------------------------------- //
// INFO(SCCL_LOG_TOPO, "Bootstrap ...\n");
// struct scclRankInfo rank_info;
// rank_info.rank = rank;
// rank_info.nRanks = nRanks;
// // 在每个进程中设置 root_handle 的值
// root_handle.rank = rank_info->rank;
// root_handle.hostHash = getHostHash();
// scclSocketAddress_t localSocketAddr = sccl_bootstrap->getLocalSocketAddr();
// memcpy(&root_handle.addr, &localSocketAddr, sizeof(scclSocketAddress_t));
// #if 1
// char line[100];
// sprintf(line, "pos 55: rank=%d", rank);
// SCCLCHECK(hardware::net::printSocketAddr(&root_handle.addr, line));
// printf("root_handle.hostHash rank=%d, hash=%lu\n", rank, root_handle.hostHash);
// #endif
// // -------------------------- 3.收集所有进程的 root_handle 信息 ----------------------------------- //
// std::vector<char> recvBuffer(nRanks * sendBuffer.size());
// SCCLCHECK(mpi::wrap_mpi_allgather(sendBuffer.data(), sendBuffer.size(), MPI_BYTE, recvBuffer.data(), sendBuffer.size(), MPI_BYTE, MPI_COMM_WORLD));
// -------------------------- 4.设置各个节点的基础信息 ----------------------------------- //
// SCCLCHECK(sccl_bootstrap->bootstrapInit(rank_info, recvBuffer.data()));
// -------------------------- 5.根据各个节点的基础信息计算topo结果 ----------------------------------- //
......@@ -84,14 +50,12 @@ scclResult_t sccl_finalize() {
// 设置一些全局变量的重置和销毁
// 设置socket等硬件监听的关闭
// void BootstrapComm::destroy() {
if(bootstrap_comm.nRanks > 0) {
bootstrap_comm.destroy();
}
// if(bootstrap_comm.nRanks > 0) {
// bootstrap_comm.destroy();
// }
return scclSuccess;
}
} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl
......@@ -6,15 +6,14 @@
namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {
typedef topology::bootstrap::scclUniqueId scclUniqueId;
typedef topology::bootstrap::BootstrapHandle_t BootstrapHandle_t;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclGetUniqueId(scclUniqueId* unique_id);
scclResult_t sccl_init(const scclUniqueId* unique_id, int rank, int nRanks);
scclResult_t sccl_finalize();
} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl
......@@ -6,9 +6,8 @@
namespace sccl {
namespace hardware {
namespace ops {
////
} // namespace ops
// 实现类似于std::span的功能,将字节数组转换为类型数组
} // namespace hardware
} // namespace sccl
......@@ -37,9 +37,10 @@ failure:
}
scclIpcSocket::~scclIpcSocket() {
printf("scclIpcSocket 析构函数 localRank=%d\n", localRank);
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
usleep(100); // 每1毫秒检查一次任务完成状态
}
// 释放pthpool
......@@ -311,318 +312,121 @@ scclResult_t scclIpcSocket::scclIpcSocketRecvFd(int* recvFd) {
return scclSuccess;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 通过IPC socket接收文件描述符
*
* 该函数使用recvmsg系统调用从socket接收文件描述符。函数会循环尝试接收,
* 直到成功或发生错误。接收到的文件描述符会通过参数recvFd返回。
*
* @param recvFd 用于存储接收到的文件描述符的指针
* @return scclResult_t 返回操作结果:
* - scclSuccess: 成功接收文件描述符
* - scclSystemError: 系统调用失败
* - scclInternalError: 操作被中止
*
* @note 函数会处理EAGAIN、EWOULDBLOCK和EINTR错误,其他错误会导致返回失败。
* 接收到的控制消息必须符合SOL_SOCKET级别和SCM_RIGHTS类型。
*/
scclResult_t scclIpcSocket::scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank) {
// 构造目标地址字符串
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
int len;
SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len));
// 设置消息结构体
struct msghdr msg;
struct iovec iov[1];
struct sockaddr_un cliaddr;
bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strncpy(cliaddr.sun_path, temp_addr, len);
cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
iov[0].iov_base = (void*)data;
iov[0].iov_len = dataLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
// 使用 poll 等待 socket 可写
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLOUT;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
ssize_t sendResult;
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno);
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag)
return scclInternalError;
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zd bytes of data through UDS socket %s", sendResult, temp_addr);
return scclSuccess;
}
/**
* @brief 通过IPC套接字发送数据到指定目标rank
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置)
* - scclSystemError: 系统调用错误(如poll超时或sendmsg失败)
*
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen) {
// 设置消息结构体
struct msghdr msg = {0};
struct iovec iov[1];
iov[0].iov_base = buffer;
iov[0].iov_len = bufferLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
// 使用 poll 等待 socket 可读
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLIN;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
int ret;
while(true) {
ret = recvmsg(handle->fd, &msg, 0);
if(ret > 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %d bytes of data from socket %s", ret, handle->socketName);
*receivedLen = ret;
return scclSuccess;
} else if(ret == 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Connection closed by peer on socket %s", handle->socketName);
*receivedLen = 0;
return scclSuccess;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
} else {
WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno);
return scclSystemError;
}
}
}
}
/**
* 通过IPC Socket发送数据并等待确认
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclSuccess 发送成功,其他错误码表示失败
*
* 该函数会将数据分块发送(CHUNK_SIZE大小),每发送一个数据块后
* 会等待接收方返回ACK确认。如果收到非预期的ACK或发送/接收失败,
* 会立即返回错误。所有数据成功发送并收到正确ACK后返回成功。
*/
scclResult_t scclIpcSocket::scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank) {
const char* dataPtr = static_cast<const char*>(data);
const char* dataPtr = reinterpret_cast<const char*>(data);
size_t bytesSent = 0;
while(bytesSent < dataLen) {
size_t bytesToSend = std::min(CHUNK_SIZE, dataLen - bytesSent);
// 发送数据块
scclResult_t sendResult = scclIpcSocketSendData(dataPtr + bytesSent, bytesToSend, dst_rank);
scclResult_t sendResult = scclIpcSocketSendDataAndRank(dataPtr + bytesSent, bytesToSend, dst_rank);
if(sendResult != scclSuccess) {
return sendResult;
}
// 等待接收方的ACK
char ack[ACK_SIZE];
size_t receivedLen;
scclResult_t recvResult = scclIpcSocketRecvData(ack, sizeof(ack), &receivedLen);
if(recvResult != scclSuccess) {
return recvResult;
}
// 检查是否是预期的ack
char target_ack[ACK_SIZE];
sprintf(target_ack, "ACK-%d", localRank);
if(strcmp(ack, target_ack) != 0) {
WARN("UDS: Received unexpected ACK: %s", ack);
return scclSystemError;
}
bytesSent += bytesToSend;
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zu bytes of data with ACK through UDS socket", dataLen);
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zu bytes of data in chunks through UDS socket", dataLen);
return scclSuccess;
}
/**
* 通过IPC Socket接收数据并发送ACK确认
*
* @param buffer 接收数据缓冲区指针
* @param bufferLen 缓冲区总长度
* @param receivedLen 实际接收到的数据长度(输出参数)
* @param src_rank 发送方rank号
* @return scclSuccess表示成功,其他错误码表示失败
*
* @note 采用分块接收机制,每接收一个数据块都会发送ACK确认
* 接收完成后会记录日志信息
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank) {
char* bufferPtr = static_cast<char*>(buffer);
scclResult_t scclIpcSocket::scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank) {
char* bufferPtr = reinterpret_cast<char*>(buffer);
size_t bytesReceived = 0;
*receivedLen = 0;
while(bytesReceived < bufferLen) {
size_t bytesToReceive = std::min(CHUNK_SIZE, bufferLen - bytesReceived);
int recv_rank = -1;
// 接收数据块
scclResult_t recvResult = scclIpcSocketRecvData(bufferPtr + bytesReceived, bytesToReceive, receivedLen);
scclResult_t recvResult = scclIpcSocketRecvDataAndRank(bufferPtr + bytesReceived, bytesToReceive, receivedLen, &recv_rank);
*src_rank = recv_rank;
if(recvResult != scclSuccess) {
return recvResult;
}
// 发送ACK给发送方
char ack[ACK_SIZE];
sprintf(ack, "ACK-%d", src_rank);
scclResult_t sendResult = scclIpcSocketSendData(ack, strlen(ack), src_rank);
if(sendResult != scclSuccess) {
return sendResult;
}
bytesReceived += *receivedLen;
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %zu bytes of data and sent ACK through UDS socket", bufferLen);
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %zu bytes of data in chunks through UDS socket", bufferLen);
return scclSuccess;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 使用IPC套接字实现Allgather操作
* @brief 通过Unix域套接字发送数据到指定目标,并等待ACK确认信息
*
* 该函数通过线程池并行发送和接收数据,实现多节点间的Allgather集合通信。
*
* @param sendData 发送数据缓冲区指针
* @param recvData 接收数据缓冲区指针
* @param dataLen 每个节点的数据长度(字节)
* @return scclResult_t 返回操作结果(scclSuccess表示成功)
* 该函数通过Unix域套接字发送数据到指定的目标rank,并等待接收ACK确认信息。
* 如果接收到的ACK确认信息不正确,函数将返回错误。
*
* @note 1. 会跳过本地rank的数据传输
* 2. 数据包格式: [发送rank(int)][数据]
* 3. 接收缓冲区需要预先分配足够空间(大小=nlocalRanks*dataLen)
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 操作成功
* - scclSystemError: 系统调用错误
*/
scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
scclResult_t scclIpcSocket::scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank) {
// 发送数据和rank信息
scclResult_t sendResult = scclIpcSocketSendDataAndRank(data, dataLen, dst_rank);
if(sendResult != scclSuccess) {
return sendResult;
}
// 采用线程池发送和接收数据
for(int i = 0; i < nlocalRanks; ++i) {
if(i != localRank) {
auto sendTask = [this, sendData, dataLen, i]() {
// 计算 DataPackage 的总大小
size_t packageSize = sizeof(int) + dataLen;
char* buffer = new char[packageSize];
// 将 rank 信息和数据一起拷贝到 buffer 中
int* rankPtr = reinterpret_cast<int*>(buffer);
*rankPtr = localRank;
char* dataPtr = buffer + sizeof(int);
memcpy(dataPtr, sendData, dataLen);
// 一次性发送 rank 信息和数据
scclIpcSocketSendData(buffer, packageSize, i);
delete[] buffer;
};
pthread_pool->enqueue(sendTask);
auto recvTask = [this, recvData, dataLen, i]() {
// 准备接收缓冲区
size_t packageSize = sizeof(int) + dataLen;
char* buffer = new char[packageSize];
size_t receivedLen;
// 一次性接收 rank 信息和数据
scclIpcSocketRecvData(buffer, packageSize, &receivedLen);
// 从 buffer 中提取 rank 信息和数据
int* rankPtr = reinterpret_cast<int*>(buffer);
int senderRank = *rankPtr;
// 等待ACK
char ack[ACK_SIZE];
size_t ackLen;
int ack_rank;
scclResult_t recvAckResult = scclIpcSocketRecvDataAndRank(ack, ACK_SIZE, &ackLen, &ack_rank);
if(recvAckResult != scclSuccess || ack_rank != dst_rank) {
WARN("UDS: Failed to receive ACK from rank ack_rank:%d, dst_rank:%d", ack_rank, dst_rank);
return scclSystemError;
}
#if 0
printf("scclIpcSocketSendDataWithAck localRank=%d, dst_rank=%d, ack_rank=%d, ack=%s\n", localRank, dst_rank, ack_rank, ack);
#endif
// 对比ACK的字符串
char expectedAck[ACK_SIZE];
snprintf(expectedAck, ACK_SIZE, "ACK-%d", ack_rank);
if(strncmp(ack, expectedAck, ACK_SIZE) != 0) {
WARN("UDS: Received incorrect ACK from rank %d", dst_rank);
return scclSystemError;
}
char* dataPtr = buffer + sizeof(int);
memcpy(static_cast<char*>(recvData) + senderRank * dataLen, dataPtr, dataLen);
return scclSuccess;
}
delete[] buffer;
};
pthread_pool->enqueue(recvTask);
} else {
// 自己的数据直接放置到正确位置
memcpy(static_cast<char*>(recvData) + localRank * dataLen, sendData, dataLen);
}
/**
* @brief 通过Unix域套接字接收数据,并发送ACK确认信息
*
* 该函数通过Unix域套接字接收数据,并在接收完成后发送ACK确认信息给发送端。
* 如果发送ACK确认信息失败,函数将返回错误。
*
* @param buffer 用于存储接收数据的缓冲区指针
* @param bufferLen 缓冲区长度
* @param receivedLen 接收到的数据长度(由函数设置)
* @param src_rank 数据发送端的rank号(由函数设置)
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 操作成功
* - scclSystemError: 系统调用错误
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvDataWithAck(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank) {
// 接收数据和rank信息
scclResult_t recvResult = scclIpcSocketRecvDataAndRank(buffer, bufferLen, receivedLen, src_rank);
if(recvResult != scclSuccess) {
return recvResult;
}
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
#if 0
printf("scclIpcSocketRecvDataWithAck localRank=%d, src_rank=%d, bufferLen=%zu, receivedLen=%zu\n", localRank, *src_rank, bufferLen, *receivedLen);
#endif
// 发送ACK
char ack[ACK_SIZE];
snprintf(ack, ACK_SIZE, "ACK-%d", localRank);
scclResult_t sendAckResult = scclIpcSocketSendDataAndRank(ack, ACK_SIZE, *src_rank);
if(sendAckResult != scclSuccess) {
WARN("UDS: Failed to send ACK to rank %d", *src_rank);
return scclSystemError;
}
return scclSuccess;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 使用IPC套接字进行Allgather同步操作
*
......@@ -637,24 +441,74 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* r
* 2. 使用线程池并行处理与其他进程的通信任务
* 3. 当wait为true时会阻塞等待所有通信完成
*/
scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen) {
#if 1
// TODO: 当前为了保证正确性,性能太慢,后续优化
scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen) {
if(nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
// 将当前进程的数据复制到接收缓冲区的对应位置
auto all_recv_data = reinterpret_cast<char*>(recvData);
memcpy(all_recv_data + localRank * dataLen, sendData, dataLen);
// 当前rank的传输目标
int next_rank_for_send = (localRank + 1 + nlocalRanks) % nlocalRanks;
// Ring Allgather
for(int step = 0; step < nlocalRanks - 1; ++step) {
int next_rank_for_data = (localRank - step + nlocalRanks) % nlocalRanks;
int prev_rank_for_data = (localRank - step - 1 + nlocalRanks) % nlocalRanks;
// 准备发送/接收的数据
auto send_data = all_recv_data + next_rank_for_data * dataLen;
auto recv_data = all_recv_data + prev_rank_for_data * dataLen;
auto sendTask = [this, send_data, dataLen, next_rank_for_send]() { scclIpcSocketSendDataBasic(send_data, dataLen, next_rank_for_send); };
pthread_pool->enqueue(sendTask);
auto recvTask = [this, recv_data, dataLen]() {
size_t receivedLen;
int recv_rank;
scclIpcSocketRecvDataBasic(recv_data, dataLen, &receivedLen);
};
pthread_pool->enqueue(recvTask);
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(100); // 每1毫秒检查一次任务完成状态
}
}
return scclSuccess;
}
#else
scclResult_t scclIpcSocket::scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
// 将当前进程的数据复制到接收缓冲区的对应位置
memcpy(static_cast<char*>(recvData) + localRank * dataLen, sendData, dataLen);
auto all_recv_data = reinterpret_cast<char*>(recvData);
memcpy(all_recv_data + localRank * dataLen, sendData, dataLen);
char* temp_recvData;
SCCLCHECK(scclCalloc(&temp_recvData, dataLen));
// 采用线程池发送和接收数据
for(int i = 0; i < nlocalRanks; ++i) {
if(i != localRank) {
auto sendTask = [this, sendData, dataLen, i]() { scclIpcSocketSendData(sendData, dataLen, i); };
auto sendTask = [this, sendData, dataLen, i]() { scclIpcSocketSendDataAndRank(sendData, dataLen, i); };
pthread_pool->enqueue(sendTask);
auto recvTask = [this, recvData, dataLen, i]() {
auto recvTask = [this, all_recv_data, dataLen, i, &temp_recvData]() {
size_t receivedLen;
scclIpcSocketRecvData(reinterpret_cast<char*>(recvData) + i * dataLen, dataLen, &receivedLen);
int recv_rank;
scclIpcSocketRecvDataAndRank(temp_recvData, dataLen, &receivedLen, &recv_rank);
// printf("localRank=%d, recv_rank=%d, dataLen=%zu\n", localRank, recv_rank, dataLen);
// 将数据拷贝到目标地址
memcpy(all_recv_data + recv_rank * dataLen, temp_recvData, dataLen);
};
pthread_pool->enqueue(recvTask);
}
......@@ -662,11 +516,14 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, voi
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
usleep(100); // 每1毫秒检查一次任务完成状态
}
free(temp_recvData);
return scclSuccess;
}
#endif
/**
* @brief 通过IPC Socket进行广播操作
......@@ -684,6 +541,7 @@ scclResult_t scclIpcSocket::scclIpcSocketAllgatherSync(const void* sendData, voi
* - scclInternalError: IPC Socket未初始化或本地rank数无效
* - scclInvalidArgument: 根进程rank值无效
*/
#if 1
scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, int root) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
......@@ -697,6 +555,52 @@ scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, i
if(localRank == root) {
// 根进程:发送数据给所有其他进程
for(int i = 0; i < nlocalRanks; ++i) {
if(i != root) {
const char* dataPtr = reinterpret_cast<const char*>(data);
size_t bytesSent = 0;
while(bytesSent < dataLen) {
size_t bytesToSend = std::min(CHUNK_SIZE, dataLen - bytesSent);
scclResult_t sendResult = scclIpcSocketSendDataWithAck(dataPtr + bytesSent, bytesToSend, i);
if(sendResult != scclSuccess) {
return sendResult;
}
bytesSent += bytesToSend;
}
}
}
} else {
char* dataPtr = reinterpret_cast<char*>(data);
size_t bytesReceived = 0;
while(bytesReceived < dataLen) {
size_t bytesToReceive = std::min(CHUNK_SIZE, dataLen - bytesReceived);
size_t receivedLen;
int receivedRank;
scclResult_t recvResult = scclIpcSocketRecvDataWithAck(dataPtr + bytesReceived, bytesToReceive, &receivedLen, &receivedRank);
if(recvResult != scclSuccess) {
return recvResult;
}
bytesReceived += receivedLen;
}
}
return scclSuccess;
}
#else
scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, int root) {
if(pthread_pool == nullptr || nlocalRanks <= 0) {
WARN("scclIpcSocket init error!");
return scclInternalError;
}
if(root < 0 || root >= nlocalRanks) {
WARN("scclIpcSocketBroadcast: Invalid root rank %d", root);
return scclInvalidArgument;
}
if(localRank == root) {
// 根进程:发送数据给所有其他进程
for(int i = 0; i < nlocalRanks; ++i) {
// scclIpcSocketSendDataWithAck(data, dataLen, i);
if(i != root) {
// 使用 std::bind 绑定 scclIpcSocketSendDataWithAck 方法和参数
auto sendTask = std::bind(&scclIpcSocket::scclIpcSocketSendDataWithAck, this, data, dataLen, i);
......@@ -706,7 +610,8 @@ scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, i
}
} else {
size_t receivedLen;
scclResult_t result = scclIpcSocketRecvDataAndSendAck(data, dataLen, &receivedLen, root);
int receivedRank;
scclResult_t result = scclIpcSocketRecvDataWithAck(data, dataLen, &receivedLen, &receivedRank);
if(result != scclSuccess) {
return result;
}
......@@ -714,11 +619,12 @@ scclResult_t scclIpcSocket::scclIpcSocketBroadcast(void* data, size_t dataLen, i
// 等待所有任务完成
while(!pthread_pool->allTasksCompleted()) {
usleep(1000); // 每1毫秒检查一次任务完成状态
usleep(100); // 每1毫秒检查一次任务完成状态
}
return scclSuccess;
}
#endif
/////////////////////////////////////////////////////////////////////////////////////
scclResult_t scclIpcSocket::getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len) {
......@@ -732,6 +638,311 @@ scclResult_t scclIpcSocket::getScclIpcSocknameStr(int rank, uint64_t hash, char*
return scclSuccess;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* @brief 通过IPC套接字发送数据到指定目标rank
*
* 该函数通过Unix域套接字(UDS)发送数据到指定的目标rank。它首先构造目标地址字符串,
* 然后设置消息结构体,包括目标地址和要发送的数据。接着,使用poll机制等待套接字可写,
* 然后通过sendmsg函数发送数据。如果发送过程中出现错误,函数将根据错误类型采取相应的措施,
* 包括重试发送或返回错误码。
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置)
* - scclSystemError: 系统调用错误(如poll超时或sendmsg失败)
*
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketSendDataBasic(const void* data, size_t dataLen, int dst_rank) {
// 构造目标地址字符串
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
int len;
SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len));
// 设置消息结构体
struct msghdr msg;
struct iovec iov[1]; // 修改为1
struct sockaddr_un cliaddr;
bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strncpy(cliaddr.sun_path, temp_addr, len);
cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
// 准备数据
iov[0].iov_base = (void*)data;
iov[0].iov_len = dataLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1; // 修改为1
// 使用 poll 等待 socket 可写
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLOUT;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
ssize_t sendResult;
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno);
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag) {
return scclInternalError;
}
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zd bytes of data through UDS socket %s", sendResult, temp_addr);
return scclSuccess;
}
/**
* @brief 通过IPC套接字接收数据
*
* 该函数通过Unix域套接字(UDS)接收数据。它首先设置消息结构体,包括接收缓冲区。
* 然后,使用poll机制等待套接字可读,接着通过recvmsg函数接收数据。如果接收过程中出现错误,
* 函数将根据错误类型采取相应的措施,包括重试接收或返回错误码。
*
* @param buffer 用于存储接收数据的缓冲区指针
* @param bufferLen 缓冲区长度
* @param receivedLen 接收的数据长度(由函数设置)
* @param src_rank 数据发送端的rank号(由函数设置)
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 接收成功
* - scclSystemError: 系统调用错误(如poll超时或recvmsg失败)
*
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可读后再接收数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvDataBasic(void* buffer, size_t bufferLen, size_t* receivedLen) {
// 设置消息结构体
struct msghdr msg = {0};
struct iovec iov[1]; // 修改为1
iov[0].iov_base = buffer;
iov[0].iov_len = bufferLen;
msg.msg_iov = iov;
msg.msg_iovlen = 1; // 修改为1
// 使用 poll 等待 socket 可读
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLIN;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
int ret;
while(true) {
ret = recvmsg(handle->fd, &msg, 0);
if(ret > 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %d bytes of data from socket %s", ret, handle->socketName);
*receivedLen = ret; // 不再减去rank信息的长度
// *src_rank    = rank; // 移除此行
// 设置发送端的rank信息
return scclSuccess;
} else if(ret == 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Connection closed by peer on socket %s", handle->socketName);
*receivedLen = 0;
return scclSuccess;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
} else {
WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno);
return scclSystemError;
}
}
}
}
/**
* @brief 通过IPC socket接收文件描述符
*
* 该函数使用recvmsg系统调用从socket接收文件描述符。函数会循环尝试接收,
* 直到成功或发生错误。接收到的文件描述符会通过参数recvFd返回。
*
* @param recvFd 用于存储接收到的文件描述符的指针
* @return scclResult_t 返回操作结果:
* - scclSuccess: 成功接收文件描述符
* - scclSystemError: 系统调用失败
* - scclInternalError: 操作被中止
*
* @note 函数会处理EAGAIN、EWOULDBLOCK和EINTR错误,其他错误会导致返回失败。
* 接收到的控制消息必须符合SOL_SOCKET级别和SCM_RIGHTS类型。
*/
scclResult_t scclIpcSocket::scclIpcSocketSendDataAndRank(const void* data, size_t dataLen, int dst_rank) {
// 构造目标地址字符串
char temp_addr[SCCL_IPC_SOCKNAME_LEN];
int len;
SCCLCHECK(getScclIpcSocknameStr(dst_rank, ipc_hash, temp_addr, &len));
// 设置消息结构体
struct msghdr msg;
struct iovec iov[2]; // 修改为2,以便发送rank信息和数据
struct sockaddr_un cliaddr;
bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strncpy(cliaddr.sun_path, temp_addr, len);
cliaddr.sun_path[0] = '\0'; // Linux抽象套接字技巧
msg.msg_name = (void*)&cliaddr;
msg.msg_namelen = sizeof(cliaddr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
// 准备rank信息
int rank = localRank;
iov[0].iov_base = &rank;
iov[0].iov_len = sizeof(rank);
// 准备数据
iov[1].iov_base = (void*)data;
iov[1].iov_len = dataLen;
msg.msg_iov = iov;
msg.msg_iovlen = 2; // 修改为2
// 使用 poll 等待 socket 可写
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLOUT;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
ssize_t sendResult;
while((sendResult = sendmsg(handle->fd, &msg, 0)) <= 0) {
if(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
WARN("UDS: Error occurred while sending data through socket %s : %d", temp_addr, errno);
return scclSystemError;
}
if(handle->abortFlag && *handle->abortFlag) {
return scclInternalError;
}
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to send data to socket %s", temp_addr);
} else {
WARN("UDS: Error occurred while polling socket %s for writability : %d", temp_addr, errno);
}
return scclSystemError;
}
}
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully sent %zd bytes of data through UDS socket %s", sendResult, temp_addr);
return scclSuccess;
}
/**
* @brief 通过IPC套接字发送数据到指定目标rank
*
* @param data 要发送的数据指针
* @param dataLen 要发送的数据长度
* @param dst_rank 目标rank号
* @return scclResult_t 返回操作结果状态码:
* - scclSuccess: 发送成功
* - scclInternalError: 内部错误(如套接字名称过长或中止标志被设置)
* - scclSystemError: 系统调用错误(如poll超时或sendmsg失败)
*
* @note 使用Linux抽象套接字技术,通过poll机制确保套接字可写后再发送数据
* 支持EAGAIN/EWOULDBLOCK/EINTR错误重试机制
*/
scclResult_t scclIpcSocket::scclIpcSocketRecvDataAndRank(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank) {
// 设置消息结构体
struct msghdr msg = {0};
struct iovec iov[2]; // 修改为2,以便接收rank信息和数据
int rank;
iov[0].iov_base = &rank;
iov[0].iov_len = sizeof(rank);
iov[1].iov_base = buffer;
iov[1].iov_len = bufferLen;
msg.msg_iov = iov;
msg.msg_iovlen = 2; // 修改为2
// 使用 poll 等待 socket 可读
struct pollfd pfd;
pfd.fd = handle->fd;
pfd.events = POLLIN;
int pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
int ret;
while(true) {
ret = recvmsg(handle->fd, &msg, 0);
if(ret > 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Successfully received %d bytes of data from socket %s", ret, handle->socketName);
*receivedLen = ret - sizeof(rank); // 减去rank信息的长度
*src_rank = rank;
// 设置发送端的rank信息
return scclSuccess;
} else if(ret == 0) {
INFO(SCCL_LOG_BOOTSTRAP, "UDS: Connection closed by peer on socket %s", handle->socketName);
*receivedLen = 0;
return scclSuccess;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
pollResult = poll(&pfd, 1, timeoutMs);
if(pollResult <= 0) {
if(pollResult == 0) {
WARN("UDS: Timeout occurred while waiting to receive data from socket %s", handle->socketName);
} else {
WARN("UDS: Error occurred while polling socket %s for readability : %d", handle->socketName, errno);
}
return scclSystemError;
}
} else {
WARN("UDS: Error occurred while receiving data through socket %s : %d", handle->socketName, errno);
return scclSystemError;
}
}
}
}
} // namespace ipc_socket
} // namespace net
} // namespace hardware
......
......@@ -37,7 +37,7 @@ struct DataPackage {
};
//////////////////////////////////////////////////////////////////////////////////////////////////////
class scclIpcSocket {
typedef class scclIpcSocket {
public:
// 构造函数和析构函数
scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr);
......@@ -62,19 +62,16 @@ public:
// 通过Unix域套接字发送/接收数据到指定目标
scclResult_t scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen);
scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank);
// 通过Unix域套接字发送/接收数据到指定目标,并发送ack确保发送成功
// 通过Unix域套接字发送/接收数据到指定目标,有ACK信息
scclResult_t scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank);
scclResult_t scclIpcSocketRecvDataWithAck(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank);
//////////////////////////////////////////////////////////////////////////////////////////////////////
// local rank内的allgather操作保证接收顺序
// local rank内的allgather操作保证接收顺序
scclResult_t scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen);
// local rank内的allgather操作。为了性能,不保证接收顺序,所以发送的信息中需要添加进程ID
scclResult_t scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen);
// local rank内的broadcast操作
scclResult_t scclIpcSocketBroadcast(void* data, size_t dataLen, int root);
......@@ -82,6 +79,12 @@ private:
// 初始化IPC套接字
scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
scclResult_t getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len);
// 通过Unix域套接字发送/接收数据到指定目标,不加锁执行
scclResult_t scclIpcSocketSendDataBasic(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvDataBasic(void* buffer, size_t bufferLen, size_t* receivedLen);
// 通过Unix域套接字发送/接收数据到指定目标,不加锁执行
scclResult_t scclIpcSocketSendDataAndRank(const void* data, size_t dataLen, int dst_rank);
scclResult_t scclIpcSocketRecvDataAndRank(void* buffer, size_t bufferLen, size_t* receivedLen, int* src_rank);
private:
// 定义并初始化一个 scclIpcSocket 结构体,用于处理 IPC 套接字连接
......@@ -100,6 +103,7 @@ private:
// 线程池指针
ThreadPool* pthread_pool = nullptr;
// 设置超时时间为无限长
int timeoutMs = -1;
......@@ -107,7 +111,7 @@ private:
static constexpr int ACK_SIZE = 8;
// 假设 CHUNK_SIZE 是一个合适的块大小,例如 64KB
static constexpr size_t CHUNK_SIZE = 64 * 1024;
};
} scclIpcSocket_t;
} // namespace ipc_socket
} // namespace net
......
......@@ -1154,8 +1154,10 @@ scclResult_t scclNetIb::getProperties(int dev, scclNetProperties_t* props) {
if(scclIbGdrSupport(dev) == scclSuccess) {
props->ptrSupport |= SCCL_PTR_CUDA; // GDR support via nv_peermem
}
if(scclIbDmaBufSupport(dev) == scclSuccess) {
props->ptrSupport |= SCCL_PTR_DMABUF; // GDR support via DMA-BUF
if(getDmaBufEnable() != 0) {
if(scclIbDmaBufSupport(dev) == scclSuccess) {
props->ptrSupport |= SCCL_PTR_DMABUF; // GDR support via DMA-BUF
}
}
props->speed = scclIbDevs[dev].speed;
props->latency = 0; // Not set
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment