base.c 13.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
 * Copyright 2021 Max Planck Institute for Software Systems, and
 * National University of Singapore
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

25
#include "dist/common/base.h"
26
27
28
29
30
31
32
33
34
35
36
37
38

#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>

39
#include <simbricks/base/proto.h>
40

41
#include "dist/common/utils.h"
42

43
44
static const uint64_t kPollReportThreshold = 128;
static const uint64_t kCleanReportThreshold = 128;
45
static const uint64_t kPollMax = 8;
46
static const uint64_t kCleanupMax = 16;
47

48
49
static size_t shm_size;
void *shm_base;
50
51
52
53
54
55
static int shm_fd = -1;
static size_t shm_alloc_off = 0;

size_t peer_num = 0;
struct Peer *peers = NULL;

56
static int epfd = -1;
57

58
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_) {
59
60
61
62
63
64
65
  shm_size = shm_size_;
  if ((shm_fd = ShmCreate(shm_path_, shm_size_, &shm_base)) < 0)
    return 1;

  epfd = epfd_;
  return 0;
}
66

67
static int ShmAlloc(size_t size, uint64_t *off) {
68
69
70
71
#ifdef DEBUG
  fprintf(stderr, "ShmAlloc(%zu)\n", size);
#endif

72
73
74
75
76
77
78
79
80
81
  if (shm_alloc_off + size > shm_size) {
    fprintf(stderr, "ShmAlloc: alloc of %zu bytes failed\n", size);
    return 1;
  }

  *off = shm_alloc_off;
  shm_alloc_off += size;
  return 0;
}

82
bool BasePeerAdd(const char *path, bool listener) {
83
84
  struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
  if (!peer) {
85
    perror("NetPeerAdd: realloc failed");
86
87
88
89
90
91
    return false;
  }
  peers = peer;
  peer += peer_num;
  peer_num++;

92
  memset(peer, 0, sizeof(*peer));
93
  if (!(peer->sock_path = strdup(path))) {
94
    perror("NetPeerAdd: strdup failed");
95
96
    return false;
  }
97
  peer->is_listener = listener;
98
99
  peer->sock_fd = -1;
  peer->shm_fd = -1;
100
  peer->last_sent_pos = -1;
101
102
103
  return true;
}

104
int BaseListen() {
105
#ifdef DEBUG
106
  fprintf(stderr, "Creating listening sockets\n");
107
108
#endif

109
110
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
111
    if (!peer->is_listener)
112
113
      continue;

114
115
116
117
#ifdef DEBUG
    fprintf(stderr, "  Creating socket %s %zu\n", peer->sock_path, i);
#endif
    if ((peer->listen_fd = UxsocketInit(peer->sock_path)) < 0) {
118
119
120
121
122
123
124
      perror("PeersInitNets: unix socket init failed");
      return 1;
    }

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
125
126
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->listen_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl accept failed");
127
128
129
      return 1;
    }
  }
130
131
132
133

#ifdef DEBUG
  fprintf(stderr, "PeerInitNets done\n");
#endif
134
135
136
  return 0;
}

137
int BaseConnect() {
138
139
140
141
#ifdef DEBUG
  fprintf(stderr, "Connecting to device sockets\n");
#endif

142
143
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
144
    if (peer->is_listener)
145
146
      continue;

147
148
149
150
#ifdef DEBUG
    fprintf(stderr, "  Connecting to socket %s %zu\n", peer->sock_path, i);
#endif

151
152
153
154
155
156
157
158
159
160
161
162
163
164
    if ((peer->sock_fd = UxsocketConnect(peer->sock_path)) < 0)
      return 1;

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl failed");
      return 1;
    }
  }
  return 0;
}

165
166
167
168
int BasePeerSetupQueues(struct Peer *peer) {
  if (!peer->is_listener) {
    /* only need to set up queues for listeners */
    return 0;
169
170
  }

171
  struct SimbricksProtoListenerIntro *li =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
172
      (struct SimbricksProtoListenerIntro *)peer->intro_remote;
173

174
#ifdef DEBUG
175
  fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
176
  fprintf(stderr, "  l2c_el=%lu l2c_n=%lu c2l_el=%lu c2l_n=%lu\n", li->l2c_elen,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
177
          li->l2c_nentries, li->c2l_elen, li->c2l_nentries);
178
#endif
179

180
181
  if (ShmAlloc(li->l2c_elen * li->l2c_nentries, &li->l2c_offset)) {
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc l2c failed");
182
183
    return 1;
  }
184
185
  if (ShmAlloc(li->c2l_elen * li->c2l_nentries, &li->c2l_offset)) {
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc c2l failed");
186
187
188
189
190
    return 1;
  }
  peer->shm_fd = shm_fd;
  peer->shm_base = shm_base;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
191
  peer->local_base = (void *)((uintptr_t)shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
192
  peer->local_offset = li->c2l_offset;
193
194
  peer->local_elen = li->c2l_elen;
  peer->local_enum = li->c2l_nentries;
195

Antoine Kaufmann's avatar
Antoine Kaufmann committed
196
  peer->cleanup_base = (void *)((uintptr_t)shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
197
  peer->cleanup_offset = li->l2c_offset;
198
199
200
201
202
  peer->cleanup_elen = li->l2c_elen;
  peer->cleanup_enum = li->l2c_nentries;

  return 0;
}
203

204
205
206
207
208
int BasePeerSendIntro(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif

Antoine Kaufmann's avatar
Antoine Kaufmann committed
209
  if (peer->sock_fd == -1) {
210
211
212
213
    /* We can receive the welcome message from our peer before our local
       connection to the simulator is established. In this case we hold the
       message till the connection is established and send it then. */
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
214
215
216
    fprintf(stderr,
            "PeerNetSetupQueues: socket not ready yet, delaying "
            "send\n");
217
#endif
218
    return 0;
219
220
  }

221
222
  int shm_fd = (peer->is_listener ? peer->shm_fd : -1);
  if (UxsocketSendFd(peer->sock_fd, peer->intro_remote, peer->intro_remote_len,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
223
                     shm_fd)) {
224
    perror("BasePeerSendIntro: send failed");
225
226
227
228
229
    return 1;
  }
  return 0;
}

Antoine Kaufmann's avatar
Antoine Kaufmann committed
230
231
int BasePeerReport(struct Peer *peer, uint32_t written_pos,
                   uint32_t clean_pos) {
232
  uint32_t pos = peer->local_pos_cleaned;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
233
  if (written_pos == peer->cleanup_pos_last && clean_pos == pos)
234
235
236
237
238
239
240
241
242
    return 0;

#ifdef DEBUG
  fprintf(stderr, "PeerReport: peer %s written %u -> %u, cleaned %u -> %u\n",
          peer->sock_path, peer->cleanup_pos_last, written_pos,
          peer->local_pos_cleaned, clean_pos);
#endif

  peer->cleanup_pos_last = written_pos;
243
244
  while (pos != clean_pos) {
    void *entry = (peer->local_base + pos * peer->local_elen);
245
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
246
247
248
249
        (volatile union SimbricksProtoBaseMsg *)entry;
    msg->header.own_type =
        (msg->header.own_type & (~SIMBRICKS_PROTO_MSG_OWN_MASK)) |
        SIMBRICKS_PROTO_MSG_OWN_PRO;
250

251
252
253
    pos += 1;
    if (pos >= peer->local_enum)
      pos -= peer->local_enum;
254
  }
255
  peer->local_pos_cleaned = pos;
256
257
258
259

  return 0;
}

260
261
262
263
static int PeerAcceptEvent(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerAcceptEvent(%s)\n", peer->sock_path);
#endif
264
  assert(peer->is_listener);
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289

  if ((peer->sock_fd = accept(peer->listen_fd, NULL, NULL)) < 0) {
    perror("PeersInitNets: accept failed");
    return 1;
  }

#ifdef DEBUG
  fprintf(stderr, "Accepted %zu\n", peer - peers);
#endif

  close(peer->listen_fd);

  struct epoll_event epev;
  epev.events = EPOLLIN;
  epev.data.ptr = peer;
  if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
    perror("PeersInitNets: epoll_ctl failed");
    return 1;
  }

  /* we may have already received the welcome message from our remote peer. In
     that case, send it now. */
  if (peer->intro_valid_remote) {
#ifdef DEBUG
    fprintf(stderr, "PeerAcceptEvent(%s): sending welcome message\n",
Antoine Kaufmann's avatar
Antoine Kaufmann committed
290
            peer->sock_path);
291
#endif
292
293
    if (BasePeerSendIntro(peer)) {
      fprintf(stderr, "PeerAcceptEvent(%s): sending intro failed\n",
Antoine Kaufmann's avatar
Antoine Kaufmann committed
294
              peer->sock_path);
295
296
297
298
299
300
      return 1;
    }
  }
  return 0;
}

301
int BasePeerEvent(struct Peer *peer, uint32_t events) {
302
#ifdef DEBUG
303
  fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
304
#endif
305
306
307
308
309
310
311
312
313

  // disable peer if not an input event
  if (!(events & EPOLLIN)) {
    fprintf(stderr, "PeerEvent: non-input event, disabling peer (%s)",
            peer->sock_path);
    peer->ready = false;
    return 1;
  }

314
  // if peer is network and not yet connected, this is an accept event
315
  if (peer->is_listener && peer->sock_fd == -1) {
316
317
318
    return PeerAcceptEvent(peer);
  }

319
320
321
322
323
324
325
326
  // if we already have the intro, this is not expected
  if (peer->intro_valid_local) {
    fprintf(stderr, "PeerEvent: receive event after intro (%s)\n",
            peer->sock_path);
    return 1;
  }

  // receive intro message
327
  ssize_t ret;
328
329
  if (!peer->is_listener) {
    /* not a listener, so we're expecting an fd for the shm region */
330
    ret = UxsocketRecvFd(peer->sock_fd, peer->intro_local,
Antoine Kaufmann's avatar
Antoine Kaufmann committed
331
                         sizeof(peer->intro_local), &peer->shm_fd);
332
    if (ret <= 0)
333
334
335
336
      return 1;

    if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
      return 1;
337
338

    struct SimbricksProtoListenerIntro *li =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
339
340
        (struct SimbricksProtoListenerIntro *)peer->intro_local;
    peer->local_base = (void *)((uintptr_t)peer->shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
341
    peer->local_offset = li->l2c_offset;
342
343
344
    peer->local_elen = li->l2c_elen;
    peer->local_enum = li->l2c_nentries;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
345
    peer->cleanup_base = (void *)((uintptr_t)peer->shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
346
    peer->cleanup_offset = li->c2l_offset;
347
348
    peer->cleanup_elen = li->c2l_elen;
    peer->cleanup_enum = li->c2l_nentries;
349
  } else {
350
    /* as a listener, we use our local shm region, so no fd is sent to us */
Antoine Kaufmann's avatar
Antoine Kaufmann committed
351
    ret = recv(peer->sock_fd, peer->intro_local, sizeof(peer->intro_local), 0);
352
    if (ret <= 0) {
353
354
355
356
357
      perror("PeerEvent: recv failed");
      return 1;
    }
  }

358
  peer->intro_local_len = ret;
359
360
  peer->intro_valid_local = true;

361
  // pass intro along
362
  if (BaseOpPassIntro(peer))
363
364
365
    return 1;

  if (peer->intro_valid_remote) {
366
367
368
#ifdef DEBUG
    fprintf(stderr, "PeerEvent(%s): marking peer as ready\n", peer->sock_path);
#endif
369
370
371
372
373
    peer->ready = true;
  }
  return 0;
}

374
static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
375
  uint32_t n;
376
  for (n = 0; n < kPollMax && peer->local_pos + n < peer->local_enum; n++) {
377
378
379
380
    // stop if we would pass the cleanup position
    if ((peer->local_pos + n + 1) % peer->local_enum ==
        peer->local_pos_cleaned) {
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
381
382
      fprintf(stderr, "PollPeerTransfer: waiting for cleanup (%u %u)\n", n,
              peer->local_pos_cleaned);
383
384
385
386
#endif
      break;
    }

387
    void *entry = (peer->local_base + (peer->local_pos + n) * peer->local_elen);
388
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
389
        (volatile union SimbricksProtoBaseMsg *)entry;
390
391
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_CON)
392
      break;
393
394
  }

395
  if (n > 0) {
396
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
397
398
399
    fprintf(stderr, "PollPeerTransfer: transferring [%u,%u] (lpc=%u lpr=%u)\n",
            peer->local_pos, peer->local_pos + n, peer->local_pos_cleaned,
            peer->local_pos_reported);
400
#endif
401
    BaseOpPassEntries(peer, peer->local_pos, n);
402
    uint32_t newpos = peer->local_pos + n;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
403
404
    peer->local_pos =
        (newpos < peer->local_enum ? newpos : newpos - peer->local_enum);
405

Antoine Kaufmann's avatar
Antoine Kaufmann committed
406
407
    uint64_t unreported =
        (peer->local_pos - peer->local_pos_reported) % peer->local_enum;
408
409
410
411
412
413
414
415
416
    if (unreported >= kPollReportThreshold)
      *report = true;
  }
}

static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
  if (peer->cleanup_pos_next == peer->cleanup_pos_last)
    return;

417
418
419
420
  uint64_t cnt = 0;
  do {
    void *entry =
        (peer->cleanup_base + peer->cleanup_pos_next * peer->cleanup_elen);
421
    volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
422
        (volatile union SimbricksProtoBaseMsg *)entry;
423

424
425
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_PRO)
426
427
      break;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
428
#ifdef DEBUG
429
430
431
432
433
434
    fprintf(stderr, "PollPeerCleanup: peer %s has clean entry at %u\n",
            peer->sock_path, peer->cleanup_pos_next);
#endif
    peer->cleanup_pos_next += 1;
    if (peer->cleanup_pos_next >= peer->cleanup_enum)
      peer->cleanup_pos_next -= peer->cleanup_enum;
435
436
  } while (++cnt <= kCleanupMax &&
           peer->cleanup_pos_next != peer->cleanup_pos_last);
437

438
  if (cnt > 0) {
Antoine Kaufmann's avatar
Antoine Kaufmann committed
439
440
441
    uint64_t unreported =
        (peer->cleanup_pos_next - peer->cleanup_pos_reported) %
        peer->cleanup_enum;
442
443
444
445
446
    if (unreported >= kCleanReportThreshold)
      *report = true;
  }
}

447
void BasePoll() {
448
449
450
451
452
  bool report = false;
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
    if (!peer->ready)
      continue;
453

454
455
    PollPeerTransfer(peer, &report);
    PollPeerCleanup(peer, &report);
456
457
  }

458
  if (report)
459
    BaseOpPassReport();
460
461
}

Antoine Kaufmann's avatar
Antoine Kaufmann committed
462
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data) {
463
#ifdef DEBUG
Antoine Kaufmann's avatar
Antoine Kaufmann committed
464
465
  fprintf(stderr, "BaseEntryReceived: pos=%u (cpr=%u cpl=%u)\n", pos,
          peer->cleanup_pos_reported, peer->cleanup_pos_last);
466
#endif
467

Antoine Kaufmann's avatar
Antoine Kaufmann committed
468
  uint64_t off = (uint64_t)pos * peer->cleanup_elen;
469
  void *entry = peer->cleanup_base + off;
470
  volatile union SimbricksProtoBaseMsg *msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
471
472
      (volatile union SimbricksProtoBaseMsg *)entry;

473
  // first copy data after header
Antoine Kaufmann's avatar
Antoine Kaufmann committed
474
475
  memcpy((void *)(msg + 1), (uint8_t *)data + sizeof(*msg),
         peer->cleanup_elen - sizeof(*msg));
476
  // then copy header except for last byte
Antoine Kaufmann's avatar
Antoine Kaufmann committed
477
  memcpy((void *)msg, data, sizeof(*msg) - 1);
478
479
480
  // WMB()
  // now copy last byte
  volatile union SimbricksProtoBaseMsg *src_msg =
Antoine Kaufmann's avatar
Antoine Kaufmann committed
481
      (volatile union SimbricksProtoBaseMsg *)data;
482
483
  asm volatile("sfence" ::: "memory");
  msg->header.own_type = src_msg->header.own_type;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
484
}