nvshmem.patch 21.4 KB
Newer Older
Chenggang Zhao's avatar
Chenggang Zhao committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
From 9d784943e1032f15dd7cdd2599192937ba9d9343 Mon Sep 17 00:00:00 2001
From: Shangyan Zhou <sy.zhou@deepseek.com>
Date: Fri, 20 Dec 2024 10:57:12 +0800
Subject: [PATCH 1/5] Change QP creating order.

---
 src/modules/transport/ibgda/ibgda.cpp | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/src/modules/transport/ibgda/ibgda.cpp b/src/modules/transport/ibgda/ibgda.cpp
index 31bc56a..ff02f50 100644
--- a/src/modules/transport/ibgda/ibgda.cpp
+++ b/src/modules/transport/ibgda/ibgda.cpp
@@ -2921,17 +2921,20 @@ int nvshmemt_ibgda_connect_endpoints(nvshmem_transport_t t, int *selected_dev_id
         INFO(ibgda_state->log_level, "Creating %d RC QPs", device->rc.num_eps_per_pe);
         for (int i = 0; i < num_rc_eps; ++i) {
             // Do not create loopback to self
-            if (i / device->rc.num_eps_per_pe == mype) {
+            int dst_pe = (i + 1 + mype) % n_pes;
+            int offset = i / n_pes;
+            int mapped_i = dst_pe * device->rc.num_eps_per_pe + offset;
+            if (dst_pe == mype) {
                 continue;
             }
-            status = ibgda_create_qp(&device->rc.eps[i], device, portid, i,
+            status = ibgda_create_qp(&device->rc.eps[mapped_i], device, portid, mapped_i,
                                      NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC);
             NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out,
-                                  "ibgda_create_dci failed on RC #%d.", i);
+                                  "ibgda_create_dci failed on RC #%d.", mapped_i);

-            status = ibgda_get_rc_handle(&local_rc_handles[i], device->rc.eps[i], device);
+            status = ibgda_get_rc_handle(&local_rc_handles[mapped_i], device->rc.eps[mapped_i], device);
             NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out,
-                                  "ibgda_get_rc_handle failed on RC #%d.", i);
+                                  "ibgda_get_rc_handle failed on RC #%d.", mapped_i);
         }

         if (num_rc_eps) {
--
2.25.1


From 3cd3938bcbbabed7fb7675032afb02647ea9c2fe Mon Sep 17 00:00:00 2001
From: Shangyan Zhou <sy.zhou@deepseek.com>
Date: Mon, 23 Dec 2024 09:55:27 +0800
Subject: [PATCH 2/5] Disable timeout check

---
 CMakeLists.txt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 771ff98..9246d29 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -140,7 +140,7 @@ option(NVSHMEM_NVTX "Enable NVSHMEM NVTX support" ${NVSHMEM_NVTX_DEFAULT})
 option(NVSHMEM_PMIX_SUPPORT "Enable Compilation of the PMIX bootstrap and PMIX specific code" $ENV{NVSHMEM_PMIX_SUPPORT})
 option(NVSHMEM_SHMEM_SUPPORT "Enable Compilation of the SHMEM bootstrap and SHMEM specific code" $ENV{NVSHMEM_SHMEM_SUPPORT})
 option(NVSHMEM_TEST_STATIC_LIB "Force tests to link only against the combined nvshmem.a binary" $ENV{NVSHMEM_TEST_STATIC_LIB})
-option(NVSHMEM_TIMEOUT_DEVICE_POLLING "Enable timeouts for NVSHMEM device-side polling functions (e.g. wait_until)" $ENV{NVSHMEM_TIMEOUT_DEVICE_POLLING})
+option(NVSHMEM_TIMEOUT_DEVICE_POLLING "Enable timeouts for NVSHMEM device-side polling functions (e.g. wait_until)" OFF)
 option(NVSHMEM_TRACE "Enable NVSHMEM trace print events" $ENV{NVSHMEM_TRACE})
 option(NVSHMEM_UCX_SUPPORT "Enable compilation of the UCX remote transport" $ENV{NVSHMEM_UCX_SUPPORT})
 option(NVSHMEM_USE_DLMALLOC "Set dlmalloc as the NVSHMEM heap allocation method" $ENV{NVSHMEM_USE_DLMALLOC})
@@ -165,6 +165,7 @@ set(NVSHMEM_PREFIX ${NVSHMEM_PREFIX_DEFAULT} CACHE PATH "path to NVSHMEM install
 set(PMIX_HOME ${PMIX_HOME_DEFAULT} CACHE PATH "path to PMIX installation")
 set(SHMEM_HOME ${MPI_HOME} CACHE PATH "path to SHMEM installation")
 set(UCX_HOME ${UCX_HOME_DEFAULT} CACHE PATH "path to UCX installation")
+set(NVSHMEM_TIMEOUT_DEVICE_POLLING OFF)

 message(STATUS "NVSHMEM_PREFIX: ${NVSHMEM_PREFIX}")
 message(STATUS "NVSHMEM_DEVEL: ${NVSHMEM_DEVEL}")
--
2.25.1


From 4e0eaff589d38f448715e43a935479451a41c0fe Mon Sep 17 00:00:00 2001
From: Shangyan Zhou <sy.zhou@deepseek.com>
Date: Fri, 10 Jan 2025 11:53:38 +0800
Subject: [PATCH 3/5] Add recv queue and recv cq for rc qps.

Let the ibgda rc qps use regular recv queue.

Add recv queue to ibgda dev qp.

IBGDA create recv cq

Setup recv cq.

fix recv queue.

Remove some useless idx.

Longer recv queue.
---
 .../nvshmem_common_ibgda.h                    | 19 +++++-
 src/modules/transport/ibgda/ibgda.cpp         | 65 ++++++++++++++++---
 2 files changed, 71 insertions(+), 13 deletions(-)

diff --git a/src/include/device_host_transport/nvshmem_common_ibgda.h b/src/include/device_host_transport/nvshmem_common_ibgda.h
index 32f6d02..7d4e250 100644
--- a/src/include/device_host_transport/nvshmem_common_ibgda.h
+++ b/src/include/device_host_transport/nvshmem_common_ibgda.h
@@ -168,14 +168,17 @@ typedef struct {
         uint64_t get_head;    // last wqe idx + 1 with a "fetch" operation (g, get, amo_fetch)
         uint64_t get_tail;    // last wqe idx + 1 polled with cst; get_tail > get_head is possible
     } tx_wq;
+    struct {
+        uint64_t resv_head;   // last reserved wqe idx + 1
+    } rx_wq;
     struct {
         uint64_t head;
         uint64_t tail;
     } ibuf;
     char padding[NVSHMEMI_IBGDA_QP_MANAGEMENT_PADDING];
 } __attribute__((__aligned__(8))) nvshmemi_ibgda_device_qp_management_v1;
-static_assert(sizeof(nvshmemi_ibgda_device_qp_management_v1) == 96,
-              "ibgda_device_qp_management_v1 must be 96 bytes.");
+static_assert(sizeof(nvshmemi_ibgda_device_qp_management_v1) == 104,
+              "ibgda_device_qp_management_v1 must be 104 bytes.");

 typedef nvshmemi_ibgda_device_qp_management_v1 nvshmemi_ibgda_device_qp_management_t;

@@ -199,9 +202,19 @@ typedef struct nvshmemi_ibgda_device_qp {
         // May point to mvars.prod_idx or internal prod_idx
         uint64_t *prod_idx;
     } tx_wq;
+    struct {
+        uint16_t nwqes;
+        uint64_t tail;
+        void *wqe;
+        __be32 *dbrec;
+        void *bf;
+        nvshmemi_ibgda_device_cq_t *cq;
+        // May point to mvars.prod_idx or internal prod_idx
+        uint64_t *prod_idx;
+    } rx_wq;
     nvshmemi_ibgda_device_qp_management_v1 mvars;  // management variables
 } nvshmemi_ibgda_device_qp_v1;
-static_assert(sizeof(nvshmemi_ibgda_device_qp_v1) == 184, "ibgda_device_qp_v1 must be 184 bytes.");
+static_assert(sizeof(nvshmemi_ibgda_device_qp_v1) == 248, "ibgda_device_qp_v1 must be 248 bytes.");

 typedef nvshmemi_ibgda_device_qp_v1 nvshmemi_ibgda_device_qp_t;

diff --git a/src/modules/transport/ibgda/ibgda.cpp b/src/modules/transport/ibgda/ibgda.cpp
index ff02f50..b8d6bc7 100644
--- a/src/modules/transport/ibgda/ibgda.cpp
+++ b/src/modules/transport/ibgda/ibgda.cpp
@@ -194,6 +194,7 @@ struct ibgda_ep {
     off_t dbr_offset;

     struct ibgda_cq *send_cq;
+    struct ibgda_cq *recv_cq;
     struct ibv_ah *ah;

     uint32_t user_index;
@@ -1520,7 +1521,8 @@ static int ibgda_create_cq_shared_objects(nvshmemt_ibgda_state_t *ibgda_state,

     struct ibv_context *context = device->context;

-    unsigned int num_cqs = device->dci.num_eps + device->rc.num_eps_per_pe * n_pes;
+    // Each RC qp has one send CQ and one recv CQ.
+    unsigned int num_cqs = device->dci.num_eps + device->rc.num_eps_per_pe * n_pes * 2;

     assert(ibgda_qp_depth > 0);
     size_t num_cqe = IBGDA_ROUND_UP_POW2_OR_0(ibgda_qp_depth);
@@ -1683,7 +1685,8 @@ static int ibgda_create_qp_shared_objects(nvshmemt_ibgda_state_t *ibgda_state,
     }

     // Allocate and map WQ buffer for all QPs.
-    wq_buf_size_per_qp = num_wqebb * MLX5_SEND_WQE_BB;  // num_wqebb is always a power of 2
+    // Todo: reduce the size of wq buffer.
+    wq_buf_size_per_qp = num_wqebb * MLX5_SEND_WQE_BB * 2;  // num_wqebb is always a power of 2
     wq_buf_size = wq_buf_size_per_qp * num_eps;
     status = ibgda_nic_control_alloc(&wq_mobject, wq_buf_size, IBGDA_GPAGE_SIZE);
     NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out, "cannot allocate wq buf.\n");
@@ -1864,8 +1867,11 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     int cqe_version = 0;

     struct ibgda_cq *send_cq = NULL;
+    struct ibgda_cq *recv_cq = NULL;

     size_t num_wqebb = IBGDA_ROUND_UP_POW2_OR_0(ibgda_qp_depth);
+    size_t num_recv_wqe = ibgda_qp_depth;
+    size_t recv_wqe_size = 16;

     int status = 0;

@@ -1893,6 +1899,11 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     status = ibgda_create_cq(&send_cq, device);
     NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out, "ibgda_create_cq failed.\n");

+    if (qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC) {
+        status = ibgda_create_cq(&recv_cq, device);
+        NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out, "ibgda_create_cq failed.\n");
+    }
+
     ep = (struct ibgda_ep *)calloc(1, sizeof(struct ibgda_ep));
     NVSHMEMI_NULL_ERROR_JMP(ep, status, NVSHMEMX_ERROR_OUT_OF_MEMORY, out,
                             "Unable to allocate mem for ep.\n");
@@ -1921,12 +1932,9 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     DEVX_SET(qpc, qp_context, pm_state, MLX5_QPC_PM_STATE_MIGRATED);
     DEVX_SET(qpc, qp_context, pd, device->qp_shared_object.pdn);
     DEVX_SET(qpc, qp_context, uar_page, uar_mobject->uar->page_id);  // BF register
-    DEVX_SET(qpc, qp_context, rq_type, IBGDA_SRQ_TYPE_VALUE);        // Shared Receive Queue
-    DEVX_SET(qpc, qp_context, srqn_rmpn_xrqn, device->qp_shared_object.srqn);
     DEVX_SET(qpc, qp_context, cqn_snd, send_cq->cqn);
-    DEVX_SET(qpc, qp_context, cqn_rcv, device->qp_shared_object.rcqn);
+    DEVX_SET(qpc, qp_context, cqn_rcv, qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC ? recv_cq->cqn : device->qp_shared_object.rcqn);
     DEVX_SET(qpc, qp_context, log_sq_size, IBGDA_ILOG2_OR0(num_wqebb));
-    DEVX_SET(qpc, qp_context, log_rq_size, 0);
     DEVX_SET(qpc, qp_context, cs_req, 0);                                     // Disable CS Request
     DEVX_SET(qpc, qp_context, cs_res, 0);                                     // Disable CS Response
     DEVX_SET(qpc, qp_context, dbr_umem_valid, IBGDA_MLX5_UMEM_VALID_ENABLE);  // Enable dbr_umem_id
@@ -1935,6 +1943,15 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     DEVX_SET(qpc, qp_context, dbr_umem_id, dbr_umem->umem_id);  // DBR buffer
     DEVX_SET(qpc, qp_context, user_index, qp_idx);
     DEVX_SET(qpc, qp_context, page_offset, 0);
+    if (qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC){
+        DEVX_SET(qpc, qp_context, rq_type, 0);        // Regular recv queue
+        DEVX_SET(qpc, qp_context, log_rq_size, IBGDA_ILOG2(num_recv_wqe)); // 4 wqe
+        DEVX_SET(qpc, qp_context, log_rq_stride, IBGDA_ILOG2(recv_wqe_size) - 4); // max recv wqe size = 16B
+    } else {
+        DEVX_SET(qpc, qp_context, rq_type, IBGDA_SRQ_TYPE_VALUE);        // Shared Receive Queue, DC must use this.
+        DEVX_SET(qpc, qp_context, srqn_rmpn_xrqn, device->qp_shared_object.srqn);
+        DEVX_SET(qpc, qp_context, log_rq_size, 0);
+    }

     ep->devx_qp = mlx5dv_devx_obj_create(context, cmd_in, sizeof(cmd_in), cmd_out, sizeof(cmd_out));
     NVSHMEMI_NULL_ERROR_JMP(ep->devx_qp, status, NVSHMEMX_ERROR_INTERNAL, out,
@@ -1944,9 +1961,9 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     ep->portid = portid;

     ep->sq_cnt = num_wqebb;
-    ep->sq_buf_offset = 0;
+    ep->sq_buf_offset = num_recv_wqe * recv_wqe_size;

-    ep->rq_cnt = 0;
+    ep->rq_cnt = num_recv_wqe;
     ep->rq_buf_offset = 0;

     ep->wq_mobject = device->qp_shared_object.wq_mobject;
@@ -1960,6 +1977,7 @@ static int ibgda_create_qp(struct ibgda_ep **ep_ptr, struct ibgda_device *device
     ep->uar_mobject = uar_mobject;

     ep->send_cq = send_cq;
+    ep->recv_cq = recv_cq;

     ep->qp_type = qp_type;

@@ -1971,6 +1989,7 @@ out:
     if (status) {
         if (uar_mobject) ibgda_unmap_and_free_qp_uar(uar_mobject);
         if (send_cq) ibgda_destroy_cq(send_cq);
+        if (recv_cq) ibgda_destroy_cq(recv_cq);
         if (ep) free(ep);
     }

@@ -2269,6 +2288,10 @@ static int ibgda_destroy_ep(struct ibgda_ep *ep) {
         ibgda_destroy_cq(ep->send_cq);
     }

+    if (ep->recv_cq) {
+        ibgda_destroy_cq(ep->recv_cq);
+    }
+
     if (ep->ah) {
         ftable.destroy_ah(ep->ah);
     }
@@ -2300,7 +2323,7 @@ static void ibgda_get_device_qp(nvshmemi_ibgda_device_qp_t *dev_qp, struct ibgda
     dev_qp->qpn = ep->qpn;

     assert(ep->wq_mobject->has_gpu_mapping);
-    dev_qp->tx_wq.wqe = (void *)((uintptr_t)ep->wq_mobject->aligned.gpu_ptr + ep->wq_offset);
+    dev_qp->tx_wq.wqe = (void *)((uintptr_t)ep->wq_mobject->aligned.gpu_ptr + ep->wq_offset + ep->sq_buf_offset);

     if (ibgda_nic_handler == IBGDA_NIC_HANDLER_GPU) {
         assert(ep->dbr_mobject->has_gpu_mapping);
@@ -2312,6 +2335,12 @@ static void ibgda_get_device_qp(nvshmemi_ibgda_device_qp_t *dev_qp, struct ibgda
     }

     dev_qp->tx_wq.nwqes = ep->sq_cnt;
+    if (ep->qp_type == NVSHMEMI_IBGDA_DEVICE_QP_TYPE_RC) {
+        dev_qp->rx_wq.nwqes = ep->rq_cnt;
+        dev_qp->rx_wq.wqe = (void *)((uintptr_t)ep->wq_mobject->aligned.gpu_ptr + ep->wq_offset + ep->rq_buf_offset);
+        dev_qp->rx_wq.dbrec = (__be32 *)((uintptr_t)ep->dbr_mobject->aligned.gpu_ptr + ep->dbr_offset);
+        dev_qp->rx_wq.bf = (void *)ep->uar_mobject->aligned.gpu_ptr;
+    }

     ibuf_dci_start = (uintptr_t)device->qp_shared_object.internal_buf.mem_object->aligned.gpu_ptr;
     ibuf_rc_start = ibuf_dci_start + (size_per_dci * device->dci.num_eps);
@@ -2361,6 +2390,9 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
     nvshmemi_ibgda_device_cq_t *cq_d = NULL;
     nvshmemi_ibgda_device_cq_t *cq_h = NULL;

+    nvshmemi_ibgda_device_cq_t *recv_cq_d = NULL;
+    nvshmemi_ibgda_device_cq_t *recv_cq_h = NULL;
+
     uint8_t *qp_group_switches_d = NULL;

     const size_t mvars_offset = offsetof(nvshmemi_ibgda_device_qp_t, mvars);
@@ -2368,6 +2400,7 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
     const size_t cons_t_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, tx_wq.cons_idx);
     const size_t wqe_h_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, tx_wq.resv_head);
     const size_t wqe_t_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, tx_wq.ready_head);
+    const size_t rx_resv_head_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, rx_wq.resv_head);

     nvshmemi_ibgda_device_qp_map_type_t rc_map_type = NVSHMEMI_IBGDA_DEVICE_QP_MAP_TYPE_INVALID;
     nvshmemi_ibgda_device_qp_map_type_t dc_map_type = NVSHMEMI_IBGDA_DEVICE_QP_MAP_TYPE_INVALID;
@@ -2405,7 +2438,7 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
         num_dct_handles += device->dct.num_eps * n_pes;
         num_dci_handles += device->dci.num_eps;
         num_rc_handles += device->rc.num_eps_per_pe * n_pes;
-        num_cq_handles += device->dci.num_eps + (device->rc.num_eps_per_pe * (n_pes - 1));
+        num_cq_handles += device->dci.num_eps + (device->rc.num_eps_per_pe * (n_pes - 1) * 2);
         num_shared_dci_handles += device->dci.num_shared_eps;
     }
     num_elements = num_dct_handles - NVSHMEMI_IBGDA_MAX_CONST_DCTS;
@@ -2441,6 +2474,10 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
     for (int i = 0; i < num_cq_handles; i++) {
         nvshmemi_init_ibgda_device_cq(cq_h[i]);
     }
+
+    recv_cq_h = (nvshmemi_ibgda_device_cq_t *)calloc(1, sizeof(*recv_cq_h));
+    NVSHMEMI_NULL_ERROR_JMP(recv_cq_h, status, NVSHMEMX_ERROR_OUT_OF_MEMORY, out, "recv_cq calloc err.");
+    nvshmemi_init_ibgda_device_cq(recv_cq_h[0]);
     /* allocate host memory for dct, rc, cq, dci end */

     /* allocate device memory for dct, rc, cq, dci start */
@@ -2544,6 +2581,14 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
                 }

                 ++cq_idx;
+
+                rc_h[arr_idx].rx_wq.cq = &cq_d[cq_idx];
+
+                ibgda_get_device_cq(&cq_h[cq_idx], device->rc.eps[i]->recv_cq);
+                cq_h[cq_idx].resv_head = (uint64_t *)(base_mvars_d_addr + rx_resv_head_offset);
+                cq_h[cq_idx].qpn = rc_h[arr_idx].qpn;
+                cq_h[cq_idx].qp_type = rc_h[arr_idx].qp_type;
+                ++cq_idx;
             }
         }
     }
--
2.25.1


From 0cc285269f154049f1c9775e07e306e03228eedc Mon Sep 17 00:00:00 2001
From: Shangyan Zhou <sy.zhou@deepseek.com>
Date: Sat, 8 Feb 2025 18:02:39 +0800
Subject: [PATCH 4/5] Maintain recv queue's cons_idx.

---
 src/include/device_host_transport/nvshmem_common_ibgda.h | 5 +++--
 src/modules/transport/ibgda/ibgda.cpp                    | 6 ++++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/src/include/device_host_transport/nvshmem_common_ibgda.h b/src/include/device_host_transport/nvshmem_common_ibgda.h
index 7d4e250..502645d 100644
--- a/src/include/device_host_transport/nvshmem_common_ibgda.h
+++ b/src/include/device_host_transport/nvshmem_common_ibgda.h
@@ -170,6 +170,7 @@ typedef struct {
     } tx_wq;
     struct {
         uint64_t resv_head;   // last reserved wqe idx + 1
+        uint64_t cons_idx;    // polled wqe idx + 1 (consumer index + 1)
     } rx_wq;
     struct {
         uint64_t head;
@@ -177,7 +178,7 @@ typedef struct {
     } ibuf;
     char padding[NVSHMEMI_IBGDA_QP_MANAGEMENT_PADDING];
 } __attribute__((__aligned__(8))) nvshmemi_ibgda_device_qp_management_v1;
-static_assert(sizeof(nvshmemi_ibgda_device_qp_management_v1) == 104,
+static_assert(sizeof(nvshmemi_ibgda_device_qp_management_v1) == 112,
               "ibgda_device_qp_management_v1 must be 104 bytes.");

 typedef nvshmemi_ibgda_device_qp_management_v1 nvshmemi_ibgda_device_qp_management_t;
@@ -214,7 +215,7 @@ typedef struct nvshmemi_ibgda_device_qp {
     } rx_wq;
     nvshmemi_ibgda_device_qp_management_v1 mvars;  // management variables
 } nvshmemi_ibgda_device_qp_v1;
-static_assert(sizeof(nvshmemi_ibgda_device_qp_v1) == 248, "ibgda_device_qp_v1 must be 248 bytes.");
+static_assert(sizeof(nvshmemi_ibgda_device_qp_v1) == 256, "ibgda_device_qp_v1 must be 248 bytes.");

 typedef nvshmemi_ibgda_device_qp_v1 nvshmemi_ibgda_device_qp_t;

diff --git a/src/modules/transport/ibgda/ibgda.cpp b/src/modules/transport/ibgda/ibgda.cpp
index b8d6bc7..a1cfe2e 100644
--- a/src/modules/transport/ibgda/ibgda.cpp
+++ b/src/modules/transport/ibgda/ibgda.cpp
@@ -1063,7 +1063,7 @@ static inline void ibgda_nic_control_free(struct ibgda_mem_object *mobject) {
         ibgda_host_mem_free(mobject);
 }

-static int ibgda_create_cq(struct ibgda_cq **pgcq, struct ibgda_device *device) {
+static int ibgda_create_cq(struct ibgda_cq **pgcq, struct ibgda_device *device, int cc = 1) {
     int status = 0;

     struct ibgda_cq *gcq = NULL;
@@ -1114,7 +1114,7 @@ static int ibgda_create_cq(struct ibgda_cq **pgcq, struct ibgda_device *device)
     cq_context = DEVX_ADDR_OF(create_cq_in, cmd_in, cq_context);
     DEVX_SET(cqc, cq_context, dbr_umem_valid, IBGDA_MLX5_UMEM_VALID_ENABLE);
     DEVX_SET(cqc, cq_context, cqe_sz, MLX5_CQE_SIZE_64B);
-    DEVX_SET(cqc, cq_context, cc, 0x1);  // Use collapsed CQ
+    DEVX_SET(cqc, cq_context, cc, cc);  // Use collapsed CQ
     DEVX_SET(cqc, cq_context, oi, 0x1);  // Allow overrun
     DEVX_SET(cqc, cq_context, dbr_umem_id, dbr_umem->umem_id);
     DEVX_SET(cqc, cq_context, log_cq_size, IBGDA_ILOG2_OR0(num_cqe));
@@ -2401,6 +2401,7 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {
     const size_t wqe_h_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, tx_wq.resv_head);
     const size_t wqe_t_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, tx_wq.ready_head);
     const size_t rx_resv_head_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, rx_wq.resv_head);
+    const size_t rx_cons_offset = offsetof(nvshmemi_ibgda_device_qp_management_t, rx_wq.cons_idx);

     nvshmemi_ibgda_device_qp_map_type_t rc_map_type = NVSHMEMI_IBGDA_DEVICE_QP_MAP_TYPE_INVALID;
     nvshmemi_ibgda_device_qp_map_type_t dc_map_type = NVSHMEMI_IBGDA_DEVICE_QP_MAP_TYPE_INVALID;
@@ -2586,6 +2587,7 @@ static int ibgda_setup_gpu_state(nvshmem_transport_t t) {

                 ibgda_get_device_cq(&cq_h[cq_idx], device->rc.eps[i]->recv_cq);
                 cq_h[cq_idx].resv_head = (uint64_t *)(base_mvars_d_addr + rx_resv_head_offset);
+                cq_h[cq_idx].cons_idx = (uint64_t *)(base_mvars_d_addr + rx_cons_offset);
                 cq_h[cq_idx].qpn = rc_h[arr_idx].qpn;
                 cq_h[cq_idx].qp_type = rc_h[arr_idx].qp_type;
                 ++cq_idx;
--
2.25.1


From f91eb8510f8c9aa4f5769bd88434db5ab000e65a Mon Sep 17 00:00:00 2001
From: Shangyan Zhou <sy.zhou@deepseek.com>
Date: Tue, 11 Feb 2025 11:00:57 +0800
Subject: [PATCH 5/5] Init rx_wq counters.

---
 src/include/device_host_transport/nvshmem_common_ibgda.h | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/include/device_host_transport/nvshmem_common_ibgda.h b/src/include/device_host_transport/nvshmem_common_ibgda.h
index 502645d..f0bc328 100644
--- a/src/include/device_host_transport/nvshmem_common_ibgda.h
+++ b/src/include/device_host_transport/nvshmem_common_ibgda.h
@@ -46,6 +46,8 @@
         qp_man.tx_wq.cons_idx = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                    \
         qp_man.tx_wq.get_head = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                    \
         qp_man.tx_wq.get_tail = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                    \
+        qp_man.rx_wq.resv_head = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                    \
+        qp_man.rx_wq.cons_idx = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                    \
         qp_man.ibuf.head = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                         \
         qp_man.ibuf.tail = NVSHMEMI_IBGDA_ULSCALAR_INVALID;                         \
     } while (0);
--
2.25.1