"...composable_kernel_rocm.git" did not exist on "e8cddfdc3bc7fbdec765ee0bfbb391ef7173b455"
Commit 256fd7c0 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

sims/net/switch: move communicating and syncing ports to class

Aim is to introduce different types of ports.
parent 4ee3ce69
...@@ -47,6 +47,7 @@ extern "C" { ...@@ -47,6 +47,7 @@ extern "C" {
static uint64_t sync_period = (500 * 1000ULL); // 500ns static uint64_t sync_period = (500 * 1000ULL); // 500ns
static uint64_t eth_latency = (500 * 1000ULL); // 500ns static uint64_t eth_latency = (500 * 1000ULL); // 500ns
static int sync_mode = SIMBRICKS_PROTO_SYNC_SIMBRICKS;
static pcap_dumper_t *dumpfile = nullptr; static pcap_dumper_t *dumpfile = nullptr;
#ifdef NETSWITCH_STAT #ifdef NETSWITCH_STAT
...@@ -66,9 +67,9 @@ static int stat_flag = 0; ...@@ -66,9 +67,9 @@ static int stat_flag = 0;
/* MAC address type */ /* MAC address type */
struct MAC { struct MAC {
const volatile uint8_t *data; const uint8_t *data;
explicit MAC(const volatile uint8_t *data) : data(data) { explicit MAC(const uint8_t *data) : data(data) {
} }
bool operator==(const MAC &other) const { bool operator==(const MAC &other) const {
...@@ -93,12 +94,106 @@ struct hash<MAC> { ...@@ -93,12 +94,106 @@ struct hash<MAC> {
}; };
} // namespace std } // namespace std
class Port {
protected:
struct SimbricksNetIf netif_;
volatile union SimbricksProtoNetD2N *rx_;
int sync_;
public:
Port() : rx_(nullptr), sync_(0) {
memset(&netif_, 0, sizeof(netif_));
}
Port(const Port &other) : netif_(other.netif_), rx_(other.rx_),
sync_(other.sync_) {}
virtual ~Port() = default;
virtual bool Connect(const char *path, int sync) {
sync_ = sync;
return SimbricksNetIfInit(&netif_, path, &sync_) == 0;
}
virtual bool IsSync() {
return sync_;
}
virtual void Sync(uint64_t cur_ts) {
if (SimbricksNetIfN2DSync(&netif_, cur_ts, eth_latency, sync_period,
sync_mode) != 0) {
fprintf(stderr, "SimbricksNetIfN2DSync failed\n");
abort();
}
}
virtual void AdvanceEpoch(uint64_t cur_ts) {
SimbricksNetIfAdvanceEpoch(cur_ts, sync_period, sync_mode);
}
virtual uint64_t NextTimestamp() {
return SimbricksNetIfD2NTimestamp(&netif_);
}
enum RxPollState {
kRxPollSuccess = 0,
kRxPollFail = 1,
kRxPollSync = 2,
};
virtual enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) {
assert(rx_ == nullptr);
rx_ = SimbricksNetIfD2NPoll(&netif_, cur_ts);
if (!rx_)
return kRxPollFail;
uint8_t type = rx_->dummy.own_type & SIMBRICKS_PROTO_NET_D2N_MSG_MASK;
if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SEND) {
data = (const void *)rx_->send.data;
len = rx_->send.len;
return kRxPollSuccess;
} else if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SYNC) {
return kRxPollSync;
} else {
fprintf(stderr, "switch_pkt: unsupported type=%u\n", type);
abort();
}
}
virtual void RxDone() {
assert(rx_ != nullptr);
SimbricksNetIfD2NDone(&netif_, rx_);
rx_ = nullptr;
}
virtual bool TxPacket(const void *data, size_t len, uint64_t cur_ts) {
volatile union SimbricksProtoNetN2D *msg_to =
SimbricksNetIfN2DAlloc(&netif_, cur_ts, eth_latency);
if (!msg_to)
return false;
volatile struct SimbricksProtoNetN2DRecv *rx;
rx = &msg_to->recv;
rx->len = len;
rx->port = 0;
memcpy((void *)rx->data, data, len);
// WMB();
rx->own_type =
SIMBRICKS_PROTO_NET_N2D_MSG_RECV | SIMBRICKS_PROTO_NET_N2D_OWN_DEV;
return true;
}
};
/* Global variables */ /* Global variables */
static uint64_t cur_ts = 0; static uint64_t cur_ts = 0;
static int exiting = 0; static int exiting = 0;
static const volatile uint8_t bcast[6] = {0xFF}; static const uint8_t bcast[6] = {0xFF};
static const MAC bcast_addr(bcast); static const MAC bcast_addr(bcast);
static std::vector<struct SimbricksNetIf> nsifs; static std::vector<Port> ports;
static std::unordered_map<MAC, int> mac_table; static std::unordered_map<MAC, int> mac_table;
static void sigint_handler(int dummy) { static void sigint_handler(int dummy) {
...@@ -115,28 +210,26 @@ static void sigusr2_handler(int dummy) { ...@@ -115,28 +210,26 @@ static void sigusr2_handler(int dummy) {
} }
#endif #endif
static void forward_pkt(volatile struct SimbricksProtoNetD2NSend *tx, static void forward_pkt(const void *pkt_data, size_t pkt_len, size_t port_id) {
size_t port) {
volatile union SimbricksProtoNetN2D *msg_to;
struct pcap_pkthdr ph; struct pcap_pkthdr ph;
Port &dest_port = ports[port_id];
// log to pcap file if initialized // log to pcap file if initialized
if (dumpfile) { if (dumpfile) {
memset(&ph, 0, sizeof(ph)); memset(&ph, 0, sizeof(ph));
ph.ts.tv_sec = cur_ts / 1000000000000ULL; ph.ts.tv_sec = cur_ts / 1000000000000ULL;
ph.ts.tv_usec = (cur_ts % 1000000000000ULL) / 1000ULL; ph.ts.tv_usec = (cur_ts % 1000000000000ULL) / 1000ULL;
ph.caplen = tx->len; ph.caplen = pkt_len;
ph.len = tx->len; ph.len = pkt_len;
pcap_dump((unsigned char *)dumpfile, &ph, (unsigned char *)tx->data); pcap_dump((unsigned char *)dumpfile, &ph, (unsigned char *)pkt_data);
} }
// print sending tick: [packet type] source_IP -> dest_IP len: // print sending tick: [packet type] source_IP -> dest_IP len:
#ifdef NETSWITCH_DEBUG #ifdef NETSWITCH_DEBUG
uint16_t eth_proto; uint16_t eth_proto;
struct ethhdr *hdr; struct ethhdr *hdr;
struct iphdr *iph; struct iphdr *iph;
hdr = (struct ethhdr*)tx->data; hdr = (struct ethhdr*)pkt_data;
eth_proto = ntohs(hdr->h_proto); eth_proto = ntohs(hdr->h_proto);
iph = (struct iphdr *)(hdr + 1); iph = (struct iphdr *)(hdr + 1);
fprintf(stderr, "%20lu: ", cur_ts); fprintf(stderr, "%20lu: ", cur_ts);
...@@ -154,26 +247,14 @@ static void forward_pkt(volatile struct SimbricksProtoNetD2NSend *tx, ...@@ -154,26 +247,14 @@ static void forward_pkt(volatile struct SimbricksProtoNetD2NSend *tx,
fprintf(stderr, "%8X -> %8X len: %lu\n ", iph->saddr, iph->daddr, iph->tot_len + sizeof(struct ethhdr)); fprintf(stderr, "%8X -> %8X len: %lu\n ", iph->saddr, iph->daddr, iph->tot_len + sizeof(struct ethhdr));
#endif #endif
if (!dest_port.TxPacket(pkt_data, pkt_len, cur_ts))
msg_to = SimbricksNetIfN2DAlloc(&nsifs[port], cur_ts, eth_latency); fprintf(stderr, "forward_pkt: dropping packet on port %zu\n", port_id);
if (msg_to != NULL) {
volatile struct SimbricksProtoNetN2DRecv *rx;
rx = &msg_to->recv;
rx->len = tx->len;
rx->port = 0;
memcpy((void *)rx->data, (void *)tx->data, tx->len);
// WMB();
rx->own_type =
SIMBRICKS_PROTO_NET_N2D_MSG_RECV | SIMBRICKS_PROTO_NET_N2D_OWN_DEV;
} else {
fprintf(stderr, "forward_pkt: dropping packet\n");
}
} }
static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) { static void switch_pkt(Port &port, size_t iport) {
volatile union SimbricksProtoNetD2N *msg_from = const void *pkt_data;
SimbricksNetIfD2NPoll(nsif, cur_ts); size_t pkt_len;
#ifdef NETSWITCH_STAT #ifdef NETSWITCH_STAT
d2n_poll_total += 1; d2n_poll_total += 1;
if (stat_flag){ if (stat_flag){
...@@ -181,7 +262,8 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) { ...@@ -181,7 +262,8 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) {
} }
#endif #endif
if (msg_from == NULL) { enum Port::RxPollState poll = port.RxPacket(pkt_data, pkt_len, cur_ts);
if (poll == Port::kRxPollFail) {
return; return;
} }
...@@ -192,12 +274,9 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) { ...@@ -192,12 +274,9 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) {
} }
#endif #endif
uint8_t type = msg_from->dummy.own_type & SIMBRICKS_PROTO_NET_D2N_MSG_MASK; if (poll == Port::kRxPollSuccess) {
if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SEND) {
volatile struct SimbricksProtoNetD2NSend *tx;
tx = &msg_from->send;
// Get MAC addresses // Get MAC addresses
MAC dst(tx->data), src(tx->data + 6); MAC dst((const uint8_t *)pkt_data), src((const uint8_t *)pkt_data + 6);
// MAC learning // MAC learning
if (!(src == bcast_addr)) { if (!(src == bcast_addr)) {
mac_table[src] = iport; mac_table[src] = iport;
...@@ -205,17 +284,17 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) { ...@@ -205,17 +284,17 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) {
// L2 forwarding // L2 forwarding
if (mac_table.count(dst) > 0) { if (mac_table.count(dst) > 0) {
size_t eport = mac_table.at(dst); size_t eport = mac_table.at(dst);
forward_pkt(tx, eport); forward_pkt(pkt_data, pkt_len, eport);
} else { } else {
// Broadcast // Broadcast
for (size_t eport = 0; eport < nsifs.size(); eport++) { for (size_t eport = 0; eport < ports.size(); eport++) {
if (eport != iport) { if (eport != iport) {
// Do not forward to ingress port // Do not forward to ingress port
forward_pkt(tx, eport); forward_pkt(pkt_data, pkt_len, eport);
} }
} }
} }
} else if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SYNC) { } else if (poll == Port::kRxPollSync) {
#ifdef NETSWITCH_STAT #ifdef NETSWITCH_STAT
d2n_poll_sync += 1; d2n_poll_sync += 1;
if (stat_flag){ if (stat_flag){
...@@ -223,31 +302,29 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) { ...@@ -223,31 +302,29 @@ static void switch_pkt(struct SimbricksNetIf *nsif, size_t iport) {
} }
#endif #endif
} else { } else {
fprintf(stderr, "switch_pkt: unsupported type=%u\n", type); fprintf(stderr, "switch_pkt: unsupported poll result=%u\n", poll);
abort(); abort();
} }
SimbricksNetIfD2NDone(nsif, msg_from); port.RxDone();
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int c; int c;
int bad_option = 0; int bad_option = 0;
int sync_eth = 1; int sync_eth = 1;
int sync_mode = SIMBRICKS_PROTO_SYNC_SIMBRICKS;
pcap_t *pc = nullptr; pcap_t *pc = nullptr;
// Parse command line argument // Parse command line argument
while ((c = getopt(argc, argv, "s:uS:E:m:p:")) != -1 && !bad_option) { while ((c = getopt(argc, argv, "s:uS:E:m:p:")) != -1 && !bad_option) {
switch (c) { switch (c) {
case 's': { case 's': {
struct SimbricksNetIf nsif; Port port;
int sync = sync_eth;
fprintf(stderr, "Switch connecting to: %s\n", optarg); fprintf(stderr, "Switch connecting to: %s\n", optarg);
if (SimbricksNetIfInit(&nsif, optarg, &sync) != 0) { if (!port.Connect(optarg, sync_eth)) {
fprintf(stderr, "connecting to %s failed\n", optarg); fprintf(stderr, "connecting to %s failed\n", optarg);
return EXIT_FAILURE; return EXIT_FAILURE;
} }
nsifs.push_back(nsif); ports.push_back(port);
break; break;
} }
...@@ -287,7 +364,7 @@ int main(int argc, char *argv[]) { ...@@ -287,7 +364,7 @@ int main(int argc, char *argv[]) {
} }
} }
if (nsifs.empty() || bad_option) { if (ports.empty() || bad_option) {
fprintf(stderr, fprintf(stderr,
"Usage: net_switch [-S SYNC-PERIOD] [-E ETH-LATENCY] " "Usage: net_switch [-S SYNC-PERIOD] [-E ETH-LATENCY] "
"-s SOCKET-A [-s SOCKET-B ...]\n"); "-s SOCKET-A [-s SOCKET-B ...]\n");
...@@ -306,24 +383,20 @@ int main(int argc, char *argv[]) { ...@@ -306,24 +383,20 @@ int main(int argc, char *argv[]) {
printf("start polling\n"); printf("start polling\n");
while (!exiting) { while (!exiting) {
// Sync all interfaces // Sync all interfaces
for (auto &nsif : nsifs) { for (auto &port : ports)
if (SimbricksNetIfN2DSync(&nsif, cur_ts, eth_latency, sync_period, port.Sync(cur_ts);
sync_mode) != 0) { for (auto &port : ports)
fprintf(stderr, "SimbricksNetIfN2DSync failed\n"); port.AdvanceEpoch(cur_ts);
abort();
}
}
SimbricksNetIfAdvanceEpoch(cur_ts, sync_period, sync_mode);
// Switch packets // Switch packets
uint64_t min_ts; uint64_t min_ts;
do { do {
min_ts = ULLONG_MAX; min_ts = ULLONG_MAX;
for (size_t port = 0; port < nsifs.size(); port++) { for (size_t port_i = 0; port_i < ports.size(); port_i++) {
auto &nsif = nsifs.at(port); auto &port = ports.at(port_i);
switch_pkt(&nsif, port); switch_pkt(port, port_i);
if (nsif.sync) { if (port.IsSync()) {
uint64_t ts = SimbricksNetIfD2NTimestamp(&nsif); uint64_t ts = port.NextTimestamp();
min_ts = ts < min_ts ? ts : min_ts; min_ts = ts < min_ts ? ts : min_ts;
} }
} }
...@@ -331,6 +404,7 @@ int main(int argc, char *argv[]) { ...@@ -331,6 +404,7 @@ int main(int argc, char *argv[]) {
// Update cur_ts // Update cur_ts
if (min_ts < ULLONG_MAX) { if (min_ts < ULLONG_MAX) {
// a bit broken but should probably do
cur_ts = SimbricksNetIfAdvanceTime(min_ts, sync_period, sync_mode); cur_ts = SimbricksNetIfAdvanceTime(min_ts, sync_period, sync_mode);
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment