Commit 52c0cb9c authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist: refactor proxy to pull out common code

Preparation to add additional proxy implementations.
parent ea065866
...@@ -31,6 +31,6 @@ experiments/build ...@@ -31,6 +31,6 @@ experiments/build
experiments/out experiments/out
experiments/local-config.sh experiments/local-config.sh
experiments/slurm experiments/slurm
dist/net_rdma dist/rdma/net_rdma
trace/process trace/process
mk/local.mk mk/local.mk
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/net_rdma.h" #include "dist/common/net.h"
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <fcntl.h>
...@@ -38,29 +38,30 @@ ...@@ -38,29 +38,30 @@
#include <simbricks/proto/base.h> #include <simbricks/proto/base.h>
#include "dist/utils.h" #include "dist/common/utils.h"
static const uint64_t kPollReportThreshold = 128; static const uint64_t kPollReportThreshold = 128;
static const uint64_t kCleanReportThreshold = 128; static const uint64_t kCleanReportThreshold = 128;
static const uint64_t kPollMax = 8; static const uint64_t kPollMax = 8;
const char *shm_path = NULL; static size_t shm_size;
size_t shm_size = 256 * 1024 * 1024ULL; // 256MB void *shm_base;
void *shm_base = NULL;
static int shm_fd = -1; static int shm_fd = -1;
static size_t shm_alloc_off = 0; static size_t shm_alloc_off = 0;
bool mode_listen = false;
size_t peer_num = 0; size_t peer_num = 0;
struct Peer *peers = NULL; struct Peer *peers = NULL;
struct sockaddr_in addr;
int epfd = -1; static int epfd = -1;
const char *ib_devname = NULL; int NetInit(const char *shm_path_, size_t shm_size_, int epfd_) {
bool ib_connect = false; shm_size = shm_size_;
uint8_t ib_port = 1; if ((shm_fd = ShmCreate(shm_path_, shm_size_, &shm_base)) < 0)
int ib_sgid_idx = -1; return 1;
epfd = epfd_;
return 0;
}
static int ShmAlloc(size_t size, uint64_t *off) { static int ShmAlloc(size_t size, uint64_t *off) {
#ifdef DEBUG #ifdef DEBUG
...@@ -77,20 +78,10 @@ static int ShmAlloc(size_t size, uint64_t *off) { ...@@ -77,20 +78,10 @@ static int ShmAlloc(size_t size, uint64_t *off) {
return 0; return 0;
} }
static void PrintUsage() { bool NetPeerAdd(const char *path, bool dev) {
fprintf(stderr,
"Usage: net_rdma [OPTIONS] IP PORT\n"
" -l: Listen instead of connecting\n"
" -d DEV-SOCKET: network socket of a device simulator\n"
" -n NET-SOCKET: network socket of a network simulator\n"
" -s SHM-PATH: shared memory region path\n"
" -S SHM-SIZE: shared memory region size in MB (default 256)\n");
}
static bool AddPeer(const char *path, bool dev) {
struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1)); struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
if (!peer) { if (!peer) {
perror("ParseArgs: realloc failed"); perror("NetPeerAdd: realloc failed");
return false; return false;
} }
peers = peer; peers = peer;
...@@ -98,7 +89,7 @@ static bool AddPeer(const char *path, bool dev) { ...@@ -98,7 +89,7 @@ static bool AddPeer(const char *path, bool dev) {
peer_num++; peer_num++;
if (!(peer->sock_path = strdup(path))) { if (!(peer->sock_path = strdup(path))) {
perror("ParseArgs: strdup failed"); perror("NetPeerAdd: strdup failed");
return false; return false;
} }
peer->is_dev = dev; peer->is_dev = dev;
...@@ -107,73 +98,6 @@ static bool AddPeer(const char *path, bool dev) { ...@@ -107,73 +98,6 @@ static bool AddPeer(const char *path, bool dev) {
return true; return true;
} }
static int ParseArgs(int argc, char *argv[]) {
const char *opts = "ld:n:s:S:D:ip:g:";
int c;
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c) {
case 'l':
mode_listen = true;
break;
case 'd':
if (!AddPeer(optarg, true))
return 1;
break;
case 'n':
if (!AddPeer(optarg, false))
return 1;
break;
case 's':
if (!(shm_path = strdup(optarg))) {
perror("ParseArgs: strdup failed");
return 1;
}
break;
case 'S':
shm_size = strtoull(optarg, NULL, 10) * 1024 * 1024;
break;
case 'D':
ib_devname = optarg;
break;
case 'i':
ib_connect = true;
break;
case 'p':
ib_port = strtoull(optarg, NULL, 10);
break;
case 'g':
ib_sgid_idx = strtoull(optarg, NULL, 10);
break;
default:
PrintUsage();
return 1;
}
}
if (optind + 2 != argc) {
PrintUsage();
return 1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(strtoul(argv[optind + 1], NULL, 10));
if ((addr.sin_addr.s_addr = inet_addr(argv[optind])) == INADDR_NONE) {
PrintUsage();
return 1;
}
return 0;
}
static int PeersInitNets() { static int PeersInitNets() {
#ifdef DEBUG #ifdef DEBUG
...@@ -236,7 +160,17 @@ static int PeersInitDevs() { ...@@ -236,7 +160,17 @@ static int PeersInitDevs() {
return 0; return 0;
} }
int PeerDevSendIntro(struct Peer *peer) { int NetConnect() {
if (PeersInitNets())
return 1;
if (PeersInitDevs())
return 1;
return 0;
}
int NetPeerSendDevIntro(struct Peer *peer) {
#ifdef DEBUG #ifdef DEBUG
fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path); fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif #endif
...@@ -262,7 +196,7 @@ int PeerDevSendIntro(struct Peer *peer) { ...@@ -262,7 +196,7 @@ int PeerDevSendIntro(struct Peer *peer) {
return 0; return 0;
} }
int PeerNetSetupQueues(struct Peer *peer) { int NetPeerSetupNetQueues(struct Peer *peer) {
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro; struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
#ifdef DEBUG #ifdef DEBUG
...@@ -309,7 +243,7 @@ int PeerNetSetupQueues(struct Peer *peer) { ...@@ -309,7 +243,7 @@ int PeerNetSetupQueues(struct Peer *peer) {
return 0; return 0;
} }
int PeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) { int NetPeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
if (written_pos == peer->cleanup_pos_last && if (written_pos == peer->cleanup_pos_last &&
clean_pos == peer->local_pos_cleaned) clean_pos == peer->local_pos_cleaned)
return 0; return 0;
...@@ -382,7 +316,7 @@ static int PeerAcceptEvent(struct Peer *peer) { ...@@ -382,7 +316,7 @@ static int PeerAcceptEvent(struct Peer *peer) {
return 0; return 0;
} }
static int PeerEvent(struct Peer *peer, uint32_t events) { int NetPeerEvent(struct Peer *peer, uint32_t events) {
#ifdef DEBUG #ifdef DEBUG
fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path); fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
#endif #endif
...@@ -429,8 +363,8 @@ static int PeerEvent(struct Peer *peer, uint32_t events) { ...@@ -429,8 +363,8 @@ static int PeerEvent(struct Peer *peer, uint32_t events) {
peer->intro_valid_local = true; peer->intro_valid_local = true;
// pass intro along via RDMA // pass intro along
if (RdmaPassIntro(peer)) if (NetOpPassIntro(peer))
return 1; return 1;
if (peer->intro_valid_remote) { if (peer->intro_valid_remote) {
...@@ -463,7 +397,7 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) { ...@@ -463,7 +397,7 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
} }
if (n > 0) { if (n > 0) {
RdmaPassEntry(peer, n); NetOpPassEntries(peer, n);
peer->local_pos += n; peer->local_pos += n;
if (peer->local_pos >= peer->local_enum) if (peer->local_pos >= peer->local_enum)
peer->local_pos -= peer->local_enum; peer->local_pos -= peer->local_enum;
...@@ -510,88 +444,17 @@ static inline void PollPeerCleanup(struct Peer *peer, bool *report) { ...@@ -510,88 +444,17 @@ static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
} }
} }
static void *PollThread(void *data) { void NetPoll() {
while (true) { bool report = false;
// poll queue for transferring entries for (size_t i = 0; i < peer_num; i++) {
bool report = false; struct Peer *peer = &peers[i];
for (size_t i = 0; i < peer_num; i++) { if (!peer->ready)
struct Peer *peer = &peers[i]; continue;
if (!peer->ready)
continue;
PollPeerTransfer(peer, &report);
PollPeerCleanup(peer, &report);
}
if (report)
RdmaPassReport();
}
return NULL;
}
static int IOLoop() {
while (1) {
const size_t kNumEvs = 8;
struct epoll_event evs[kNumEvs];
int n = epoll_wait(epfd, evs, kNumEvs, -1);
if (n < 0) {
perror("IOLoop: epoll_wait failed");
return 1;
}
for (int i = 0; i < n; i++) {
struct Peer *peer = evs[i].data.ptr;
if (peer && PeerEvent(peer, evs[i].events))
return 1;
else if (!peer && RdmaEvent())
return 1;
}
fflush(stdout);
}
}
int main(int argc, char *argv[]) {
if (ParseArgs(argc, argv))
return EXIT_FAILURE;
#ifdef DEBUG
fprintf(stderr, "pid=%d shm=%s\n", getpid(), shm_path);
#endif
if ((shm_fd = ShmCreate(shm_path, shm_size, &shm_base)) < 0)
return EXIT_FAILURE;
if ((epfd = epoll_create1(0)) < 0) {
perror("epoll_create1 failed");
return EXIT_FAILURE;
}
if (mode_listen) {
if (RdmaListen(&addr))
return EXIT_FAILURE;
} else {
if (RdmaConnect(&addr))
return EXIT_FAILURE;
}
printf("RDMA connected\n");
fflush(stdout);
if (PeersInitNets())
return EXIT_FAILURE;
printf("Networks initialized\n");
fflush(stdout);
if (PeersInitDevs()) PollPeerTransfer(peer, &report);
return EXIT_FAILURE; PollPeerCleanup(peer, &report);
printf("Devices initialized\n");
fflush(stdout);
pthread_t poll_thread;
if (pthread_create(&poll_thread, NULL, PollThread, NULL)) {
perror("pthread_create failed (poll thread)");
return EXIT_FAILURE;
} }
return IOLoop(); if (report)
} NetOpPassReport();
}
\ No newline at end of file
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#ifndef DIST_NET_RDMA_H_ #ifndef DIST_COMMON_NET_H_
#define DIST_NET_RDMA_H_ #define DIST_COMMON_NET_H_
#include <arpa/inet.h> #include <arpa/inet.h>
#include <stdbool.h> #include <stdbool.h>
...@@ -70,10 +70,8 @@ struct Peer { ...@@ -70,10 +70,8 @@ struct Peer {
struct SimbricksProtoNetNetIntro net_intro; struct SimbricksProtoNetNetIntro net_intro;
const char *sock_path; const char *sock_path;
/* RDMA memory region for the shared memory of the queues on this end. Could // opaque value, e.g. to be used by rdma proxy for memory region
be either our own global SHM region if this is a network peer, or the SHM void *shm_opaque;
region allocated by the device peer. */
struct ibv_mr *shm_mr;
void *shm_base; void *shm_base;
size_t shm_size; size_t shm_size;
...@@ -91,28 +89,23 @@ struct Peer { ...@@ -91,28 +89,23 @@ struct Peer {
volatile bool ready; volatile bool ready;
}; };
// configuration variables
extern bool mode_listen;
extern const char *shm_path;
extern size_t shm_size;
extern void *shm_base; extern void *shm_base;
extern size_t peer_num; extern size_t peer_num;
extern struct Peer *peers; extern struct Peer *peers;
extern int epfd;
extern const char *ib_devname; int NetInit(const char *shm_path_, size_t shm_size_, int epfd_);
extern bool ib_connect; bool NetPeerAdd(const char *path, bool dev);
extern uint8_t ib_port; struct Peer *NetPeerLookup(uint32_t id);
extern int ib_sgid_idx; int NetConnect(void);
void NetPoll(void);
int PeerDevSendIntro(struct Peer *peer); int NetPeerSendDevIntro(struct Peer *peer);
int PeerNetSetupQueues(struct Peer *peer); int NetPeerSetupNetQueues(struct Peer *peer);
int PeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos); int NetPeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos);
int NetPeerEvent(struct Peer *peer, uint32_t events);
int RdmaListen(struct sockaddr_in *addr);
int RdmaConnect(struct sockaddr_in *addr); // To be implemented in proxy implementation
int RdmaPassIntro(struct Peer *peer); int NetOpPassIntro(struct Peer *peer);
int RdmaPassEntry(struct Peer *peer, uint32_t n); int NetOpPassEntries(struct Peer *peer, size_t n);
int RdmaPassReport(); int NetOpPassReport();
int RdmaEvent();
#endif // DIST_NET_RDMA_H_ #endif // DIST_NET_RDMA_H_
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/utils.h" #include "dist/common/utils.h"
#include <fcntl.h> #include <fcntl.h>
#include <pthread.h> #include <pthread.h>
......
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/rdma/net_rdma.h"
#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>
#include <simbricks/proto/base.h>
#include "dist/common/utils.h"
const char *shm_path = NULL;
size_t shm_size = 256 * 1024 * 1024ULL; // 256MB
bool mode_listen = false;
struct sockaddr_in addr;
int epfd = -1;
const char *ib_devname = NULL;
bool ib_connect = false;
uint8_t ib_port = 1;
int ib_sgid_idx = -1;
static void PrintUsage() {
fprintf(stderr,
"Usage: net_rdma [OPTIONS] IP PORT\n"
" -l: Listen instead of connecting\n"
" -d DEV-SOCKET: network socket of a device simulator\n"
" -n NET-SOCKET: network socket of a network simulator\n"
" -s SHM-PATH: shared memory region path\n"
" -S SHM-SIZE: shared memory region size in MB (default 256)\n");
}
static int ParseArgs(int argc, char *argv[]) {
const char *opts = "ld:n:s:S:D:ip:g:";
int c;
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c) {
case 'l':
mode_listen = true;
break;
case 'd':
if (!NetPeerAdd(optarg, true))
return 1;
break;
case 'n':
if (!NetPeerAdd(optarg, false))
return 1;
break;
case 's':
if (!(shm_path = strdup(optarg))) {
perror("ParseArgs: strdup failed");
return 1;
}
break;
case 'S':
shm_size = strtoull(optarg, NULL, 10) * 1024 * 1024;
break;
case 'D':
ib_devname = optarg;
break;
case 'i':
ib_connect = true;
break;
case 'p':
ib_port = strtoull(optarg, NULL, 10);
break;
case 'g':
ib_sgid_idx = strtoull(optarg, NULL, 10);
break;
default:
PrintUsage();
return 1;
}
}
if (optind + 2 != argc) {
PrintUsage();
return 1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(strtoul(argv[optind + 1], NULL, 10));
if ((addr.sin_addr.s_addr = inet_addr(argv[optind])) == INADDR_NONE) {
PrintUsage();
return 1;
}
return 0;
}
static void *PollThread(void *data) {
while (true)
NetPoll();
return NULL;
}
static int IOLoop() {
while (1) {
const size_t kNumEvs = 8;
struct epoll_event evs[kNumEvs];
int n = epoll_wait(epfd, evs, kNumEvs, -1);
if (n < 0) {
perror("IOLoop: epoll_wait failed");
return 1;
}
for (int i = 0; i < n; i++) {
struct Peer *peer = evs[i].data.ptr;
if (peer && NetPeerEvent(peer, evs[i].events))
return 1;
else if (!peer && RdmaEvent())
return 1;
}
fflush(stdout);
}
}
int main(int argc, char *argv[]) {
if (ParseArgs(argc, argv))
return EXIT_FAILURE;
#ifdef DEBUG
fprintf(stderr, "pid=%d shm=%s\n", getpid(), shm_path);
#endif
if ((epfd = epoll_create1(0)) < 0) {
perror("epoll_create1 failed");
return EXIT_FAILURE;
}
if (NetInit(shm_path, shm_size, epfd))
return EXIT_FAILURE;
if (mode_listen) {
if (RdmaListen(&addr))
return EXIT_FAILURE;
} else {
if (RdmaConnect(&addr))
return EXIT_FAILURE;
}
printf("RDMA connected\n");
fflush(stdout);
if (NetConnect())
return EXIT_FAILURE;
printf("Peers initialized\n");
fflush(stdout);
pthread_t poll_thread;
if (pthread_create(&poll_thread, NULL, PollThread, NULL)) {
perror("pthread_create failed (poll thread)");
return EXIT_FAILURE;
}
return IOLoop();
}
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef DIST_NET_RDMA_H_
#define DIST_NET_RDMA_H_
#include "dist/common/net.h"
#include <arpa/inet.h>
#include <stdbool.h>
#include <stddef.h>
#include <simbricks/proto/network.h>
// configuration variables
extern size_t shm_size;
extern int epfd;
extern const char *ib_devname;
extern bool ib_connect;
extern uint8_t ib_port;
extern int ib_sgid_idx;
int RdmaListen(struct sockaddr_in *addr);
int RdmaConnect(struct sockaddr_in *addr);
int RdmaEvent();
#endif // DIST_NET_RDMA_H_
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/rdma.h" #include "dist/rdma/rdma.h"
#include "dist/net_rdma.h" #include "dist/rdma/net_rdma.h"
#include <fcntl.h> #include <fcntl.h>
#include <infiniband/verbs.h> #include <infiniband/verbs.h>
...@@ -137,13 +137,13 @@ static int RdmaMsgRxIntro(struct NetRdmaMsg *msg) { ...@@ -137,13 +137,13 @@ static int RdmaMsgRxIntro(struct NetRdmaMsg *msg) {
peer->intro_valid_remote = true; peer->intro_valid_remote = true;
if (peer->is_dev) { if (peer->is_dev) {
peer->net_intro = msg->net; peer->net_intro = msg->net;
if (PeerDevSendIntro(peer)) if (NetPeerSendDevIntro(peer))
return 1; return 1;
} else { } else {
peer->dev_intro = msg->dev; peer->dev_intro = msg->dev;
if (PeerNetSetupQueues(peer)) if (NetPeerSetupNetQueues(peer))
return 1; return 1;
if (peer->intro_valid_local && RdmaPassIntro(peer)) if (peer->intro_valid_local && NetOpPassIntro(peer))
return 1; return 1;
} }
...@@ -163,7 +163,8 @@ static int RdmaMsgRxReport(struct NetRdmaMsg *msg) { ...@@ -163,7 +163,8 @@ static int RdmaMsgRxReport(struct NetRdmaMsg *msg) {
fprintf(stderr, "RdmaMsgRxReport: invalid ready peer number %zu\n", i); fprintf(stderr, "RdmaMsgRxReport: invalid ready peer number %zu\n", i);
abort(); abort();
} }
PeerReport(&peers[i], msg->report.written_pos[i], msg->report.clean_pos[i]); NetPeerReport(&peers[i], msg->report.written_pos[i],
msg->report.clean_pos[i]);
} }
return 0; return 0;
} }
...@@ -369,17 +370,17 @@ int RdmaEvent() { ...@@ -369,17 +370,17 @@ int RdmaEvent() {
return 0; return 0;
} }
int RdmaPassIntro(struct Peer *peer) { int NetOpPassIntro(struct Peer *peer) {
#ifdef RDMA_DEBUG #ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassIntro(%s)\n", peer->sock_path); fprintf(stderr, "NetOpPassIntro(%s)\n", peer->sock_path);
#endif #endif
// device peers have sent us an SHM region, need to register this an as MR // device peers have sent us an SHM region, need to register this an as MR
if (peer->is_dev) { if (peer->is_dev) {
if (!(peer->shm_mr = ibv_reg_mr(pd, peer->shm_base, peer->shm_size, if (!(peer->shm_opaque = ibv_reg_mr(pd, peer->shm_base, peer->shm_size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE))) { IBV_ACCESS_REMOTE_WRITE))) {
perror("RdmaPassIntro: ibv_reg_mr shm failed"); perror("NetOpPassIntro: ibv_reg_mr shm failed");
return 1; return 1;
} }
} else { } else {
...@@ -387,11 +388,11 @@ int RdmaPassIntro(struct Peer *peer) { ...@@ -387,11 +388,11 @@ int RdmaPassIntro(struct Peer *peer) {
intro from our RDMA peer, so we can include the queue position. */ intro from our RDMA peer, so we can include the queue position. */
if (!peer->intro_valid_remote) { if (!peer->intro_valid_remote) {
fprintf(stderr, fprintf(stderr,
"RdmaPassIntro: skipping because remote intro not received\n"); "NetOpPassIntro: skipping because remote intro not received\n");
return 0; return 0;
} }
peer->shm_mr = mr_shm; peer->shm_opaque = mr_shm;
peer->shm_base = shm_base; peer->shm_base = shm_base;
peer->shm_size = shm_size; peer->shm_size = shm_size;
} }
...@@ -402,7 +403,8 @@ int RdmaPassIntro(struct Peer *peer) { ...@@ -402,7 +403,8 @@ int RdmaPassIntro(struct Peer *peer) {
msg->id = peer - peers; msg->id = peer - peers;
msg->base_addr = (uintptr_t) peer->shm_base; msg->base_addr = (uintptr_t) peer->shm_base;
msg->rkey = peer->shm_mr->rkey; struct ibv_mr *mr = peer->shm_opaque;
msg->rkey = mr->rkey;
if (peer->is_dev) { if (peer->is_dev) {
msg->msg_type = kMsgDev; msg->msg_type = kMsgDev;
/* this is a device peer, meaning the remote side will write to the /* this is a device peer, meaning the remote side will write to the
...@@ -441,9 +443,10 @@ int RdmaPassIntro(struct Peer *peer) { ...@@ -441,9 +443,10 @@ int RdmaPassIntro(struct Peer *peer) {
return 0; return 0;
} }
int RdmaPassEntry(struct Peer *peer, uint32_t n) { int NetOpPassEntries(struct Peer *peer, size_t n) {
#ifdef RDMA_DEBUG #ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassEntry(%s,%u)\n", peer->sock_path, peer->local_pos); fprintf(stderr, "NetOpPassEntries(%s,%u)\n", peer->sock_path,
peer->local_pos);
fprintf(stderr, " remote_base=%lx local_base=%p\n", peer->remote_base, fprintf(stderr, " remote_base=%lx local_base=%p\n", peer->remote_base,
peer->local_base); peer->local_base);
#endif #endif
...@@ -457,7 +460,8 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) { ...@@ -457,7 +460,8 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) {
struct ibv_sge sge; struct ibv_sge sge;
sge.addr = (uintptr_t) (peer->local_base + pos); sge.addr = (uintptr_t) (peer->local_base + pos);
sge.length = peer->local_elen * n; sge.length = peer->local_elen * n;
sge.lkey = peer->shm_mr->lkey; struct ibv_mr *mr = peer->shm_opaque;
sge.lkey = mr->lkey;
struct ibv_send_wr send_wr = { }; struct ibv_send_wr send_wr = { };
send_wr.wr_id = -1ULL; send_wr.wr_id = -1ULL;
...@@ -474,7 +478,7 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) { ...@@ -474,7 +478,7 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) {
if (ret == 0) { if (ret == 0) {
break; break;
} else if (ret != ENOMEM) { } else if (ret != ENOMEM) {
fprintf(stderr, "RdmaPassEntry: ibv_post_send failed %d (%s)\n", ret, fprintf(stderr, "NetOpPassEntries: ibv_post_send failed %d (%s)\n", ret,
strerror(ret)); strerror(ret));
return 1; return 1;
} }
...@@ -482,9 +486,9 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) { ...@@ -482,9 +486,9 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) {
return 0; return 0;
} }
int RdmaPassReport() { int NetOpPassReport() {
if (peer_num > MAX_PEERS) { if (peer_num > MAX_PEERS) {
fprintf(stderr, "RdmaPassReport: peer_num (%zu) larger than max (%u)\n", fprintf(stderr, "NetOpPassReport: peer_num (%zu) larger than max (%u)\n",
peer_num, MAX_PEERS); peer_num, MAX_PEERS);
abort(); abort();
} }
...@@ -531,7 +535,7 @@ int RdmaPassReport() { ...@@ -531,7 +535,7 @@ int RdmaPassReport() {
if (ret == 0) { if (ret == 0) {
break; break;
} else if (ret != ENOMEM) { } else if (ret != ENOMEM) {
fprintf(stderr, "RdmaPassReport: ibv_post_send failed %u (%s)", ret, fprintf(stderr, "NetOpPassReport: ibv_post_send failed %u (%s)", ret,
strerror(ret)); strerror(ret));
return 1; return 1;
} }
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#ifndef DIST_RDMA_H_ #ifndef DIST_RDMA_H_
#define DIST_RDMA_H_ #define DIST_RDMA_H_
#include "dist/net_rdma.h" #include "dist/rdma/net_rdma.h"
#include <infiniband/verbs.h> #include <infiniband/verbs.h>
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/rdma.h" #include "dist/rdma/rdma.h"
#include "dist/net_rdma.h" #include "dist/rdma/net_rdma.h"
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <stdio.h> #include <stdio.h>
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/rdma.h" #include "dist/rdma/rdma.h"
#include "dist/net_rdma.h" #include "dist/rdma/net_rdma.h"
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <stdio.h> #include <stdio.h>
......
...@@ -22,9 +22,10 @@ ...@@ -22,9 +22,10 @@
include mk/subdir_pre.mk include mk/subdir_pre.mk
bin_net_rdma := $(d)net_rdma bin_net_rdma := $(d)rdma/net_rdma
OBJS := $(addprefix $(d), net_rdma.o rdma.o rdma_cm.o rdma_ib.o utils.o) OBJS := $(addprefix $(d), rdma/net_rdma.o rdma/rdma.o rdma/rdma_cm.o \
rdma/rdma_ib.o common/net.o common/utils.o)
$(bin_net_rdma): $(OBJS) -lrdmacm -libverbs -lpthread $(bin_net_rdma): $(OBJS) -lrdmacm -libverbs -lpthread
......
...@@ -67,7 +67,7 @@ class RDMANetProxyListener(NetProxyListener): ...@@ -67,7 +67,7 @@ class RDMANetProxyListener(NetProxyListener):
super().__init__() super().__init__()
def run_cmd(self, env): def run_cmd(self, env):
cmd = (f'{env.repodir}/dist/net_rdma -l ' cmd = (f'{env.repodir}/dist/rdma/net_rdma -l '
f'-s {env.proxy_shm_path(self)} ' f'-s {env.proxy_shm_path(self)} '
f'-S {self.shm_size} ') f'-S {self.shm_size} ')
for (nic, local) in self.nics: for (nic, local) in self.nics:
...@@ -81,7 +81,7 @@ class RDMANetProxyConnecter(NetProxyConnecter): ...@@ -81,7 +81,7 @@ class RDMANetProxyConnecter(NetProxyConnecter):
super().__init__(listener) super().__init__(listener)
def run_cmd(self, env): def run_cmd(self, env):
cmd = (f'{env.repodir}/dist/net_rdma ' cmd = (f'{env.repodir}/dist/rdma/net_rdma '
f'-s {env.proxy_shm_path(self)} ' f'-s {env.proxy_shm_path(self)} '
f'-S {self.shm_size} ') f'-S {self.shm_size} ')
for (nic, local) in self.nics: for (nic, local) in self.nics:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment