base.c 13.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
 * Copyright 2021 Max Planck Institute for Software Systems, and
 * National University of Singapore
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

25
#include "dist/common/base.h"
26
27
28
29
30
31
32
33
34
35
36
37
38

#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>

39
#include <simbricks/base/proto.h>
40

41
#include "dist/common/utils.h"
42

43
44
static const uint64_t kPollReportThreshold = 128;
static const uint64_t kCleanReportThreshold = 128;
45
static const uint64_t kPollMax = 8;
46
static const uint64_t kCleanupMax = 16;
47

48
49
static size_t shm_size;
void *shm_base;
50
51
52
53
54
55
static int shm_fd = -1;
static size_t shm_alloc_off = 0;

size_t peer_num = 0;
struct Peer *peers = NULL;

56
static int epfd = -1;
57

58
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_) {
59
60
61
62
63
64
65
  shm_size = shm_size_;
  if ((shm_fd = ShmCreate(shm_path_, shm_size_, &shm_base)) < 0)
    return 1;

  epfd = epfd_;
  return 0;
}
66

67
static int ShmAlloc(size_t size, uint64_t *off) {
68
69
70
71
#ifdef DEBUG
  fprintf(stderr, "ShmAlloc(%zu)\n", size);
#endif

72
73
74
75
76
77
78
79
80
81
  if (shm_alloc_off + size > shm_size) {
    fprintf(stderr, "ShmAlloc: alloc of %zu bytes failed\n", size);
    return 1;
  }

  *off = shm_alloc_off;
  shm_alloc_off += size;
  return 0;
}

82
bool BasePeerAdd(const char *path, bool listener) {
83
84
  struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
  if (!peer) {
85
    perror("NetPeerAdd: realloc failed");
86
87
88
89
90
91
    return false;
  }
  peers = peer;
  peer += peer_num;
  peer_num++;

92
  memset(peer, 0, sizeof(*peer));
93
  if (!(peer->sock_path = strdup(path))) {
94
    perror("NetPeerAdd: strdup failed");
95
96
    return false;
  }
97
  peer->is_listener = listener;
98
99
  peer->sock_fd = -1;
  peer->shm_fd = -1;
100
  peer->last_sent_pos = -1;
101
102
103
  return true;
}

104
int BaseListen() {
105
#ifdef DEBUG
106
  fprintf(stderr, "Creating listening sockets\n");
107
108
#endif

109
110
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
111
    if (!peer->is_listener)
112
113
      continue;

114
115
116
117
#ifdef DEBUG
    fprintf(stderr, "  Creating socket %s %zu\n", peer->sock_path, i);
#endif
    if ((peer->listen_fd = UxsocketInit(peer->sock_path)) < 0) {
118
119
120
121
122
123
124
      perror("PeersInitNets: unix socket init failed");
      return 1;
    }

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
125
126
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->listen_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl accept failed");
127
128
129
      return 1;
    }
  }
130
131
132
133

#ifdef DEBUG
  fprintf(stderr, "PeerInitNets done\n");
#endif
134
135
136
  return 0;
}

137
int BaseConnect() {
138
139
140
141
#ifdef DEBUG
  fprintf(stderr, "Connecting to device sockets\n");
#endif

142
143
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
144
    if (peer->is_listener)
145
146
      continue;

147
148
149
150
#ifdef DEBUG
    fprintf(stderr, "  Connecting to socket %s %zu\n", peer->sock_path, i);
#endif

151
152
153
154
155
156
157
158
159
160
161
162
163
164
    if ((peer->sock_fd = UxsocketConnect(peer->sock_path)) < 0)
      return 1;

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl failed");
      return 1;
    }
  }
  return 0;
}

165
166
167
168
int BasePeerSetupQueues(struct Peer *peer) {
  if (!peer->is_listener) {
    /* only need to set up queues for listeners */
    return 0;
169
170
  }

171
  struct SimbricksProtoListenerIntro *li =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
172
      (struct SimbricksProtoListenerIntro *)peer->intro_remote;
173

174
#ifdef DEBUG
175
  fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
176
  fprintf(stderr, "  l2c_el=%lu l2c_n=%lu c2l_el=%lu c2l_n=%lu\n", li->l2c_elen,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
177
          li->l2c_nentries, li->c2l_elen, li->c2l_nentries);
178
#endif
179

180
181
  uint64_t l2c_offset = 0;
  if (ShmAlloc(li->l2c_elen * li->l2c_nentries, &l2c_offset)) {
182
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc l2c failed");
183
184
    return 1;
  }
185
186
187
188
  li->l2c_offset = l2c_offset;

  uint64_t c2l_offset = 0;
  if (ShmAlloc(li->c2l_elen * li->c2l_nentries, &c2l_offset)) {
189
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc c2l failed");
190
191
    return 1;
  }
192
193
  li->c2l_offset = c2l_offset;

194
195
196
  peer->shm_fd = shm_fd;
  peer->shm_base = shm_base;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
197
  peer->local_base = (void *)((uintptr_t)shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
198
  peer->local_offset = li->c2l_offset;
199
200
  peer->local_elen = li->c2l_elen;
  peer->local_enum = li->c2l_nentries;
201

Antoine Kaufmann's avatar
Antoine Kaufmann committed
202
  peer->cleanup_base = (void *)((uintptr_t)shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
203
  peer->cleanup_offset = li->l2c_offset;
204
205
206
207
208
  peer->cleanup_elen = li->l2c_elen;
  peer->cleanup_enum = li->l2c_nentries;

  return 0;
}
209

210
211
212
213
214
int BasePeerSendIntro(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif

Antoine Kaufmann's avatar
Antoine Kaufmann committed
215
  if (peer->sock_fd == -1) {
216
217
218
219
    /* We can receive the welcome message from our peer before our local
       connection to the simulator is established. In this case we hold the
       message till the connection is established and send it then. */
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
220
221
222
    fprintf(stderr,
            "PeerNetSetupQueues: socket not ready yet, delaying "
            "send\n");
223
#endif
224
    return 0;
225
226
  }

227
228
  int shm_fd = (peer->is_listener ? peer->shm_fd : -1);
  if (UxsocketSendFd(peer->sock_fd, peer->intro_remote, peer->intro_remote_len,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
229
                     shm_fd)) {
230
    perror("BasePeerSendIntro: send failed");
231
232
233
234
235
    return 1;
  }
  return 0;
}

Antoine Kaufmann's avatar
Antoine Kaufmann committed
236
237
int BasePeerReport(struct Peer *peer, uint32_t written_pos,
                   uint32_t clean_pos) {
238
  uint32_t pos = peer->local_pos_cleaned;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
239
  if (written_pos == peer->cleanup_pos_last && clean_pos == pos)
240
241
242
243
244
245
246
247
248
    return 0;

#ifdef DEBUG
  fprintf(stderr, "PeerReport: peer %s written %u -> %u, cleaned %u -> %u\n",
          peer->sock_path, peer->cleanup_pos_last, written_pos,
          peer->local_pos_cleaned, clean_pos);
#endif

  peer->cleanup_pos_last = written_pos;
249
250
  while (pos != clean_pos) {
    void *entry = (peer->local_base + pos * peer->local_elen);
251
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
252
253
254
255
        (volatile union SimbricksProtoBaseMsg *)entry;
    msg->header.own_type =
        (msg->header.own_type & (~SIMBRICKS_PROTO_MSG_OWN_MASK)) |
        SIMBRICKS_PROTO_MSG_OWN_PRO;
256

257
258
259
    pos += 1;
    if (pos >= peer->local_enum)
      pos -= peer->local_enum;
260
  }
261
  peer->local_pos_cleaned = pos;
262
263
264
265

  return 0;
}

266
267
268
269
static int PeerAcceptEvent(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerAcceptEvent(%s)\n", peer->sock_path);
#endif
270
  assert(peer->is_listener);
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295

  if ((peer->sock_fd = accept(peer->listen_fd, NULL, NULL)) < 0) {
    perror("PeersInitNets: accept failed");
    return 1;
  }

#ifdef DEBUG
  fprintf(stderr, "Accepted %zu\n", peer - peers);
#endif

  close(peer->listen_fd);

  struct epoll_event epev;
  epev.events = EPOLLIN;
  epev.data.ptr = peer;
  if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
    perror("PeersInitNets: epoll_ctl failed");
    return 1;
  }

  /* we may have already received the welcome message from our remote peer. In
     that case, send it now. */
  if (peer->intro_valid_remote) {
#ifdef DEBUG
    fprintf(stderr, "PeerAcceptEvent(%s): sending welcome message\n",
Antoine Kaufmann's avatar
Antoine Kaufmann committed
296
            peer->sock_path);
297
#endif
298
299
    if (BasePeerSendIntro(peer)) {
      fprintf(stderr, "PeerAcceptEvent(%s): sending intro failed\n",
Antoine Kaufmann's avatar
Antoine Kaufmann committed
300
              peer->sock_path);
301
302
303
304
305
306
      return 1;
    }
  }
  return 0;
}

307
int BasePeerEvent(struct Peer *peer, uint32_t events) {
308
#ifdef DEBUG
309
  fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
310
#endif
311
312
313
314
315
316
317
318
319

  // disable peer if not an input event
  if (!(events & EPOLLIN)) {
    fprintf(stderr, "PeerEvent: non-input event, disabling peer (%s)",
            peer->sock_path);
    peer->ready = false;
    return 1;
  }

320
  // if peer is network and not yet connected, this is an accept event
321
  if (peer->is_listener && peer->sock_fd == -1) {
322
323
324
    return PeerAcceptEvent(peer);
  }

325
326
327
328
329
330
331
332
  // if we already have the intro, this is not expected
  if (peer->intro_valid_local) {
    fprintf(stderr, "PeerEvent: receive event after intro (%s)\n",
            peer->sock_path);
    return 1;
  }

  // receive intro message
333
  ssize_t ret;
334
335
  if (!peer->is_listener) {
    /* not a listener, so we're expecting an fd for the shm region */
336
    ret = UxsocketRecvFd(peer->sock_fd, peer->intro_local,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
337
                         sizeof(peer->intro_local), &peer->shm_fd);
338
    if (ret <= 0)
339
340
341
342
      return 1;

    if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
      return 1;
343
344

    struct SimbricksProtoListenerIntro *li =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
345
346
        (struct SimbricksProtoListenerIntro *)peer->intro_local;
    peer->local_base = (void *)((uintptr_t)peer->shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
347
    peer->local_offset = li->l2c_offset;
348
349
350
    peer->local_elen = li->l2c_elen;
    peer->local_enum = li->l2c_nentries;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
351
    peer->cleanup_base = (void *)((uintptr_t)peer->shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
352
    peer->cleanup_offset = li->c2l_offset;
353
354
    peer->cleanup_elen = li->c2l_elen;
    peer->cleanup_enum = li->c2l_nentries;
355
  } else {
356
    /* as a listener, we use our local shm region, so no fd is sent to us */
Antoine Kaufmann's avatar
Antoine Kaufmann committed
357
    ret = recv(peer->sock_fd, peer->intro_local, sizeof(peer->intro_local), 0);
358
    if (ret <= 0) {
359
360
361
362
363
      perror("PeerEvent: recv failed");
      return 1;
    }
  }

364
  peer->intro_local_len = ret;
365
366
  peer->intro_valid_local = true;

367
  // pass intro along
368
  if (BaseOpPassIntro(peer))
369
370
371
    return 1;

  if (peer->intro_valid_remote) {
372
373
374
#ifdef DEBUG
    fprintf(stderr, "PeerEvent(%s): marking peer as ready\n", peer->sock_path);
#endif
375
376
377
378
379
    peer->ready = true;
  }
  return 0;
}

380
static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
381
  uint32_t n;
382
  for (n = 0; n < kPollMax && peer->local_pos + n < peer->local_enum; n++) {
383
384
385
386
    // stop if we would pass the cleanup position
    if ((peer->local_pos + n + 1) % peer->local_enum ==
        peer->local_pos_cleaned) {
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
387
388
      fprintf(stderr, "PollPeerTransfer: waiting for cleanup (%u %u)\n", n,
              peer->local_pos_cleaned);
389
390
391
392
#endif
      break;
    }

393
    void *entry = (peer->local_base + (peer->local_pos + n) * peer->local_elen);
394
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
395
        (volatile union SimbricksProtoBaseMsg *)entry;
396
397
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_CON)
398
      break;
399
400
  }

401
  if (n > 0) {
402
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
403
404
405
    fprintf(stderr, "PollPeerTransfer: transferring [%u,%u] (lpc=%u lpr=%u)\n",
            peer->local_pos, peer->local_pos + n, peer->local_pos_cleaned,
            peer->local_pos_reported);
406
#endif
407
    BaseOpPassEntries(peer, peer->local_pos, n);
408
    uint32_t newpos = peer->local_pos + n;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
409
410
    peer->local_pos =
        (newpos < peer->local_enum ? newpos : newpos - peer->local_enum);
411

Antoine Kaufmann's avatar
Antoine Kaufmann committed
412
413
    uint64_t unreported =
        (peer->local_pos - peer->local_pos_reported) % peer->local_enum;
414
415
416
417
418
419
420
421
422
    if (unreported >= kPollReportThreshold)
      *report = true;
  }
}

static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
  if (peer->cleanup_pos_next == peer->cleanup_pos_last)
    return;

423
424
425
426
  uint64_t cnt = 0;
  do {
    void *entry =
        (peer->cleanup_base + peer->cleanup_pos_next * peer->cleanup_elen);
427
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
428
        (volatile union SimbricksProtoBaseMsg *)entry;
429

430
431
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_PRO)
432
433
      break;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
434
#ifdef DEBUG
435
436
437
438
439
440
    fprintf(stderr, "PollPeerCleanup: peer %s has clean entry at %u\n",
            peer->sock_path, peer->cleanup_pos_next);
#endif
    peer->cleanup_pos_next += 1;
    if (peer->cleanup_pos_next >= peer->cleanup_enum)
      peer->cleanup_pos_next -= peer->cleanup_enum;
441
442
  } while (++cnt <= kCleanupMax &&
           peer->cleanup_pos_next != peer->cleanup_pos_last);
443

444
  if (cnt > 0) {
Antoine Kaufmann's avatar
Antoine Kaufmann committed
445
446
447
    uint64_t unreported =
        (peer->cleanup_pos_next - peer->cleanup_pos_reported) %
        peer->cleanup_enum;
448
449
450
451
452
    if (unreported >= kCleanReportThreshold)
      *report = true;
  }
}

453
void BasePoll() {
454
455
456
457
458
  bool report = false;
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
    if (!peer->ready)
      continue;
459

460
461
    PollPeerTransfer(peer, &report);
    PollPeerCleanup(peer, &report);
462
463
  }

464
  if (report)
465
    BaseOpPassReport();
466
467
}

Antoine Kaufmann's avatar
Antoine Kaufmann committed
468
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data) {
469
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
470
471
  fprintf(stderr, "BaseEntryReceived: pos=%u (cpr=%u cpl=%u)\n", pos,
          peer->cleanup_pos_reported, peer->cleanup_pos_last);
472
#endif
473

Antoine Kaufmann's avatar
Antoine Kaufmann committed
474
  uint64_t off = (uint64_t)pos * peer->cleanup_elen;
475
  void *entry = peer->cleanup_base + off;
476
  volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
477
478
      (volatile union SimbricksProtoBaseMsg *)entry;

479
  // first copy data after header
Antoine Kaufmann's avatar
Antoine Kaufmann committed
480
481
  memcpy((void *)(msg + 1), (uint8_t *)data + sizeof(*msg),
         peer->cleanup_elen - sizeof(*msg));
482
  // then copy header except for last byte
Antoine Kaufmann's avatar
Antoine Kaufmann committed
483
  memcpy((void *)msg, data, sizeof(*msg) - 1);
484
485
486
  // WMB()
  // now copy last byte
  volatile union SimbricksProtoBaseMsg *src_msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
487
      (volatile union SimbricksProtoBaseMsg *)data;
488
489
  asm volatile("sfence" ::: "memory");
  msg->header.own_type = src_msg->header.own_type;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
490
}