Commit eaa624d8 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist/net_rdma: cleanup now works too

With this longer tests (5sec iperf with kvm) work without problems now.
parent ef2f2d88
......@@ -40,6 +40,9 @@
#include "dist/utils.h"
static const uint64_t kPollReportThreshold = 128;
static const uint64_t kCleanReportThreshold = 128;
const char *shm_path = NULL;
size_t shm_size = 256 * 1024 * 1024ULL; // 256MB
void *shm_base = NULL;
......@@ -195,13 +198,19 @@ static int PeersInitDevs() {
}
int PeerDevSendIntro(struct Peer *peer) {
#ifdef DEBUG
fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
peer->local_base = (void *) ((uintptr_t) peer->shm_base + di->d2n_offset);
peer->local_elen = di->d2n_elen;
peer->local_enum = di->d2n_nentries;
peer->cleanup_base = (void *) ((uintptr_t) peer->shm_base + di->n2d_offset);
peer->cleanup_elen = di->n2d_elen;
peer->cleanup_enum = di->n2d_nentries;
struct SimbricksProtoNetNetIntro *ni = &peer->net_intro;
ssize_t ret = send(peer->sock_fd, ni, sizeof(*ni), 0);
if (ret < 0) {
......@@ -215,7 +224,9 @@ int PeerDevSendIntro(struct Peer *peer) {
}
int PeerNetSetupQueues(struct Peer *peer) {
#ifdef DEBUG
fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
#endif
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
if (ShmAlloc(di->d2n_elen * di->d2n_nentries, &di->d2n_offset)) {
......@@ -233,6 +244,10 @@ int PeerNetSetupQueues(struct Peer *peer) {
peer->local_elen = di->n2d_elen;
peer->local_enum = di->n2d_nentries;
peer->cleanup_base = (void *) ((uintptr_t) shm_base + di->d2n_offset);
peer->cleanup_elen = di->d2n_elen;
peer->cleanup_enum = di->d2n_nentries;
if (UxsocketSendFd(peer->sock_fd, di, sizeof(*di), peer->shm_fd)) {
fprintf(stderr, "PeerNetSetupQueues: sending welcome message failed (%lu)",
peer - peers);
......@@ -241,9 +256,41 @@ int PeerNetSetupQueues(struct Peer *peer) {
return 0;
}
int PeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
if (written_pos == peer->cleanup_pos_last &&
clean_pos == peer->local_pos_cleaned)
return 0;
#ifdef DEBUG
fprintf(stderr, "PeerReport: peer %s written %u -> %u, cleaned %u -> %u\n",
peer->sock_path, peer->cleanup_pos_last, written_pos,
peer->local_pos_cleaned, clean_pos);
#endif
peer->cleanup_pos_last = written_pos;
while (peer->local_pos_cleaned != clean_pos) {
void *entry =
(peer->local_base + peer->local_pos_cleaned * peer->local_elen);
if (peer->is_dev) {
struct SimbricksProtoNetD2NDummy *d2n = entry;
d2n->own_type = SIMBRICKS_PROTO_NET_D2N_OWN_DEV;
} else {
struct SimbricksProtoNetN2DDummy *n2d = entry;
n2d->own_type = SIMBRICKS_PROTO_NET_N2D_OWN_NET;
}
peer->local_pos_cleaned += 1;
if (peer->local_pos_cleaned >= peer->local_enum)
peer->local_pos_cleaned -= peer->local_enum;
}
return 0;
}
static int PeerEvent(struct Peer *peer, uint32_t events) {
#ifdef DEBUG
fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
fflush(stdout);
#endif
// disable peer if not an input event
if (!(events & EPOLLIN)) {
......@@ -293,12 +340,8 @@ static int PeerEvent(struct Peer *peer, uint32_t events) {
return 0;
}
static void *PollThread(void *data) {
while (true) {
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (!peer->ready)
continue;
static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
// XXX: consider batching this to forward multiple entries at once if possible
void *entry = (peer->local_base + peer->local_pos * peer->local_elen);
bool ready;
......@@ -317,8 +360,64 @@ static void *PollThread(void *data) {
peer->local_pos += 1;
if (peer->local_pos >= peer->local_enum)
peer->local_pos -= peer->local_enum;
uint64_t unreported = (peer->local_pos - peer->local_pos_reported) %
peer->local_enum;
if (unreported >= kPollReportThreshold)
*report = true;
}
}
static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
// XXX: could also be batched
if (peer->cleanup_pos_next == peer->cleanup_pos_last)
return;
void *entry =
(peer->cleanup_base + peer->cleanup_pos_next * peer->cleanup_elen);
bool ready;
if (peer->is_dev) {
struct SimbricksProtoNetN2DDummy *n2d = entry;
ready = (n2d->own_type & SIMBRICKS_PROTO_NET_N2D_OWN_MASK) ==
SIMBRICKS_PROTO_NET_N2D_OWN_NET;
} else {
struct SimbricksProtoNetD2NDummy *d2n = entry;
ready = (d2n->own_type & SIMBRICKS_PROTO_NET_D2N_OWN_MASK) ==
SIMBRICKS_PROTO_NET_D2N_OWN_DEV;
}
if (ready) {
#ifdef DEBUG
fprintf(stderr, "PollPeerCleanup: peer %s has clean entry at %u\n",
peer->sock_path, peer->cleanup_pos_next);
#endif
peer->cleanup_pos_next += 1;
if (peer->cleanup_pos_next >= peer->cleanup_enum)
peer->cleanup_pos_next -= peer->cleanup_enum;
uint64_t unreported = (peer->cleanup_pos_next - peer->cleanup_pos_reported)
% peer->cleanup_enum;
if (unreported >= kCleanReportThreshold)
*report = true;
}
}
static void *PollThread(void *data) {
while (true) {
// poll queue for transferring entries
bool report = false;
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (!peer->ready)
continue;
PollPeerTransfer(peer, &report);
PollPeerCleanup(peer, &report);
}
if (report)
RdmaPassReport();
}
return NULL;
}
......@@ -349,7 +448,10 @@ int main(int argc, char *argv[]) {
if (ParseArgs(argc, argv))
return EXIT_FAILURE;
#ifdef DEBUG
fprintf(stderr, "pid=%d shm=%s\n", getpid(), shm_path);
#endif
if ((shm_fd = ShmCreate(shm_path, shm_size, &shm_base)) < 0)
return EXIT_FAILURE;
......
......@@ -32,21 +32,44 @@
#include <simbricks/proto/network.h>
struct Peer {
struct SimbricksProtoNetDevIntro dev_intro;
struct SimbricksProtoNetNetIntro net_intro;
const char *sock_path;
/* base address of the local queue we're polling.
(d2n or n2d depending on is_dev). */
uint8_t *local_base;
uint64_t local_elen;
uint64_t local_enum;
uint64_t local_pos;
uint32_t local_elen;
uint32_t local_enum;
uint32_t local_pos;
// last position reported to our peer
uint32_t local_pos_reported;
// last position cleaned
uint32_t local_pos_cleaned;
// rkey and base address of the remote queue to write to
uint64_t remote_rkey;
uint64_t remote_base;
/* For cleanup we poll the queue just to see when entries get freed. We need
to know up to where we can poll, i.e. what entries the peer has written to.
The peer communicates this position periodically and we store it in
`cleanup_pos_last`. `cleanup_pos_next` refers to the next entry we will
poll. Finally we need to report the freed positions to our peer again, so
the peer can mark these entries as unused in it's local queue, we again do
this periodically and keep track of the last communicated position in
`cleanup_pos_reported`. */
uint8_t *cleanup_base;
uint32_t cleanup_elen;
uint32_t cleanup_enum;
// next position to be cleaned up
uint32_t cleanup_pos_next;
// first entry not ready for cleanup yet
volatile uint32_t cleanup_pos_last;
// last cleanup position reported to peer
uint32_t cleanup_pos_reported;
struct SimbricksProtoNetDevIntro dev_intro;
struct SimbricksProtoNetNetIntro net_intro;
const char *sock_path;
/* RDMA memory region for the shared memory of the queues on this end. Could
be either our own global SHM region if this is a network peer, or the SHM
region allocated by the device peer. */
......@@ -78,11 +101,13 @@ extern int epfd;
int PeerDevSendIntro(struct Peer *peer);
int PeerNetSetupQueues(struct Peer *peer);
int PeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos);
int RdmaListen(struct sockaddr_in *addr);
int RdmaConnect(struct sockaddr_in *addr);
int RdmaPassIntro(struct Peer *peer);
int RdmaPassEntry(struct Peer *peer);
int RdmaPassReport();
int RdmaEvent();
#endif // DIST_NET_RDMA_NET_RDMA_H_
\ No newline at end of file
......@@ -33,11 +33,19 @@
#define MSG_RXBUFS 16
#define MSG_TXBUFS 16
#define MAX_PEERS 32
struct NetRdmaReportMsg {
uint32_t written_pos[MAX_PEERS];
uint32_t clean_pos[MAX_PEERS];
bool valid[MAX_PEERS];
} __attribute__ ((packed));
struct NetRdmaMsg {
union {
struct SimbricksProtoNetDevIntro dev;
struct SimbricksProtoNetNetIntro net;
struct NetRdmaReportMsg report;
struct NetRdmaMsg *next_free;
};
uint64_t id;
......@@ -47,6 +55,7 @@ struct NetRdmaMsg {
enum {
kMsgDev,
kMsgNet,
kMsgReport,
} msg_type;
} __attribute__ ((packed));
......@@ -95,7 +104,7 @@ static int RdmMsgRxEnqueue(struct NetRdmaMsg *msg) {
return 0;
}
static int RdmaMsgRx(struct NetRdmaMsg *msg) {
static int RdmaMsgRxIntro(struct NetRdmaMsg *msg) {
if (msg->id >= peer_num) {
fprintf(stderr, "RdmMsgRx: invalid peer id in message (%lu)\n", msg->id);
abort();
......@@ -136,6 +145,30 @@ static int RdmaMsgRx(struct NetRdmaMsg *msg) {
return 0;
}
static int RdmaMsgRxReport(struct NetRdmaMsg *msg) {
for (size_t i = 0; i < MAX_PEERS && i < peer_num; i++) {
if (!msg->report.valid[i])
continue;
if (i >= peer_num) {
fprintf(stderr, "RdmaMsgRxReport: invalid ready peer number %zu\n", i);
abort();
}
PeerReport(&peers[i], msg->report.written_pos[i], msg->report.clean_pos[i]);
}
return 0;
}
static int RdmaMsgRx(struct NetRdmaMsg *msg) {
if (msg->msg_type == kMsgDev || msg->msg_type == kMsgNet)
return RdmaMsgRxIntro(msg);
else if (msg->msg_type == kMsgReport)
return RdmaMsgRxReport(msg);
fprintf(stderr, "RdmaMsgRx: unexpected message type = %u\n", msg->msg_type);
abort();
}
static int RdmaCommonInit() {
if (!(pd = ibv_alloc_pd(cm_id->verbs))) {
perror("RdmaCommonInit: ibv_alloc_pd failed");
......@@ -181,7 +214,9 @@ static int RdmaCommonInit() {
perror("RdmMsgRxEnqueue: ibv_req_notify_cq failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "Enqueue rx buffers\n");
#endif
// post receive operations for all rx buffers
for (int i = 0; i < MSG_RXBUFS; i++)
if (RdmMsgRxEnqueue(&msgs[i]))
......@@ -234,7 +269,9 @@ int RdmaListen(struct sockaddr_in *addr) {
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: listen done\n");
#endif
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
......@@ -247,7 +284,9 @@ int RdmaListen(struct sockaddr_in *addr) {
cm_id = event->id;
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: got conn request\n");
#endif
if (RdmaCommonInit())
return 1;
......@@ -258,7 +297,10 @@ int RdmaListen(struct sockaddr_in *addr) {
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: accept done\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
......@@ -268,7 +310,10 @@ int RdmaListen(struct sockaddr_in *addr) {
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: conn established\n");
#endif
if (RdmaCommonSetNonblock())
return 1;
......@@ -302,7 +347,10 @@ int RdmaConnect(struct sockaddr_in *addr) {
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: address resolved\n");
#endif
if (rdma_resolve_route(cm_id, 5000)) {
perror("RdmaConnect: rdma_resolve_route failed");
......@@ -318,7 +366,10 @@ int RdmaConnect(struct sockaddr_in *addr) {
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: route resolved\n");
#endif
if (RdmaCommonInit())
return 1;
......@@ -330,7 +381,10 @@ int RdmaConnect(struct sockaddr_in *addr) {
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: connect issued\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (connect)");
return 1;
......@@ -348,7 +402,9 @@ int RdmaConnect(struct sockaddr_in *addr) {
}
int RdmaEvent() {
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaEvent [pid=%d]\n", getpid());
#endif
struct ibv_cq *ecq;
void *ectx;
......@@ -371,10 +427,15 @@ int RdmaEvent() {
perror("RdmaEvent: ibv_poll_cq failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, " n=%d\n", n);
#endif
for (int i = 0; i < n; i++) {
if (wcs[i].opcode == IBV_WC_SEND) {
#ifdef RDMA_DEBUG
fprintf(stderr, "Send done\n", n);
#endif
if (wcs[i].status != IBV_WC_SUCCESS) {
fprintf(stderr, "RdmaEvent: unsuccessful send (%u)\n", wcs[i].status);
abort();
......@@ -383,7 +444,9 @@ int RdmaEvent() {
// need to free the send buffer again
RdmaMsgFree(msgs + wcs[i].wr_id);
} else if ((wcs[i].opcode & IBV_WC_RECV)) {
#ifdef RDMA_DEBUG
fprintf(stderr, "Recv done\n", n);
#endif
if (wcs[i].status != IBV_WC_SUCCESS) {
fprintf(stderr, "RdmaEvent: unsuccessful recv (%u)\n", wcs[i].status);
......@@ -404,7 +467,9 @@ int RdmaEvent() {
}
int RdmaPassIntro(struct Peer *peer) {
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassIntro(%s)\n", peer->sock_path);
#endif
// device peers have sent us an SHM region, need to register this an as MR
if (peer->is_dev) {
......@@ -466,15 +531,20 @@ int RdmaPassIntro(struct Peer *peer) {
perror("RdmaPassIntro: ibv_post_send failed");
return 1;
}
fprintf(stderr, "RdmaPassIntro: ibv_post_send done\n");
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassIntro: ibv_post_send done\n");
#endif
return 0;
}
int RdmaPassEntry(struct Peer *peer) {
fprintf(stderr, "RdmaPassEntry(%s,%lu)\n", peer->sock_path, peer->local_pos);
fprintf(stderr, " remote_base=%lx local_base=%lx\n", peer->remote_base,
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassEntry(%s,%u)\n", peer->sock_path, peer->local_pos);
fprintf(stderr, " remote_base=%lx local_base=%p\n", peer->remote_base,
peer->local_base);
#endif
uint64_t pos = peer->local_pos * peer->local_elen;
struct ibv_sge sge;
sge.addr = (uintptr_t) (peer->local_base + pos);
......@@ -496,3 +566,56 @@ int RdmaPassEntry(struct Peer *peer) {
}
return 0;
}
int RdmaPassReport() {
if (peer_num > MAX_PEERS) {
fprintf(stderr, "RdmaPassReport: peer_num (%zu) larger than max (%u)\n",
peer_num, MAX_PEERS);
abort();
}
struct NetRdmaMsg *msg = RdmaMsgAlloc();
if (!msg)
return 1;
msg->msg_type = kMsgReport;
for (size_t i = 0; i < MAX_PEERS; i++) {
if (i >= peer_num) {
msg->report.valid[i] = false;
continue;
}
struct Peer *peer = &peers[i];
msg->report.valid[i] = peer->ready;
if (!peer->ready)
continue;
peer->cleanup_pos_reported = peer->cleanup_pos_next;
msg->report.clean_pos[i] = peer->cleanup_pos_reported;
peer->local_pos_reported = peer->local_pos;
msg->report.written_pos[i] = peer->local_pos_reported;
}
struct ibv_sge sge;
sge.addr = (uintptr_t) msg;
sge.length = sizeof(*msg);
sge.lkey = mr_msgs->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = msg - msgs;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
if (ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr)) {
perror("RdmaPassReport: ibv_post_send failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassReport: ibv_post_send done\n");
#endif
return 0;
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment