Commit 01137d16 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist/net_rdma: checkpoint for rdma network channel proxy (WIP)

parent ab151b01
...@@ -31,5 +31,6 @@ experiments/build ...@@ -31,5 +31,6 @@ experiments/build
experiments/out experiments/out
experiments/local-config.sh experiments/local-config.sh
experiments/slurm experiments/slurm
dist/net_rdma
trace/process trace/process
mk/local.mk mk/local.mk
...@@ -41,6 +41,7 @@ VFLAGS = +1364-2005ext+v \ ...@@ -41,6 +41,7 @@ VFLAGS = +1364-2005ext+v \
$(eval $(call subdir,lib)) $(eval $(call subdir,lib))
$(eval $(call subdir,sims)) $(eval $(call subdir,sims))
$(eval $(call subdir,dist))
$(eval $(call subdir,doc)) $(eval $(call subdir,doc))
$(eval $(call subdir,images)) $(eval $(call subdir,images))
$(eval $(call subdir,trace)) $(eval $(call subdir,trace))
......
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/net_rdma.h"
#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/mman.h>
#include <unistd.h>
#include <simbricks/proto/base.h>
#include "dist/utils.h"
const char *shm_path = NULL;
size_t shm_size = 256 * 1024 * 1024ULL; // 256MB
void *shm_base = NULL;
static int shm_fd = -1;
static size_t shm_alloc_off = 0;
bool mode_listen = false;
size_t peer_num = 0;
struct Peer *peers = NULL;
struct sockaddr_in addr;
int epfd = -1;
static int ShmAlloc(size_t size, uint64_t *off) {
if (shm_alloc_off + size > shm_size) {
fprintf(stderr, "ShmAlloc: alloc of %zu bytes failed\n", size);
return 1;
}
*off = shm_alloc_off;
shm_alloc_off += size;
return 0;
}
static void PrintUsage() {
fprintf(stderr,
"Usage: net_rdma [OPTIONS] IP PORT\n"
" -l: Listen instead of connecting\n"
" -d DEV-SOCKET: network socket of a device simulator\n"
" -n NET-SOCKET: network socket of a network simulator\n"
" -s SHM-PATH: shared memory region path\n"
" -S SHM-SIZE: shared memory region size in MB (default 256)\n");
}
static bool AddPeer(const char *path, bool dev) {
struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
if (!peer) {
perror("ParseArgs: realloc failed");
return false;
}
peers = peer;
peer += peer_num;
peer_num++;
if (!(peer->sock_path = strdup(path))) {
perror("ParseArgs: strdup failed");
return false;
}
peer->is_dev = dev;
peer->sock_fd = -1;
peer->shm_fd = -1;
return true;
}
static int ParseArgs(int argc, char *argv[]) {
const char *opts = "ld:n:s:S:";
int c;
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c) {
case 'l':
mode_listen = true;
break;
case 'd':
if (!AddPeer(optarg, true))
return 1;
break;
case 'n':
if (!AddPeer(optarg, false))
return 1;
break;
case 's':
if (!(shm_path = strdup(optarg))) {
perror("ParseArgs: strdup failed");
return 1;
}
break;
case 'S':
shm_size = strtoull(optarg, NULL, 10) * 1024 * 1024;
break;
default:
PrintUsage();
return 1;
}
}
if (optind + 2 != argc) {
PrintUsage();
return 1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(strtoul(argv[optind + 1], NULL, 10));
if ((addr.sin_addr.s_addr = inet_addr(argv[optind])) == INADDR_NONE) {
PrintUsage();
return 1;
}
return 0;
}
static int PeersInitNets() {
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (peer->is_dev)
continue;
int lfd;
if ((lfd = UxsocketInit(peer->sock_path)) < 0) {
perror("PeersInitNets: unix socket init failed");
return 1;
}
if ((peer->sock_fd = accept(lfd, NULL, NULL)) < 0) {
perror("PeersInitNets: accept failed");
return 1;
}
struct epoll_event epev;
epev.events = EPOLLIN;
epev.data.ptr = peer;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
perror("PeersInitNets: epoll_ctl failed");
return 1;
}
}
return 0;
}
static int PeersInitDevs() {
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (!peer->is_dev)
continue;
if ((peer->sock_fd = UxsocketConnect(peer->sock_path)) < 0)
return 1;
struct epoll_event epev;
epev.events = EPOLLIN;
epev.data.ptr = peer;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, peer->sock_fd, &epev)) {
perror("PeersInitNets: epoll_ctl failed");
return 1;
}
}
return 0;
}
int PeerDevSendIntro(struct Peer *peer) {
fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
peer->shm_fd = shm_fd;
peer->shm_base = shm_base;
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
peer->local_base = (void *) ((uintptr_t) shm_base + di->d2n_offset);
peer->local_elen = di->d2n_elen;
peer->local_enum = di->d2n_nentries;
struct SimbricksProtoNetNetIntro *ni = &peer->net_intro;
ssize_t ret = send(peer->sock_fd, ni, sizeof(*ni), 0);
if (ret < 0) {
perror("PeerDevSendIntro: send failed");
return 1;
} else if (ret != (ssize_t) sizeof(*ni)) {
fprintf(stderr, "PeerDevSendIntro: send incomplete\n");
return 1;
}
return 0;
}
int PeerNetSetupQueues(struct Peer *peer) {
fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
if (ShmAlloc(di->d2n_elen * di->d2n_nentries, &di->d2n_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc d2n failed");
return 1;
}
if (ShmAlloc(di->d2n_elen * di->n2d_nentries, &di->n2d_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc n2d failed");
return 1;
}
peer->shm_fd = shm_fd;
peer->shm_base = shm_base;
peer->local_base = (void *) ((uintptr_t) shm_base + di->n2d_offset);
peer->local_elen = di->n2d_elen;
peer->local_enum = di->n2d_nentries;
if (UxsocketSendFd(peer->sock_fd, di, sizeof(*di), peer->shm_fd)) {
fprintf(stderr, "PeerNetSetupQueues: sending welcome message failed (%lu)",
peer - peers);
return 1;
}
return 0;
}
static int PeerEvent(struct Peer *peer, uint32_t events) {
fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
fflush(stdout);
// disable peer if not an input event
if (!(events & EPOLLIN)) {
fprintf(stderr, "PeerEvent: non-input event, disabling peer (%s)",
peer->sock_path);
peer->ready = false;
return 1;
}
// if we already have the intro, this is not expected
if (peer->intro_valid_local) {
fprintf(stderr, "PeerEvent: receive event after intro (%s)\n",
peer->sock_path);
return 1;
}
// receive intro message
if (peer->is_dev) {
if (UxsocketRecvFd(peer->sock_fd, &peer->dev_intro, sizeof(peer->dev_intro),
&peer->shm_fd))
return 1;
if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
return 1;
} else {
ssize_t ret = recv(peer->sock_fd, &peer->net_intro, sizeof(peer->net_intro),
0);
if (ret < 0) {
perror("PeerEvent: recv failed");
return 1;
} else if (ret != (ssize_t) sizeof(peer->net_intro)) {
fprintf(stderr, "PeerEvent: partial receive (%zd)\n", ret);
return 1;
}
}
peer->intro_valid_local = true;
// pass intro along via RDMA
if (RdmaPassIntro(peer))
return 1;
if (peer->intro_valid_remote) {
printf("PeerEvent(%s): marking peer as ready\n", peer->sock_path);
peer->ready = true;
}
return 0;
}
static void *PollThread(void *data) {
return NULL;
}
static int IOLoop() {
while (1) {
const size_t kNumEvs = 8;
struct epoll_event evs[kNumEvs];
int n = epoll_wait(epfd, evs, kNumEvs, -1);
if (n < 0) {
perror("IOLoop: epoll_wait failed");
return 1;
}
for (int i = 0; i < n; i++) {
struct Peer *peer = evs[i].data.ptr;
if (peer && PeerEvent(peer, evs[i].events))
return 1;
else if (!peer && RdmaEvent())
return 1;
}
fflush(stdout);
}
}
int main(int argc, char *argv[]) {
if (ParseArgs(argc, argv))
return EXIT_FAILURE;
fprintf(stderr, "pid=%d shm=%s\n", getpid(), shm_path);
if ((shm_fd = ShmCreate(shm_path, shm_size, &shm_base)) < 0)
return EXIT_FAILURE;
if ((epfd = epoll_create1(0)) < 0) {
perror("epoll_create1 failed");
return EXIT_FAILURE;
}
if (mode_listen) {
if (RdmaListen(&addr))
return EXIT_FAILURE;
} else {
if (RdmaConnect(&addr))
return EXIT_FAILURE;
}
printf("RDMA connected\n");
fflush(stdout);
if (PeersInitNets())
return EXIT_FAILURE;
printf("Networks initialized\n");
fflush(stdout);
if (PeersInitDevs())
return EXIT_FAILURE;
printf("Devices initialized\n");
fflush(stdout);
pthread_t poll_thread;
if (pthread_create(&poll_thread, NULL, PollThread, NULL)) {
perror("pthread_create failed (poll thread)");
return EXIT_FAILURE;
}
IOLoop();
return EXIT_SUCCESS;
}
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef DIST_NET_RDMA_NET_RDMA_H_
#define DIST_NET_RDMA_NET_RDMA_H_
#include <arpa/inet.h>
#include <stdbool.h>
#include <stddef.h>
#include <simbricks/proto/network.h>
struct Peer {
struct SimbricksProtoNetDevIntro dev_intro;
struct SimbricksProtoNetNetIntro net_intro;
const char *sock_path;
/* base address of the local queue we're polling.
(d2n or n2d depending on is_dev). */
void *local_base;
uint64_t local_elen;
uint64_t local_enum;
uint64_t local_pos;
// rkey and base address of the remote queue to write to
uint64_t remote_rkey;
uint64_t remote_base;
/* RDMA memory region for the shared memory of the queues on this end. Could
be either our own global SHM region if this is a network peer, or the SHM
region allocated by the device peer. */
struct ibv_mr *shm_mr;
void *shm_base;
size_t shm_size;
int sock_fd;
int shm_fd;
// is our local peer a device? (otherwise it's a network)
bool is_dev;
bool intro_valid_local;
bool intro_valid_remote;
// set true when the queue is ready for polling
volatile bool ready;
};
// configuration variables
extern bool mode_listen;
extern const char *shm_path;
extern size_t shm_size;
extern void *shm_base;
extern size_t peer_num;
extern struct Peer *peers;
extern int epfd;
int PeerDevSendIntro(struct Peer *peer);
int PeerNetSetupQueues(struct Peer *peer);
int RdmaListen(struct sockaddr_in *addr);
int RdmaConnect(struct sockaddr_in *addr);
int RdmaPassIntro(struct Peer *peer);
int RdmaEvent();
#endif // DIST_NET_RDMA_NET_RDMA_H_
\ No newline at end of file
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/net_rdma.h"
#include <fcntl.h>
#include <rdma/rdma_cma.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#define MSG_RXBUFS 16
#define MSG_TXBUFS 16
struct NetRdmaMsg {
union {
struct SimbricksProtoNetDevIntro dev;
struct SimbricksProtoNetNetIntro net;
struct NetRdmaMsg *next_free;
};
uint64_t id;
uint64_t base_addr;
uint64_t queue_off;
uint64_t rkey;
enum {
kMsgDev,
kMsgNet,
} msg_type;
} __attribute__ ((packed));
static struct rdma_event_channel *cm_channel;
static struct rdma_conn_param conn_param = { };
static struct rdma_cm_id *cm_id;
static struct ibv_pd *pd;
static struct ibv_cq *cq;
static struct ibv_comp_channel *comp_chan;
static struct ibv_mr *mr_shm;
static struct ibv_mr *mr_msgs;
static struct ibv_qp_init_attr qp_attr = { };
static struct NetRdmaMsg msgs[MSG_RXBUFS + MSG_TXBUFS];
static struct NetRdmaMsg *msgs_free = NULL;
static struct NetRdmaMsg *RdmaMsgAlloc() {
struct NetRdmaMsg *msg = msgs_free;
if (msg != NULL) {
msgs_free = msg->next_free;
}
return msg;
}
static void RdmaMsgFree(struct NetRdmaMsg *msg) {
msg->next_free = msgs_free;
msgs_free = msg;
}
static int RdmMsgRxEnqueue(struct NetRdmaMsg *msg) {
struct ibv_sge sge = { };
sge.addr = (uintptr_t) msg;
sge.length = sizeof(*msg);
sge.lkey = mr_msgs->lkey;
struct ibv_recv_wr recv_wr = { };
recv_wr.wr_id = msg - msgs;
recv_wr.sg_list = &sge;
recv_wr.num_sge = 1;
struct ibv_recv_wr *bad_recv_wr;
if (ibv_post_recv(cm_id->qp, &recv_wr, &bad_recv_wr)) {
perror("RdmMsgRxEnqueue: ibv_post_recv failed");
return 1;
}
return 0;
}
static int RdmaMsgRx(struct NetRdmaMsg *msg) {
if (msg->id >= peer_num) {
fprintf(stderr, "RdmMsgRx: invalid peer id in message (%lu)\n", msg->id);
abort();
}
struct Peer *peer = peers + msg->id;
printf("RdmMsgRx -> peer %s\n", peer->sock_path);
if (peer->is_dev != (msg->msg_type == kMsgNet)) {
fprintf(stderr, "RdmMsgRx: unexpetced message type (%u)\n", msg->msg_type);
abort();
}
if (peer->intro_valid_remote) {
fprintf(stderr, "RdmMsgRx: received multiple messages (%lu)\n", msg->id);
abort();
}
peer->remote_rkey = msg->rkey;
peer->remote_base = msg->base_addr + msg->queue_off;
peer->intro_valid_remote = true;
if (peer->is_dev) {
peer->net_intro = msg->net;
if (PeerDevSendIntro(peer))
return 1;
} else {
peer->dev_intro = msg->dev;
if (PeerNetSetupQueues(peer))
return 1;
if (peer->intro_valid_local && RdmaPassIntro(peer))
return 1;
}
if (peer->intro_valid_local) {
fprintf(stderr, "RdmMsgRx(%s): marking peer as ready\n", peer->sock_path);
peer->ready = true;
}
return 0;
}
static int RdmaCommonInit() {
if (!(pd = ibv_alloc_pd(cm_id->verbs))) {
perror("RdmaCommonInit: ibv_alloc_pd failed");
return 1;
}
if (!(comp_chan = ibv_create_comp_channel(cm_id->verbs))) {
perror("RdmaCommonInit: ibv_create_comp_channel failed");
return 1;
}
if (!(cq = ibv_create_cq(cm_id->verbs, 1024, NULL, comp_chan, 0))) {
perror("RdmaCommonInit: ibv_create_cq failed");
return 1;
}
if (!(mr_shm = ibv_reg_mr(pd, shm_base, shm_size,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE))) {
perror("RdmaCommonInit: ibv_reg_mr shm failed");
return 1;
}
if (!(mr_msgs = ibv_reg_mr(pd, msgs, sizeof(msgs),
IBV_ACCESS_LOCAL_WRITE))) {
perror("RdmaCommonInit: ibv_reg_mr msgs failed");
return 1;
}
qp_attr.cap.max_send_wr = MSG_TXBUFS;
qp_attr.cap.max_send_sge = 1;
qp_attr.cap.max_recv_wr = MSG_RXBUFS;
qp_attr.cap.max_recv_sge = 1;
qp_attr.send_cq = cq;
qp_attr.recv_cq = cq;
qp_attr.qp_type = IBV_QPT_RC;
qp_attr.sq_sig_all = 1;
if (rdma_create_qp(cm_id, pd, &qp_attr)) {
perror("RdmaCommonInit: rdma_create_qp failed");
return 1;
}
if (ibv_req_notify_cq(cq, 0)) {
perror("RdmMsgRxEnqueue: ibv_req_notify_cq failed");
return 1;
}
fprintf(stderr, "Enqueue rx buffers\n");
// post receive operations for all rx buffers
for (int i = 0; i < MSG_RXBUFS; i++)
if (RdmMsgRxEnqueue(&msgs[i]))
return 1;
// add tx buffers to freelist
for (int i = 0; i < MSG_TXBUFS; i++)
RdmaMsgFree(&msgs[MSG_RXBUFS + i]);
return 0;
}
static int RdmaCommonSetNonblock() {
int flags = fcntl(comp_chan->fd, F_GETFL);
if (fcntl(comp_chan->fd, F_SETFL, flags | O_NONBLOCK)) {
perror("RdmaCommonSetNonblock: fcntl set nonblock failed");
return 1;
}
struct epoll_event epev;
epev.events = EPOLLIN;
epev.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, comp_chan->fd, &epev)) {
perror("RdmaCommonSetNonblock: epoll_ctl failed");
return 1;
}
return 0;
}
int RdmaListen(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) {
perror("RdmaListen: rdma_create_event_channel failed");
return 1;
}
struct rdma_cm_id *listen_id;
if (rdma_create_id(cm_channel, &listen_id, NULL, RDMA_PS_TCP)) {
perror("RdmaListen: rdma_create_id failed");
return 1;
}
if (rdma_bind_addr(listen_id, (struct sockaddr *) addr)) {
perror("RdmaListen: rdma_bind_addr failed");
return 1;
}
if (rdma_listen(listen_id, 1)) {
perror("RdmaListen: rdma_listen failed");
return 1;
}
fprintf(stderr, "RdmaListen: listen done\n");
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
cm_id = event->id;
rdma_ack_cm_event(event);
fprintf(stderr, "RdmaListen: got conn request\n");
if (RdmaCommonInit())
return 1;
conn_param.responder_resources = 1;
if (rdma_accept(cm_id, &conn_param)) {
perror("RdmaListen: rdma_accept failed");
return 1;
}
fprintf(stderr, "RdmaListen: accept done\n");
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
fprintf(stderr, "RdmaListen: conn established\n");
if (RdmaCommonSetNonblock())
return 1;
return 0;
}
int RdmaConnect(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) {
perror("RdmaConnect: rdma_create_event_channel failed");
return 1;
}
if (rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP)) {
perror("RdmaConnect: rdma_create_id failed");
return 1;
}
if (rdma_resolve_addr(cm_id, NULL, (struct sockaddr *) addr, 5000)) {
perror("RdmaConnect: rdma_resolve_addr failed");
return 1;
}
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (addr)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ADDR_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
fprintf(stderr, "RdmaConnect: address resolved\n");
if (rdma_resolve_route(cm_id, 5000)) {
perror("RdmaConnect: rdma_resolve_route failed");
return 1;
}
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (route)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ROUTE_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
fprintf(stderr, "RdmaConnect: route resolved\n");
if (RdmaCommonInit())
return 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
if (rdma_connect(cm_id, &conn_param)) {
perror("RdmaConnect: rdma_connect failed");
return 1;
}
fprintf(stderr, "RdmaConnect: connect issued\n");
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (connect)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
if (RdmaCommonSetNonblock())
return 1;
return 0;
}
int RdmaEvent() {
fprintf(stderr, "RdmaEvent [pid=%d]\n", getpid());
struct ibv_cq *ecq;
void *ectx;
if (ibv_get_cq_event(comp_chan, &ecq, &ectx)) {
perror("RdmaEvent: ibv_get_cq_event failed");
return 1;
}
ibv_ack_cq_events(ecq, 1);
if (ibv_req_notify_cq(cq, 0)) {
perror("RdmaEvent: ibv_req_notify_cq failed");
return 1;
}
int n;
do {
const size_t kNumWC = 8;
struct ibv_wc wcs[kNumWC];
if ((n = ibv_poll_cq(cq, kNumWC, wcs)) < 0) {
perror("RdmaEvent: ibv_poll_cq failed");
return 1;
}
fprintf(stderr, " n=%d\n", n);
for (int i = 0; i < n; i++) {
if (wcs[i].opcode == IBV_WC_SEND) {
fprintf(stderr, "Send done\n", n);
if (wcs[i].status != IBV_WC_SUCCESS) {
fprintf(stderr, "RdmaEvent: unsuccessful send (%u)\n", wcs[i].status);
abort();
}
// need to free the send buffer again
RdmaMsgFree(msgs + wcs[i].wr_id);
} else if ((wcs[i].opcode & IBV_WC_RECV)) {
fprintf(stderr, "Recv done\n", n);
if (wcs[i].status != IBV_WC_SUCCESS) {
fprintf(stderr, "RdmaEvent: unsuccessful recv (%u)\n", wcs[i].status);
abort();
}
struct NetRdmaMsg *msg = msgs + wcs[i].wr_id;
if (RdmaMsgRx(msg) || RdmMsgRxEnqueue(msg))
return 1;
} else {
fprintf(stderr, "RdmaEvent: unexpected opcode %u\n", wcs[i].opcode);
abort();
}
}
} while (n > 0);
fflush(stdout);
return 0;
}
int RdmaPassIntro(struct Peer *peer) {
fprintf(stderr, "RdmaPassIntro(%s)\n", peer->sock_path);
// device peers have sent us an SHM region, need to register this an as MR
if (peer->is_dev) {
if (!(peer->shm_mr = ibv_reg_mr(pd, peer->shm_base, peer->shm_size,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE))) {
perror("RdmaPassIntro: ibv_reg_mr shm failed");
return 1;
}
} else {
/* on the network side we need to make sure we have received the device
intro from our RDMA peer, so we can include the queue position. */
if (!peer->intro_valid_remote) {
fprintf(stderr,
"RdmaPassIntro: skipping because remote intro not received\n");
return 0;
}
peer->shm_mr = mr_shm;
peer->shm_base = shm_base;
peer->shm_size = shm_size;
}
struct NetRdmaMsg *msg = RdmaMsgAlloc();
if (!msg)
return 1;
msg->id = peer - peers;
msg->base_addr = (uintptr_t) peer->shm_base;
msg->rkey = peer->shm_mr->rkey;
if (peer->is_dev) {
msg->msg_type = kMsgDev;
/* this is a device peer, meaning the remote side will write to the
network-to-device queue. */
msg->queue_off = peer->dev_intro.n2d_offset;
msg->dev = peer->dev_intro;
} else {
msg->msg_type = kMsgNet;
/* this is a network peer, meaning the remote side will write to the
device-to-network queue. */
msg->queue_off = peer->dev_intro.d2n_offset;
msg->net = peer->net_intro;
}
struct ibv_sge sge;
sge.addr = (uintptr_t) msg;
sge.length = sizeof(*msg);
sge.lkey = mr_msgs->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = msg - msgs;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
if (ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr)) {
perror("RdmaPassIntro: ibv_post_send failed");
return 1;
}
fprintf(stderr, "RdmaPassIntro: ibv_post_send done\n");
return 0;
}
\ No newline at end of file
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
include mk/subdir_pre.mk
bin_net_rdma := $(d)net_rdma
OBJS := $(addprefix $(d), net_rdma.o rdma.o utils.o)
$(bin_net_rdma): $(OBJS) -lrdmacm -libverbs -lpthread
CLEAN := $(bin_net_rdma) $(OBJS)
#ALL := $(bin_net_rdma)
include mk/subdir_post.mk
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/utils.h"
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
int UxsocketInit(const char *path) {
int fd;
struct sockaddr_un saun;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
perror("uxsocket_init: socket failed");
goto error_exit;
}
memset(&saun, 0, sizeof(saun));
saun.sun_family = AF_UNIX;
memcpy(saun.sun_path, path, strlen(path));
if (bind(fd, (struct sockaddr *)&saun, sizeof(saun))) {
perror("uxsocket_init: bind failed");
goto error_close;
}
if (listen(fd, 5)) {
perror("uxsocket_init: listen failed");
goto error_close;
}
return fd;
error_close:
close(fd);
error_exit:
return -1;
}
int UxsocketConnect(const char *path) {
int fd;
struct sockaddr_un saun;
/* prepare and connect socket */
memset(&saun, 0, sizeof(saun));
saun.sun_family = AF_UNIX;
strcpy(saun.sun_path, path);
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
perror("socket failed");
return -1;
}
if (connect(fd, (struct sockaddr *)&saun, sizeof(saun)) != 0) {
perror("connect failed");
return -1;
}
return fd;
}
int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd) {
int *ppfd;
ssize_t ret;
struct cmsghdr *cmsg;
union {
char buf[CMSG_SPACE(sizeof(int))];
struct cmsghdr align;
} u;
struct iovec iov = {
.iov_base = data,
.iov_len = len,
};
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = u.buf,
.msg_controllen = sizeof(u.buf),
.msg_flags = 0,
};
if ((ret = recvmsg(fd, &msg, 0)) != (ssize_t) len) {
perror("recvmsg failed");
return -1;
}
cmsg = CMSG_FIRSTHDR(&msg);
ppfd = (int *)CMSG_DATA(cmsg);
if (msg.msg_controllen <= 0 || cmsg->cmsg_len != CMSG_LEN(sizeof(int))) {
fprintf(stderr, "accessing ancillary data failed\n");
return -1;
}
*pfd = *ppfd;
return 0;
}
int UxsocketSendFd(int connfd, void *data, size_t len, int fd) {
ssize_t tx;
struct iovec iov = {
.iov_base = data,
.iov_len = len,
};
union {
char buf[CMSG_SPACE(sizeof(int))];
struct cmsghdr align;
} u;
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = u.buf,
.msg_controllen = 0,
.msg_flags = 0,
};
struct cmsghdr *cmsg = &u.align;
if (fd >= 0) {
msg.msg_controllen = sizeof(u.buf);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
*(int *)CMSG_DATA(cmsg) = fd;
}
if ((tx = sendmsg(connfd, &msg, 0)) != (ssize_t)len) {
fprintf(stderr, "tx == %zd\n", tx);
return -1;
}
return 0;
}
int ShmCreate(const char *path, size_t size, void **addr) {
int fd;
void *p;
if ((fd = open(path, O_CREAT | O_RDWR, 0666)) == -1) {
perror("util_create_shmsiszed: open failed");
goto error_out;
}
if (ftruncate(fd, size) != 0) {
perror("util_create_shmsiszed: ftruncate failed");
goto error_remove;
}
if ((p = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
fd, 0)) == (void *)-1) {
perror("util_create_shmsiszed: mmap failed");
goto error_remove;
}
memset(p, 0, size);
*addr = p;
return fd;
error_remove:
close(fd);
unlink(path);
error_out:
return -1;
}
void *ShmMap(int shm_fd, size_t *psize) {
void *p;
struct stat statbuf;
if (fstat(shm_fd, &statbuf) != 0) {
perror("shm_map: fstat failed");
return NULL;
}
p = mmap(NULL, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd,
0);
if (p == MAP_FAILED) {
perror("shm_map: mmap failed");
return NULL;
}
*psize = statbuf.st_size;
return p;
}
\ No newline at end of file
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef DIST_NET_RDMA_UTILS_H_
#define DIST_NET_RDMA_UTILS_H_
#include <stddef.h>
int UxsocketInit(const char *path);
int UxsocketConnect(const char *path);
int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd);
int UxsocketSendFd(int connfd, void *data, size_t len, int fd);
int ShmCreate(const char *path, size_t size, void **addr);
void *ShmMap(int shm_fd, size_t *size);
#endif // DIST_NET_RDMA_UTILS_H_
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment