Commit 8f1eda06 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist: factor out rdma cm code to separate functions/file

To add manual infiniband initalization as an alternative in a next step.
parent 1635c937
...@@ -22,11 +22,12 @@ ...@@ -22,11 +22,12 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
#include "dist/rdma.h"
#include "dist/net_rdma.h" #include "dist/net_rdma.h"
#include <fcntl.h> #include <fcntl.h>
#include <infiniband/verbs.h>
#include <pthread.h> #include <pthread.h>
#include <rdma/rdma_cma.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/epoll.h> #include <sys/epoll.h>
...@@ -62,9 +63,8 @@ struct NetRdmaMsg { ...@@ -62,9 +63,8 @@ struct NetRdmaMsg {
} msg_type; } msg_type;
} __attribute__((packed)); } __attribute__((packed));
static struct rdma_event_channel *cm_channel; static struct ibv_context *ib_ctx;
static struct rdma_conn_param conn_param = { }; static struct ibv_qp *qp;
static struct rdma_cm_id *cm_id;
static struct ibv_pd *pd; static struct ibv_pd *pd;
static struct ibv_cq *cq; static struct ibv_cq *cq;
static struct ibv_comp_channel *comp_chan; static struct ibv_comp_channel *comp_chan;
...@@ -105,7 +105,7 @@ static int RdmMsgRxEnqueue(struct NetRdmaMsg *msg) { ...@@ -105,7 +105,7 @@ static int RdmMsgRxEnqueue(struct NetRdmaMsg *msg) {
recv_wr.sg_list = &sge; recv_wr.sg_list = &sge;
recv_wr.num_sge = 1; recv_wr.num_sge = 1;
struct ibv_recv_wr *bad_recv_wr; struct ibv_recv_wr *bad_recv_wr;
if (ibv_post_recv(cm_id->qp, &recv_wr, &bad_recv_wr)) { if (ibv_post_recv(qp, &recv_wr, &bad_recv_wr)) {
perror("RdmMsgRxEnqueue: ibv_post_recv failed"); perror("RdmMsgRxEnqueue: ibv_post_recv failed");
return 1; return 1;
} }
...@@ -178,23 +178,25 @@ static int RdmaMsgRx(struct NetRdmaMsg *msg) { ...@@ -178,23 +178,25 @@ static int RdmaMsgRx(struct NetRdmaMsg *msg) {
abort(); abort();
} }
static int RdmaCommonInit() { int RdmaCommonInit(struct ibv_context *ctx) {
ib_ctx = ctx;
if (pthread_spin_init(&freelist_spin, PTHREAD_PROCESS_PRIVATE)) { if (pthread_spin_init(&freelist_spin, PTHREAD_PROCESS_PRIVATE)) {
perror("RdmaCommonInit: pthread_spin_init failed"); perror("RdmaCommonInit: pthread_spin_init failed");
return 1; return 1;
} }
if (!(pd = ibv_alloc_pd(cm_id->verbs))) { if (!(pd = ibv_alloc_pd(ib_ctx))) {
perror("RdmaCommonInit: ibv_alloc_pd failed"); perror("RdmaCommonInit: ibv_alloc_pd failed");
return 1; return 1;
} }
if (!(comp_chan = ibv_create_comp_channel(cm_id->verbs))) { if (!(comp_chan = ibv_create_comp_channel(ib_ctx))) {
perror("RdmaCommonInit: ibv_create_comp_channel failed"); perror("RdmaCommonInit: ibv_create_comp_channel failed");
return 1; return 1;
} }
if (!(cq = ibv_create_cq(cm_id->verbs, 1024, NULL, comp_chan, 0))) { if (!(cq = ibv_create_cq(ib_ctx, 1024, NULL, comp_chan, 0))) {
perror("RdmaCommonInit: ibv_create_cq failed"); perror("RdmaCommonInit: ibv_create_cq failed");
return 1; return 1;
} }
...@@ -218,14 +220,13 @@ static int RdmaCommonInit() { ...@@ -218,14 +220,13 @@ static int RdmaCommonInit() {
qp_attr.send_cq = cq; qp_attr.send_cq = cq;
qp_attr.recv_cq = cq; qp_attr.recv_cq = cq;
qp_attr.qp_type = IBV_QPT_RC; qp_attr.qp_type = IBV_QPT_RC;
if (!(qp = RdmaCMCreateQP(pd, &qp_attr))) {
if (rdma_create_qp(cm_id, pd, &qp_attr)) { fprintf(stderr, "RdmaCommonInit: RdmaCMCreateQP failed\n");
perror("RdmaCommonInit: rdma_create_qp failed");
return 1; return 1;
} }
if (ibv_req_notify_cq(cq, 0)) { if (ibv_req_notify_cq(cq, 0)) {
perror("RdmMsgRxEnqueue: ibv_req_notify_cq failed"); perror("RdmaCommonInit: ibv_req_notify_cq failed");
return 1; return 1;
} }
#ifdef RDMA_DEBUG #ifdef RDMA_DEBUG
...@@ -262,73 +263,9 @@ static int RdmaCommonSetNonblock() { ...@@ -262,73 +263,9 @@ static int RdmaCommonSetNonblock() {
} }
int RdmaListen(struct sockaddr_in *addr) { int RdmaListen(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) { if (RdmaCMListen(addr))
perror("RdmaListen: rdma_create_event_channel failed");
return 1;
}
struct rdma_cm_id *listen_id;
if (rdma_create_id(cm_channel, &listen_id, NULL, RDMA_PS_TCP)) {
perror("RdmaListen: rdma_create_id failed");
return 1;
}
if (rdma_bind_addr(listen_id, (struct sockaddr *) addr)) {
perror("RdmaListen: rdma_bind_addr failed");
return 1;
}
if (rdma_listen(listen_id, 1)) {
perror("RdmaListen: rdma_listen failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: listen done\n");
#endif
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
cm_id = event->id;
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: got conn request\n");
#endif
if (RdmaCommonInit())
return 1; return 1;
conn_param.responder_resources = 1;
if (rdma_accept(cm_id, &conn_param)) {
perror("RdmaListen: rdma_accept failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: accept done\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: conn established\n");
#endif
if (RdmaCommonSetNonblock()) if (RdmaCommonSetNonblock())
return 1; return 1;
...@@ -336,78 +273,8 @@ int RdmaListen(struct sockaddr_in *addr) { ...@@ -336,78 +273,8 @@ int RdmaListen(struct sockaddr_in *addr) {
} }
int RdmaConnect(struct sockaddr_in *addr) { int RdmaConnect(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) { if (RdmaCMConnect(addr))
perror("RdmaConnect: rdma_create_event_channel failed");
return 1; return 1;
}
if (rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP)) {
perror("RdmaConnect: rdma_create_id failed");
return 1;
}
if (rdma_resolve_addr(cm_id, NULL, (struct sockaddr *) addr, 5000)) {
perror("RdmaConnect: rdma_resolve_addr failed");
return 1;
}
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (addr)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ADDR_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: address resolved\n");
#endif
if (rdma_resolve_route(cm_id, 5000)) {
perror("RdmaConnect: rdma_resolve_route failed");
return 1;
}
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (route)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ROUTE_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: route resolved\n");
#endif
if (RdmaCommonInit())
return 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
if (rdma_connect(cm_id, &conn_param)) {
perror("RdmaConnect: rdma_connect failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: connect issued\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (connect)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
if (RdmaCommonSetNonblock()) if (RdmaCommonSetNonblock())
return 1; return 1;
...@@ -543,7 +410,7 @@ int RdmaPassIntro(struct Peer *peer) { ...@@ -543,7 +410,7 @@ int RdmaPassIntro(struct Peer *peer) {
send_wr.num_sge = 1; send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr; struct ibv_send_wr *bad_send_wr;
if (ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr)) { if (ibv_post_send(qp, &send_wr, &bad_send_wr)) {
perror("RdmaPassIntro: ibv_post_send failed"); perror("RdmaPassIntro: ibv_post_send failed");
return 1; return 1;
} }
...@@ -583,7 +450,7 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) { ...@@ -583,7 +450,7 @@ int RdmaPassEntry(struct Peer *peer, uint32_t n) {
send_wr.num_sge = 1; send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr; struct ibv_send_wr *bad_send_wr;
int ret = ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr); int ret = ibv_post_send(qp, &send_wr, &bad_send_wr);
if (ret == 0) { if (ret == 0) {
break; break;
} else if (ret != ENOMEM) { } else if (ret != ENOMEM) {
...@@ -640,7 +507,7 @@ int RdmaPassReport() { ...@@ -640,7 +507,7 @@ int RdmaPassReport() {
send_wr.num_sge = 1; send_wr.num_sge = 1;
struct ibv_send_wr *bad_send_wr; struct ibv_send_wr *bad_send_wr;
int ret = ibv_post_send(cm_id->qp, &send_wr, &bad_send_wr); int ret = ibv_post_send(qp, &send_wr, &bad_send_wr);
if (ret == 0) { if (ret == 0) {
break; break;
} else if (ret != ENOMEM) { } else if (ret != ENOMEM) {
......
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef DIST_RDMA_H_
#define DIST_RDMA_H_
#include "dist/net_rdma.h"
#include <infiniband/verbs.h>
int RdmaCommonInit(struct ibv_context *ctx);
int RdmaCMListen(struct sockaddr_in *addr);
int RdmaCMConnect(struct sockaddr_in *addr);
struct ibv_qp *RdmaCMCreateQP(struct ibv_pd *pd,
struct ibv_qp_init_attr *attr);
#endif // DIST_RDMA_H_
/*
* Copyright 2021 Max Planck Institute for Software Systems, and
* National University of Singapore
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/rdma.h"
#include "dist/net_rdma.h"
#include <rdma/rdma_cma.h>
#include <stdio.h>
#include <stdlib.h>
static struct rdma_event_channel *cm_channel;
static struct rdma_conn_param conn_param = { };
static struct rdma_cm_id *cm_id;
int RdmaCMListen(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) {
perror("RdmaListen: rdma_create_event_channel failed");
return 1;
}
struct rdma_cm_id *listen_id;
if (rdma_create_id(cm_channel, &listen_id, NULL, RDMA_PS_TCP)) {
perror("RdmaListen: rdma_create_id failed");
return 1;
}
if (rdma_bind_addr(listen_id, (struct sockaddr *) addr)) {
perror("RdmaListen: rdma_bind_addr failed");
return 1;
}
if (rdma_listen(listen_id, 1)) {
perror("RdmaListen: rdma_listen failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: listen done\n");
#endif
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
cm_id = event->id;
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: got conn request\n");
#endif
if (RdmaCommonInit(cm_id->verbs))
return 1;
conn_param.responder_resources = 1;
if (rdma_accept(cm_id, &conn_param)) {
perror("RdmaListen: rdma_accept failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: accept done\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaListen: rdma_get_cm_event failed");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaListen: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaListen: conn established\n");
#endif
return 0;
}
int RdmaCMConnect(struct sockaddr_in *addr) {
if (!(cm_channel = rdma_create_event_channel())) {
perror("RdmaConnect: rdma_create_event_channel failed");
return 1;
}
if (rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP)) {
perror("RdmaConnect: rdma_create_id failed");
return 1;
}
if (rdma_resolve_addr(cm_id, NULL, (struct sockaddr *) addr, 5000)) {
perror("RdmaConnect: rdma_resolve_addr failed");
return 1;
}
struct rdma_cm_event *event;
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (addr)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ADDR_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: address resolved\n");
#endif
if (rdma_resolve_route(cm_id, 5000)) {
perror("RdmaConnect: rdma_resolve_route failed");
return 1;
}
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (route)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u instead of %u)\n",
event->event, RDMA_CM_EVENT_ROUTE_RESOLVED);
return 1;
}
rdma_ack_cm_event(event);
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: route resolved\n");
#endif
if (RdmaCommonInit(cm_id->verbs))
return 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
if (rdma_connect(cm_id, &conn_param)) {
perror("RdmaConnect: rdma_connect failed");
return 1;
}
#ifdef RDMA_DEBUG
fprintf(stderr, "RdmaConnect: connect issued\n");
#endif
if (rdma_get_cm_event(cm_channel, &event)) {
perror("RdmaConnect: rdma_get_cm_event failed (connect)");
return 1;
}
if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "RdmaConnect: unexpected event (%u)\n", event->event);
return 1;
}
rdma_ack_cm_event(event);
return 0;
}
struct ibv_qp *RdmaCMCreateQP(struct ibv_pd *pd,
struct ibv_qp_init_attr *attr) {
if (rdma_create_qp(cm_id, pd, attr)) {
perror("RdmaCommonInit: rdma_create_qp failed");
return NULL;
}
return cm_id->qp;
}
...@@ -24,7 +24,7 @@ include mk/subdir_pre.mk ...@@ -24,7 +24,7 @@ include mk/subdir_pre.mk
bin_net_rdma := $(d)net_rdma bin_net_rdma := $(d)net_rdma
OBJS := $(addprefix $(d), net_rdma.o rdma.o utils.o) OBJS := $(addprefix $(d), net_rdma.o rdma.o rdma_cm.o utils.o)
$(bin_net_rdma): $(OBJS) -lrdmacm -libverbs -lpthread $(bin_net_rdma): $(OBJS) -lrdmacm -libverbs -lpthread
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment