Commit ceb1af0f authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: rework experiment running with dynamic dependencies

Instead of static ordering of simulator types. This simpliefies the
experiment runners but also simplifies adding additional features.
Immeadiate goal is adding network-to-network connections
parent 8f67770c
......@@ -22,6 +22,7 @@
import os
import asyncio
import simbricks.utils.graphlib as graphlib
from collections import defaultdict
import simbricks.exectools as exectools
import simbricks.proxy
......@@ -123,19 +124,58 @@ class ExperimentBaseRunner(object):
self.out = ExpOutput(exp)
self.running = []
self.sockets = []
self.wait_hosts = []
self.wait_sims = []
def sim_executor(self, sim):
raise NotImplementedError("Please implement this method")
async def before_nics(self):
pass
def sim_graph(self):
sims = self.exp.all_simulators()
graph = {}
for sim in sims:
deps = sim.dependencies() + sim.extra_deps
graph[sim] = set()
for d in deps:
graph[sim].add(d)
return graph
async def before_nets(self):
pass
async def start_sim(self, sim):
""" Start a simulator and wait for it to be ready. """
async def before_hosts(self):
pass
name = sim.full_name()
if self.verbose:
print('%s: starting %s' % (self.exp.name, name))
# run simulator
exec = self.sim_executor(sim)
sc = exec.create_component(name,
shlex.split(sim.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((exec, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
if wait_socks:
if self.verbose:
print('%s: waiting for sockets %s' % (self.exp.name, name))
await exec.await_files(wait_socks, verbose=self.verbose)
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate():
self.wait_sims.append(sc)
if self.verbose:
print('%s: started %s' % (self.exp.name, name))
async def before_wait(self):
pass
......@@ -160,107 +200,43 @@ class ExperimentBaseRunner(object):
# prepare all simulators in parallel
sims = []
for sim in self.exp.hosts + self.exp.nics + self.exp.networks:
for sim in self.exp.all_simulators():
prep_cmds = [pc for pc in sim.prep_cmds(self.env)]
exec = self.sim_executor(sim)
sims.append(exec.run_cmdlist('prepare_' + self.exp.name, prep_cmds,
verbose=self.verbose))
await asyncio.wait(sims)
async def run_nics(self):
""" Start all NIC simulators. """
if self.verbose:
print('%s: starting NICS' % self.exp.name)
starts = []
for nic in self.exp.nics:
if self.verbose:
print('start NIC:', nic.run_cmd(self.env))
exec = self.sim_executor(nic)
sc = exec.create_component(nic.full_name(),
shlex.split(nic.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
starts.append(sc.start())
self.running.append((nic, sc))
self.sockets.append((exec, self.env.nic_pci_path(nic)))
self.sockets.append((exec, self.env.nic_eth_path(nic)))
self.sockets.append((exec, self.env.nic_shm_path(nic)))
await asyncio.wait(starts)
# Wait till all NIC sockets exist
if self.verbose:
print('%s: waiting for sockets' % self.exp.name)
byexec = defaultdict(lambda: [])
for (exec, s) in self.sockets:
byexec[exec].append(s)
for (exec, ss) in byexec.items():
await exec.await_files(ss, verbose=self.verbose)
# just a bit of a safety delay
await asyncio.sleep(0.5)
async def run_nets(self):
""" Start all network simulators (typically one). """
if self.verbose:
print('%s: starting networks' % self.exp.name)
starts = []
for net in self.exp.networks:
if self.verbose:
print('start Net:', net.run_cmd(self.env))
exec = self.sim_executor(net)
sc = exec.create_component(net.full_name(),
shlex.split(net.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
starts.append(sc.start())
self.running.append((net, sc))
await asyncio.wait(starts)
async def run_hosts(self):
""" Start all host simulators. """
if self.verbose:
print('%s: starting hosts' % self.exp.name)
starts = []
for host in self.exp.hosts:
if self.verbose:
print('start Host:', host.run_cmd(self.env))
exec = self.sim_executor(host)
sc = exec.create_component(host.full_name(),
shlex.split(host.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
starts.append(sc.start())
self.running.append((host,sc))
if host.wait:
self.wait_hosts.append(sc)
if host.sleep > 0:
await asyncio.sleep(host.sleep)
await asyncio.wait(starts)
async def wait_for_hosts(self):
""" Wait for hosts to terminate (the ones marked to wait on). """
async def wait_for_sims(self):
""" Wait for simulators to terminate (the ones marked to wait on). """
if self.verbose:
print('%s: waiting for hosts to terminate' % self.exp.name)
for sc in self.wait_hosts:
for sc in self.wait_sims:
await sc.wait()
async def run(self):
try:
self.out.set_start()
await self.before_nics()
await self.run_nics()
graph = self.sim_graph()
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
# start ready simulators in parallel
starts = []
sims = []
for sim in ts.get_ready():
starts.append(self.start_sim(sim))
sims.append(sim)
await self.before_nets()
await self.run_nets()
# wait for starts to complete
await asyncio.wait(starts)
await self.before_hosts()
await self.run_hosts()
for sim in sims:
ts.done(sim)
await self.before_wait()
await self.wait_for_hosts()
await self.wait_for_sims()
except:
self.out.set_failed()
traceback.print_exc()
......@@ -331,76 +307,6 @@ class ExperimentDistributedRunner(ExperimentBaseRunner):
await super().prepare()
def add_proxy_sockets(self, exec, proxy):
# add shared memory region for proxy
self.sockets.append((exec, self.env.proxy_shm_path(proxy)))
# add each listening unix socket
for (nic, local) in proxy.nics:
add = False
if (isinstance(proxy, simbricks.proxy.NetProxyConnecter) and local) \
or (isinstance(proxy, simbricks.proxy.NetProxyListener) \
and not local):
self.sockets.append((exec, self.env.nic_eth_path(nic)))
async def run_proxies_listeners(self):
""" Start all listening proxies. """
if self.verbose:
print('%s: starting listening proxies' % self.exp.name)
for proxy in self.exp.proxies_listen:
if self.verbose:
print('start listening proxy:', proxy.run_cmd(self.env))
exec = self.sim_executor(proxy)
sc = exec.create_component(proxy.full_name(),
shlex.split(proxy.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((proxy, sc))
self.add_proxy_sockets(exec, proxy)
await asyncio.sleep(10)
async def run_proxies_connecters(self):
""" Start all connecting proxies. """
if self.verbose:
print('%s: starting connecting proxies' % self.exp.name)
for proxy in self.exp.proxies_connect:
if self.verbose:
print('start connecting proxy:', proxy.run_cmd(self.env))
exec = self.sim_executor(proxy)
sc = exec.create_component(proxy.full_name(),
shlex.split(proxy.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((proxy, sc))
self.add_proxy_sockets(exec, proxy)
await asyncio.sleep(10)
async def wait_proxy_sockets(self):
""" Make sure all sockets exist. """
paths = {}
for proxy in itertools.chain(
self.exp.proxies_connect, self.exp.proxies_listen):
for (nic, local) in proxy.nics:
if local:
continue
exec = self.sim_executor(proxy)
if exec not in paths:
paths[exec] = []
paths[exec].append(self.env.nic_eth_path(nic))
for (exec, paths) in paths.items():
await exec.await_files(paths, verbose=self.verbose)
async def before_nets(self):
await self.run_proxies_listeners()
await self.run_proxies_connecters()
await self.wait_proxy_sockets()
await super().before_nets()
class ExpEnv(object):
def __init__(self, repo_path, workdir, cpdir):
......
......@@ -28,6 +28,9 @@ class SimProxy(Simulator):
ip = ''
listen = False
def __init__(self):
super().__init__()
def full_name(self):
return 'proxy.' + self.name
......@@ -39,32 +42,92 @@ class NetProxy(SimProxy):
# Shared memory size in GB
shm_size = 2048
def __init__(self):
super().__init__()
def start_delay(self):
return 10
class NetProxyListener(NetProxy):
connecter = None
def __init__(self):
super().__init__()
self.listen = True
self.nics = []
super().__init__()
def add_nic(self, nic):
self.nics.append((nic, True))
# the network this nic connects to now also depends on the peer
nic.network.extra_deps.append(self.connecter)
def dependencies(self):
deps = []
for (nic, local) in self.nics:
if local:
deps.append(nic)
return deps
def sockets_cleanup(self, env):
socks = []
for (nic, local) in self.nics:
if not local:
socks.append(env.nic_eth_path(nic))
return []
# sockets to wait for indicating the simulator is ready
def sockets_wait(self, env):
socks = []
for (nic, local) in self.nics:
if not local:
socks.append(env.nic_eth_path(nic))
return socks
class NetProxyConnecter(NetProxy):
listener = None
def __init__(self, listener):
super().__init__()
self.listener = listener
listener.connecter = self
self.nics = listener.nics
super().__init__()
def add_nic(self, nic):
self.nics.append((nic, False))
# the network this nic connects to now also depends on the proxy
nic.network.extra_deps.append(self.listener)
def dependencies(self):
deps = [self.listener]
for (nic, local) in self.nics:
if not local:
deps.append(nic)
return deps
def sockets_cleanup(self, env):
socks = []
for (nic, local) in self.nics:
if local:
socks.append(env.nic_eth_path(nic))
return []
# sockets to wait for indicating the simulator is ready
def sockets_wait(self, env):
socks = []
for (nic, local) in self.nics:
if local:
socks.append(env.nic_eth_path(nic))
return socks
class RDMANetProxyListener(NetProxyListener):
port = 12345
def __init__(self):
self.listen = True
super().__init__()
self.listen = True
def run_cmd(self, env):
cmd = (f'{env.repodir}/dist/rdma/net_rdma -l '
......
......@@ -23,6 +23,9 @@
import math
class Simulator(object):
def __init__(self):
self.extra_deps = []
# number of cores required for this simulator
def resreq_cores(self):
return 1
......@@ -37,6 +40,24 @@ class Simulator(object):
def run_cmd(self, env):
pass
# Other simulators this one depends on
def dependencies(self):
return []
# Sockets to be cleaned up
def sockets_cleanup(self, env):
return []
# sockets to wait for indicating the simulator is ready
def sockets_wait(self, env):
return []
def start_delay(self):
return 5
def wait_terminate(self):
return False
class HostSim(Simulator):
node_config = None
name = ''
......@@ -50,6 +71,7 @@ class HostSim(Simulator):
def __init__(self):
self.nics = []
super().__init__()
def full_name(self):
return 'host.' + self.name
......@@ -61,6 +83,16 @@ class HostSim(Simulator):
def set_config(self, nc):
self.node_config = nc
def dependencies(self):
deps = []
for nic in self.nics:
deps.append(nic)
deps.append(nic.network)
return deps
def wait_terminate(self):
return self.wait
class NICSim(Simulator):
network = None
name = ''
......@@ -70,6 +102,9 @@ class NICSim(Simulator):
pci_latency = 500
eth_latency = 500
def __init__(self):
super().__init__()
def set_network(self, net):
self.network = net
net.nics.append(self)
......@@ -87,6 +122,13 @@ class NICSim(Simulator):
def full_name(self):
return 'nic.' + self.name
def sockets_cleanup(self, env):
return [env.nic_pci_path(self), env.nic_eth_path(self),
env.nic_shm_path(self)]
def sockets_wait(self, env):
return [env.nic_pci_path(self), env.nic_eth_path(self)]
class NetSim(Simulator):
name = ''
opt = ''
......@@ -100,9 +142,22 @@ class NetSim(Simulator):
def full_name(self):
return 'net.' + self.name
def connect_sockets(self, env):
sockets = []
for n in self.nics:
sockets.append((n, env.nic_eth_path(n)))
return sockets
def dependencies(self):
return self.nics
class QemuHost(HostSim):
sync = False
def __init__(self):
super().__init__()
def resreq_cores(self):
if self.sync:
return 1
......@@ -165,6 +220,8 @@ class Gem5Host(HostSim):
cpu_type = 'TimingSimpleCPU'
sys_clock = '1GHz'
def __init__(self):
super().__init__()
def set_config(self, nc):
nc.sim = 'gem5'
......@@ -225,6 +282,9 @@ class Gem5Host(HostSim):
class CorundumVerilatorNIC(NICSim):
clock_freq = 250 # MHz
def __init__(self):
super().__init__()
def resreq_mem(self):
# this is a guess
return 512
......@@ -234,21 +294,31 @@ class CorundumVerilatorNIC(NICSim):
str(self.clock_freq))
class CorundumBMNIC(NICSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
return self.basic_run_cmd(env, '/corundum_bm/corundum_bm')
class I40eNIC(NICSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
return self.basic_run_cmd(env, '/i40e_bm/i40e_bm')
class WireNet(NetSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
assert len(self.nics) == 2
connects = self.connect_sockets()
assert len(connects) == 2
cmd = '%s/sims/net/wire/net_wire %s %s %d %d %d' % \
(env.repodir, env.nic_eth_path(self.nics[0]),
env.nic_eth_path(self.nics[1]),
(env.repodir, connects[0][1],
connects[1][1],
self.sync_mode, self.sync_period, self.eth_latency)
if len(env.pcap_file) > 0:
cmd += ' ' + env.pcap_file
......@@ -257,6 +327,9 @@ class WireNet(NetSim):
class SwitchNet(NetSim):
sync = True
def __init__(self):
super().__init__()
def run_cmd(self, env):
cmd = env.repodir + '/sims/net/switch/net_switch'
cmd += f' -m {self.sync_mode} -S {self.sync_period} -E {self.eth_latency}'
......@@ -266,26 +339,43 @@ class SwitchNet(NetSim):
if len(env.pcap_file) > 0:
cmd += ' -p ' + env.pcap_file
for n in self.nics:
cmd += ' -s ' + env.nic_eth_path(n)
for (_,n) in self.connect_sockets(env):
cmd += ' -s ' + n
for (_,n) in self.listen_sockets(env):
cmd += ' -h ' + n
return cmd
def sockets_cleanup(self, env):
# cleanup here will just have listening eth sockets, switch also creates
# shm regions for each with a "-shm" suffix
cleanup = []
for s in super().sockets_cleanup(env):
cleanup.append(s)
cleanup.append(s + '-shm')
return cleanup
class TofinoNet(NetSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
cmd = env.repodir + '/sims/tofino/tofino'
cmd += f' -m {self.sync_mode} -S {self.sync_period} -E {self.eth_latency}'
for n in self.nics:
cmd += ' -s ' + env.nic_eth_path(n)
for (_,n) in self.connect_sockets():
cmd += ' -s ' + n
return cmd
class NS3DumbbellNet(NetSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
ports = ''
for n in self.nics:
for (n,s) in self.connect_sockets():
if 'server' in n.name:
ports += '--CosimPortLeft=' + env.nic_eth_path(n) + ' '
ports += '--CosimPortLeft=' + s + ' '
else:
ports += '--CosimPortRight=' + env.nic_eth_path(n) + ' '
ports += '--CosimPortRight=' + s + ' '
cmd = env.repodir + '/sims/external/ns-3' + '/cosim-run.sh cosim cosim-dumbbell-example ' + ports + ' ' + self.opt
print(cmd)
......@@ -293,10 +383,13 @@ class NS3DumbbellNet(NetSim):
return cmd
class NS3BridgeNet(NetSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
ports = ''
for n in self.nics:
ports += '--CosimPort=' + env.nic_eth_path(n) + ' '
for (_,n) in self.connect_sockets():
ports += '--CosimPort=' + n + ' '
cmd = env.repodir + '/sims/external/ns-3' + '/cosim-run.sh cosim cosim-bridge-example ' + ports + ' ' + self.opt
print(cmd)
......@@ -304,15 +397,18 @@ class NS3BridgeNet(NetSim):
return cmd
class NS3SequencerNet(NetSim):
def __init__(self):
super().__init__()
def run_cmd(self, env):
ports = ''
for n in self.nics:
for (n,s) in self.connect_sockets():
if 'client' in n.name:
ports += '--ClientPort=' + env.nic_eth_path(n) + ' '
ports += '--ClientPort=' + s + ' '
elif 'replica' in n.name:
ports += '--ServerPort=' + env.nic_eth_path(n) + ' '
ports += '--ServerPort=' + s + ' '
elif 'sequencer' in n.name:
ports += '--EndhostSequencerPort=' + env.nic_eth_path(n) + ' '
ports += '--EndhostSequencerPort=' + s + ' '
else:
raise Exception('Wrong NIC type')
cmd = env.repodir + '/sims/external/ns-3' + '/cosim-run.sh sequencer sequencer-single-switch-example ' + ports + ' ' + self.opt
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment