Commit 623eb3e5 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist/net_rdma: increase robustness (add retries, signalling)

parent 192db8b4
......@@ -36,6 +36,7 @@
#define MSG_RXBUFS 512
#define MSG_TXBUFS 512
#define MAX_PEERS 32
#define SIG_THRESHOLD 32
struct NetRdmaReportMsg {
uint32_t written_pos[MAX_PEERS];
......@@ -74,6 +75,7 @@ static struct ibv_qp_init_attr qp_attr = { };
static struct NetRdmaMsg msgs[MSG_RXBUFS + MSG_TXBUFS];
pthread_spinlock_t freelist_spin;
static struct NetRdmaMsg *msgs_free = NULL;
static uint32_t last_signaled = 0;
static struct NetRdmaMsg *RdmaMsgAlloc() {
pthread_spin_lock(&freelist_spin);
......@@ -467,6 +469,8 @@ int RdmaEvent() {
struct NetRdmaMsg *msg = msgs + wcs[i].wr_id;
if (RdmaMsgRx(msg) || RdmMsgRxEnqueue(msg))
return 1;
} else if ((wcs[i].opcode & IBV_WC_RDMA_WRITE)) {
/* just a signalled write every once in a while to clear queue*/
} else {
fprintf(stderr, "RdmaEvent: unexpected opcode %u\n", wcs[i].opcode);
abort();
......@@ -557,24 +561,36 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) {
peer->local_base);
#endif
uint64_t pos = peer->local_pos * peer->local_elen;
struct ibv_sge sge;
sge.addr = (uintptr_t) (peer->local_base + pos);
sge.length = peer->local_elen * n;
sge.lkey = peer->shm_mr->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = -1ULL;
send_wr.opcode = IBV_WR_RDMA_WRITE;
send_wr.wr.rdma.remote_addr = peer->remote_base + pos;
send_wr.wr.rdma.rkey = peer->remote_rkey;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
if (ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr)) {
perror("RdmaPassEntry: ibv_post_send failed");
return 1;
bool triggerSig = ++last_signaled > SIG_THRESHOLD;
if (triggerSig)
last_signaled = 0;
while (1) {
uint64_t pos = peer->local_pos * peer->local_elen;
struct ibv_sge sge;
sge.addr = (uintptr_t) (peer->local_base + pos);
sge.length = peer->local_elen * n;
sge.lkey = peer->shm_mr->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = -1ULL;
send_wr.opcode = IBV_WR_RDMA_WRITE;
if (triggerSig)
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr.rdma.remote_addr = peer->remote_base + pos;
send_wr.wr.rdma.rkey = peer->remote_rkey;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
int ret = ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr);
if (ret == 0) {
break;
} else if (ret != ENOMEM) {
fprintf(stderr, "RdmaPassEntry: ibv_post_send failed %d (%s)\n", ret,
strerror(ret));
return 1;
}
}
return 0;
}
......@@ -608,26 +624,31 @@ int RdmaPassReport() {
msg->report.written_pos[i] = peer->local_pos_reported;
}
struct ibv_sge sge;
sge.addr = (uintptr_t) msg;
sge.length = sizeof(*msg);
sge.lkey = mr_msgs->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = msg - msgs;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
if (ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr)) {
perror("RdmaPassReport: ibv_post_send failed");
return 1;
last_signaled = 0;
while (1) {
struct ibv_sge sge;
sge.addr = (uintptr_t) msg;
sge.length = sizeof(*msg);
sge.lkey = mr_msgs->lkey;
struct ibv_send_wr send_wr = { };
send_wr.wr_id = msg - msgs;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr;
int ret = ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr);
if (ret == 0) {
break;
} else if (ret != ENOMEM) {
fprintf(stderr, "RdmaPassReport: ibv_post_send failed %u (%s)", ret,
strerror(ret));
return 1;
}
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaPassReport: ibv_post_send done\n");
#endif
return 0;
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment