net_rdma.c 4.78 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
 * Copyright 2021 Max Planck Institute for Software Systems, and
 * National University of Singapore
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

#include "dist/rdma/net_rdma.h"

#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>

Antoine Kaufmann's avatar
Antoine Kaufmann committed
39
#include <simbricks/base/proto.h>
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

#include "dist/common/utils.h"

const char *shm_path = NULL;
size_t shm_size = 256 * 1024 * 1024ULL;  // 256MB

bool mode_listen = false;
struct sockaddr_in addr;

int epfd = -1;

const char *ib_devname = NULL;
bool ib_connect = false;
uint8_t ib_port = 1;
int ib_sgid_idx = -1;

static void PrintUsage() {
  fprintf(stderr,
          "Usage: net_rdma [OPTIONS] IP PORT\n"
          "    -l: Listen instead of connecting\n"
Antoine Kaufmann's avatar
Antoine Kaufmann committed
60
61
          "    -L LISTEN-SOCKET: listening socket for a simulator\n"
          "    -C CONN-SOCKET: connecting socket for a simulator\n"
62
63
64
65
66
          "    -s SHM-PATH: shared memory region path\n"
          "    -S SHM-SIZE: shared memory region size in MB (default 256)\n");
}

static int ParseArgs(int argc, char *argv[]) {
Antoine Kaufmann's avatar
Antoine Kaufmann committed
67
  const char *opts = "lL:C:s:S:D:ip:g:";
68
69
70
71
72
73
74
75
  int c;

  while ((c = getopt(argc, argv, opts)) != -1) {
    switch (c) {
      case 'l':
        mode_listen = true;
        break;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
76
77
      case 'L':
        if (!BasePeerAdd(optarg, true))
78
79
80
          return 1;
        break;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
81
82
      case 'C':
        if (!BasePeerAdd(optarg, false))
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
          return 1;
        break;

      case 's':
        if (!(shm_path = strdup(optarg))) {
          perror("ParseArgs: strdup failed");
          return 1;
        }
        break;

      case 'S':
        shm_size = strtoull(optarg, NULL, 10) * 1024 * 1024;
        break;

      case 'D':
        ib_devname = optarg;
        break;

      case 'i':
        ib_connect = true;
        break;

      case 'p':
        ib_port = strtoull(optarg, NULL, 10);
        break;

      case 'g':
        ib_sgid_idx = strtoull(optarg, NULL, 10);
        break;

      default:
        PrintUsage();
        return 1;
    }
  }

Antoine Kaufmann's avatar
Antoine Kaufmann committed
119
  if (optind + 2 != argc) {
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    PrintUsage();
    return 1;
  }

  addr.sin_family = AF_INET;
  addr.sin_port = htons(strtoul(argv[optind + 1], NULL, 10));
  if ((addr.sin_addr.s_addr = inet_addr(argv[optind])) == INADDR_NONE) {
    PrintUsage();
    return 1;
  }

  return 0;
}

static void *PollThread(void *data) {
  while (true)
Antoine Kaufmann's avatar
Antoine Kaufmann committed
136
    BasePoll();
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
  return NULL;
}

static int IOLoop() {
  while (1) {
    const size_t kNumEvs = 8;
    struct epoll_event evs[kNumEvs];
    int n = epoll_wait(epfd, evs, kNumEvs, -1);
    if (n < 0) {
      perror("IOLoop: epoll_wait failed");
      return 1;
    }

    for (int i = 0; i < n; i++) {
      struct Peer *peer = evs[i].data.ptr;
Antoine Kaufmann's avatar
Antoine Kaufmann committed
152
      if (peer && BasePeerEvent(peer, evs[i].events))
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
        return 1;
      else if (!peer && RdmaEvent())
        return 1;
    }

    fflush(stdout);
  }
}

int main(int argc, char *argv[]) {
  if (ParseArgs(argc, argv))
    return EXIT_FAILURE;

#ifdef DEBUG
  fprintf(stderr, "pid=%d shm=%s\n", getpid(), shm_path);
#endif

  if ((epfd = epoll_create1(0)) < 0) {
    perror("epoll_create1 failed");
    return EXIT_FAILURE;
  }

Antoine Kaufmann's avatar
Antoine Kaufmann committed
175
  if (BaseInit(shm_path, shm_size, epfd))
176
177
    return EXIT_FAILURE;

Antoine Kaufmann's avatar
Antoine Kaufmann committed
178
  if (BaseListen())
179
180
    return EXIT_FAILURE;

181
182
183
184
185
186
187
188
189
190
  if (mode_listen) {
    if (RdmaListen(&addr))
      return EXIT_FAILURE;
  } else {
    if (RdmaConnect(&addr))
      return EXIT_FAILURE;
  }
  printf("RDMA connected\n");
  fflush(stdout);

Antoine Kaufmann's avatar
Antoine Kaufmann committed
191
  if (BaseConnect())
192
193
194
195
196
197
198
199
200
201
202
203
    return EXIT_FAILURE;
  printf("Peers initialized\n");
  fflush(stdout);

  pthread_t poll_thread;
  if (pthread_create(&poll_thread, NULL, PollThread, NULL)) {
    perror("pthread_create failed (poll thread)");
    return EXIT_FAILURE;
  }

  return IOLoop();
}