#pragma once #include #include "socket.h" #include "ipcsocket.h" namespace sccl { namespace hardware { namespace topology { namespace bootstrap { typedef net::host::scclSocketAddress scclSocketAddress_t; typedef net::host::scclSocket scclSocket_t; #define SCCL_PROXY_MAX_SUBS MAXCHANNELS #define PROXYARGS_ALLOCATE_SIZE SCCL_MAX_OPS enum proxyConnectState : uint8_t { connUninitialized = 0, connInitialized = 1, connSharedInitialized = 2, connSetupDone = 3, connConnected = 4, numConnStates = 5 }; // 期望代理响应FIFO struct scclExpectedProxyResponse { void* opId; // 操作ID,用于标识特定的操作 int respSize; // 响应大小,表示响应数据的字节数 bool done; // 完成标志,表示该响应是否已完成处理 void* respBuff; // 响应缓冲区,用于存储接收到的响应数据 struct scclExpectedProxyResponse* next; // 指向下一个预期代理响应的指针,形成链表结构 }; // 子代理参数数组 struct scclProxySubArgs { int channelId; // 通道ID int nsteps; // 操作步骤数 ssize_t nbytes; // 数据字节数 int peer; // 对等体ID int groupSize; // uint64_t base; // 基础计数 uint64_t posted; // 已发布的计数 uint64_t received; // 已接收的计数 uint64_t flushed; // 已刷新的计数 uint64_t transmitted; // 已传输的计数 uint64_t done; // 已完成的计数 uint64_t end; // 结束计数 void* requests[SCCL_STEPS]; // 每个步骤的请求指针数组 }; // 定义代理参数结构体 struct scclProxyArgs { struct scclProxySubArgs subs[SCCL_PROXY_MAX_SUBS]; // 子代理参数数组 int nsubs; // 子代理数量 int done; // 是否完成的标志 uint64_t opCount; // 操作计数 int sliceSteps; // 切片步骤数 int chunkSteps; // 数据块步骤数 int chunkSize; // 数据块大小 scclDataType_t dtype; // 数据类型 scclProtocolType_t protocol; // 协议类型 int state; // 当前状态 char* sharedBuff[SCCL_STEPS]; // 共享缓冲区指针数组 int sharedSize[SCCL_STEPS]; // 共享缓冲区大小数组 int idle; // 是否空闲的标志 // 元素链接 struct scclProxyArgs* next; // 指向下一个代理参数的指针 struct scclProxyArgs* nextPeer; // 指向下一个对等代理参数的指针 struct scclProxyArgs** proxyAppendPtr; // 指向代理追加指针的指针 }; struct scclProxyPool { struct scclProxyPool* next; // 指向下一个代理池的指针 struct scclProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; // 代理参数元素数组 }; struct scclProxyProgressState { // 用于主线程向进度线程发送工作 // struct scclProxyOpsPool* opsPool; // scclShmHandle_t handle; char opsPoolShmSuffix[6]; // 操作池共享内存后缀 pthread_t thread; // 进度线程的线程ID bool stop; // 停止标志,用于控制线程停止 // struct scclProxyPeer** localPeers; // struct scclSharedNetComms* netComms[SCCL_MAX_NETDEVS]; struct scclProxyArgs* active; // 当前活动的代理参数 struct scclProxyArgs* pool; // 代理参数池 struct scclProxyPool* pools; // 代理池 int nextOps; // 下一个操作的索引 }; // struct scclProxyOp { // struct scclProxyConnection* connection; // int channelId; // int nsteps; // ssize_t nbytes; // struct { // int root : 30; // uint32_t connIndex : 2; // }; // int next; // uint64_t opCount; // int sliceSteps; // int chunkSteps; // int chunkSize; // uint8_t /*scclDataType_t*/ dtype; // uint8_t /*scclDevRedOp_t*/ redOp; // uint8_t /*scclPattern_t*/ pattern; // uint8_t protocol; // union { // uint64_t unused; // // For use by enqueue.cc // struct scclProxyOp* enqNext; // }; // }; // struct scclProxyOpsPool { // struct scclProxyOp ops[MAX_OPS_PER_PEER * SCCL_MAX_LOCAL_RANKS]; // volatile int nextOps; // volatile int nextOpsEnd; // volatile int freeOps[SCCL_MAX_LOCAL_RANKS]; // pthread_mutex_t mutex; // pthread_cond_t cond; // }; //////////////////////////////////////////////////////////////////////////////////////////////// // scclResult_t scclProxyInit(struct scclComm* comm, scclSocket_t* sock, union scclSocketAddress* peerAddresses); } // namespace bootstrap } // namespace topology } // namespace hardware } // namespace sccl // enum scclProxyOpState { // scclProxyOpNone, // scclProxyOpReady, // scclProxyOpProgress // }; // enum { // proxyRecv = 0, // proxySend = 1 // }; // struct scclProxyArgs; // typedef scclResult_t (*proxyProgressFunc_t)(struct scclProxyState*, struct scclProxyArgs*); // static_assert(SCCL_MAX_WORK_ELEMENTS <= MAXCHANNELS, "Not enough sub space for max work elements"); // struct scclProxyOp { // struct scclProxyConnection* connection; // int channelId; // int nsteps; // ssize_t nbytes; // struct { // int root : 30; // uint32_t connIndex : 2; // }; // int next; // uint64_t opCount; // int sliceSteps; // int chunkSteps; // int chunkSize; // uint8_t /*scclDataType_t*/ // dtype; // uint8_t /*scclDevRedOp_t*/ redOp; // uint8_t /*scclPattern_t*/ pattern; // uint8_t protocol; // union { // uint64_t unused; // // For use by enqueue.cc // struct scclProxyOp* enqNext; // }; // } // ; // static_assert(sizeof(struct scclProxyOp) == 64, "Keep ProxyOp aligned with cache lines for effective prefetch"); // #define SCCL_MAX_NETDEVS 128 // // ProxyOps are used to communicate between main thread and service thread // // Make sure we have enough to store two full rounds of operations on all channels. // // Otherwise we'd be unable to post half of them to free new elements. // #define MAX_OPS_PER_PEER (2 * MAXCHANNELS * SCCL_MAX_WORK_ELEMENTS_P2P) // #define SCCL_MAX_LOCAL_RANKS 64 // struct scclProxyOpsPool { // struct scclProxyOp ops[MAX_OPS_PER_PEER * SCCL_MAX_LOCAL_RANKS]; // volatile int nextOps; // volatile int nextOpsEnd; // volatile int freeOps[SCCL_MAX_LOCAL_RANKS]; // pthread_mutex_t mutex; // pthread_cond_t cond; // }; // struct scclProxyOps { // scclProxyOpsPool* pool; // scclShmHandle_t handle; // int count; // int freeOp; // int nextOps; // int nextOpsEnd; // }; // struct scclProxySharedP2p { // int refcount; // size_t size; // char* cudaBuff; // char* hostBuff; // // CUDA IPC // scclIpcDesc ipcDesc; // struct scclProxyArgs* proxyAppend[MAXCHANNELS]; // Separate send and recv // }; // struct scclProxyPeer { // struct scclProxySharedP2p send; // struct scclProxySharedP2p recv; // }; // struct scclSharedNetComms { // void* sendComm[MAXCHANNELS]; // void* recvComm[MAXCHANNELS]; // int sendRefCount[MAXCHANNELS]; // int recvRefCount[MAXCHANNELS]; // }; // struct scclProxyPool; // struct scclProxyProgressState { // // Used by main threads to send work to progress thread // struct scclProxyOpsPool* opsPool; // scclShmHandle_t handle; // char opsPoolShmSuffix[6]; // pthread_t thread; // bool stop; // struct scclProxyPeer** localPeers; // struct scclSharedNetComms* netComms[SCCL_MAX_NETDEVS]; // struct scclProxyArgs* active; // struct scclProxyArgs* pool; // struct scclProxyPool* pools; // int nextOps; // }; // struct scclProxyAsyncOp { // int type; // struct scclProxyConnection* connection; // int reqSize, respSize; // char *reqBuff, *respBuff; // void* opId; // scclProxyAsyncOp* next; // }; // struct scclProxyLocalPeer { // struct scclSocket sock; // int tpRank; // int tpLocalRank; // scclProxyAsyncOp* asyncOps; // int asyncOpCounter; // }; // struct scclProxyState { // int refCount; // int tpRank; // int tpnRanks; // int tpLocalnRanks; // int cudaDev; // int p2pnChannels; // int p2pChunkSize; // int nChannels; // int buffSizes[SCCL_NUM_PROTOCOLS]; // bool allocP2pNetLLBuffers; // bool dmaBufSupport; // scclNet_t* scclNet; // scclCollNet_t* scclCollNet; // volatile uint32_t* abortFlag; // // Service thread // pthread_t thread; // struct scclSocket* listenSock; // int stop; // CUcontext cudaCtx; // // Used by main thread // union scclSocketAddress* peerAddresses; // struct scclSocket* peerSocks; // struct scclProxyOps* proxyOps; // void** sharedDevMems; // struct scclIpcSocket peerIpcSock; // cuMEM API support (UDS) // // Progress thread // struct scclProxyProgressState progressState; // // Queue of expected responses from the proxy // struct scclExpectedProxyResponse* expectedResponses; // }; // enum proxyConnectState { // connUninitialized = 0, // connInitialized = 1, // connSharedInitialized = 2, // connSetupDone = 3, // connConnected = 4, // numConnStates = 5 // }; // struct scclProxyConnection { // int send, transport, shared; // int tpLocalRank, sameProcess; // struct scclSocket* sock; // struct scclTransportComm* tcomm; // struct scclProxyArgs* proxyAppend; // struct scclProxyArgs** proxyAppendPtr; // void* transportResources; // proxyConnectState state; // struct scclCollNetSharedRes* collNet; // }; // typedef scclResult_t (*threadFunc_t)(struct scclProxyArgs*); // enum proxyMode { // proxyRing = 0, // proxyFrom = 1, // proxyTo = 2 // }; // scclResult_t scclProxySaveOp(struct scclComm* comm, struct scclProxyOp* proxyOp, bool* justInquire); // scclResult_t scclProxyComputeP2p(struct scclInfo* info, struct scclProxyOp* proxyOp); // scclResult_t scclProxyStart(struct scclComm* comm); // scclResult_t scclProxyCreate(struct scclComm* comm); // scclResult_t scclProxyConnect(struct scclComm* comm, int transport, int send, int proxyRank, struct scclProxyConnector* proxyConn); // enum scclProxyMsgType { // scclProxyMsgInit = 1, // scclProxyMsgSharedInit = 2, // scclProxyMsgSetup = 3, // scclProxyMsgConnect = 4, // scclProxyMsgStart = 5, // scclProxyMsgClose = 6, // scclProxyMsgAbort = 7, // scclProxyMsgStop = 8, // scclProxyMsgConvertFd = 9, // cuMem API support (UDS) // }; // // This function is called by a client of the proxy that needs to invoke any of the non-progress proxyOp types // // Call this function on the client, supplying a locally unique opId. Then, poll on the return value of // // scclPollProxyResponse(), supplying the same opId to confirm the operation has completed // scclResult_t scclProxyCallAsync(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, int respSize, void* opId); // // This function will internally call scclProxyCallAsync() and spin until scclPollProxyResponse() confirms the result is received // scclResult_t // scclProxyCallBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize); // scclResult_t scclPollProxyResponse(struct scclComm* comm, struct scclProxyConnector* proxyConn, void* respBuff, void* opId); // scclResult_t scclProxyClientConvertFdBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int fd, int* convertedFd); // scclResult_t scclProxyStop(struct scclComm* comm); // scclResult_t scclProxyShmUnlink(struct scclComm* comm); // scclResult_t scclProxyDestroy(struct scclComm* comm); // scclResult_t mscclSaveProxy(struct scclComm* comm, struct scclChannel* channel, int type, int peer, struct scclProxyOp* op, int connIndex);