Unverified Commit 8eb3a0ac authored by Jakob Görgen's avatar Jakob Görgen
Browse files

minor changes

parent b5ef0c65
...@@ -292,6 +292,10 @@ class Instantiation(util_base.IdObj): ...@@ -292,6 +292,10 @@ class Instantiation(util_base.IdObj):
) )
return path return path
# TODO: fixme
def cfgtar_path(self, sim: Simulator) -> str:
return f'{self.workdir}/cfg.{sim.name}.tar'
def join_tmp_base(self, relative_path: str) -> str: def join_tmp_base(self, relative_path: str) -> str:
return self._join_paths( return self._join_paths(
base=self._env._tmp_simulation_files, base=self._env._tmp_simulation_files,
......
...@@ -33,7 +33,6 @@ from simbricks.orchestration.utils import graphlib ...@@ -33,7 +33,6 @@ from simbricks.orchestration.utils import graphlib
from simbricks.orchestration.simulation import base as sim_base from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
...@@ -45,12 +44,12 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -45,12 +44,12 @@ class ExperimentBaseRunner(abc.ABC):
self._verbose: bool = verbose self._verbose: bool = verbose
self._profile_int: int | None = None self._profile_int: int | None = None
self._out = sim_base.SimulationOutput(self._simulation) self._out = sim_base.SimulationOutput(self._simulation)
self._running: list[tuple[sim_base.Simulator, simulation_executor.SimpleComponent]] = [] self._running: list[tuple[sim_base.Simulator, command_executor.SimpleComponent]] = []
self._sockets: list[tuple[simulation_executor.Executor, inst_base.Socket]] = [] self._sockets: list[tuple[command_executor.Executor, inst_base.Socket]] = []
self._wait_sims: list[simulation_executor.Component] = [] self._wait_sims: list[command_executor.Component] = []
@abc.abstractmethod @abc.abstractmethod
def sim_executor(self, simulator: sim_base.Simulator) -> simulation_executor.Executor: def sim_executor(self, simulator: sim_base.Simulator) -> command_executor.Executor:
pass pass
def sim_graph(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]: def sim_graph(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]:
...@@ -124,18 +123,17 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -124,18 +123,17 @@ class ExperimentBaseRunner(abc.ABC):
# TODO: FIXME # TODO: FIXME
for host in self.exp.hosts: for host in self.exp.hosts:
path = self.env.cfgtar_path(host) path = self.env.cfgtar_path(host)
if self.verbose: if self._verbose:
print('preparing config tar:', path) print('preparing config tar:', path)
# TODO: FIXME # TODO: FIXME
host.node_config.make_tar(self.env, path) host.node_config.make_tar(self.env, path)
executor = self.sim_executor(host) executor = self.sim_executor(host)
task = asyncio.create_task(executor.send_file(path, self.verbose)) task = asyncio.create_task(executor.send_file(path, self._verbose))
copies.append(task) copies.append(task)
await asyncio.gather(*copies) await asyncio.gather(*copies)
# prepare all simulators in parallel # prepare all simulators in parallel
sims = [] sims = []
# TODO: FIXME
for sim in self._simulation.all_simulators(): for sim in self._simulation.all_simulators():
prep_cmds = list(sim.prep_cmds(inst=self._instantiation)) prep_cmds = list(sim.prep_cmds(inst=self._instantiation))
executor = self.sim_executor(sim) executor = self.sim_executor(sim)
...@@ -149,16 +147,16 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -149,16 +147,16 @@ class ExperimentBaseRunner(abc.ABC):
async def wait_for_sims(self) -> None: async def wait_for_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on).""" """Wait for simulators to terminate (the ones marked to wait on)."""
if self.verbose: if self._verbose:
print(f'{self.exp.name}: waiting for hosts to terminate') print(f'{self._simulation.name}: waiting for hosts to terminate')
for sc in self.wait_sims: for sc in self._wait_sims:
await sc.wait() await sc.wait()
async def terminate_collect_sims(self) -> sim_base.SimulationOutput: async def terminate_collect_sims(self) -> sim_base.SimulationOutput:
"""Terminates all simulators and collects output.""" """Terminates all simulators and collects output."""
self._out.set_end() self._out.set_end()
if self.verbose: if self._verbose:
print(f'{self.exp.name}: cleaning up') print(f'{self._simulation.name}: cleaning up')
await self.before_cleanup() await self.before_cleanup()
...@@ -261,14 +259,14 @@ class ExperimentDistributedRunner(ExperimentBaseRunner): ...@@ -261,14 +259,14 @@ class ExperimentDistributedRunner(ExperimentBaseRunner):
# TODO: FIXME # TODO: FIXME
def __init__( def __init__(
self, execs: dict[sim -> Executor], exp: DistributedExperiment, *args, **kwargs self, execs: dict[sim -> command_executor.Executor], exp: DistributedExperiment, *args, **kwargs
) -> None: ) -> None:
self.execs = execs self.execs = execs
super().__init__(exp, *args, **kwargs) super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs) assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim) -> Executor: def sim_executor(self, sim) -> command_executor.Executor:
h_id = self.exp.host_mapping[sim] h_id = self.exp.host_mapping[sim]
return self.execs[h_id] return self.execs[h_id]
......
...@@ -29,6 +29,7 @@ import typing as tp ...@@ -29,6 +29,7 @@ import typing as tp
import simbricks.orchestration.system as sys_conf import simbricks.orchestration.system as sys_conf
from simbricks.orchestration.experiment import experiment_environment_new as exp_env from simbricks.orchestration.experiment import experiment_environment_new as exp_env
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.simulation import channel as sim_chan
from simbricks.orchestration.utils import base as utils_base from simbricks.orchestration.utils import base as utils_base
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
...@@ -55,7 +56,7 @@ class Simulator(utils_base.IdObj): ...@@ -55,7 +56,7 @@ class Simulator(utils_base.IdObj):
self.name: str = name self.name: str = name
self._relative_executable_path: str = relative_executable_path self._relative_executable_path: str = relative_executable_path
self.extra_deps: list[Simulator] = [] self.extra_deps: list[Simulator] = []
self.experiment: sim_base.Simulation = simulation self._simulation: sim_base.Simulation = simulation
self._components: set[sys_conf.Component] = set() self._components: set[sys_conf.Component] = set()
@staticmethod @staticmethod
...@@ -78,6 +79,24 @@ class Simulator(utils_base.IdObj): ...@@ -78,6 +79,24 @@ class Simulator(utils_base.IdObj):
) )
return listen, connect return listen, connect
# helper method for simulators that do not support
# multiple sync periods etc. Should become eventually
# at some point in the future...
@staticmethod
def get_unique_latency_period_sync(
channels: list[sim_chan.Channel],
) -> tuple[int, int, bool]:
eth_latency = None
sync_period = None
run_sync = False
for channel in channels:
sync_period = min(sync_period, channel.sync_period)
run_sync = run_sync or channel._synchronized
eth_latency = max(eth_latency, channel.sys_channel.eth_latency)
if eth_latency is None or sync_period is None:
raise Exception("could not determine eth_latency and sync_period")
return eth_latency, sync_period, sync
def resreq_cores(self) -> int: def resreq_cores(self) -> int:
""" """
Number of cores this simulator requires during execution. Number of cores this simulator requires during execution.
...@@ -107,7 +126,7 @@ class Simulator(utils_base.IdObj): ...@@ -107,7 +126,7 @@ class Simulator(utils_base.IdObj):
if comp in self._components: if comp in self._components:
raise Exception("cannot add the same specification twice to a simulator") raise Exception("cannot add the same specification twice to a simulator")
self._components.add(comp) self._components.add(comp)
self.experiment.add_spec_sim_map(comp, self) self._simulation.add_spec_sim_map(comp, self)
def _chan_needs_instance(self, chan: sys_conf.Channel) -> bool: def _chan_needs_instance(self, chan: sys_conf.Channel) -> bool:
if ( if (
...@@ -245,19 +264,19 @@ class Simulation(utils_base.IdObj): ...@@ -245,19 +264,19 @@ class Simulation(utils_base.IdObj):
by first running in a less accurate mode, then checkpointing the system by first running in a less accurate mode, then checkpointing the system
state after boot and running simulations from there. state after boot and running simulations from there.
""" """
self.hosts: list[HostSim] = [] # self.hosts: list[HostSim] = []
"""The host simulators to run.""" # """The host simulators to run."""
self.pcidevs: list[PCIDevSim] = [] # self.pcidevs: list[PCIDevSim] = []
"""The PCIe device simulators to run.""" # """The PCIe device simulators to run."""
self.memdevs: list[MemDevSim] = [] # self.memdevs: list[MemDevSim] = []
"""The memory device simulators to run.""" # """The memory device simulators to run."""
self.netmems: list[NetMemSim] = [] # self.netmems: list[NetMemSim] = []
"""The network memory simulators to run.""" # """The network memory simulators to run."""
self.networks: list[NetSim] = [] # self.networks: list[NetSim] = []
"""The network simulators to run.""" # """The network simulators to run."""
self.metadata: dict[str, tp.Any] = {} self.metadata: dict[str, tp.Any] = {}
self.sys_sim_map: dict[sys_conf.Component, Simulator] = {} self._sys_sim_map: dict[sys_conf.Component, Simulator] = {}
"""System component and its simulator pairs""" """System component and its simulator pairs"""
self._chan_map: dict[sys_conf.Channel, Channel] = {} self._chan_map: dict[sys_conf.Channel, Channel] = {}
...@@ -265,9 +284,9 @@ class Simulation(utils_base.IdObj): ...@@ -265,9 +284,9 @@ class Simulation(utils_base.IdObj):
def add_spec_sim_map(self, sys: sys_conf.Component, sim: Simulator): def add_spec_sim_map(self, sys: sys_conf.Component, sim: Simulator):
"""Add a mapping from specification to simulation instance""" """Add a mapping from specification to simulation instance"""
if sys in self.sys_sim_map: if sys in self._sys_sim_map:
raise Exception("system component is already mapped by simulator") raise Exception("system component is already mapped by simulator")
self.sys_sim_map[sys] = sim self._sys_sim_map[sys] = sim
def is_channel_instantiated(self, chan: Channel) -> bool: def is_channel_instantiated(self, chan: Channel) -> bool:
return chan in self._chan_map return chan in self._chan_map
...@@ -280,52 +299,18 @@ class Simulation(utils_base.IdObj): ...@@ -280,52 +299,18 @@ class Simulation(utils_base.IdObj):
self._chan_map[chan] = channel self._chan_map[chan] = channel
return channel return channel
@property def all_simulators(self) -> set[Simulator]:
def nics(self):
return filter(lambda pcidev: pcidev.is_nic(), self.pcidevs)
def add_host(self, sim: HostSim) -> None:
"""Add a host simulator to the experiment."""
for h in self.hosts:
if h.name == sim.name:
raise ValueError("Duplicate host name")
self.hosts.append(sim)
def add_nic(self, sim: NICSim | I40eMultiNIC):
"""Add a NIC simulator to the experiment."""
self.add_pcidev(sim)
def add_pcidev(self, sim: PCIDevSim) -> None:
"""Add a PCIe device simulator to the experiment."""
for d in self.pcidevs:
if d.name == sim.name:
raise ValueError("Duplicate pcidev name")
self.pcidevs.append(sim)
def add_memdev(self, sim: simulators.MemDevSim):
for d in self.memdevs:
if d.name == sim.name:
raise ValueError("Duplicate memdev name")
self.memdevs.append(sim)
def add_netmem(self, sim: simulators.NetMemSim):
for d in self.netmems:
if d.name == sim.name:
raise ValueError("Duplicate netmems name")
self.netmems.append(sim)
def add_network(self, sim: NetSim) -> None:
"""Add a network simulator to the experiment."""
for n in self.networks:
if n.name == sim.name:
raise ValueError("Duplicate net name")
self.networks.append(sim)
def all_simulators(self) -> tp.Iterable[Simulator]:
"""Returns all simulators defined to run in this experiment.""" """Returns all simulators defined to run in this experiment."""
return itertools.chain( simulators = set(map(lambda kv: kv[1], self._sys_sim_map.items()))
self.hosts, self.pcidevs, self.memdevs, self.netmems, self.networks return simulators
)
# TODO: FIXME, either by filtering using specification type or simulator type
def nics() -> list[Simulator]:
pass
# TODO FIXME, either by filtering using specification type or simulator type
def hosts() -> list[Simulator]:
pass
def resreq_mem(self) -> int: def resreq_mem(self) -> int:
"""Memory required to run all simulators in this experiment.""" """Memory required to run all simulators in this experiment."""
...@@ -343,13 +328,15 @@ class Simulation(utils_base.IdObj): ...@@ -343,13 +328,15 @@ class Simulation(utils_base.IdObj):
def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator: def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator:
"""Returns the used simulator object for the system component.""" """Returns the used simulator object for the system component."""
if comp not in self.sys_sim_map: if comp not in self._sys_sim_map:
raise Exception("Simulator Not Found") raise Exception("Simulator Not Found")
return self.sys_sim_map[comp] return self._sys_sim_map[comp]
# TODO: FIXME
def enable_checkpointing_if_supported() -> None: def enable_checkpointing_if_supported() -> None:
raise Exception("not implemented") raise Exception("not implemented")
# TODO: FIXME
def is_checkpointing_enabled(self) -> bool: def is_checkpointing_enabled(self) -> bool:
raise Exception("not implemented") raise Exception("not implemented")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment