base.c 13.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
 * Copyright 2021 Max Planck Institute for Software Systems, and
 * National University of Singapore
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

25
#include "dist/common/base.h"
26
27
28
29
30
31
32
33
34
35
36
37
38

#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>

39
#include <simbricks/base/proto.h>
40

41
#include "dist/common/utils.h"
42

43
44
static const uint64_t kPollReportThreshold = 128;
static const uint64_t kCleanReportThreshold = 128;
45
static const uint64_t kPollMax = 8;
46
static const uint64_t kCleanupMax = 16;
47

48
49
static size_t shm_size;
void *shm_base;
50
51
52
53
54
55
static int shm_fd = -1;
static size_t shm_alloc_off = 0;

size_t peer_num = 0;
struct Peer *peers = NULL;

56
static int epfd = -1;
57

58
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_) {
59
60
61
62
63
64
65
  shm_size = shm_size_;
  if ((shm_fd = ShmCreate(shm_path_, shm_size_, &shm_base)) < 0)
    return 1;

  epfd = epfd_;
  return 0;
}
66

67
static int ShmAlloc(size_t size, uint64_t *off) {
68
69
70
71
#ifdef DEBUG
  fprintf(stderr, "ShmAlloc(%zu)\n", size);
#endif

72
73
74
75
76
77
78
79
80
81
  if (shm_alloc_off + size > shm_size) {
    fprintf(stderr, "ShmAlloc: alloc of %zu bytes failed\n", size);
    return 1;
  }

  *off = shm_alloc_off;
  shm_alloc_off += size;
  return 0;
}

82
bool BasePeerAdd(const char *path, bool listener) {
83
84
  struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
  if (!peer) {
85
    perror("NetPeerAdd: realloc failed");
86
87
88
89
90
91
    return false;
  }
  peers = peer;
  peer += peer_num;
  peer_num++;

92
  memset(peer, 0, sizeof(*peer));
93
  if (!(peer->sock_path = strdup(path))) {
94
    perror("NetPeerAdd: strdup failed");
95
96
    return false;
  }
97
  peer->is_listener = listener;
98
99
  peer->sock_fd = -1;
  peer->shm_fd = -1;
100
  peer->last_sent_pos = -1;
101
102
103
104
  return true;
}


105
int BaseListen() {
106
#ifdef DEBUG
107
  fprintf(stderr, "Creating listening sockets\n");
108
109
#endif

110
111
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
112
    if (!peer->is_listener)
113
114
      continue;

115
116
117
118
#ifdef DEBUG
    fprintf(stderr, "  Creating socket %s %zu\n", peer->sock_path, i);
#endif
    if ((peer->listen_fd = UxsocketInit(peer->sock_path)) < 0) {
119
120
121
122
123
124
125
      perror("PeersInitNets: unix socket init failed");
      return 1;
    }

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
126
127
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->listen_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl accept failed");
128
129
130
      return 1;
    }
  }
131
132
133
134

#ifdef DEBUG
  fprintf(stderr, "PeerInitNets done\n");
#endif
135
136
137
  return 0;
}

138
int BaseConnect() {
139
140
141
142
#ifdef DEBUG
  fprintf(stderr, "Connecting to device sockets\n");
#endif

143
144
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
145
    if (peer->is_listener)
146
147
      continue;

148
149
150
151
#ifdef DEBUG
    fprintf(stderr, "  Connecting to socket %s %zu\n", peer->sock_path, i);
#endif

152
153
154
155
156
157
158
159
160
161
162
163
164
165
    if ((peer->sock_fd = UxsocketConnect(peer->sock_path)) < 0)
      return 1;

    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.ptr = peer;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
      perror("PeersInitNets: epoll_ctl failed");
      return 1;
    }
  }
  return 0;
}

166
167
168
169
int BasePeerSetupQueues(struct Peer *peer) {
  if (!peer->is_listener) {
    /* only need to set up queues for listeners */
    return 0;
170
171
  }

172
173
  struct SimbricksProtoListenerIntro *li =
      (struct SimbricksProtoListenerIntro *) peer->intro_remote;
174

175
#ifdef DEBUG
176
  fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
177
178
  fprintf(stderr, "  l2c_el=%lu l2c_n=%lu c2l_el=%lu c2l_n=%lu\n", li->l2c_elen,
      li->l2c_nentries, li->c2l_elen, li->c2l_nentries);
179
#endif
180

181
182
  if (ShmAlloc(li->l2c_elen * li->l2c_nentries, &li->l2c_offset)) {
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc l2c failed");
183
184
    return 1;
  }
185
186
  if (ShmAlloc(li->c2l_elen * li->c2l_nentries, &li->c2l_offset)) {
    fprintf(stderr, "PeerNetSetupQueues: ShmAlloc c2l failed");
187
188
189
190
191
    return 1;
  }
  peer->shm_fd = shm_fd;
  peer->shm_base = shm_base;

192
  peer->local_base = (void *) ((uintptr_t) shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
193
  peer->local_offset = li->c2l_offset;
194
195
  peer->local_elen = li->c2l_elen;
  peer->local_enum = li->c2l_nentries;
196

197
  peer->cleanup_base = (void *) ((uintptr_t) shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
198
  peer->cleanup_offset = li->l2c_offset;
199
200
201
202
203
  peer->cleanup_elen = li->l2c_elen;
  peer->cleanup_enum = li->l2c_nentries;

  return 0;
}
204

205
206
207
208
209
210
int BasePeerSendIntro(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif

 if (peer->sock_fd == -1) {
211
212
213
214
215
216
217
    /* We can receive the welcome message from our peer before our local
       connection to the simulator is established. In this case we hold the
       message till the connection is established and send it then. */
#ifdef DEBUG
    fprintf(stderr, "PeerNetSetupQueues: socket not ready yet, delaying "
        "send\n");
#endif
218
    return 0;
219
220
  }

221
222
223
224
  int shm_fd = (peer->is_listener ? peer->shm_fd : -1);
  if (UxsocketSendFd(peer->sock_fd, peer->intro_remote, peer->intro_remote_len,
      shm_fd)) {
    perror("BasePeerSendIntro: send failed");
225
226
227
228
229
    return 1;
  }
  return 0;
}

230
int BasePeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
231
  uint32_t pos = peer->local_pos_cleaned;
232
  if (written_pos == peer->cleanup_pos_last &&
233
      clean_pos == pos)
234
235
236
237
238
239
240
241
242
    return 0;

#ifdef DEBUG
  fprintf(stderr, "PeerReport: peer %s written %u -> %u, cleaned %u -> %u\n",
          peer->sock_path, peer->cleanup_pos_last, written_pos,
          peer->local_pos_cleaned, clean_pos);
#endif

  peer->cleanup_pos_last = written_pos;
243
244
  while (pos != clean_pos) {
    void *entry = (peer->local_base + pos * peer->local_elen);
245
246
247
248
    volatile union SimbricksProtoBaseMsg *msg =
        (volatile union SimbricksProtoBaseMsg *) entry;
      msg->header.own_type = (msg->header.own_type &
          (~SIMBRICKS_PROTO_MSG_OWN_MASK)) | SIMBRICKS_PROTO_MSG_OWN_PRO;
249

250
251
252
    pos += 1;
    if (pos >= peer->local_enum)
      pos -= peer->local_enum;
253
  }
254
  peer->local_pos_cleaned = pos;
255
256
257
258

  return 0;
}

259
260
261
262
static int PeerAcceptEvent(struct Peer *peer) {
#ifdef DEBUG
  fprintf(stderr, "PeerAcceptEvent(%s)\n", peer->sock_path);
#endif
263
  assert(peer->is_listener);
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290

  if ((peer->sock_fd = accept(peer->listen_fd, NULL, NULL)) < 0) {
    perror("PeersInitNets: accept failed");
    return 1;
  }

#ifdef DEBUG
  fprintf(stderr, "Accepted %zu\n", peer - peers);
#endif

  close(peer->listen_fd);

  struct epoll_event epev;
  epev.events = EPOLLIN;
  epev.data.ptr = peer;
  if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
    perror("PeersInitNets: epoll_ctl failed");
    return 1;
  }

  /* we may have already received the welcome message from our remote peer. In
     that case, send it now. */
  if (peer->intro_valid_remote) {
#ifdef DEBUG
    fprintf(stderr, "PeerAcceptEvent(%s): sending welcome message\n",
        peer->sock_path);
#endif
291
292
293
    if (BasePeerSendIntro(peer)) {
      fprintf(stderr, "PeerAcceptEvent(%s): sending intro failed\n",
          peer->sock_path);
294
295
296
297
298
299
      return 1;
    }
  }
  return 0;
}

300
int BasePeerEvent(struct Peer *peer, uint32_t events) {
301
#ifdef DEBUG
302
  fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
303
#endif
304
305
306
307
308
309
310
311
312

  // disable peer if not an input event
  if (!(events & EPOLLIN)) {
    fprintf(stderr, "PeerEvent: non-input event, disabling peer (%s)",
            peer->sock_path);
    peer->ready = false;
    return 1;
  }

313
  // if peer is network and not yet connected, this is an accept event
314
  if (peer->is_listener && peer->sock_fd == -1) {
315
316
317
    return PeerAcceptEvent(peer);
  }

318
319
320
321
322
323
324
325
  // if we already have the intro, this is not expected
  if (peer->intro_valid_local) {
    fprintf(stderr, "PeerEvent: receive event after intro (%s)\n",
            peer->sock_path);
    return 1;
  }

  // receive intro message
326
  ssize_t ret;
327
328
  if (!peer->is_listener) {
    /* not a listener, so we're expecting an fd for the shm region */
329
330
331
    ret = UxsocketRecvFd(peer->sock_fd, peer->intro_local,
        sizeof(peer->intro_local), &peer->shm_fd);
    if (ret <= 0)
332
333
334
335
      return 1;

    if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
      return 1;
336
337
338
339

    struct SimbricksProtoListenerIntro *li =
      (struct SimbricksProtoListenerIntro *) peer->intro_local;
    peer->local_base = (void *) ((uintptr_t) peer->shm_base + li->l2c_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
340
    peer->local_offset = li->l2c_offset;
341
342
343
344
    peer->local_elen = li->l2c_elen;
    peer->local_enum = li->l2c_nentries;

    peer->cleanup_base = (void *) ((uintptr_t) peer->shm_base + li->c2l_offset);
Antoine Kaufmann's avatar
Antoine Kaufmann committed
345
    peer->cleanup_offset = li->c2l_offset;
346
347
    peer->cleanup_elen = li->c2l_elen;
    peer->cleanup_enum = li->c2l_nentries;
348
  } else {
349
    /* as a listener, we use our local shm region, so no fd is sent to us */
350
    ret = recv(peer->sock_fd, peer->intro_local,
351
352
        sizeof(peer->intro_local), 0);
    if (ret <= 0) {
353
354
355
356
357
      perror("PeerEvent: recv failed");
      return 1;
    }
  }

358
  peer->intro_local_len = ret;
359
360
  peer->intro_valid_local = true;

361
  // pass intro along
362
  if (BaseOpPassIntro(peer))
363
364
365
    return 1;

  if (peer->intro_valid_remote) {
366
367
368
#ifdef DEBUG
    fprintf(stderr, "PeerEvent(%s): marking peer as ready\n", peer->sock_path);
#endif
369
370
371
372
373
    peer->ready = true;
  }
  return 0;
}

374
static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
375
  uint32_t n;
376
  for (n = 0; n < kPollMax && peer->local_pos + n < peer->local_enum; n++) {
377
378
379
380
381
    // stop if we would pass the cleanup position
    if ((peer->local_pos + n + 1) % peer->local_enum ==
        peer->local_pos_cleaned) {
#ifdef DEBUG
      fprintf(stderr, "PollPeerTransfer: waiting for cleanup (%u %u)\n",
382
              n, peer->local_pos_cleaned);
383
384
385
386
#endif
      break;
    }

387
    void *entry = (peer->local_base + (peer->local_pos + n) * peer->local_elen);
388
389
390
391
    volatile union SimbricksProtoBaseMsg *msg =
        (volatile union SimbricksProtoBaseMsg *) entry;
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_CON)
392
      break;
393
394
  }

395
  if (n > 0) {
396
397
398
399
400
#ifdef DEBUG
      fprintf(stderr, "PollPeerTransfer: transferring [%u,%u] (lpc=%u lpr=%u)\n",
              peer->local_pos, peer->local_pos + n, peer->local_pos_cleaned,
              peer->local_pos_reported);
#endif
401
    BaseOpPassEntries(peer, peer->local_pos, n);
402
403
404
405
    uint32_t newpos = peer->local_pos + n;
    peer->local_pos = (newpos < peer->local_enum ?
                       newpos :
                       newpos - peer->local_enum);
406
407
408
409
410
411
412
413
414
415
416
417

    uint64_t unreported = (peer->local_pos - peer->local_pos_reported) %
                          peer->local_enum;
    if (unreported >= kPollReportThreshold)
      *report = true;
  }
}

static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
  if (peer->cleanup_pos_next == peer->cleanup_pos_last)
    return;

418
419
420
421
  uint64_t cnt = 0;
  do {
    void *entry =
        (peer->cleanup_base + peer->cleanup_pos_next * peer->cleanup_elen);
422
423
    volatile union SimbricksProtoBaseMsg *msg =
        (volatile union SimbricksProtoBaseMsg *) entry;
424

425
426
    if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
        SIMBRICKS_PROTO_MSG_OWN_PRO)
427
428
429
      break;

  #ifdef DEBUG
430
431
432
433
434
435
    fprintf(stderr, "PollPeerCleanup: peer %s has clean entry at %u\n",
            peer->sock_path, peer->cleanup_pos_next);
#endif
    peer->cleanup_pos_next += 1;
    if (peer->cleanup_pos_next >= peer->cleanup_enum)
      peer->cleanup_pos_next -= peer->cleanup_enum;
436
437
  } while (++cnt <= kCleanupMax &&
           peer->cleanup_pos_next != peer->cleanup_pos_last);
438

439
  if (cnt > 0) {
440
441
442
443
444
445
446
    uint64_t unreported = (peer->cleanup_pos_next - peer->cleanup_pos_reported)
                          % peer->cleanup_enum;
    if (unreported >= kCleanReportThreshold)
      *report = true;
  }
}

447
void BasePoll() {
448
449
450
451
452
  bool report = false;
  for (size_t i = 0; i < peer_num; i++) {
    struct Peer *peer = &peers[i];
    if (!peer->ready)
      continue;
453

454
455
    PollPeerTransfer(peer, &report);
    PollPeerCleanup(peer, &report);
456
457
  }

458
  if (report)
459
    BaseOpPassReport();
460
461
}

462
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data)
463
{
464
465
466
467
#ifdef DEBUG
  fprintf(stderr, "BaseEntryReceived: pos=%u (cpr=%u cpl=%u)\n",
          pos, peer->cleanup_pos_reported, peer->cleanup_pos_last);
#endif
468

469
470
  uint64_t off = (uint64_t) pos * peer->cleanup_elen;
  void *entry = peer->cleanup_base + off;
471
472
473
474
475
476
477
478
479
480
481
482
483
484
  volatile union SimbricksProtoBaseMsg *msg =
        (volatile union SimbricksProtoBaseMsg *) entry;
  
  // first copy data after header
  memcpy((void *) (msg + 1), (uint8_t *) data + sizeof(*msg),
          peer->cleanup_elen - sizeof(*msg));
  // then copy header except for last byte
  memcpy((void *) msg, data, sizeof(*msg) - 1);
  // WMB()
  // now copy last byte
  volatile union SimbricksProtoBaseMsg *src_msg =
        (volatile union SimbricksProtoBaseMsg *) data;
  asm volatile("sfence" ::: "memory");
  msg->header.own_type = src_msg->header.own_type;
485
}