memnic.cc 12.3 KB
Newer Older
GAO Bin's avatar
GAO Bin committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
 * Copyright 2022 Max Planck Institute for Software Systems, and
 * National University of Singapore
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */


#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#include <cassert>
#include <ctime>
#include <iostream>
#include <vector>

39
40

#include <arpa/inet.h>
41
#include <netinet/udp.h>
42
43
44
#include <linux/ip.h>
#include <linux/if_ether.h>

GAO Bin's avatar
GAO Bin committed
45
46
47
48
49
#include <simbricks/base/cxxatomicfix.h>

extern "C" {
#include <simbricks/mem/if.h>
#include <simbricks/nicif/nicif.h>
50
#include <simbricks/mem/memop.h>
GAO Bin's avatar
GAO Bin committed
51
52
};

Hejing Li's avatar
Hejing Li committed
53
//#define MEMNIC_DEBUG 1
GAO Bin's avatar
GAO Bin committed
54
55
56

static int exiting = 0;
static uint64_t cur_ts = 0;
57
58
59
60
uint16_t src_port = 1;
uint16_t dest_port = 1;
uint32_t ip_addr = 0x0F0E0D0C;

61
62
63
64
65
66
union mac_addr_{
  uint64_t mac_64;
  uint8_t mac_byte[6];
};

union mac_addr_ mac_addr;
GAO Bin's avatar
GAO Bin committed
67
68
69
70
71
72
73
74
75

static void sigint_handler(int dummy) {
  exiting = 1;
}

static void sigusr1_handler(int dummy) {
    fprintf(stderr, "main_time = %lu\n", cur_ts);
}

76
77
78
79
80
bool MemNicIfInit(struct SimbricksMemIf *memif, struct SimbricksNetIf *netif,
                  const char *shm_path,
                  struct SimbricksBaseIfParams *memParams,
                  struct SimbricksBaseIfParams *netParams) {

GAO Bin's avatar
GAO Bin committed
81
  struct SimbricksBaseIf *membase = &memif->base;
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  struct SimbricksBaseIf *netbase = &netif->base;

  // first allocate pool
  size_t shm_size = 0;
  if (memParams){
    shm_size += memParams->in_num_entries * memParams->in_entries_size;
    shm_size += memParams->out_num_entries * memParams->out_entries_size;
  }
  if (netParams){
    shm_size += netParams->in_num_entries * netParams->in_entries_size;
    shm_size += netParams->out_num_entries * netParams->out_entries_size;
  }
  
  std::string shm_path_ = shm_path;
GAO Bin's avatar
GAO Bin committed
96
97
  struct SimbricksBaseIfSHMPool pool_;
  memset(&pool_, 0, sizeof(pool_));
98
99
100
101
102
103
104
105
106
107

  if (SimbricksBaseIfSHMPoolCreate(&pool_, shm_path_.c_str(), shm_size) !=
      0) {
      perror("MemNicIfInit: SimbricksBaseIfSHMPoolCreate failed");
      return false;
    }

  struct SimBricksBaseIfEstablishData ests[2];
  struct SimbricksProtoMemHostIntro mem_intro;
  struct SimbricksProtoNetIntro net_intro;
GAO Bin's avatar
GAO Bin committed
108
109
  unsigned n_bifs = 0;

110
111
112
  memset(&net_intro, 0, sizeof(net_intro));

  // MemIf Init
GAO Bin's avatar
GAO Bin committed
113
  if (SimbricksBaseIfInit(membase, memParams)) {
114
    perror("MemIfInit: SimbricksBaseIfInit failed");
GAO Bin's avatar
GAO Bin committed
115
116
  }

117
118
  if (SimbricksBaseIfListen(membase, &pool_) != 0) {
    perror("MemifInit: SimbricksBaseIfListen failed");
GAO Bin's avatar
GAO Bin committed
119
120
121
    return false;
  }

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
  memset(&mem_intro, 0, sizeof(mem_intro));
  ests[n_bifs].base_if = membase;
  ests[n_bifs].tx_intro = &mem_intro;
  ests[n_bifs].tx_intro_len = sizeof(mem_intro);
  ests[n_bifs].rx_intro = &mem_intro;
  ests[n_bifs].rx_intro_len = sizeof(mem_intro);
  n_bifs++;

  // NetIf Init
  if (SimbricksBaseIfInit(netbase, netParams)) {
    perror("NetIfInit: SimbricksBaseIfInit failed");
  }

  if (SimbricksBaseIfListen(netbase, &pool_) != 0) {
    perror("NetIfInit: SimbricksBaseIfListen failed");
GAO Bin's avatar
GAO Bin committed
137
138
139
    return false;
  }

140
141
142
143
144
145
146
147
148
149
150
  memset(&net_intro, 0, sizeof(net_intro));
  ests[n_bifs].base_if = netbase;
  ests[n_bifs].tx_intro = &net_intro;
  ests[n_bifs].tx_intro_len = sizeof(net_intro);
  ests[n_bifs].rx_intro = &net_intro;
  ests[n_bifs].rx_intro_len = sizeof(net_intro);
  n_bifs++;



  if (SimBricksBaseIfEstablish(ests, 2)) {
GAO Bin's avatar
GAO Bin committed
151
152
153
154
155
156
157
158
    fprintf(stderr, "SimBricksBaseIfEstablish failed\n");
    return false;
  }

  printf("done connecting\n");
  return true;
}

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
static inline int SimbricksMemNicIfSync(struct SimbricksMemIf *memif,
                                        struct SimbricksNetIf *netif,
                                        uint64_t cur_ts) {
  return ((SimbricksMemIfM2HOutSync(memif, cur_ts) == 0 &&
           SimbricksNetIfOutSync(netif, cur_ts) == 0)
              ? 0
              : -1);
}

static inline uint64_t SimbricksMemNicIfNextTimestamp(
    struct SimbricksMemIf *memif, struct SimbricksNetIf *netif) {
  uint64_t net_in = SimbricksNetIfInTimestamp(netif);
  uint64_t mem_in = SimbricksMemIfH2MInTimestamp(memif);

  return (net_in < mem_in ? net_in : mem_in);
}

void ForwardToETH(SimbricksNetIf *netif, volatile union SimbricksProtoMemH2M *data, uint8_t type) {
  
GAO Bin's avatar
GAO Bin committed
178
179
180
  volatile union SimbricksProtoNetMsg *msg = SimbricksNetIfOutAlloc(netif, cur_ts);
  if (msg == NULL)
    return;
181

GAO Bin's avatar
GAO Bin committed
182
  volatile struct SimbricksProtoNetMsgPacket *packet = &msg->packet;
183
184

  // Add Ethernet header
185
186
187
188
189
190
191
192
193
  struct ethhdr *eth_hdr = (struct ethhdr *)packet->data;  

  //memcpy(eth_hdr->h_source, mac_addr.mac_byte, ETH_ALEN);
  int i = 0;
  for (i = 0; i < ETH_ALEN; i++){
    eth_hdr->h_source[i] = mac_addr.mac_byte[i];
    eth_hdr->h_dest[i] = 0xFF; // Keep destination to broadcast for now
  }
  
194
195
196
197
198
199
200
201
202
203
  eth_hdr->h_proto = htons(ETH_P_IP);

  // Add IP header
  struct iphdr *ip_hdr = (struct iphdr *)(eth_hdr + 1);
  ip_hdr->daddr = 0xFFFFFFFF;
  ip_hdr->saddr = ip_addr;
  ip_hdr->tot_len = sizeof(struct iphdr) + sizeof(struct udphdr) + sizeof(struct MemOp);
  if (type == SIMBRICKS_PROTO_MEM_H2M_MSG_WRITE){
    ip_hdr->tot_len += data->write.len;
  }
204
  ip_hdr->tot_len = htons(ip_hdr->tot_len);
205
206
207
208
209
210
211
212
213
214
  // Add UDP header
  struct udphdr *udp_hdr = (struct udphdr *)(ip_hdr + 1);
  udp_hdr->uh_sport = src_port;
  udp_hdr->uh_dport = dest_port;
  udp_hdr->uh_ulen = sizeof(struct udphdr) + sizeof(struct MemOp);
  if (type == SIMBRICKS_PROTO_MEM_H2M_MSG_WRITE){
    udp_hdr->uh_ulen += data->write.len;
  }
  udp_hdr->uh_sum = 0; // To update later

215
216
217
  packet->len = sizeof(struct ethhdr) + sizeof(struct iphdr) +
                 sizeof(struct udphdr) + sizeof(struct MemOp);

218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
  // Fill the MemOps struct in the payload
  struct MemOp *memop = (struct MemOp *)(udp_hdr + 1);
  void *payload;
  switch (type) {
    case SIMBRICKS_PROTO_MEM_H2M_MSG_READ:
      memop->OpType = type;
      memop->req_id = data->read.req_id;
      memop->as_id = data->read.as_id;
      memop->addr = data->read.addr;
      memop->len = data->read.len;
      break;
    case SIMBRICKS_PROTO_MEM_H2M_MSG_WRITE:
      memop->OpType = type;
      memop->req_id = data->write.req_id;
      memop->as_id = data->write.as_id;
      memop->addr = data->write.addr;
      memop->len = data->write.len;
      payload = (void *)(memop + 1);
      memcpy((void *)payload, (void *)data->write.data, data->write.len);
237
      packet->len += data->write.len;
238
239
240
241
      break;

    default:
      fprintf(stderr, "ForwardToETH: unsupported type=%u\n", type);
242
    
243
244
245
246

  }
  

GAO Bin's avatar
GAO Bin committed
247
248
249
  SimbricksNetIfOutSend(netif, msg, SIMBRICKS_PROTO_NET_MSG_PACKET);
}

250
251
252

void ForwardToMEM(SimbricksMemIf *memif, volatile struct SimbricksProtoNetMsgPacket *packet) {

GAO Bin's avatar
GAO Bin committed
253
  volatile union SimbricksProtoMemM2H *msg = SimbricksMemIfM2HOutAlloc(memif, cur_ts);
254

GAO Bin's avatar
GAO Bin committed
255
256
257
  if (msg == NULL)
    return;
  
258
259
260
261
262
263
264
265
266
267
  
  uint8_t type;
  struct ethhdr *eth_hdr = (struct ethhdr *)packet->data;
  struct iphdr *ip_hdr = (struct iphdr *)(eth_hdr + 1);
  struct udphdr *udp_hdr = (struct udphdr *)(ip_hdr + 1);
  struct MemOp *memop = (struct MemOp *)(udp_hdr + 1);
  void *data = (void *)(memop + 1);

  type = memop->OpType;

GAO Bin's avatar
GAO Bin committed
268
269
  switch (type){
    case SIMBRICKS_PROTO_MEM_M2H_MSG_READCOMP:
270
      volatile struct SimbricksProtoMemM2HReadcomp *rc;
GAO Bin's avatar
GAO Bin committed
271
      rc = &msg->readcomp;
272
273
274
      rc->req_id = memop->req_id;

      memcpy((void*)rc->data, (void*)data, memop->len);
GAO Bin's avatar
GAO Bin committed
275
276
277
278
      SimbricksMemIfM2HOutSend(memif, msg, SIMBRICKS_PROTO_MEM_M2H_MSG_READCOMP);
      break;
  
    case SIMBRICKS_PROTO_MEM_M2H_MSG_WRITECOMP:
279
      volatile struct SimbricksProtoMemM2HWritecomp *wc;
GAO Bin's avatar
GAO Bin committed
280
      wc = &msg->writecomp;
281
282
      wc->req_id = memop->req_id;

GAO Bin's avatar
GAO Bin committed
283
284
285
286
287
288
289
290
291
292
293
294
      SimbricksMemIfM2HOutSend(memif, msg, SIMBRICKS_PROTO_MEM_M2H_MSG_WRITECOMP);
      break;

    case SIMBRICKS_PROTO_MSG_TYPE_SYNC:
      break;

    default:
      fprintf(stderr, "poll_m2h: unsupported type=%u\n", type);
  }

}

295
void PollN2M(SimbricksNetIf *netif, struct SimbricksMemIf *memif, uint64_t cur_ts) {
GAO Bin's avatar
GAO Bin committed
296
297
298
299
300
301
302
303
304
  volatile union SimbricksProtoNetMsg *msg = SimbricksNetIfInPoll(netif, cur_ts);
  if (msg == NULL){
    return;
  }
  uint8_t type;

  type = SimbricksNetIfInType(netif, msg);
  switch (type) {
    case SIMBRICKS_PROTO_NET_MSG_PACKET:
305
      ForwardToMEM(memif, &msg->packet);
GAO Bin's avatar
GAO Bin committed
306
307
308
309
310
311
      break;

    case SIMBRICKS_PROTO_MSG_TYPE_SYNC:
      break;

    default:
312
      fprintf(stderr, "poll_n2m: unsupported type=%u\n", type);
GAO Bin's avatar
GAO Bin committed
313
314
315
316
317
318
319
320
  }

  SimbricksNetIfInDone(netif, msg);
}

void PollH2M(struct SimbricksMemIf *memif, SimbricksNetIf *netif, uint64_t cur_ts) {
  volatile union SimbricksProtoMemH2M *msg = SimbricksMemIfH2MInPoll(memif, cur_ts);

321
  if (msg == NULL) {
GAO Bin's avatar
GAO Bin committed
322
323
324
325
326
327
328
329
    return;
  }
  uint8_t type;

  type = SimbricksMemIfH2MInType(memif, msg);
  switch (type) {
    
    case SIMBRICKS_PROTO_MEM_H2M_MSG_READ:
Hejing Li's avatar
Hejing Li committed
330
      //printf("received read request\n");
331
      ForwardToETH(netif, msg, type);
GAO Bin's avatar
GAO Bin committed
332
333
334
      break;

    case SIMBRICKS_PROTO_MEM_H2M_MSG_WRITE:
Hejing Li's avatar
Hejing Li committed
335
      //printf("received write request\n");
336
      ForwardToETH(netif, msg, type);
GAO Bin's avatar
GAO Bin committed
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
      break;
    case SIMBRICKS_PROTO_MSG_TYPE_SYNC:
      break;
    default:
      fprintf(stderr, "poll_h2m: unsupported type=%u\n", type);
  }

  SimbricksMemIfH2MInDone(memif, msg);
}

int main(int argc, char *argv[]) {
  
  signal(SIGINT, sigint_handler);
  signal(SIGUSR1, sigusr1_handler);

  int sync_mem = 1, sync_net = 1;
353
354
355

  uint64_t ts_mem = 0;
  uint64_t ts_net = 0;
GAO Bin's avatar
GAO Bin committed
356
357
358
359
360
361
  const char *shmPath;

  struct SimbricksBaseIfParams memParams;
  struct SimbricksBaseIfParams netParams;

  struct SimbricksMemIf memif;
362
  struct SimbricksNetIf netif;
GAO Bin's avatar
GAO Bin committed
363
364
365
366
  
  SimbricksMemIfDefaultParams(&memParams);
  SimbricksNetIfDefaultParams(&netParams);

367
  
GAO Bin's avatar
GAO Bin committed
368
369
370
  if (argc < 4 || argc > 10) {
    fprintf(stderr,
            "Usage: memnic MEM-SOCKET NET-SOCKET"
371
            "SHM [MAC-ADDR] [SYNC-MODE] [START-TICK] [SYNC-PERIOD] [MEM-LATENCY]"
GAO Bin's avatar
GAO Bin committed
372
373
374
375
376
            "[ETH-LATENCY]\n");
    return -1;
  }

  if (argc >= 7)
377
     cur_ts = strtoull(argv[6], NULL, 0);
GAO Bin's avatar
GAO Bin committed
378
  if (argc >= 8)
379
380
    memParams.sync_interval = netParams.sync_interval =
         strtoull(argv[7], NULL, 0) * 1000ULL;
GAO Bin's avatar
GAO Bin committed
381
  if (argc >= 9)
382
383
384
    memParams.link_latency = strtoull(argv[8], NULL, 0) * 1000ULL;
  if (argc >= 10)
    netParams.link_latency = strtoull(argv[9], NULL, 0) * 1000ULL;
GAO Bin's avatar
GAO Bin committed
385
386
387
388

  memParams.sock_path = argv[1];
  netParams.sock_path = argv[2];
  shmPath = argv[3];
389
390
391
  mac_addr.mac_64 = strtoull(argv[4], NULL, 16);
  printf("mac_addr=%lx\n", mac_addr.mac_64);
  printf("mac_8: %X:%X:%X:%X:%X:%X\n", mac_addr.mac_byte[0], mac_addr.mac_byte[1],mac_addr.mac_byte[2],mac_addr.mac_byte[3],mac_addr.mac_byte[4],mac_addr.mac_byte[5]);
GAO Bin's avatar
GAO Bin committed
392
393
394
395
396

  memParams.sync_mode = kSimbricksBaseIfSyncOptional;
  netParams.sync_mode = kSimbricksBaseIfSyncOptional;
  memParams.blocking_conn = false;
  memif.base.sync = sync_mem;
397
  netif.base.sync = sync_net;
GAO Bin's avatar
GAO Bin committed
398
399


400
401
  if (!MemNicIfInit(&memif, &netif, shmPath, &memParams, &netParams)){
    fprintf(stderr, "MemNicIf init error happens");
GAO Bin's avatar
GAO Bin committed
402
403
404
405
406
407
408
    return -1;
  }

  

  fprintf(stderr, "start polling\n");
  while (!exiting){
409
410
    while (SimbricksMemNicIfSync(&memif, &netif, cur_ts)) {
      fprintf(stderr, "warn: SimbricksMemNicIfSync failed (memif=%lu)\n", cur_ts);
GAO Bin's avatar
GAO Bin committed
411
    }
412

GAO Bin's avatar
GAO Bin committed
413
    do {
414
415
416
417
418
419
      PollH2M(&memif, &netif, cur_ts);
      PollN2M(&netif, &memif, cur_ts);

      ts_mem = SimbricksMemIfH2MInTimestamp(&memif);
      ts_net = SimbricksNetIfInTimestamp(&netif);

GAO Bin's avatar
GAO Bin committed
420
    } while (!exiting && 
421
             ((sync_mem && ts_mem <= cur_ts) || (sync_net && ts_net <= cur_ts)));
GAO Bin's avatar
GAO Bin committed
422
423

    if (sync_mem && sync_net)
424
      cur_ts = ts_mem <= ts_net ? ts_mem : ts_net;
GAO Bin's avatar
GAO Bin committed
425
    else if (sync_mem)
426
      cur_ts = ts_mem;
GAO Bin's avatar
GAO Bin committed
427
    else if (sync_net)
428
      cur_ts = ts_net;
GAO Bin's avatar
GAO Bin committed
429
430

  }
431
432
433

  // Todo: cleanup

GAO Bin's avatar
GAO Bin committed
434
435
  return 0;
}