Commit c398b350 authored by Jialin Li's avatar Jialin Li
Browse files

tofino: new tofino adapter with proper model timing

parent 34b17f08
......@@ -22,75 +22,164 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <arpa/inet.h>
#include <fcntl.h>
#include <linux/if_packet.h>
#include <net/ethernet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>
#include <chrono>
#include <climits>
#include <csignal>
#include <cstdio>
#include <cstring>
#include <climits>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <linux/if_packet.h>
#include <cstring>
#include <fstream>
#include <iostream>
#include <set>
#include <string>
#include <vector>
extern "C" {
#include <simbricks/netif/netif.h>
#include <simbricks/proto/base.h>
};
#include <utils/json.hpp>
//#define DEBUG
using json = nlohmann::json;
typedef long long int ts_t;
static uint64_t sync_period = (500 * 1000ULL);
static uint64_t eth_latency = (500 * 1000ULL); // 500ns
static uint64_t cur_ts = 0;
static const int log_wait_limit_ms = 10; // 10ms
static ts_t sync_period = (500 * 1000ULL);
static ts_t eth_latency = (500 * 1000ULL); // 500ns
static int sync_mode = SIMBRICKS_PROTO_SYNC_SIMBRICKS;
static ts_t cur_ts = 0;
static int exiting = 0;
static std::vector<struct SimbricksNetIf> nsifs;
static std::vector<int> tofino_fds;
static std::ifstream log_ifs;
static std::string log_line;
static const int flush_msg_sz = 14;
static char flush_msg[flush_msg_sz] = {0x0};
static void sigint_handler(int dummy) {
exiting = 1;
}
static void dev_to_switch(struct SimbricksNetIf *nsif, size_t port) {
volatile union SimbricksProtoNetD2N *msg_from =
SimbricksNetIfD2NPoll(nsif, cur_ts);
if (msg_from == nullptr) {
return;
struct packet_info {
unsigned int port;
ts_t latency;
};
struct event {
ts_t time;
bool to_switch;
unsigned int port;
std::string msg;
};
struct classcomp {
bool operator()(const struct event &lhs, const struct event &rhs) const {
return lhs.time < rhs.time;
}
uint8_t type = msg_from->dummy.own_type & SIMBRICKS_PROTO_NET_D2N_MSG_MASK;
if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SEND) {
volatile struct SimbricksProtoNetD2NSend *tx;
tx = &msg_from->send;
if (send(tofino_fds.at(port), (const void *)tx->data, tx->len, 0) < tx->len) {
fprintf(stderr, "tofino: failed to forward packet to switch\n");
abort();
};
std::set<struct event, classcomp> event_queue;
static bool get_tofino_log_line(int limit_ms) {
using std::chrono::system_clock;
system_clock::time_point start, end;
char buf[16384];
start = system_clock::now();
do {
log_ifs.clear();
log_ifs.getline(buf, 16384);
log_line.append(buf);
end = system_clock::now();
} while (!log_ifs.good() &&
(limit_ms < 0 ||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count() < limit_ms));
return log_ifs.good();
}
static void get_egress_pkts(ts_t ingress_ts,
std::vector<struct packet_info> &pkts) {
while (get_tofino_log_line(log_wait_limit_ms)) {
json j = json::parse(log_line);
log_line.clear();
if (j.contains("context")) {
auto context = j.at("context");
if (context.at("gress").get<std::string>().compare("egress") == 0 &&
context.at("component").get<std::string>().compare("port") == 0) {
unsigned int port = j.at("packet").at("port").get<unsigned int>();
ts_t latency = j.at("sim_time").get<ts_t>() / 100000000 - ingress_ts;
struct packet_info info = {port, latency};
pkts.push_back(info);
}
} else if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SYNC) {
} else {
fprintf(stderr, "tofino: unsupported type=%u\n", type);
abort();
} else if (j.contains("message") &&
j.at("message").get<std::string>().compare(0, 7, "Ingress") ==
0) {
break;
}
SimbricksNetIfD2NDone(nsif, msg_from);
}
}
static std::vector<struct packet_info> get_tofino_output() {
std::vector<struct packet_info> pkts;
// First, get packet ingress time
ts_t ingress_ts = -1;
while (ingress_ts < 0) {
get_tofino_log_line(-1);
json j = json::parse(log_line);
log_line.clear();
if (j.contains("message") &&
j.at("message").get<std::string>().compare(0, 7, "Ingress") == 0) {
ingress_ts = j.at("sim_time").get<ts_t>() / 100000000;
}
}
// Next, get egress time for each port
get_egress_pkts(ingress_ts, pkts);
// Send a malformatted message to force log flushing
send(tofino_fds.at(0), flush_msg, flush_msg_sz, 0);
get_egress_pkts(ingress_ts, pkts);
return pkts;
}
static ts_t get_min_peer_time() {
std::set<uint64_t> peer_times;
for (auto &nsif : nsifs) {
peer_times.insert(SimbricksNetIfD2NTimestamp(&nsif));
}
return *peer_times.begin();
}
static void switch_to_dev(size_t port) {
static void switch_to_dev(int port) {
static const int BUFFER_SIZE = 2048;
char buf[BUFFER_SIZE];
volatile union SimbricksProtoNetN2D *msg_to;
struct sockaddr_ll addr;
socklen_t addr_len;
ssize_t n;
ssize_t n = recvfrom(tofino_fds.at(port), buf, BUFFER_SIZE, 0,
(struct sockaddr*)&addr, &addr_len);
if (n <= 0 || addr.sll_pkttype == PACKET_OUTGOING) {
return;
#ifdef DEBUG
printf("forward packet to peer %u at time %llu\n", port, cur_ts);
#endif
while ((n = recvfrom(tofino_fds.at(port), buf, BUFFER_SIZE, 0,
(struct sockaddr *)&addr, &addr_len)) <= 0 ||
addr.sll_pkttype == PACKET_OUTGOING) {
;
}
msg_to = SimbricksNetIfN2DAlloc(&nsifs[port], cur_ts, eth_latency);
if (msg_to != NULL) {
if (msg_to != nullptr) {
volatile struct SimbricksProtoNetN2DRecv *rx;
rx = &msg_to->recv;
rx->len = n;
......@@ -105,17 +194,97 @@ static void switch_to_dev(size_t port) {
}
}
static void process_event(const struct event &e) {
if (e.to_switch) {
#ifdef DEBUG
printf("process to_switch event from peer %u at time %llu\n", e.port,
e.time);
#endif
if (send(tofino_fds.at(e.port), e.msg.data(), e.msg.length(), 0) <
(long int)e.msg.length()) {
fprintf(stderr, "tofino: failed to forward packet to switch\n");
abort();
}
for (const auto &pkt : get_tofino_output()) {
if (pkt.port < nsifs.size()) {
auto &nsif = nsifs.at(pkt.port);
if (nsif.sync) {
struct event de;
de.time = cur_ts + pkt.latency;
de.to_switch = false;
de.port = pkt.port;
event_queue.insert(de);
#ifdef DEBUG
printf("add to_dev event to peer %u at time %llu to queue\n", de.port,
de.time);
#endif
} else {
switch_to_dev(pkt.port);
}
}
}
} else {
switch_to_dev(e.port);
}
}
static void recv_from_peer(int port) {
struct SimbricksNetIf *nsif = &nsifs.at(port);
volatile union SimbricksProtoNetD2N *msg_from =
SimbricksNetIfD2NPoll(nsif, cur_ts);
if (msg_from == nullptr) {
return;
}
uint8_t type = msg_from->dummy.own_type & SIMBRICKS_PROTO_NET_D2N_MSG_MASK;
if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SEND) {
struct event e;
e.time = msg_from->send.timestamp;
e.to_switch = true;
e.port = port;
e.msg = std::string((const char *)msg_from->send.data, msg_from->send.len);
#ifdef DEBUG
printf("received packet from peer %u at time %llu\n", port, e.time);
#endif
if (nsif->sync) {
event_queue.insert(e);
#ifdef DEBUG
printf("add to_switch event from peer %u at time %llu to queue\n", port,
e.time);
#endif
} else {
process_event(e);
}
} else if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SYNC) {
} else {
fprintf(stderr, "tofino: unsupported type=%u\n", type);
abort();
}
SimbricksNetIfD2NDone(nsif, msg_from);
}
static void process_event_queue() {
while (!event_queue.empty()) {
const struct event &e = *event_queue.begin();
if (e.time <= cur_ts) {
process_event(e);
event_queue.erase(event_queue.begin());
} else {
break;
}
}
}
int main(int argc, char *argv[]) {
int c;
int bad_option = 0;
int sync_mode = SIMBRICKS_PROTO_SYNC_SIMBRICKS;
int sync = 1;
struct SimbricksNetIf nsif;
std::string tofino_log;
// Parse command line argument
while ((c = getopt(argc, argv, "s:S:E:m:")) != -1 && !bad_option) {
while ((c = getopt(argc, argv, "s:S:E:m:t:u")) != -1 && !bad_option) {
switch (c) {
case 's':
struct SimbricksNetIf nsif;
if (SimbricksNetIfInit(&nsif, optarg, &sync) != 0) {
fprintf(stderr, "connecting to %s failed\n", optarg);
return EXIT_FAILURE;
......@@ -137,6 +306,14 @@ int main(int argc, char *argv[]) {
sync_mode == SIMBRICKS_PROTO_SYNC_BARRIER);
break;
case 't':
tofino_log = std::string(optarg);
break;
case 'u':
sync = 0;
break;
default:
fprintf(stderr, "unknown option %c\n", c);
bad_option = 1;
......@@ -144,16 +321,29 @@ int main(int argc, char *argv[]) {
}
}
if (nsifs.empty() || bad_option) {
if (!sync) {
for (auto &nsif : nsifs) {
nsif.sync = 0;
}
}
if (nsifs.empty() || tofino_log.empty() || bad_option) {
fprintf(stderr,
"Usage: tofino [-S SYNC-PERIOD] [-E ETH-LATENCY] "
"-s SOCKET-A [-s SOCKET-B ...]\n");
"-t TOFINO-LOG-PATH -s SOCKET-A [-s SOCKET-B ...]\n");
return EXIT_FAILURE;
}
signal(SIGINT, sigint_handler);
signal(SIGTERM, sigint_handler);
// Open Tofino log file
log_ifs.open(tofino_log.c_str(), std::ifstream::in);
if (!log_ifs.good()) {
fprintf(stderr, "Failed to open tofino log file %s\n", tofino_log.c_str());
abort();
}
// Create sockets for Tofino model interfaces
for (size_t port = 0; port < nsifs.size(); port++) {
int fd = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
......@@ -163,7 +353,7 @@ int main(int argc, char *argv[]) {
}
char ifname[16];
sprintf(ifname, "veth%ld", port*2+1);
sprintf(ifname, "veth%ld", port * 2 + 1);
struct ifreq ifopts;
memset(&ifopts, 0, sizeof(ifopts));
strcpy(ifopts.ifr_name, ifname);
......@@ -173,7 +363,8 @@ int main(int argc, char *argv[]) {
}
int sockopt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) == -1) {
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) ==
-1) {
fprintf(stderr, "Failed to set socket option SO_REUSEADDR");
abort();
}
......@@ -196,7 +387,7 @@ int main(int argc, char *argv[]) {
tofino_fds.push_back(fd);
}
printf("start polling\n");
fprintf(stderr, "start polling\n");
while (!exiting) {
// Sync all interfaces
for (auto &nsif : nsifs) {
......@@ -209,24 +400,15 @@ int main(int argc, char *argv[]) {
SimbricksNetIfAdvanceEpoch(cur_ts, sync_period, sync_mode);
// Switch packets
uint64_t min_ts;
do {
min_ts = ULLONG_MAX;
for (size_t port = 0; port < nsifs.size(); port++) {
auto &nsif = nsifs.at(port);
dev_to_switch(&nsif, port);
if (nsif.sync) {
uint64_t ts = SimbricksNetIfD2NTimestamp(&nsif);
min_ts = ts < min_ts ? ts : min_ts;
ts_t min_ts = 0;
while (!exiting && min_ts <= cur_ts) {
for (int port = 0; port < (int)nsifs.size(); port++) {
recv_from_peer(port);
}
min_ts = get_min_peer_time();
process_event_queue();
}
for (size_t port = 0; port < nsifs.size(); port++) {
switch_to_dev(port);
}
} while (!exiting && (min_ts <= cur_ts));
// Update cur_ts
if (min_ts < ULLONG_MAX) {
if (min_ts > cur_ts) {
cur_ts = SimbricksNetIfAdvanceTime(min_ts, sync_period, sync_mode);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment