"...resnet50_tensorflow.git" did not exist on "a59bd4b53ada189c6adc888cd94e897b5d467fde"
Commit 391e5cb8 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

sims/net/switch: new API refactor

parent eb152aa7
......@@ -37,18 +37,16 @@
#include <unordered_map>
#include <vector>
#include <simbricks/base/cxxatomicfix.h>
extern "C" {
#include <simbricks/netif/netif.h>
#include <simbricks/network/if.h>
#include <simbricks/nicif/nicif.h>
#include <simbricks/proto/base.h>
};
//#define NETSWITCH_DEBUG
#define NETSWITCH_STAT
static uint64_t sync_period = (500 * 1000ULL); // 500ns
static uint64_t eth_latency = (500 * 1000ULL); // 500ns
static int sync_mode = SIMBRICKS_PROTO_SYNC_SIMBRICKS;
struct SimbricksBaseIfParams netParams;
static pcap_dumper_t *dumpfile = nullptr;
#ifdef NETSWITCH_STAT
......@@ -110,7 +108,6 @@ class Port {
virtual bool Connect(const char *path, int sync) = 0;
virtual bool IsSync() = 0;
virtual void Sync(uint64_t cur_ts) = 0;
virtual void AdvanceEpoch(uint64_t cur_ts) = 0;
virtual uint64_t NextTimestamp() = 0;
virtual enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) = 0;
......@@ -122,21 +119,22 @@ class Port {
/** Normal network switch port (conneting to a NIC) */
class NetPort : public Port {
protected:
struct SimbricksNetIf netif_;
volatile union SimbricksProtoNetD2N *rx_;
struct SimbricksNetIf netifObj_;
struct SimbricksNetIf *netif_;
volatile union SimbricksProtoNetMsg *rx_;
int sync_;
public:
NetPort() : rx_(nullptr), sync_(0) {
memset(&netif_, 0, sizeof(netif_));
NetPort() : netif_(&netifObj_), rx_(nullptr), sync_(0) {
memset(&netifObj_, 0, sizeof(netifObj_));
}
NetPort(const NetPort &other) : netif_(other.netif_), rx_(other.rx_),
sync_(other.sync_) {}
NetPort(const NetPort &other) : netifObj_(other.netifObj_),
netif_(&netifObj_), rx_(other.rx_), sync_(other.sync_) {}
virtual bool Connect(const char *path, int sync) override {
sync_ = sync;
return SimbricksNetIfInit(&netif_, path, &sync_) == 0;
return SimbricksNetIfInit(netif_, &netParams, path, &sync_) == 0;
}
virtual bool IsSync() override {
......@@ -144,32 +142,27 @@ class NetPort : public Port {
}
virtual void Sync(uint64_t cur_ts) override {
while (SimbricksNetIfN2DSync(&netif_, cur_ts, eth_latency, sync_period,
sync_mode));
}
virtual void AdvanceEpoch(uint64_t cur_ts) override {
SimbricksNetIfAdvanceEpoch(cur_ts, sync_period, sync_mode);
while (SimbricksNetIfOutSync(netif_, cur_ts));
}
virtual uint64_t NextTimestamp() override {
return SimbricksNetIfD2NTimestamp(&netif_);
return SimbricksNetIfInTimestamp(netif_);
}
virtual enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) override {
assert(rx_ == nullptr);
rx_ = SimbricksNetIfD2NPoll(&netif_, cur_ts);
rx_ = SimbricksNetIfInPoll(netif_, cur_ts);
if (!rx_)
return kRxPollFail;
uint8_t type = rx_->dummy.own_type & SIMBRICKS_PROTO_NET_D2N_MSG_MASK;
if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SEND) {
data = (const void *)rx_->send.data;
len = rx_->send.len;
uint8_t type = SimbricksNetIfInType(netif_, rx_);
if (type == SIMBRICKS_PROTO_NET_MSG_PACKET) {
data = (const void *)rx_->packet.data;
len = rx_->packet.len;
return kRxPollSuccess;
} else if (type == SIMBRICKS_PROTO_NET_D2N_MSG_SYNC) {
} else if (type == SIMBRICKS_PROTO_MSG_TYPE_SYNC) {
return kRxPollSync;
} else {
fprintf(stderr, "switch_pkt: unsupported type=%u\n", type);
......@@ -180,139 +173,64 @@ class NetPort : public Port {
virtual void RxDone() override {
assert(rx_ != nullptr);
SimbricksNetIfD2NDone(&netif_, rx_);
SimbricksNetIfInDone(netif_, rx_);
rx_ = nullptr;
}
virtual bool TxPacket(
const void *data, size_t len, uint64_t cur_ts) override {
volatile union SimbricksProtoNetN2D *msg_to =
SimbricksNetIfN2DAlloc(&netif_, cur_ts, eth_latency);
volatile union SimbricksProtoNetMsg *msg_to =
SimbricksNetIfOutAlloc(netif_, cur_ts);
if (!msg_to && !sync_) {
return false;
} else if (!msg_to && sync_) {
while (!msg_to)
msg_to = SimbricksNetIfN2DAlloc(&netif_, cur_ts, eth_latency);
msg_to = SimbricksNetIfOutAlloc(netif_, cur_ts);
}
volatile struct SimbricksProtoNetN2DRecv *rx;
rx = &msg_to->recv;
volatile struct SimbricksProtoNetMsgPacket *rx;
rx = &msg_to->packet;
rx->len = len;
rx->port = 0;
memcpy((void *)rx->data, data, len);
// WMB();
rx->own_type =
SIMBRICKS_PROTO_NET_N2D_MSG_RECV | SIMBRICKS_PROTO_NET_N2D_OWN_DEV;
SimbricksNetIfOutSend(netif_, msg_to, SIMBRICKS_PROTO_NET_MSG_PACKET);
return true;
}
};
/** Hosting network switch port (connected to another network) */
class NetHostPort : public Port {
class NetHostPort : public NetPort {
protected:
struct SimbricksNicIf nicif_;
volatile union SimbricksProtoNetN2D *rx_;
int sync_;
public:
NetHostPort() : rx_(nullptr), sync_(0) {
NetHostPort() {
netif_ = &nicif_.net;
memset(&nicif_, 0, sizeof(nicif_));
}
NetHostPort(const NetHostPort &other) : nicif_(other.nicif_), rx_(other.rx_),
sync_(other.sync_) {}
NetHostPort(const NetHostPort &other) : NetPort(other), nicif_(other.nicif_) {
netif_ = &nicif_.net;
}
virtual bool Connect(const char *path, int sync) override {
sync_ = sync;
std::string shm_path = path;
shm_path += "-shm";
struct SimbricksNicIfParams params = {
.pci_socket_path = nullptr,
.eth_socket_path = path,
.shm_path = shm_path.c_str(),
.pci_latency = 0,
.eth_latency = eth_latency,
.sync_delay = sync_period,
.sync_pci = 0,
.sync_eth = sync,
.sync_mode = sync_mode,
};
struct SimbricksProtoPcieDevIntro di;
int ret = SimbricksNicIfInit(&nicif_, &params, &di);
sync_ = params.sync_eth;
struct SimbricksBaseIfParams params = netParams;
netParams.sock_path = path;
if (!sync)
netParams.sync_mode = kSimbricksBaseIfSyncDisabled;
int ret = SimbricksNicIfInit(&nicif_, shm_path.c_str(), &params, nullptr,
nullptr);
sync_ = SimbricksBaseIfSyncEnabled(&netif_->base);
return ret == 0;
}
virtual bool IsSync() override {
return sync_;
}
virtual void Sync(uint64_t cur_ts) override {
while (SimbricksNicIfSync(&nicif_, cur_ts));
}
virtual void AdvanceEpoch(uint64_t cur_ts) override {
SimbricksNicIfAdvanceEpoch(&nicif_, cur_ts);
}
virtual uint64_t NextTimestamp() override {
return SimbricksNicIfNextTimestamp(&nicif_);
}
virtual enum RxPollState RxPacket(
const void *& data, size_t &len, uint64_t cur_ts) override {
assert(rx_ == nullptr);
rx_ = SimbricksNicIfN2DPoll(&nicif_, cur_ts);
if (!rx_)
return kRxPollFail;
uint8_t type = rx_->dummy.own_type & SIMBRICKS_PROTO_NET_N2D_MSG_MASK;
if (type == SIMBRICKS_PROTO_NET_N2D_MSG_RECV) {
data = (const void *)rx_->recv.data;
len = rx_->recv.len;
return kRxPollSuccess;
} else if (type == SIMBRICKS_PROTO_NET_N2D_MSG_SYNC) {
return kRxPollSync;
} else {
fprintf(stderr, "switch_pkt: unsupported type=%u\n", type);
abort();
}
}
virtual void RxDone() override {
assert(rx_ != nullptr);
SimbricksNicIfN2DDone(&nicif_, rx_);
SimbricksNicIfN2DNext(&nicif_);
rx_ = nullptr;
}
virtual bool TxPacket(
const void *data, size_t len, uint64_t cur_ts) override {
volatile union SimbricksProtoNetD2N *msg_to =
SimbricksNicIfD2NAlloc(&nicif_, cur_ts);
if (!msg_to && !sync_) {
return false;
} else if (!msg_to && sync_) {
while (!msg_to)
msg_to = SimbricksNicIfD2NAlloc(&nicif_, cur_ts);
}
volatile struct SimbricksProtoNetD2NSend *rx;
rx = &msg_to->send;
rx->len = len;
rx->port = 0;
memcpy((void *)rx->data, data, len);
// WMB();
rx->own_type =
SIMBRICKS_PROTO_NET_D2N_MSG_SEND | SIMBRICKS_PROTO_NET_D2N_OWN_NET;
return true;
}
};
......@@ -449,8 +367,10 @@ int main(int argc, char *argv[]) {
int sync_eth = 1;
pcap_t *pc = nullptr;
SimbricksNetIfDefaultParams(&netParams);
// Parse command line argument
while ((c = getopt(argc, argv, "s:h:uS:E:m:p:")) != -1 && !bad_option) {
while ((c = getopt(argc, argv, "s:h:uS:E:p:")) != -1 && !bad_option) {
switch (c) {
case 's': {
NetPort *port = new NetPort;
......@@ -479,17 +399,11 @@ int main(int argc, char *argv[]) {
break;
case 'S':
sync_period = strtoull(optarg, NULL, 0) * 1000ULL;
netParams.sync_interval = strtoull(optarg, NULL, 0) * 1000ULL;
break;
case 'E':
eth_latency = strtoull(optarg, NULL, 0) * 1000ULL;
break;
case 'm':
sync_mode = strtol(optarg, NULL, 0);
assert(sync_mode == SIMBRICKS_PROTO_SYNC_SIMBRICKS ||
sync_mode == SIMBRICKS_PROTO_SYNC_BARRIER);
netParams.link_latency = strtoull(optarg, NULL, 0) * 1000ULL;
break;
case 'p':
......@@ -531,8 +445,6 @@ int main(int argc, char *argv[]) {
// Sync all interfaces
for (auto port : ports)
port->Sync(cur_ts);
for (auto port : ports)
port->AdvanceEpoch(cur_ts);
// Switch packets
uint64_t min_ts;
......@@ -550,8 +462,7 @@ int main(int argc, char *argv[]) {
// Update cur_ts
if (min_ts < ULLONG_MAX) {
// a bit broken but should probably do
cur_ts = SimbricksNetIfAdvanceTime(min_ts, sync_period, sync_mode);
cur_ts = min_ts;
}
}
......
......@@ -28,7 +28,7 @@ OBJS := $(d)net_switch.o
$(OBJS): CPPFLAGS := $(CPPFLAGS) -I$(d)include/
$(bin_net_switch): $(OBJS) $(lib_netif) $(lib_nicif) -lpcap
$(bin_net_switch): $(OBJS) $(lib_netif) $(lib_nicif) $(lib_base) -lpcap
CLEAN := $(bin_net_switch) $(OBJS)
ALL := $(bin_net_switch)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment