proxy.h 12.2 KB
Newer Older
lishen's avatar
lishen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#pragma once

#include <pthread.h>
#include "socket.h"
#include "ipcsocket.h"

namespace sccl {
namespace hardware {
namespace topology {
namespace bootstrap {

typedef net::host::scclSocketAddress scclSocketAddress_t;
typedef net::host::scclSocket scclSocket_t;

#define SCCL_PROXY_MAX_SUBS MAXCHANNELS
#define PROXYARGS_ALLOCATE_SIZE SCCL_MAX_OPS

enum proxyConnectState : uint8_t {
    connUninitialized     = 0,
    connInitialized       = 1,
    connSharedInitialized = 2,
    connSetupDone         = 3,
    connConnected         = 4,
    numConnStates         = 5
};

// 期望代理响应FIFO
struct scclExpectedProxyResponse {
    void* opId;                             // 操作ID,用于标识特定的操作
    int respSize;                           // 响应大小,表示响应数据的字节数
    bool done;                              // 完成标志,表示该响应是否已完成处理
    void* respBuff;                         // 响应缓冲区,用于存储接收到的响应数据
    struct scclExpectedProxyResponse* next; // 指向下一个预期代理响应的指针,形成链表结构
};

// 子代理参数数组
struct scclProxySubArgs {
    int channelId;  // 通道ID
    int nsteps;     // 操作步骤数
    ssize_t nbytes; // 数据字节数
    int peer;       // 对等体ID
    int groupSize;  //

    uint64_t base;        // 基础计数
    uint64_t posted;      // 已发布的计数
    uint64_t received;    // 已接收的计数
    uint64_t flushed;     // 已刷新的计数
    uint64_t transmitted; // 已传输的计数
    uint64_t done;        // 已完成的计数
    uint64_t end;         // 结束计数

    void* requests[SCCL_STEPS]; // 每个步骤的请求指针数组
};

// 定义代理参数结构体
struct scclProxyArgs {
    struct scclProxySubArgs subs[SCCL_PROXY_MAX_SUBS]; // 子代理参数数组
    int nsubs;                                         // 子代理数量
    int done;                                          // 是否完成的标志
    uint64_t opCount;                                  // 操作计数
    int sliceSteps;                                    // 切片步骤数
    int chunkSteps;                                    // 数据块步骤数
    int chunkSize;                                     // 数据块大小
    scclDataType_t dtype;                              // 数据类型
    scclProtocolType_t protocol;                       // 协议类型
    int state;                                         // 当前状态
    char* sharedBuff[SCCL_STEPS];                      // 共享缓冲区指针数组
    int sharedSize[SCCL_STEPS];                        // 共享缓冲区大小数组
    int idle;                                          // 是否空闲的标志

    // 元素链接
    struct scclProxyArgs* next;            // 指向下一个代理参数的指针
    struct scclProxyArgs* nextPeer;        // 指向下一个对等代理参数的指针
    struct scclProxyArgs** proxyAppendPtr; // 指向代理追加指针的指针
};

struct scclProxyPool {
    struct scclProxyPool* next;                          // 指向下一个代理池的指针
    struct scclProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; // 代理参数元素数组
};

struct scclProxyProgressState {
    // 用于主线程向进度线程发送工作
    // struct scclProxyOpsPool* opsPool;
    // scclShmHandle_t handle;
    char opsPoolShmSuffix[6]; // 操作池共享内存后缀

    pthread_t thread; // 进度线程的线程ID
    bool stop;        // 停止标志,用于控制线程停止
    // struct scclProxyPeer** localPeers;
    // struct scclSharedNetComms* netComms[SCCL_MAX_NETDEVS];
    struct scclProxyArgs* active; // 当前活动的代理参数
    struct scclProxyArgs* pool;   // 代理参数池
    struct scclProxyPool* pools;  // 代理池
    int nextOps;                  // 下一个操作的索引
};

// struct scclProxyOp {
//     struct scclProxyConnection* connection;
//     int channelId;
//     int nsteps;
//     ssize_t nbytes;
//     struct {
//         int root : 30;
//         uint32_t connIndex : 2;
//     };
//     int next;

//     uint64_t opCount;
//     int sliceSteps;
//     int chunkSteps;
//     int chunkSize;
//     uint8_t /*scclDataType_t*/ dtype;
//     uint8_t /*scclDevRedOp_t*/ redOp;
//     uint8_t /*scclPattern_t*/ pattern;
//     uint8_t protocol;

//     union {
//         uint64_t unused;
//         // For use by enqueue.cc
//         struct scclProxyOp* enqNext;
//     };
// };

// struct scclProxyOpsPool {
//     struct scclProxyOp ops[MAX_OPS_PER_PEER * SCCL_MAX_LOCAL_RANKS];
//     volatile int nextOps;
//     volatile int nextOpsEnd;
//     volatile int freeOps[SCCL_MAX_LOCAL_RANKS];
//     pthread_mutex_t mutex;
//     pthread_cond_t cond;
// };

////////////////////////////////////////////////////////////////////////////////////////////////
// scclResult_t scclProxyInit(struct scclComm* comm, scclSocket_t* sock, union scclSocketAddress* peerAddresses);

} // namespace bootstrap
} // namespace topology
} // namespace hardware
} // namespace sccl

// enum scclProxyOpState {
//     scclProxyOpNone,
//     scclProxyOpReady,
//     scclProxyOpProgress
// };
// enum {
//     proxyRecv = 0,
//     proxySend = 1
// };

// struct scclProxyArgs;
// typedef scclResult_t (*proxyProgressFunc_t)(struct scclProxyState*, struct scclProxyArgs*);

// static_assert(SCCL_MAX_WORK_ELEMENTS <= MAXCHANNELS, "Not enough sub space for max work elements");

// struct scclProxyOp {
//     struct scclProxyConnection* connection;
//     int channelId;
//     int nsteps;
//     ssize_t nbytes;
//     struct {
//         int root : 30;
//         uint32_t connIndex : 2;
//     };
//     int next;

//     uint64_t opCount;
//     int sliceSteps;
//     int chunkSteps;
//     int chunkSize;
//     uint8_t /*scclDataType_t*/
// dtype;
// uint8_t /*scclDevRedOp_t*/ redOp;
// uint8_t /*scclPattern_t*/ pattern;
// uint8_t protocol;

// union {
//     uint64_t unused;
//     // For use by enqueue.cc
//     struct scclProxyOp* enqNext;
// };
// }
// ;
// static_assert(sizeof(struct scclProxyOp) == 64, "Keep ProxyOp aligned with cache lines for effective prefetch");

// #define SCCL_MAX_NETDEVS 128

// // ProxyOps are used to communicate between main thread and service thread
// // Make sure we have enough to store two full rounds of operations on all channels.
// // Otherwise we'd be unable to post half of them to free new elements.
// #define MAX_OPS_PER_PEER (2 * MAXCHANNELS * SCCL_MAX_WORK_ELEMENTS_P2P)
// #define SCCL_MAX_LOCAL_RANKS 64
// struct scclProxyOpsPool {
//     struct scclProxyOp ops[MAX_OPS_PER_PEER * SCCL_MAX_LOCAL_RANKS];
//     volatile int nextOps;
//     volatile int nextOpsEnd;
//     volatile int freeOps[SCCL_MAX_LOCAL_RANKS];
//     pthread_mutex_t mutex;
//     pthread_cond_t cond;
// };

// struct scclProxyOps {
//     scclProxyOpsPool* pool;
//     scclShmHandle_t handle;
//     int count;
//     int freeOp;
//     int nextOps;
//     int nextOpsEnd;
// };

// struct scclProxySharedP2p {
//     int refcount;
//     size_t size;
//     char* cudaBuff;
//     char* hostBuff;
//     // CUDA IPC
//     scclIpcDesc ipcDesc;
//     struct scclProxyArgs* proxyAppend[MAXCHANNELS]; // Separate send and recv
// };

// struct scclProxyPeer {
//     struct scclProxySharedP2p send;
//     struct scclProxySharedP2p recv;
// };

// struct scclSharedNetComms {
//     void* sendComm[MAXCHANNELS];
//     void* recvComm[MAXCHANNELS];
//     int sendRefCount[MAXCHANNELS];
//     int recvRefCount[MAXCHANNELS];
// };

// struct scclProxyPool;
// struct scclProxyProgressState {
//     // Used by main threads to send work to progress thread
//     struct scclProxyOpsPool* opsPool;
//     scclShmHandle_t handle;
//     char opsPoolShmSuffix[6];

//     pthread_t thread;
//     bool stop;
//     struct scclProxyPeer** localPeers;
//     struct scclSharedNetComms* netComms[SCCL_MAX_NETDEVS];
//     struct scclProxyArgs* active;
//     struct scclProxyArgs* pool;
//     struct scclProxyPool* pools;
//     int nextOps;
// };

// struct scclProxyAsyncOp {
//     int type;
//     struct scclProxyConnection* connection;
//     int reqSize, respSize;
//     char *reqBuff, *respBuff;
//     void* opId;
//     scclProxyAsyncOp* next;
// };

// struct scclProxyLocalPeer {
//     struct scclSocket sock;
//     int tpRank;
//     int tpLocalRank;
//     scclProxyAsyncOp* asyncOps;
//     int asyncOpCounter;
// };

// struct scclProxyState {
//     int refCount;
//     int tpRank;
//     int tpnRanks;
//     int tpLocalnRanks;
//     int cudaDev;
//     int p2pnChannels;
//     int p2pChunkSize;
//     int nChannels;
//     int buffSizes[SCCL_NUM_PROTOCOLS];
//     bool allocP2pNetLLBuffers;
//     bool dmaBufSupport;
//     scclNet_t* scclNet;
//     scclCollNet_t* scclCollNet;
//     volatile uint32_t* abortFlag;
//     // Service thread
//     pthread_t thread;
//     struct scclSocket* listenSock;
//     int stop;
//     CUcontext cudaCtx;

//     // Used by main thread
//     union scclSocketAddress* peerAddresses;
//     struct scclSocket* peerSocks;
//     struct scclProxyOps* proxyOps;
//     void** sharedDevMems;
//     struct scclIpcSocket peerIpcSock; // cuMEM API support (UDS)

//     // Progress thread
//     struct scclProxyProgressState progressState;

//     // Queue of expected responses from the proxy
//     struct scclExpectedProxyResponse* expectedResponses;
// };

// enum proxyConnectState {
//     connUninitialized     = 0,
//     connInitialized       = 1,
//     connSharedInitialized = 2,
//     connSetupDone         = 3,
//     connConnected         = 4,
//     numConnStates         = 5
// };

// struct scclProxyConnection {
//     int send, transport, shared;
//     int tpLocalRank, sameProcess;
//     struct scclSocket* sock;
//     struct scclTransportComm* tcomm;
//     struct scclProxyArgs* proxyAppend;
//     struct scclProxyArgs** proxyAppendPtr;
//     void* transportResources;
//     proxyConnectState state;
//     struct scclCollNetSharedRes* collNet;
// };

// typedef scclResult_t (*threadFunc_t)(struct scclProxyArgs*);

// enum proxyMode {
//     proxyRing = 0,
//     proxyFrom = 1,
//     proxyTo   = 2
// };

// scclResult_t scclProxySaveOp(struct scclComm* comm, struct scclProxyOp* proxyOp, bool* justInquire);
// scclResult_t scclProxyComputeP2p(struct scclInfo* info, struct scclProxyOp* proxyOp);
// scclResult_t scclProxyStart(struct scclComm* comm);
// scclResult_t scclProxyCreate(struct scclComm* comm);
// scclResult_t scclProxyConnect(struct scclComm* comm, int transport, int send, int proxyRank, struct scclProxyConnector* proxyConn);
// enum scclProxyMsgType {
//     scclProxyMsgInit       = 1,
//     scclProxyMsgSharedInit = 2,
//     scclProxyMsgSetup      = 3,
//     scclProxyMsgConnect    = 4,
//     scclProxyMsgStart      = 5,
//     scclProxyMsgClose      = 6,
//     scclProxyMsgAbort      = 7,
//     scclProxyMsgStop       = 8,
//     scclProxyMsgConvertFd  = 9, // cuMem API support (UDS)
// };

// // This function is called by a client of the proxy that needs to invoke any of the non-progress proxyOp types
// // Call this function on the client, supplying a locally unique opId. Then, poll on the return value of
// // scclPollProxyResponse(), supplying the same opId to confirm the operation has completed
// scclResult_t scclProxyCallAsync(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, int respSize, void* opId);

// // This function will internally call scclProxyCallAsync() and spin until scclPollProxyResponse() confirms the result is received
// scclResult_t
// scclProxyCallBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize);
// scclResult_t scclPollProxyResponse(struct scclComm* comm, struct scclProxyConnector* proxyConn, void* respBuff, void* opId);

// scclResult_t scclProxyClientConvertFdBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int fd, int* convertedFd);

// scclResult_t scclProxyStop(struct scclComm* comm);
// scclResult_t scclProxyShmUnlink(struct scclComm* comm);
// scclResult_t scclProxyDestroy(struct scclComm* comm);

// scclResult_t mscclSaveProxy(struct scclComm* comm, struct scclChannel* channel, int type, int peer, struct scclProxyOp* op, int connIndex);