Unverified Commit ce2027ee authored by Jakob Görgen's avatar Jakob Görgen
Browse files

fixed socket tuple error + fixed ip add addr command

parent af98d725
...@@ -57,8 +57,10 @@ ethchannel0 = system.EthChannel(switch.eth_ifs[0], nic0._eth_if) ...@@ -57,8 +57,10 @@ ethchannel0 = system.EthChannel(switch.eth_ifs[0], nic0._eth_if)
ethchannel1 = system.EthChannel(switch.eth_ifs[1], nic1._eth_if) ethchannel1 = system.EthChannel(switch.eth_ifs[1], nic1._eth_if)
# configure the software to run on the host # configure the software to run on the host
host0.add_app(system.NetperfClient(host0, nic1._ip)) # host0.add_app(system.NetperfClient(host0, nic1._ip))
host1.add_app(system.NetperfServer(host1)) # host1.add_app(system.NetperfServer(host1))
host0.add_app(system.PingClient(host0, nic1._ip))
host1.add_app(system.Sleep(host1, infinite=True))
""" """
Execution Config Execution Config
...@@ -103,9 +105,11 @@ for host_type in host_types: ...@@ -103,9 +105,11 @@ for host_type in host_types:
host_inst0 = host_sim(simulation) host_inst0 = host_sim(simulation)
host_inst0.add(host0) host_inst0.add(host0)
host_inst0.wait_terminate = True host_inst0.wait_terminate = True
host_inst0.cpu_type = 'X86KvmCPU'
host_inst1 = host_sim(simulation) host_inst1 = host_sim(simulation)
host_inst1.add(host1) host_inst1.add(host1)
host_inst1.cpu_type = 'X86KvmCPU'
nic_inst0 = nic_sim(simulation) nic_inst0 = nic_sim(simulation)
nic_inst0.add(nic0) nic_inst0.add(nic0)
...@@ -116,9 +120,9 @@ for host_type in host_types: ...@@ -116,9 +120,9 @@ for host_type in host_types:
net_inst = net_sim(simulation) net_inst = net_sim(simulation)
net_inst.add(switch) net_inst.add(switch)
sim_helpers.enable_sync_simulation( # sim_helpers.enable_sync_simulation(
simulation=simulation, amount=500, ratio=sim.Time.Nanoseconds # simulation=simulation, amount=500, ratio=sim.Time.Nanoseconds
) # )
print(simulation.name + " all simulators:") print(simulation.name + " all simulators:")
sims = simulation.all_simulators() sims = simulation.all_simulators()
......
...@@ -26,7 +26,6 @@ import asyncio ...@@ -26,7 +26,6 @@ import asyncio
import itertools import itertools
import shlex import shlex
import traceback import traceback
import typing as tp
import abc import abc
from simbricks.orchestration.utils import graphlib from simbricks.orchestration.utils import graphlib
...@@ -39,46 +38,42 @@ from simbricks.orchestration.runtime_new import command_executor ...@@ -39,46 +38,42 @@ from simbricks.orchestration.runtime_new import command_executor
class ExperimentBaseRunner(abc.ABC): class ExperimentBaseRunner(abc.ABC):
def __init__(self, simulation: sim_base.Simulation, instantiation: inst_base.Instantiation, verbose: bool) -> None: def __init__(
self,
simulation: sim_base.Simulation,
instantiation: inst_base.Instantiation,
verbose: bool,
) -> None:
self._simulation: sim_base.Simulation = simulation self._simulation: sim_base.Simulation = simulation
self._instantiation: inst_base.Instantiation = instantiation self._instantiation: inst_base.Instantiation = instantiation
self._verbose: bool = verbose self._verbose: bool = verbose
self._profile_int: int | None = None self._profile_int: int | None = None
self._out = output.SimulationOutput(self._simulation) self._out = output.SimulationOutput(self._simulation)
self._running: list[tuple[sim_base.Simulator, command_executor.SimpleComponent]] = [] self._running: list[
self._sockets: list[tuple[command_executor.Executor, inst_base.Socket]] = [] tuple[sim_base.Simulator, command_executor.SimpleComponent]
] = []
self._sockets: list[inst_base.Socket] = []
self._wait_sims: list[command_executor.Component] = [] self._wait_sims: list[command_executor.Component] = []
@abc.abstractmethod @abc.abstractmethod
def sim_executor(self, simulator: sim_base.Simulator) -> command_executor.Executor: def sim_executor(self, simulator: sim_base.Simulator) -> command_executor.Executor:
pass pass
# def sim_graph(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]:
# sims = self._simulation.all_simulators()
# graph = {}
# for sim in sims:
# deps = sim.dependencies() + sim.extra_deps
# print(f'deps of {sim}: {sim.dependencies()}')
# graph[sim] = set()
# for d in deps:
# graph[sim].add(d)
# return graph
async def start_sim(self, sim: sim_base.Simulator) -> None: async def start_sim(self, sim: sim_base.Simulator) -> None:
"""Start a simulator and wait for it to be ready.""" """Start a simulator and wait for it to be ready."""
name = sim.full_name() name = sim.full_name()
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: starting {name}') print(f"{self._simulation.name}: starting {name}")
run_cmd = sim.run_cmd(self._instantiation) run_cmd = sim.run_cmd(self._instantiation)
if run_cmd is None: if run_cmd is None:
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: started dummy {name}') print(f"{self._simulation.name}: started dummy {name}")
return return
# run simulator # run simulator
executor = self.sim_executor(sim) executor = self._instantiation.executor
sc = executor.create_component( sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self._verbose, canfail=True name, shlex.split(run_cmd), verbose=self._verbose, canfail=True
) )
...@@ -87,16 +82,18 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -87,16 +82,18 @@ class ExperimentBaseRunner(abc.ABC):
# add sockets for cleanup # add sockets for cleanup
for sock in sim.sockets_cleanup(inst=self._instantiation): for sock in sim.sockets_cleanup(inst=self._instantiation):
self._sockets.append((executor, sock)) self._sockets.append(sock)
# Wait till sockets exist # Wait till sockets exist
wait_socks = sim.sockets_wait(inst=self._instantiation) wait_socks = sim.sockets_wait(inst=self._instantiation)
if len(wait_socks) > 0: if len(wait_socks) > 0:
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: waiting for sockets {name}') print(f"{self._simulation.name}: waiting for sockets {name}")
await self._instantiation.wait_for_sockets(sockets=wait_socks) await self._instantiation.wait_for_sockets(sockets=wait_socks)
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: waited successfully for sockets {name}') print(
f"{self._simulation.name}: waited successfully for sockets {name}"
)
# add time delay if required # add time delay if required
delay = sim.start_delay() delay = sim.start_delay()
...@@ -107,7 +104,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -107,7 +104,7 @@ class ExperimentBaseRunner(abc.ABC):
self._wait_sims.append(sc) self._wait_sims.append(sc)
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: started {name}') print(f"{self._simulation.name}: started {name}")
async def before_wait(self) -> None: async def before_wait(self) -> None:
pass pass
...@@ -119,42 +116,15 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -119,42 +116,15 @@ class ExperimentBaseRunner(abc.ABC):
pass pass
async def prepare(self) -> None: async def prepare(self) -> None:
# generate config tars
# copies = []
# for host in self.exp.hosts:
# path = self.env.cfgtar_path(host)
# if self._verbose:
# print('preparing config tar:', path)
# # TODO: FIXME
# host.node_config.make_tar(self.env, path)
# executor = self.sim_executor(host)
# task = asyncio.create_task(executor.send_file(path, self._verbose))
# copies.append(task)
# await asyncio.gather(*copies)
# TODO: FIXME # TODO: FIXME
executor = command_executor.LocalExecutor() executor = command_executor.LocalExecutor()
self._instantiation.executor = executor self._instantiation.executor = executor
await self._instantiation.prepare() await self._instantiation.prepare()
# prepare all simulators in parallel
# sims = []
# for sim in self._simulation.all_simulators():
# sim.prep_tar(self._instantiation)
# prep_cmds = list(sim.prep_cmds(inst=self._instantiation))
# executor = self.sim_executor(sim)
# task = asyncio.create_task(
# executor.run_cmdlist(
# 'prepare_' + self._simulation.name, prep_cmds, verbose=self._verbose
# )
# )
# sims.append(task)
# await asyncio.gather(*sims)
async def wait_for_sims(self) -> None: async def wait_for_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on).""" """Wait for simulators to terminate (the ones marked to wait on)."""
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: waiting for hosts to terminate') print(f"{self._simulation.name}: waiting for hosts to terminate")
for sc in self._wait_sims: for sc in self._wait_sims:
await sc.wait() await sc.wait()
...@@ -162,7 +132,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -162,7 +132,7 @@ class ExperimentBaseRunner(abc.ABC):
"""Terminates all simulators and collects output.""" """Terminates all simulators and collects output."""
self._out.set_end() self._out.set_end()
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: cleaning up') print(f"{self._simulation.name}: cleaning up")
await self.before_cleanup() await self.before_cleanup()
...@@ -190,7 +160,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -190,7 +160,7 @@ class ExperimentBaseRunner(abc.ABC):
assert self._profile_int assert self._profile_int
while True: while True:
await asyncio.sleep(self._profile_int) await asyncio.sleep(self._profile_int)
for (_, sc) in self._running: for _, sc in self._running:
await sc.sigusr1() await sc.sigusr1()
async def run(self) -> output.SimulationOutput: async def run(self) -> output.SimulationOutput:
...@@ -222,7 +192,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -222,7 +192,7 @@ class ExperimentBaseRunner(abc.ABC):
await self.wait_for_sims() await self.wait_for_sims()
except asyncio.CancelledError: except asyncio.CancelledError:
if self._verbose: if self._verbose:
print(f'{self._simulation.name}: interrupted') print(f"{self._simulation.name}: interrupted")
self._out.set_interrupted() self._out.set_interrupted()
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
self._out.set_failed() self._out.set_failed()
...@@ -236,9 +206,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -236,9 +206,7 @@ class ExperimentBaseRunner(abc.ABC):
# The bare except above guarantees that we always execute the following # The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output # code, which terminates all simulators and produces a proper output
# file. # file.
terminate_collect_task = asyncio.create_task( terminate_collect_task = asyncio.create_task(self.terminate_collect_sims())
self.terminate_collect_sims()
)
# prevent terminate_collect_task from being cancelled # prevent terminate_collect_task from being cancelled
while True: while True:
try: try:
...@@ -263,9 +231,7 @@ class ExperimentDistributedRunner(ExperimentBaseRunner): ...@@ -263,9 +231,7 @@ class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor.""" """Simple experiment runner with just one executor."""
# TODO: FIXME # TODO: FIXME
def __init__( def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs) -> None:
self, execs, exp: DistributedExperiment, *args, **kwargs
) -> None:
self.execs = execs self.execs = execs
super().__init__(exp, *args, **kwargs) super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class self.exp = exp # overrides the type in the base class
...@@ -280,9 +246,7 @@ class ExperimentDistributedRunner(ExperimentBaseRunner): ...@@ -280,9 +246,7 @@ class ExperimentDistributedRunner(ExperimentBaseRunner):
assert self.exp.all_sims_assigned() assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors # set IP addresses for proxies based on assigned executors
for p in itertools.chain( for p in itertools.chain(self.exp.proxies_listen, self.exp.proxies_connect):
self.exp.proxies_listen, self.exp.proxies_connect
):
executor = self.sim_executor(p) executor = self.sim_executor(p)
p.ip = executor.ip p.ip = executor.ip
......
...@@ -30,6 +30,7 @@ from simbricks.orchestration.instantiation import base as inst_base ...@@ -30,6 +30,7 @@ from simbricks.orchestration.instantiation import base as inst_base
if tp.TYPE_CHECKING: if tp.TYPE_CHECKING:
from simbricks.orchestration.system import host as sys_host from simbricks.orchestration.system import host as sys_host
class Application(abc.ABC): class Application(abc.ABC):
def __init__(self, h: sys_host.Host) -> None: def __init__(self, h: sys_host.Host) -> None:
self.host = h self.host = h
...@@ -54,7 +55,7 @@ class BaseLinuxApplication(abc.ABC): ...@@ -54,7 +55,7 @@ class BaseLinuxApplication(abc.ABC):
if self.end_delay is None: if self.end_delay is None:
return [] return []
else: else:
return [f'sleep {self.start_delay}'] return [f"sleep {self.start_delay}"]
def config_files(self, inst: inst_base.Instantiation) -> dict[str, tp.IO]: def config_files(self, inst: inst_base.Instantiation) -> dict[str, tp.IO]:
""" """
...@@ -75,7 +76,7 @@ class BaseLinuxApplication(abc.ABC): ...@@ -75,7 +76,7 @@ class BaseLinuxApplication(abc.ABC):
if self.end_delay is None: if self.end_delay is None:
return [] return []
else: else:
return [f'sleep {self.end_delay}'] return [f"sleep {self.end_delay}"]
def strfile(self, s: str) -> io.BytesIO: def strfile(self, s: str) -> io.BytesIO:
""" """
...@@ -89,21 +90,24 @@ class BaseLinuxApplication(abc.ABC): ...@@ -89,21 +90,24 @@ class BaseLinuxApplication(abc.ABC):
class PingClient(BaseLinuxApplication): class PingClient(BaseLinuxApplication):
def __init__(self, h: sys_host.LinuxHost, server_ip: str = '192.168.64.1') -> None: def __init__(self, h: sys_host.LinuxHost, server_ip: str = "192.168.64.1") -> None:
super().__init__(h) super().__init__(h)
self.server_ip = server_ip self.server_ip = server_ip
def run_cmds(self, inst: inst_base.Instantiation) -> tp.List[str]: def run_cmds(self, inst: inst_base.Instantiation) -> tp.List[str]:
return [f'ping {self.server_ip} -c 10'] return [f"ping {self.server_ip} -c 10"]
class Sleep(BaseLinuxApplication): class Sleep(BaseLinuxApplication):
def __init__(self, h: sys_host.LinuxHost, delay: float = 10) -> None: def __init__(self, h: sys_host.LinuxHost, delay: float = 10, infinite: bool = False) -> None:
super().__init__(h) super().__init__(h)
self.infinite: bool = infinite
self.delay = delay self.delay = delay
def run_cmds(self, inst: inst_base.Instantiation) -> list[str]: def run_cmds(self, inst: inst_base.Instantiation) -> list[str]:
return [f'sleep {self.delay}'] if self.infinite:
return [f"sleep infinity"]
return [f"sleep {self.delay}"]
class NetperfServer(BaseLinuxApplication): class NetperfServer(BaseLinuxApplication):
...@@ -111,11 +115,11 @@ class NetperfServer(BaseLinuxApplication): ...@@ -111,11 +115,11 @@ class NetperfServer(BaseLinuxApplication):
super().__init__(h) super().__init__(h)
def run_cmds(self, inst: inst_base.Instantiation) -> list[str]: def run_cmds(self, inst: inst_base.Instantiation) -> list[str]:
return ['netserver', 'sleep infinity'] return ["netserver", "sleep infinity"]
class NetperfClient(BaseLinuxApplication): class NetperfClient(BaseLinuxApplication):
def __init__(self, h: sys_host.LinuxHost, server_ip: str = '192.168.64.1') -> None: def __init__(self, h: sys_host.LinuxHost, server_ip: str = "192.168.64.1") -> None:
super().__init__(h) super().__init__(h)
self.server_ip = server_ip self.server_ip = server_ip
self.duration_tp = 10 self.duration_tp = 10
...@@ -123,11 +127,11 @@ class NetperfClient(BaseLinuxApplication): ...@@ -123,11 +127,11 @@ class NetperfClient(BaseLinuxApplication):
def run_cmds(self, inst: inst_base.Instantiation) -> list[str]: def run_cmds(self, inst: inst_base.Instantiation) -> list[str]:
return [ return [
'netserver', "netserver",
'sleep 0.5', "sleep 0.5",
f'netperf -H {self.server_ip} -l {self.duration_tp}', f"netperf -H {self.server_ip} -l {self.duration_tp}",
( (
f'netperf -H {self.server_ip} -l {self.duration_lat} -t TCP_RR' f"netperf -H {self.server_ip} -l {self.duration_lat} -t TCP_RR"
' -- -o mean_latency,p50_latency,p90_latency,p99_latency' " -- -o mean_latency,p50_latency,p90_latency,p99_latency"
) ),
] ]
\ No newline at end of file
...@@ -168,19 +168,27 @@ class LinuxHost(BaseLinuxHost): ...@@ -168,19 +168,27 @@ class LinuxHost(BaseLinuxHost):
def __init__(self, sys) -> None: def __init__(self, sys) -> None:
super().__init__(sys) super().__init__(sys)
self.drivers: list[str] = [] self.drivers: list[str] = []
self.hostname: str | None = "ubuntu"
def cleanup_cmds(self, inst: instantiation.Instantiation) -> list[str]: def cleanup_cmds(self, inst: instantiation.Instantiation) -> list[str]:
return super().cleanup_cmds(inst) + ["poweroff -f"] return super().cleanup_cmds(inst) + ["poweroff -f"]
def prepare_pre_cp(self, inst: instantiation.Instantiation) -> list[str]: def prepare_pre_cp(self, inst: instantiation.Instantiation) -> list[str]:
"""Commands to run to prepare node before checkpointing.""" """Commands to run to prepare node before checkpointing."""
return [ cmds = [
"set -x", "set -x",
"export HOME=/root", "export HOME=/root",
"export LANG=en_US", "export LANG=en_US",
'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:' 'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:'
+ '/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"', + '/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"',
] + super().prepare_pre_cp(inst) ]
if self.hostname is not None:
cmds += [
f"hostname -b {self.hostname}",
f'echo "127.0.1.1 {self.hostname}\n" >> /etc/hosts',
]
cmds += super().prepare_pre_cp(inst)
return cmds
def prepare_post_cp(self, inst) -> list[str]: def prepare_post_cp(self, inst) -> list[str]:
cmds = super().prepare_post_cp(inst) cmds = super().prepare_post_cp(inst)
...@@ -215,7 +223,7 @@ class LinuxHost(BaseLinuxHost): ...@@ -215,7 +223,7 @@ class LinuxHost(BaseLinuxHost):
# Add IP addresses if included # Add IP addresses if included
assert com._ip is not None assert com._ip is not None
cmds.append(f"ip addr add {com._ip} dev {ifn}") cmds.append(f"ip addr add {com._ip}/24 dev {ifn}")
return cmds return cmds
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment