Commit f444ddb4 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist/sockets: new API refactor

parent 31d9a0d1
......@@ -22,7 +22,7 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "dist/common/net.h"
#include "dist/common/base.h"
#include <assert.h>
#include <fcntl.h>
......@@ -36,7 +36,7 @@
#include <sys/mman.h>
#include <unistd.h>
#include <simbricks/proto/base.h>
#include <simbricks/base/proto.h>
#include "dist/common/utils.h"
......@@ -55,7 +55,7 @@ struct Peer *peers = NULL;
static int epfd = -1;
int NetInit(const char *shm_path_, size_t shm_size_, int epfd_) {
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_) {
shm_size = shm_size_;
if ((shm_fd = ShmCreate(shm_path_, shm_size_, &shm_base)) < 0)
return 1;
......@@ -79,7 +79,7 @@ static int ShmAlloc(size_t size, uint64_t *off) {
return 0;
}
bool NetPeerAdd(const char *path, bool dev) {
bool BasePeerAdd(const char *path, bool listener) {
struct Peer *peer = realloc(peers, sizeof(*peers) * (peer_num + 1));
if (!peer) {
perror("NetPeerAdd: realloc failed");
......@@ -94,7 +94,7 @@ bool NetPeerAdd(const char *path, bool dev) {
perror("NetPeerAdd: strdup failed");
return false;
}
peer->is_dev = dev;
peer->is_listener = listener;
peer->sock_fd = -1;
peer->shm_fd = -1;
peer->last_sent_pos = -1;
......@@ -102,14 +102,14 @@ bool NetPeerAdd(const char *path, bool dev) {
}
int NetListen() {
int BaseListen() {
#ifdef DEBUG
fprintf(stderr, "Creating net listening sockets\n");
fprintf(stderr, "Creating listening sockets\n");
#endif
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (peer->is_dev)
if (!peer->is_listener)
continue;
#ifdef DEBUG
......@@ -135,14 +135,14 @@ int NetListen() {
return 0;
}
int NetConnect() {
int BaseConnect() {
#ifdef DEBUG
fprintf(stderr, "Connecting to device sockets\n");
#endif
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
if (!peer->is_dev)
if (peer->is_listener)
continue;
#ifdef DEBUG
......@@ -163,59 +163,47 @@ int NetConnect() {
return 0;
}
int NetPeerSendDevIntro(struct Peer *peer) {
#ifdef DEBUG
fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
peer->local_base = (void *) ((uintptr_t) peer->shm_base + di->d2n_offset);
peer->local_elen = di->d2n_elen;
peer->local_enum = di->d2n_nentries;
peer->cleanup_base = (void *) ((uintptr_t) peer->shm_base + di->n2d_offset);
peer->cleanup_elen = di->n2d_elen;
peer->cleanup_enum = di->n2d_nentries;
struct SimbricksProtoNetNetIntro *ni = &peer->net_intro;
ssize_t ret = send(peer->sock_fd, ni, sizeof(*ni), 0);
if (ret < 0) {
perror("PeerDevSendIntro: send failed");
return 1;
} else if (ret != (ssize_t) sizeof(*ni)) {
fprintf(stderr, "PeerDevSendIntro: send incomplete\n");
return 1;
}
int BasePeerSetupQueues(struct Peer *peer) {
if (!peer->is_listener) {
/* only need to set up queues for listeners */
return 0;
}
}
int NetPeerSetupNetQueues(struct Peer *peer) {
struct SimbricksProtoNetDevIntro *di = &peer->dev_intro;
struct SimbricksProtoListenerIntro *li =
(struct SimbricksProtoListenerIntro *) peer->intro_remote;
#ifdef DEBUG
fprintf(stderr, "PeerNetSetupQueues(%s)\n", peer->sock_path);
fprintf(stderr, " d2n_el=%lu d2n_n=%lu n2d_el=%lu n2d_n=%lu\n", di->d2n_elen,
di->d2n_nentries, di->n2d_elen, di->n2d_nentries);
fprintf(stderr, " l2c_el=%lu l2c_n=%lu c2l_el=%lu c2l_n=%lu\n", li->l2c_elen,
li->l2c_nentries, li->c2l_elen, li->c2l_nentries);
#endif
if (ShmAlloc(di->d2n_elen * di->d2n_nentries, &di->d2n_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc d2n failed");
if (ShmAlloc(li->l2c_elen * li->l2c_nentries, &li->l2c_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc l2c failed");
return 1;
}
if (ShmAlloc(di->n2d_elen * di->n2d_nentries, &di->n2d_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc n2d failed");
if (ShmAlloc(li->c2l_elen * li->c2l_nentries, &li->c2l_offset)) {
fprintf(stderr, "PeerNetSetupQueues: ShmAlloc c2l failed");
return 1;
}
peer->shm_fd = shm_fd;
peer->shm_base = shm_base;
peer->local_base = (void *) ((uintptr_t) shm_base + di->n2d_offset);
peer->local_elen = di->n2d_elen;
peer->local_enum = di->n2d_nentries;
peer->local_base = (void *) ((uintptr_t) shm_base + li->c2l_offset);
peer->local_elen = li->c2l_elen;
peer->local_enum = li->c2l_nentries;
peer->cleanup_base = (void *) ((uintptr_t) shm_base + li->l2c_offset);
peer->cleanup_elen = li->l2c_elen;
peer->cleanup_enum = li->l2c_nentries;
peer->cleanup_base = (void *) ((uintptr_t) shm_base + di->d2n_offset);
peer->cleanup_elen = di->d2n_elen;
peer->cleanup_enum = di->d2n_nentries;
return 0;
}
int BasePeerSendIntro(struct Peer *peer) {
#ifdef DEBUG
fprintf(stderr, "PeerDevSendIntro(%s)\n", peer->sock_path);
#endif
if (peer->sock_fd == -1) {
/* We can receive the welcome message from our peer before our local
......@@ -225,18 +213,19 @@ int NetPeerSetupNetQueues(struct Peer *peer) {
fprintf(stderr, "PeerNetSetupQueues: socket not ready yet, delaying "
"send\n");
#endif
return 0;
return 1;
}
if (UxsocketSendFd(peer->sock_fd, di, sizeof(*di), peer->shm_fd)) {
fprintf(stderr, "PeerNetSetupQueues: sending welcome message failed (%lu)",
peer - peers);
int shm_fd = (peer->is_listener ? peer->shm_fd : -1);
if (UxsocketSendFd(peer->sock_fd, peer->intro_remote, peer->intro_remote_len,
shm_fd)) {
perror("BasePeerSendIntro: send failed");
return 1;
}
return 0;
}
int NetPeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
int BasePeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
uint32_t pos = peer->local_pos_cleaned;
if (written_pos == peer->cleanup_pos_last &&
clean_pos == pos)
......@@ -280,13 +269,10 @@ int NetPeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos) {
peer->cleanup_pos_last = written_pos;
while (pos != clean_pos) {
void *entry = (peer->local_base + pos * peer->local_elen);
if (peer->is_dev) {
struct SimbricksProtoNetD2NDummy *d2n = entry;
d2n->own_type = SIMBRICKS_PROTO_NET_D2N_OWN_DEV;
} else {
struct SimbricksProtoNetN2DDummy *n2d = entry;
n2d->own_type = SIMBRICKS_PROTO_NET_N2D_OWN_NET;
}
volatile union SimbricksProtoBaseMsg *msg =
(volatile union SimbricksProtoBaseMsg *) entry;
msg->header.own_type = (msg->header.own_type &
(~SIMBRICKS_PROTO_MSG_OWN_MASK)) | SIMBRICKS_PROTO_MSG_OWN_PRO;
pos += 1;
if (pos >= peer->local_enum)
......@@ -301,7 +287,7 @@ static int PeerAcceptEvent(struct Peer *peer) {
#ifdef DEBUG
fprintf(stderr, "PeerAcceptEvent(%s)\n", peer->sock_path);
#endif
assert(!peer->is_dev);
assert(peer->is_listener);
if ((peer->sock_fd = accept(peer->listen_fd, NULL, NULL)) < 0) {
perror("PeersInitNets: accept failed");
......@@ -329,17 +315,16 @@ static int PeerAcceptEvent(struct Peer *peer) {
fprintf(stderr, "PeerAcceptEvent(%s): sending welcome message\n",
peer->sock_path);
#endif
if (UxsocketSendFd(peer->sock_fd, &peer->dev_intro, sizeof(peer->dev_intro),
peer->shm_fd)) {
fprintf(stderr, "PeerAcceptEvent: sending welcome message failed (%lu)",
peer - peers);
if (BasePeerSendIntro(peer)) {
fprintf(stderr, "PeerAcceptEvent(%s): sending intro failed\n",
peer->sock_path);
return 1;
}
}
return 0;
}
int NetPeerEvent(struct Peer *peer, uint32_t events) {
int BasePeerEvent(struct Peer *peer, uint32_t events) {
#ifdef DEBUG
fprintf(stderr, "PeerEvent(%s)\n", peer->sock_path);
#endif
......@@ -353,7 +338,7 @@ int NetPeerEvent(struct Peer *peer, uint32_t events) {
}
// if peer is network and not yet connected, this is an accept event
if (!peer->is_dev && peer->sock_fd == -1) {
if (peer->is_listener && peer->sock_fd == -1) {
return PeerAcceptEvent(peer);
}
......@@ -365,29 +350,28 @@ int NetPeerEvent(struct Peer *peer, uint32_t events) {
}
// receive intro message
if (peer->is_dev) {
if (UxsocketRecvFd(peer->sock_fd, &peer->dev_intro, sizeof(peer->dev_intro),
&peer->shm_fd))
if (!peer->is_listener) {
/* not a listener, so we're expecting an fd for the shm region */
if (UxsocketRecvFd(peer->sock_fd, peer->intro_local,
sizeof(peer->intro_local), &peer->shm_fd))
return 1;
if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
return 1;
} else {
ssize_t ret = recv(peer->sock_fd, &peer->net_intro, sizeof(peer->net_intro),
0);
if (ret < 0) {
/* as a listener, we use our local shm region, so no fd is sent to us */
ssize_t ret = recv(peer->sock_fd, peer->intro_local,
sizeof(peer->intro_local), 0);
if (ret <= 0) {
perror("PeerEvent: recv failed");
return 1;
} else if (ret != (ssize_t) sizeof(peer->net_intro)) {
fprintf(stderr, "PeerEvent: partial receive (%zd)\n", ret);
return 1;
}
}
peer->intro_valid_local = true;
// pass intro along
if (NetOpPassIntro(peer))
if (BaseOpPassIntro(peer))
return 1;
if (peer->intro_valid_remote) {
......@@ -415,22 +399,15 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
}
void *entry = (peer->local_base + (peer->local_pos + n) * peer->local_elen);
bool ready;
if (peer->is_dev) {
struct SimbricksProtoNetD2NDummy *d2n = entry;
ready = (d2n->own_type & SIMBRICKS_PROTO_NET_D2N_OWN_MASK) ==
SIMBRICKS_PROTO_NET_D2N_OWN_NET;
} else {
struct SimbricksProtoNetN2DDummy *n2d = entry;
ready = (n2d->own_type & SIMBRICKS_PROTO_NET_N2D_OWN_MASK) ==
SIMBRICKS_PROTO_NET_N2D_OWN_DEV;
}
if (!ready)
volatile union SimbricksProtoBaseMsg *msg =
(volatile union SimbricksProtoBaseMsg *) entry;
if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
SIMBRICKS_PROTO_MSG_OWN_CON)
break;
}
if (n > 0) {
NetOpPassEntries(peer, peer->local_pos, n);
BaseOpPassEntries(peer, peer->local_pos, n);
uint32_t newpos = peer->local_pos + n;
peer->local_pos = (newpos < peer->local_enum ?
newpos :
......@@ -447,22 +424,15 @@ static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
if (peer->cleanup_pos_next == peer->cleanup_pos_last)
return;
bool ready;
uint64_t cnt = 0;
do {
void *entry =
(peer->cleanup_base + peer->cleanup_pos_next * peer->cleanup_elen);
if (peer->is_dev) {
struct SimbricksProtoNetN2DDummy *n2d = entry;
ready = (n2d->own_type & SIMBRICKS_PROTO_NET_N2D_OWN_MASK) ==
SIMBRICKS_PROTO_NET_N2D_OWN_NET;
} else {
struct SimbricksProtoNetD2NDummy *d2n = entry;
ready = (d2n->own_type & SIMBRICKS_PROTO_NET_D2N_OWN_MASK) ==
SIMBRICKS_PROTO_NET_D2N_OWN_DEV;
}
volatile union SimbricksProtoBaseMsg *msg =
(volatile union SimbricksProtoBaseMsg *) entry;
if (!ready)
if ((msg->header.own_type & SIMBRICKS_PROTO_MSG_OWN_MASK) !=
SIMBRICKS_PROTO_MSG_OWN_PRO)
break;
#ifdef DEBUG
......@@ -483,7 +453,7 @@ static inline void PollPeerCleanup(struct Peer *peer, bool *report) {
}
}
void NetPoll() {
void BasePoll() {
bool report = false;
for (size_t i = 0; i < peer_num; i++) {
struct Peer *peer = &peers[i];
......@@ -495,10 +465,10 @@ void NetPoll() {
}
if (report)
NetOpPassReport();
BaseOpPassReport();
}
void NetEntryReceived(struct Peer *peer, uint32_t pos, void *data)
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data)
{
// validate position for debugging:
if ((peer->cleanup_pos_reported <= peer->cleanup_pos_last &&
......@@ -514,30 +484,18 @@ void NetEntryReceived(struct Peer *peer, uint32_t pos, void *data)
uint64_t off = (uint64_t) pos * peer->cleanup_elen;
void *entry = peer->cleanup_base + off;
volatile union SimbricksProtoBaseMsg *msg =
(volatile union SimbricksProtoBaseMsg *) entry;
if (peer->is_dev) {
volatile struct SimbricksProtoNetD2NDummy *d2n = entry;
// first copy data after header
memcpy((void *) (d2n + 1), (uint8_t *) data + sizeof(*d2n),
peer->cleanup_elen - sizeof(*d2n));
// then copy header except for last byte
memcpy((void *) d2n, data, sizeof(*d2n) - 1);
// WMB()
// now copy last byte
volatile struct SimbricksProtoNetD2NDummy *src_d2n = data;
asm volatile("sfence" ::: "memory");
d2n->own_type = src_d2n->own_type;
} else {
volatile struct SimbricksProtoNetN2DDummy *n2d = entry;
// first copy data after header
memcpy((void *) (n2d + 1), (uint8_t *) data + sizeof(*n2d),
peer->cleanup_elen - sizeof(*n2d));
memcpy((void *) (msg + 1), (uint8_t *) data + sizeof(*msg),
peer->cleanup_elen - sizeof(*msg));
// then copy header except for last byte
memcpy((void *) n2d, data, sizeof(*n2d) - 1);
memcpy((void *) msg, data, sizeof(*msg) - 1);
// WMB()
// now copy last byte
volatile struct SimbricksProtoNetN2DDummy *src_n2d = data;
volatile union SimbricksProtoBaseMsg *src_msg =
(volatile union SimbricksProtoBaseMsg *) data;
asm volatile("sfence" ::: "memory");
n2d->own_type = src_n2d->own_type;
}
msg->header.own_type = src_msg->header.own_type;
}
\ No newline at end of file
......@@ -22,18 +22,16 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef DIST_COMMON_NET_H_
#define DIST_COMMON_NET_H_
#ifndef DIST_COMMON_BASE_H_
#define DIST_COMMON_BASE_H_
#include <arpa/inet.h>
#include <stdbool.h>
#include <stddef.h>
#include <simbricks/proto/network.h>
struct Peer {
/* base address of the local queue we're polling.
(d2n or n2d depending on is_dev). */
/* base address of the local queue we're polling. */
uint8_t *local_base;
uint32_t local_elen;
uint32_t local_enum;
......@@ -66,9 +64,6 @@ struct Peer {
// last cleanup position reported to peer
uint32_t cleanup_pos_reported;
struct SimbricksProtoNetDevIntro dev_intro __attribute__ ((aligned (8)));
struct SimbricksProtoNetNetIntro net_intro __attribute__ ((aligned (8)));
const char *sock_path;
// opaque value, e.g. to be used by rdma proxy for memory region
......@@ -80,35 +75,43 @@ struct Peer {
int sock_fd;
int shm_fd;
// is our local peer a device? (otherwise it's a network)
bool is_dev;
// is our local peer a listener?
bool is_listener;
bool intro_valid_local;
bool intro_valid_remote;
// set true when the queue is ready for polling
volatile bool ready;
/* intro received from our local peer */
bool intro_valid_local;
uint8_t intro_local[2048];
size_t intro_local_len;
/* intro received through proxy channel */
bool intro_valid_remote;
uint8_t intro_remote[2048];
size_t intro_remote_len;
};
extern void *shm_base;
extern size_t peer_num;
extern struct Peer *peers;
int NetInit(const char *shm_path_, size_t shm_size_, int epfd_);
bool NetPeerAdd(const char *path, bool dev);
struct Peer *NetPeerLookup(uint32_t id);
int NetListen(void);
int NetConnect(void);
void NetPoll(void);
int NetPeerSendDevIntro(struct Peer *peer);
int NetPeerSetupNetQueues(struct Peer *peer);
int NetPeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos);
int NetPeerEvent(struct Peer *peer, uint32_t events);
void NetEntryReceived(struct Peer *peer, uint32_t pos, void *data);
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_);
bool BasePeerAdd(const char *path, bool listener);
struct Peer *BasePeerLookup(uint32_t id);
int BaseListen(void);
int BaseConnect(void);
void BasePoll(void);
int BasePeerSetupQueues(struct Peer *peer);
int BasePeerSendIntro(struct Peer *peer);
int BasePeerReport(struct Peer *peer, uint32_t written_pos, uint32_t clean_pos);
int BasePeerEvent(struct Peer *peer, uint32_t events);
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data);
// To be implemented in proxy implementation
int NetOpPassIntro(struct Peer *peer);
int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n);
int NetOpPassReport();
int BaseOpPassIntro(struct Peer *peer);
int BaseOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n);
int BaseOpPassReport();
#endif // DIST_NET_RDMA_H_
#endif // DIST_COMMON_BASE_H_
......@@ -88,7 +88,7 @@ int UxsocketConnect(const char *path) {
return fd;
}
int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd) {
ssize_t UxsocketRecvFd(int fd, void *data, size_t len, int *pfd) {
int *ppfd;
ssize_t ret;
struct cmsghdr *cmsg;
......@@ -110,7 +110,7 @@ int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd) {
.msg_flags = 0,
};
if ((ret = recvmsg(fd, &msg, 0)) != (ssize_t) len) {
if ((ret = recvmsg(fd, &msg, 0)) <= 0) {
perror("recvmsg failed");
return -1;
}
......@@ -123,7 +123,7 @@ int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd) {
}
*pfd = *ppfd;
return 0;
return ret;
}
int UxsocketSendFd(int connfd, void *data, size_t len, int fd) {
......
......@@ -26,10 +26,11 @@
#define DIST_UTILS_H_
#include <stddef.h>
#include <sys/types.h>
int UxsocketInit(const char *path);
int UxsocketConnect(const char *path);
int UxsocketRecvFd(int fd, void *data, size_t len, int *pfd);
ssize_t UxsocketRecvFd(int fd, void *data, size_t len, int *pfd);
int UxsocketSendFd(int connfd, void *data, size_t len, int fd);
int ShmCreate(const char *path, size_t size, void **addr);
......
......@@ -25,7 +25,7 @@ include mk/subdir_pre.mk
bin_net_rdma := $(d)rdma/net_rdma
bin_net_sockets := $(d)sockets/net_sockets
COMMON_OBJS := $(addprefix $(d)common/, net.o utils.o)
COMMON_OBJS := $(addprefix $(d)common/, base.o utils.o)
RDMA_OBJS := $(addprefix $(d)rdma/, net_rdma.o rdma.o rdma_cm.o rdma_ib.o)
SOCKETS_OBJS := $(addprefix $(d)sockets/, net_sockets.o)
......
......@@ -36,10 +36,10 @@
#include <sys/mman.h>
#include <unistd.h>
#include <simbricks/proto/base.h>
#include <simbricks/proto/network.h>
#include <simbricks/base/proto.h>
#include <simbricks/network/proto.h>
#include "dist/common/net.h"
#include "dist/common/base.h"
#include "dist/common/utils.h"
//#define SOCK_DEBUG
......@@ -49,6 +49,11 @@
#define TXBUF_SIZE (128 * 1024)
#define TXBUF_NUM 16
struct SockIntroMsg {
uint32_t payload_len;
uint8_t data[];
} __attribute__((packed));
struct SockReportMsg {
uint32_t written_pos[MAX_PEERS];
uint32_t clean_pos[MAX_PEERS];
......@@ -62,8 +67,7 @@ struct SockEntriesMsg {
} __attribute__((packed));
enum SockMsgType {
kMsgDev,
kMsgNet,
kMsgIntro,
kMsgReport,
kMsgEntries,
};
......@@ -74,8 +78,7 @@ struct SockMsg {
uint32_t msg_id;
uint32_t id;
union {
struct SimbricksProtoNetDevIntro dev_intro;
struct SimbricksProtoNetNetIntro net_intro;
struct SockIntroMsg intro;
struct SockReportMsg report;
struct SockEntriesMsg entries;
struct SockMsg *next_free;
......@@ -102,15 +105,15 @@ pthread_spinlock_t freelist_spin;
static void PrintUsage() {
fprintf(stderr,
"Usage: net_sockets [OPTIONS] IP PORT\n"
" -l: Listen instead of connecting\n"
" -d DEV-SOCKET: network socket of a device simulator\n"
" -n NET-SOCKET: network socket of a network simulator\n"
" -l: Listen instead of connecting on socket\n"
" -L LISTEN-SOCKET: listening socket for a simulator\n"
" -C CONN-SOCKET: connecting socket for a simulator\n"
" -s SHM-PATH: shared memory region path\n"
" -S SHM-SIZE: shared memory region size in MB (default 256)\n");
}
static int ParseArgs(int argc, char *argv[]) {
const char *opts = "ld:n:s:S:";
const char *opts = "lL:C:s:S:";
int c;
while ((c = getopt(argc, argv, opts)) != -1) {
......@@ -119,13 +122,13 @@ static int ParseArgs(int argc, char *argv[]) {
mode_listen = true;
break;
case 'd':
if (!NetPeerAdd(optarg, true))
case 'L':
if (!BasePeerAdd(optarg, true))
return 1;
break;
case 'n':
if (!NetPeerAdd(optarg, false))
case 'C':
if (!BasePeerAdd(optarg, false))
return 1;
break;
......@@ -289,41 +292,43 @@ static int SockConnect(struct sockaddr_in *addr) {
}
static int SockMsgRxIntro(struct SockMsg *msg) {
struct SockIntroMsg *intro_msg = &msg->intro;
if (msg->id >= peer_num) {
fprintf(stderr, "SockMsgRxIntro: invalid peer id in message (%u)\n",
msg->id);
abort();
}
if (msg->msg_len <
offsetof(struct SockMsg, intro.data) + intro_msg->payload_len) {
fprintf(stderr, "SockMsgRxIntro: message too short for payload len\n");
abort();
}
struct Peer *peer = peers + msg->id;
#ifdef SOCK_DEBUG
fprintf(stderr, "SockMsgRxIntro -> peer %s\n", peer->sock_path);
#endif
if (peer->is_dev != (msg->msg_type == kMsgNet)) {
fprintf(stderr, "SockMsgRxIntro: unexpetced message type (%u)\n",
msg->msg_type);
abort();
}
if (peer->intro_valid_remote) {
fprintf(stderr, "SockMsgRxIntro: received multiple messages (%u)\n",
msg->id);
abort();
}
if (intro_msg->payload_len > (uint32_t) sizeof(peer->intro_remote)) {
fprintf(stderr, "SockMsgRxIntro: Intro longer than buffer\n");
abort();
}
peer->intro_valid_remote = true;
if (peer->is_dev) {
peer->net_intro = msg->net_intro;
if (NetPeerSendDevIntro(peer))
return 1;
} else {
peer->dev_intro = msg->dev_intro;
if (NetPeerSetupNetQueues(peer))
return 1;
if (peer->intro_valid_local && NetOpPassIntro(peer))
return 1;
peer->intro_remote_len = intro_msg->payload_len;
memcpy(peer->intro_remote, intro_msg->data, intro_msg->payload_len);
if (BasePeerSetupQueues(peer)) {
fprintf(stderr, "SockMsgRxIntro(%s): queue setup failed\n",
peer->sock_path);
abort();
}
if (BasePeerSendIntro(peer))
return 1;
if (peer->intro_valid_local) {
fprintf(stderr, "SockMsgRxIntro(%s): marking peer as ready\n",
......@@ -345,7 +350,7 @@ static int SockMsgRxReport(struct SockMsg *msg) {
fprintf(stderr, "SockMsgRxReport: invalid ready peer number %zu\n", i);
abort();
}
NetPeerReport(&peers[i], msg->report.written_pos[i],
BasePeerReport(&peers[i], msg->report.written_pos[i],
msg->report.clean_pos[i]);
}
return 0;
......@@ -383,7 +388,7 @@ static int SockMsgRxEntries(struct SockMsg *msg) {
uint32_t i;
for (i = 0; i < entries->num_entries; i++)
NetEntryReceived(peer, entries->pos + i,
BaseEntryReceived(peer, entries->pos + i,
entries->data + (i * peer->cleanup_elen));
return 0;
}
......@@ -393,7 +398,7 @@ static int SockMsgRx(struct SockMsg *msg) {
fprintf(stderr, "SockMsgRx(mi=%u t=%u i=%u l=%u)\n", msg->msg_id,
msg->msg_type, msg->id, msg->msg_len);
#endif
if (msg->msg_type == kMsgDev || msg->msg_type == kMsgNet)
if (msg->msg_type == kMsgIntro)
return SockMsgRxIntro(msg);
else if (msg->msg_type == kMsgReport)
return SockMsgRxReport(msg);
......@@ -474,50 +479,40 @@ static int SockSend(struct SockMsg *msg) {
return 0;
}
int NetOpPassIntro(struct Peer *peer) {
int BaseOpPassIntro(struct Peer *peer) {
#ifdef SOCK_DEBUG
fprintf(stderr, "NetOpPassIntro(%s)\n", peer->sock_path);
fprintf(stderr, "BaseOpPassIntro(%s)\n", peer->sock_path);
#endif
if (!peer->is_dev && !peer->intro_valid_remote) {
fprintf(stderr,
"NetOpPassIntro: skipping because remote intro not received\n");
return 0;
}
struct SockMsg *msg = SockMsgAlloc();
if (!msg)
return 1;
msg->msg_len = sizeof(*msg);
msg->msg_len = offsetof(struct SockMsg, entries.data) + peer->intro_local_len;
msg->id = peer - peers;
if (peer->is_dev) {
msg->msg_type = kMsgDev;
msg->dev_intro = peer->dev_intro;
} else {
msg->msg_type = kMsgNet;
msg->net_intro = peer->net_intro;
}
msg->msg_type = kMsgIntro;
msg->intro.payload_len = peer->intro_local_len;
memcpy(msg->intro.data, peer->intro_local, peer->intro_local_len);
int ret = SockSend(msg);
SockMsgFree(msg);
return ret;
}
int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n) {
int BaseOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n) {
#ifdef SOCK_DEBUG
fprintf(stderr, "NetOpPassEntires(%s, n=%zu, pos=%u)\n", peer->sock_path, n,
fprintf(stderr, "BaseOpPassEntries(%s, n=%zu, pos=%u)\n", peer->sock_path, n,
pos);
#endif
if (n * peer->local_elen > TXBUF_SIZE) {
fprintf(stderr,
"NetOpPassEntries: tx buffer too small (%u) for n (%u) entries\n",
"BaseOpPassEntries: tx buffer too small (%u) for n (%u) entries\n",
TXBUF_SIZE, n);
abort();
}
if ((peer->last_sent_pos + 1) % peer->local_enum != pos) {
fprintf(stderr, "NetOpPassEntries: entry sent repeatedly: p=%u n=%u\n",
fprintf(stderr, "BaseOpPassEntries: entry sent repeatedly: p=%u n=%u\n",
pos, n);
abort();
}
......@@ -552,12 +547,12 @@ int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n) {
return ret;
}
int NetOpPassReport() {
int BaseOpPassReport() {
#ifdef SOCK_DEBUG
fprintf(stderr, "NetOpPassReport()\n");
fprintf(stderr, "BaseOpPassReport()\n");
#endif
if (peer_num > MAX_PEERS) {
fprintf(stderr, "NetOpPassReport: peer_num (%zu) larger than max (%u)\n",
fprintf(stderr, "BaseOpPassReport: peer_num (%zu) larger than max (%u)\n",
peer_num, MAX_PEERS);
abort();
}
......@@ -596,7 +591,7 @@ int NetOpPassReport() {
static void *PollThread(void *data) {
while (true)
NetPoll();
BasePoll();
return NULL;
}
......@@ -615,10 +610,9 @@ static int IOLoop() {
for (int i = 0; i < n; i++) {
struct Peer *peer = evs[i].data.ptr;
if (peer && NetPeerEvent(peer, evs[i].events))
if (peer && BasePeerEvent(peer, evs[i].events))
return 1;
else if (!peer && SockEvent(evs[i].events
))
else if (!peer && SockEvent(evs[i].events))
return 1;
}
......@@ -642,10 +636,10 @@ int main(int argc, char *argv[]) {
if (SockAllocInit())
return EXIT_FAILURE;
if (NetInit(shm_path, shm_size, epfd))
if (BaseInit(shm_path, shm_size, epfd))
return EXIT_FAILURE;
if (NetListen())
if (BaseListen())
return EXIT_FAILURE;
if (mode_listen) {
......@@ -658,7 +652,7 @@ int main(int argc, char *argv[]) {
printf("Socket connected\n");
fflush(stdout);
if (NetConnect())
if (BaseConnect())
return EXIT_FAILURE;
printf("Peers initialized\n");
fflush(stdout);
......
......@@ -28,14 +28,16 @@ doxygen_srcs := $(wildcard $(d)/*.h)
sphinx_outdir := $(d)_build
sphinx_srcs := $(wildcard $(d)/*.rst $(d)/*/*.rst)
documentation: $(doxygen_outdir) $(sphinx_outdir)
documentation: $(doxygen_outdir)/ready $(sphinx_outdir)/ready
.PHONY: documentation
$(doxygen_outdir): $(d)Doxyfile $(doxygen_srcs)
$(doxygen_outdir)/ready: $(d)Doxyfile $(doxygen_srcs)
cd $(base_dir). && doxygen doc/Doxyfile
touch $@
$(sphinx_outdir): $(d)conf.py $(sphinx_srcs)
$(sphinx_outdir)/ready: $(d)conf.py $(sphinx_srcs)
cd $(base_dir). && sphinx-build doc/ doc/_build
touch $@
CLEAN := $(doxygen_outdir) $(sphinx_outdir)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment