Commit bb82346e authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

sims/net/switch: parallel connection establishment

(uses SimBricksBaseIfEstablish)
parent 83d363ae
...@@ -95,70 +95,81 @@ struct hash<MAC> { ...@@ -95,70 +95,81 @@ struct hash<MAC> {
} // namespace std } // namespace std
/** Abstract base switch port */ /** Normal network switch port (conneting to a NIC) */
class Port { class NetPort {
public: public:
enum RxPollState { enum RxPollState {
kRxPollSuccess = 0, kRxPollSuccess = 0,
kRxPollFail = 1, kRxPollFail = 1,
kRxPollSync = 2, kRxPollSync = 2,
}; };
struct SimbricksNetIf netif_;
virtual ~Port() = default;
virtual bool Connect(const char *path, int sync) = 0;
virtual bool IsSync() = 0;
virtual void Sync(uint64_t cur_ts) = 0;
virtual uint64_t NextTimestamp() = 0;
virtual enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) = 0;
virtual void RxDone() = 0;
virtual bool TxPacket(const void *data, size_t len, uint64_t cur_ts) = 0;
};
/** Normal network switch port (conneting to a NIC) */
class NetPort : public Port {
protected: protected:
struct SimbricksNetIf netifObj_;
struct SimbricksNetIf *netif_;
volatile union SimbricksProtoNetMsg *rx_; volatile union SimbricksProtoNetMsg *rx_;
int sync_; int sync_;
const char *path_;
bool Init() {
struct SimbricksBaseIfParams params = netParams;
params.sync_mode = (sync_ ? kSimbricksBaseIfSyncOptional
: kSimbricksBaseIfSyncDisabled);
params.sock_path = path_;
params.blocking_conn = false;
if (SimbricksBaseIfInit(&netif_.base, &params)) {
perror("Init: SimbricksBaseIfInit failed");
return false;
}
return true;
}
public: public:
NetPort() : netif_(&netifObj_), rx_(nullptr), sync_(0) { NetPort(const char *path, int sync) : rx_(nullptr), sync_(sync), path_(path) {
memset(&netifObj_, 0, sizeof(netifObj_)); memset(&netif_, 0, sizeof(netif_));
} }
NetPort(const NetPort &other) : netifObj_(other.netifObj_), NetPort(const NetPort &other) : netif_(other.netif_),
netif_(&netifObj_), rx_(other.rx_), sync_(other.sync_) {} rx_(other.rx_), sync_(other.sync_), path_(other.path_) {}
virtual bool Prepare() {
if (!Init())
return false;
if (SimbricksBaseIfConnect(&netif_.base)) {
perror("Prepare: SimbricksBaseIfConnect failed");
return false;
}
return true;
}
virtual bool Connect(const char *path, int sync) override { virtual void Prepared() {
sync_ = sync; sync_ = SimbricksBaseIfSyncEnabled(&netif_.base);
return SimbricksNetIfInit(netif_, &netParams, path, &sync_) == 0;
} }
virtual bool IsSync() override { bool IsSync() {
return sync_; return sync_;
} }
virtual void Sync(uint64_t cur_ts) override { void Sync(uint64_t cur_ts) {
while (SimbricksNetIfOutSync(netif_, cur_ts)); while (SimbricksNetIfOutSync(&netif_, cur_ts));
} }
virtual uint64_t NextTimestamp() override { uint64_t NextTimestamp() {
return SimbricksNetIfInTimestamp(netif_); return SimbricksNetIfInTimestamp(&netif_);
} }
virtual enum RxPollState RxPacket( enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) override { const void *& data, size_t &len, uint64_t cur_ts) {
assert(rx_ == nullptr); assert(rx_ == nullptr);
rx_ = SimbricksNetIfInPoll(netif_, cur_ts); rx_ = SimbricksNetIfInPoll(&netif_, cur_ts);
if (!rx_) if (!rx_)
return kRxPollFail; return kRxPollFail;
uint8_t type = SimbricksNetIfInType(netif_, rx_); uint8_t type = SimbricksNetIfInType(&netif_, rx_);
if (type == SIMBRICKS_PROTO_NET_MSG_PACKET) { if (type == SIMBRICKS_PROTO_NET_MSG_PACKET) {
data = (const void *)rx_->packet.data; data = (const void *)rx_->packet.data;
len = rx_->packet.len; len = rx_->packet.len;
...@@ -171,22 +182,22 @@ class NetPort : public Port { ...@@ -171,22 +182,22 @@ class NetPort : public Port {
} }
} }
virtual void RxDone() override { void RxDone() {
assert(rx_ != nullptr); assert(rx_ != nullptr);
SimbricksNetIfInDone(netif_, rx_); SimbricksNetIfInDone(&netif_, rx_);
rx_ = nullptr; rx_ = nullptr;
} }
virtual bool TxPacket( bool TxPacket(
const void *data, size_t len, uint64_t cur_ts) override { const void *data, size_t len, uint64_t cur_ts) {
volatile union SimbricksProtoNetMsg *msg_to = volatile union SimbricksProtoNetMsg *msg_to =
SimbricksNetIfOutAlloc(netif_, cur_ts); SimbricksNetIfOutAlloc(&netif_, cur_ts);
if (!msg_to && !sync_) { if (!msg_to && !sync_) {
return false; return false;
} else if (!msg_to && sync_) { } else if (!msg_to && sync_) {
while (!msg_to) while (!msg_to)
msg_to = SimbricksNetIfOutAlloc(netif_, cur_ts); msg_to = SimbricksNetIfOutAlloc(&netif_, cur_ts);
} }
volatile struct SimbricksProtoNetMsgPacket *rx; volatile struct SimbricksProtoNetMsgPacket *rx;
rx = &msg_to->packet; rx = &msg_to->packet;
...@@ -194,53 +205,82 @@ class NetPort : public Port { ...@@ -194,53 +205,82 @@ class NetPort : public Port {
rx->port = 0; rx->port = 0;
memcpy((void *)rx->data, data, len); memcpy((void *)rx->data, data, len);
SimbricksNetIfOutSend(netif_, msg_to, SIMBRICKS_PROTO_NET_MSG_PACKET); SimbricksNetIfOutSend(&netif_, msg_to, SIMBRICKS_PROTO_NET_MSG_PACKET);
return true; return true;
} }
}; };
/** Hosting network switch port (connected to another network) */ /** Listening switch port (connected to by another network) */
class NetHostPort : public NetPort { class NetListenPort : public NetPort {
protected: protected:
struct SimbricksNicIf nicif_; struct SimbricksBaseIfSHMPool pool_;
public: public:
NetHostPort() { NetListenPort(const char *path, int sync) : NetPort(path, sync) {
netif_ = &nicif_.net; memset(&pool_, 0, sizeof(pool_));
memset(&nicif_, 0, sizeof(nicif_));
} }
NetHostPort(const NetHostPort &other) : NetPort(other), nicif_(other.nicif_) { NetListenPort(const NetListenPort &other) : NetPort(other),
netif_ = &nicif_.net; pool_(other.pool_) {
} }
virtual bool Connect(const char *path, int sync) override { virtual bool Prepare() override {
sync_ = sync; if (!Init())
std::string shm_path = path; return false;
std::string shm_path = path_;
shm_path += "-shm"; shm_path += "-shm";
struct SimbricksBaseIfParams params = netParams;
params.sock_path = path;
if (!sync)
params.sync_mode = kSimbricksBaseIfSyncDisabled;
int ret = SimbricksNicIfInit(&nicif_, shm_path.c_str(), &params, nullptr,
nullptr);
sync_ = SimbricksBaseIfSyncEnabled(&netif_->base);
return ret == 0;
}
virtual bool IsSync() override { if (SimbricksBaseIfSHMPoolCreate(&pool_, shm_path.c_str(),
return sync_; SimbricksBaseIfSHMSize(&netif_.base.params)) != 0) {
perror("Prepare: SimbricksBaseIfSHMPoolCreate failed");
return false;
}
if (SimbricksBaseIfListen(&netif_.base, &pool_) != 0) {
perror("Prepare: SimbricksBaseIfListen failed");
return false;
}
return true;
} }
}; };
static bool ConnectAll(std::vector<NetPort *> ports)
{
size_t n = ports.size();
struct SimBricksBaseIfEstablishData ests[n];
struct SimbricksProtoNetIntro intro;
printf("start connecting...\n");
for (size_t i = 0; i < n; i++) {
NetPort *p = ports[i];
ests[i].base_if = &p->netif_.base;
ests[i].tx_intro = &intro;
ests[i].tx_intro_len = sizeof(intro);
ests[i].rx_intro = &intro;
ests[i].rx_intro_len = sizeof(intro);
if (!p->Prepare())
return false;
}
if (SimBricksBaseIfEstablish(ests, n)) {
fprintf(stderr, "ConnectAll: SimBricksBaseIfEstablish failed\n");
return false;
}
printf("done connecting\n");
return true;
}
/* Global variables */ /* Global variables */
static uint64_t cur_ts = 0; static uint64_t cur_ts = 0;
static int exiting = 0; static int exiting = 0;
static const uint8_t bcast[6] = {0xFF}; static const uint8_t bcast[6] = {0xFF};
static const MAC bcast_addr(bcast); static const MAC bcast_addr(bcast);
static std::vector<Port *> ports; static std::vector<NetPort *> ports;
static std::unordered_map<MAC, int> mac_table; static std::unordered_map<MAC, int> mac_table;
static void sigint_handler(int dummy) { static void sigint_handler(int dummy) {
...@@ -260,7 +300,7 @@ static void sigusr2_handler(int dummy) { ...@@ -260,7 +300,7 @@ static void sigusr2_handler(int dummy) {
static void forward_pkt(const void *pkt_data, size_t pkt_len, size_t port_id, static void forward_pkt(const void *pkt_data, size_t pkt_len, size_t port_id,
size_t iport_id) { size_t iport_id) {
struct pcap_pkthdr ph; struct pcap_pkthdr ph;
Port &dest_port = *ports[port_id]; NetPort &dest_port = *ports[port_id];
// log to pcap file if initialized // log to pcap file if initialized
if (dumpfile) { if (dumpfile) {
...@@ -303,7 +343,7 @@ static void forward_pkt(const void *pkt_data, size_t pkt_len, size_t port_id, ...@@ -303,7 +343,7 @@ static void forward_pkt(const void *pkt_data, size_t pkt_len, size_t port_id,
fprintf(stderr, "forward_pkt: dropping packet on port %zu\n", port_id); fprintf(stderr, "forward_pkt: dropping packet on port %zu\n", port_id);
} }
static void switch_pkt(Port &port, size_t iport) { static void switch_pkt(NetPort &port, size_t iport) {
const void *pkt_data; const void *pkt_data;
size_t pkt_len; size_t pkt_len;
...@@ -314,8 +354,8 @@ static void switch_pkt(Port &port, size_t iport) { ...@@ -314,8 +354,8 @@ static void switch_pkt(Port &port, size_t iport) {
} }
#endif #endif
enum Port::RxPollState poll = port.RxPacket(pkt_data, pkt_len, cur_ts); enum NetPort::RxPollState poll = port.RxPacket(pkt_data, pkt_len, cur_ts);
if (poll == Port::kRxPollFail) { if (poll == NetPort::kRxPollFail) {
return; return;
} }
...@@ -326,7 +366,7 @@ static void switch_pkt(Port &port, size_t iport) { ...@@ -326,7 +366,7 @@ static void switch_pkt(Port &port, size_t iport) {
} }
#endif #endif
if (poll == Port::kRxPollSuccess) { if (poll == NetPort::kRxPollSuccess) {
// Get MAC addresses // Get MAC addresses
MAC dst((const uint8_t *)pkt_data), src((const uint8_t *)pkt_data + 6); MAC dst((const uint8_t *)pkt_data), src((const uint8_t *)pkt_data + 6);
// MAC learning // MAC learning
...@@ -348,7 +388,7 @@ static void switch_pkt(Port &port, size_t iport) { ...@@ -348,7 +388,7 @@ static void switch_pkt(Port &port, size_t iport) {
} }
} }
} }
} else if (poll == Port::kRxPollSync) { } else if (poll == NetPort::kRxPollSync) {
#ifdef NETSWITCH_STAT #ifdef NETSWITCH_STAT
d2n_poll_sync += 1; d2n_poll_sync += 1;
if (stat_flag){ if (stat_flag){
...@@ -374,23 +414,15 @@ int main(int argc, char *argv[]) { ...@@ -374,23 +414,15 @@ int main(int argc, char *argv[]) {
while ((c = getopt(argc, argv, "s:h:uS:E:p:")) != -1 && !bad_option) { while ((c = getopt(argc, argv, "s:h:uS:E:p:")) != -1 && !bad_option) {
switch (c) { switch (c) {
case 's': { case 's': {
NetPort *port = new NetPort; NetPort *port = new NetPort(optarg, sync_eth);
fprintf(stderr, "Switch connecting to: %s\n", optarg); fprintf(stderr, "Switch connecting to: %s\n", optarg);
if (!port->Connect(optarg, sync_eth)) {
fprintf(stderr, "connecting to %s failed\n", optarg);
return EXIT_FAILURE;
}
ports.push_back(port); ports.push_back(port);
break; break;
} }
case 'h': { case 'h': {
NetHostPort *port = new NetHostPort; NetListenPort *port = new NetListenPort(optarg, sync_eth);
fprintf(stderr, "Switch listening on: %s\n", optarg); fprintf(stderr, "Switch listening on: %s\n", optarg);
if (!port->Connect(optarg, sync_eth)) {
fprintf(stderr, "listening on %s failed\n", optarg);
return EXIT_FAILURE;
}
ports.push_back(port); ports.push_back(port);
break; break;
} }
...@@ -440,6 +472,8 @@ int main(int argc, char *argv[]) { ...@@ -440,6 +472,8 @@ int main(int argc, char *argv[]) {
signal(SIGUSR2, sigusr2_handler); signal(SIGUSR2, sigusr2_handler);
#endif #endif
if (!ConnectAll(ports))
return EXIT_FAILURE;
printf("start polling\n"); printf("start polling\n");
while (!exiting) { while (!exiting) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment