Unverified Commit a0494ee3 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

support for gem5 checkpointing + execution of multiple runs

parent e724a160
...@@ -14,7 +14,7 @@ This scripts generates the experiments with all the combinations of different ex ...@@ -14,7 +14,7 @@ This scripts generates the experiments with all the combinations of different ex
host_types = ["gem5"] host_types = ["gem5"]
nic_types = ["i40e"] nic_types = ["i40e"]
net_types = ["switch"] net_types = ["switch"]
experiments = [] instantiations: list[inst.Instantiation] = []
sys = system.System() sys = system.System()
...@@ -121,7 +121,8 @@ for host_type in host_types: ...@@ -121,7 +121,8 @@ for host_type in host_types:
else: else:
raise NameError(net_type) raise NameError(net_type)
host_inst0 = sim.QemuSim(simulation) host_inst0 = sim.Gem5Sim(simulation)
# host_inst0 = sim.QemuSim(simulation)
host_inst0.add(host0) host_inst0.add(host0)
host_inst0.name = "Client-Host" host_inst0.name = "Client-Host"
# host_inst0.wait_terminate = True # host_inst0.wait_terminate = True
...@@ -129,7 +130,7 @@ for host_type in host_types: ...@@ -129,7 +130,7 @@ for host_type in host_types:
# host_inst1 = sim.Gem5Sim(simulation) # host_inst1 = sim.Gem5Sim(simulation)
host_inst1 = sim.QemuSim(simulation) host_inst1 = sim.QemuSim(simulation)
host_inst1.name = "Server-Simulator" host_inst1.name = "Server-Host"
host_inst1.add(host1) host_inst1.add(host1)
# host_inst1.cpu_type = 'X86KvmCPU' # host_inst1.cpu_type = 'X86KvmCPU'
...@@ -158,4 +159,7 @@ for host_type in host_types: ...@@ -158,4 +159,7 @@ for host_type in host_types:
for s in sims: for s in sims:
print(s) print(s)
experiments.append(simulation) instance = inst.Instantiation(sim=simulation)
instance.preserve_tmp_folder = False
instance.create_checkpoint = True
instantiations.append(instance)
...@@ -34,14 +34,14 @@ import signal ...@@ -34,14 +34,14 @@ import signal
import sys import sys
from simbricks.orchestration import exectools from simbricks.orchestration import exectools
from simbricks.orchestration.experiment import experiment_environment
from simbricks.orchestration.simulation import base as sim_base from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.simulation import output from simbricks.orchestration.simulation import output as sim_out
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import runs from simbricks.orchestration.runtime_new.runs import base as runs_base
from simbricks.orchestration.runtime_new.runs import base as runs_base
from simbricks.orchestration.runtime_new.runs import local as rt_local
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
from simbricks.orchestration.runtime_new import simulation_executor
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
...@@ -261,56 +261,14 @@ def warn_multi_exec(executors: list[command_executor.Executor]): ...@@ -261,56 +261,14 @@ def warn_multi_exec(executors: list[command_executor.Executor]):
def add_exp( def add_exp(
simulation: sim_base.Simulation, instantiation: inst_base.Instantiation,
rt: runs.base.Runtime, prereq: runs_base.Run | None,
run_number: int, rt: runs_base.Runtime,
prereq: runs.base.Run | None, ) -> runs_base.Run:
create_cp: bool,
restore_cp: bool,
args: argparse.Namespace,
):
outpath = f"{args.outdir}/{simulation.name}-{run_number}.json"
if os.path.exists(outpath) and not args.force:
print(f"skip {simulation.name} run {run_number}")
return None
workdir = f"{args.workdir}/{simulation.name}/{run_number}"
cpdir = f"{args.workdir}/{simulation.name}/0"
if args.shmdir is not None:
shmdir = f"{args.shmdir}/{simulation.name}/{run_number}"
shm_base = "" # TODO
if args.shmdir is not None:
env.shm_base = os.path.abspath(shmdir)
# TODO: user can specify output base
output_base = ""
tmp_sim_files = "" # TODO
inst_env = inst_base.InstantiationEnvironment(
repo_path=args.repo,
# workdir=workdir,
# cpdir=cpdir,
# create_cp=create_cp,
# restore_cp=restore_cp,
# shm_base=shm_base,
# output_base=output_base,
# tmp_simulation_files=tmp_sim_files,
)
inst_ = inst_base.Instantiation(sim=simulation, env=inst_env)
output_ = output.SimulationOutput(simulation)
run = runs.base.Run(
simulation=simulation,
instantiation=inst_,
prereq=prereq,
output = output_
)
output = sim_out.SimulationOutput(instantiation.simulation)
run = runs_base.Run(instantiation=instantiation, prereq=prereq, output=output)
rt.add_run(run) rt.add_run(run)
return run return run
...@@ -322,18 +280,18 @@ def main(): ...@@ -322,18 +280,18 @@ def main():
executors = load_executors(args.hosts) executors = load_executors(args.hosts)
# initialize runtime # initialize runtime
if args.runtime == "parallel": if args.runtime == "parallel": # TODO: FIXME
warn_multi_exec(executors) warn_multi_exec(executors)
rt = runs.LocalParallelRuntime( rt = rt_local.LocalParallelRuntime(
cores=args.cores, mem=args.mem, verbose=args.verbose, executor=executors[0] cores=args.cores, mem=args.mem, verbose=args.verbose, executor=executors[0]
) )
elif args.runtime == "slurm": # elif args.runtime == "slurm":
rt = runs.SlurmRuntime(args.slurmdir, args, verbose=args.verbose) # rt = runs.SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == "dist": # elif args.runtime == "dist":
rt = runs.DistributedSimpleRuntime(executors, verbose=args.verbose) # rt = runs.DistributedSimpleRuntime(executors, verbose=args.verbose)
else: else:
warn_multi_exec(executors) warn_multi_exec(executors)
rt = runs.LocalSimpleRuntime(verbose=args.verbose, executor=executors[0]) rt = rt_local.LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
if args.profile_int: if args.profile_int:
rt.enable_profiler(args.profile_int) rt.enable_profiler(args.profile_int)
...@@ -341,7 +299,7 @@ def main(): ...@@ -341,7 +299,7 @@ def main():
# load experiments # load experiments
if not args.pickled: if not args.pickled:
# default: load python modules with experiments # default: load python modules with experiments
simulations: list[sim_base.Simulation] = [] instantiations: list[inst_base.Instantiation] = []
for path in args.experiments: for path in args.experiments:
modname, _ = os.path.splitext(os.path.basename(path)) modname, _ = os.path.splitext(os.path.basename(path))
...@@ -355,23 +313,22 @@ def main(): ...@@ -355,23 +313,22 @@ def main():
if spec.loader is None: if spec.loader is None:
raise ExperimentModuleLoadError("spec.loader is None") raise ExperimentModuleLoadError("spec.loader is None")
spec.loader.exec_module(mod) spec.loader.exec_module(mod)
simulations += mod.experiments instantiations += mod.instantiations
if args.list: if args.list:
for sim in simulations: for inst in instantiations:
print(sim.name) print(inst.simulation.name)
sys.exit(0) sys.exit(0)
for sim in simulations: for inst in instantiations:
# TODO: do we want a sitributed SImulation class? --> probably not, choose slightly different abstraction # if args.auto_dist and not isinstance(sim, sim_base.DistributedExperiment):
if args.auto_dist and not isinstance(sim, sim_base.DistributedExperiment): # sim = runs_base.auto_dist(sim, executors, args.proxy_type)
sim = runs.auto_dist(sim, executors, args.proxy_type)
# apply filter if any specified # apply filter if any specified
if (args.filter) and (len(args.filter) > 0): if (args.filter) and (len(args.filter) > 0):
match = False match = False
for f in args.filter: for f in args.filter:
match = fnmatch.fnmatch(sim.name, f) match = fnmatch.fnmatch(inst.simulation.name, f)
if match: if match:
break break
...@@ -380,19 +337,30 @@ def main(): ...@@ -380,19 +337,30 @@ def main():
# if this is an experiment with a checkpoint we might have to create # if this is an experiment with a checkpoint we might have to create
# it # it
# TODO: what to do / how to handel checkpointing prereq = None
if sim.checkpoint: if (
prereq = add_exp(sim, rt, 0, None, True, False, args) inst.create_checkpoint
else: and inst.simulation.any_supports_checkpointing()
prereq = None ):
checkpointing_inst = inst.copy()
for run in range(args.firstrun, args.firstrun + args.runs): checkpointing_inst.restore_checkpoint = False
add_exp(sim, rt, run, prereq, False, sim.checkpoint, args) checkpointing_inst.create_checkpoint = True
else: inst.create_checkpoint = False
# otherwise load pickled run object inst.restore_checkpoint = True
for path in args.experiments:
with open(path, "rb") as f: prereq = add_exp(instantiation=checkpointing_inst, rt=rt, prereq=None)
rt.add_run(pickle.load(f))
for index in range(args.firstrun, args.firstrun + args.runs):
inst_copy = inst.copy()
inst_copy.preserve_tmp_folder = False
if index == args.firstrun + args.runs - 1:
inst_copy._preserve_checkpoints = False
add_exp(instantiation=inst_copy, rt=rt, prereq=prereq)
# else:
# # otherwise load pickled run object
# for path in args.experiments:
# with open(path, "rb") as f:
# rt.add_run(pickle.load(f))
# register interrupt handler # register interrupt handler
signal.signal(signal.SIGINT, lambda *_: rt.interrupt()) signal.signal(signal.SIGINT, lambda *_: rt.interrupt())
......
...@@ -19,3 +19,5 @@ ...@@ -19,3 +19,5 @@
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.orchestration.instantiation.base import *
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import enum import enum
import pathlib import pathlib
import shutil import shutil
...@@ -32,7 +31,6 @@ from simbricks.orchestration.system import base as sys_base ...@@ -32,7 +31,6 @@ from simbricks.orchestration.system import base as sys_base
from simbricks.orchestration.system import pcie as sys_pcie from simbricks.orchestration.system import pcie as sys_pcie
from simbricks.orchestration.system import mem as sys_mem from simbricks.orchestration.system import mem as sys_mem
from simbricks.orchestration.system import eth as sys_eth from simbricks.orchestration.system import eth as sys_eth
from simbricks.orchestration.system.host import base as sys_host
from simbricks.orchestration.system.host import disk_images from simbricks.orchestration.system.host import disk_images
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
...@@ -57,57 +55,23 @@ class InstantiationEnvironment(util_base.IdObj): ...@@ -57,57 +55,23 @@ class InstantiationEnvironment(util_base.IdObj):
def __init__( def __init__(
self, self,
repo_path: str = pathlib.Path(__file__).parents[3].resolve(), repo_path: str = pathlib.Path(__file__).parents[4].resolve(),
workdir: str | None = None, workdir: str | None = None,
output_base: str | None = None,
cpdir: str | None = None,
create_cp: bool = False,
restore_cp: bool = False,
shm_base: str | None = None,
tmp_simulation_files: str | None = None,
qemu_img_path: str | None = None,
qemu_path: str | None = None,
): ):
super().__init__() super().__init__()
self._repodir: str = pathlib.Path(repo_path).resolve() self._repodir: str = pathlib.Path(repo_path).resolve()
self._workdir: str = ( self._workdir: str = (
workdir if workdir else pathlib.Path(f"{self._repodir}/wrkdir").resolve() workdir if workdir else pathlib.Path(f"{self._repodir}/wrkdir").resolve()
) )
self._output_base: str = ( self._output_base: str = pathlib.Path(f"{self._workdir}/output").resolve()
output_base self._tmp_simulation_files: str = pathlib.Path(f"{self._workdir}/tmp").resolve()
if output_base self._imgdir: str = pathlib.Path(f"{self._tmp_simulation_files}/imgs").resolve()
else pathlib.Path(f"{self._workdir}/output").resolve() self._cpdir: str = pathlib.Path(
) f"{self._tmp_simulation_files}/checkpoints"
self._cpdir: str = ( ).resolve()
cpdir self._shm_base: str = pathlib.Path(
if cpdir f"{self._tmp_simulation_files}/shm"
else pathlib.Path(f"{self._output_base}/checkpoints").resolve() ).resolve()
)
self._shm_base: str = (
shm_base if shm_base else pathlib.Path(f"{self._workdir}/shm").resolve()
)
self._tmp_simulation_files: str = (
tmp_simulation_files
if tmp_simulation_files
else (pathlib.Path(f"{self._workdir}/tmp").resolve())
)
self._create_cp: bool = create_cp
self._restore_cp: bool = restore_cp
self._qemu_img_path: str = (
qemu_img_path
if qemu_img_path
else pathlib.Path(
f"{self._repodir}/sims/external/qemu/build/qemu-img"
).resolve()
)
self._qemu_path: str = (
qemu_path
if qemu_path
else pathlib.Path(
f"{self._repodir}/sims/external/qemu/build/x86_64-softmmu/qemu-system-x86_64"
).resolve()
)
class Instantiation(util_base.IdObj): class Instantiation(util_base.IdObj):
...@@ -118,13 +82,14 @@ class Instantiation(util_base.IdObj): ...@@ -118,13 +82,14 @@ class Instantiation(util_base.IdObj):
env: InstantiationEnvironment = InstantiationEnvironment(), env: InstantiationEnvironment = InstantiationEnvironment(),
): ):
super().__init__() super().__init__()
self._simulation: sim_base.Simulation = sim self.simulation: sim_base.Simulation = sim
self._env: InstantiationEnvironment = env self.env: InstantiationEnvironment = env
self._executor: command_executor.Executor | None = None self._executor: command_executor.Executor | None = None
self._create_checkpoint: bool = False
self._restore_checkpoint: bool = False
self._preserve_checkpoints: bool = True
self.preserve_tmp_folder: bool = False
self._socket_per_interface: dict[sys_base.Interface, Socket] = {} self._socket_per_interface: dict[sys_base.Interface, Socket] = {}
self._simulation_topo: (
dict[sys_base.Interface, set[sys_base.Interface]] | None
) = None
self._sim_dependency: ( self._sim_dependency: (
dict[sim_base.Simulator, set[sim_base.Simulator]] | None dict[sim_base.Simulator, set[sim_base.Simulator]] | None
) = None ) = None
...@@ -144,12 +109,6 @@ class Instantiation(util_base.IdObj): ...@@ -144,12 +109,6 @@ class Instantiation(util_base.IdObj):
def executor(self, executor: command_executor.Executor): def executor(self, executor: command_executor.Executor):
self._executor = executor self._executor = executor
def qemu_img_path(self) -> str:
return self._env._qemu_img_path
def qemu_path(self) -> str:
return self._env._qemu_path
def _get_opposing_interface( def _get_opposing_interface(
self, interface: sys_base.Interface self, interface: sys_base.Interface
) -> sys_base.Interface: ) -> sys_base.Interface:
...@@ -203,7 +162,7 @@ class Instantiation(util_base.IdObj): ...@@ -203,7 +162,7 @@ class Instantiation(util_base.IdObj):
raise Exception("cannot create socket path for given interface type") raise Exception("cannot create socket path for given interface type")
assert queue_type is not None assert queue_type is not None
print(f"_interface_to_sock_path: self._env._shm_base={self.shm_base_dir()}") print(f"_interface_to_sock_path: self.env._shm_base={self.shm_base_dir()}")
return self._join_paths( return self._join_paths(
base=self.shm_base_dir(), base=self.shm_base_dir(),
relative_path=f"{queue_type}-{queue_ident}", relative_path=f"{queue_type}-{queue_ident}",
...@@ -311,7 +270,7 @@ class Instantiation(util_base.IdObj): ...@@ -311,7 +270,7 @@ class Instantiation(util_base.IdObj):
self._get_socket(interface=sim_b, socket_type=SockType.LISTEN) self._get_socket(interface=sim_b, socket_type=SockType.LISTEN)
# build dependency graph # build dependency graph
for sim in self._simulation.all_simulators(): for sim in self.simulation.all_simulators():
for comp in sim._components: for comp in sim._components:
for sim_inf in comp.interfaces(): for sim_inf in comp.interfaces():
if self._opposing_interface_within_same_sim(interface=sim_inf): if self._opposing_interface_within_same_sim(interface=sim_inf):
...@@ -324,21 +283,10 @@ class Instantiation(util_base.IdObj): ...@@ -324,21 +283,10 @@ class Instantiation(util_base.IdObj):
def sim_dependencies(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]: def sim_dependencies(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]:
if self._sim_dependency is not None: if self._sim_dependency is not None:
return self._sim_dependency return self._sim_dependency
self._build_simulation_topology() self._build_simulation_topology()
assert self._sim_dependency is not None assert self._sim_dependency is not None
return self._sim_dependency return self._sim_dependency
async def cleanup_sockets(
self,
sockets: list[Socket] = [],
) -> None:
scs = []
for sock in sockets:
scs.append(asyncio.create_task(self.executor.rmtree(path=sock._path)))
if len(scs) > 0:
await asyncio.gather(*scs)
async def wait_for_sockets( async def wait_for_sockets(
self, self,
sockets: list[Socket] = [], sockets: list[Socket] = [],
...@@ -346,68 +294,92 @@ class Instantiation(util_base.IdObj): ...@@ -346,68 +294,92 @@ class Instantiation(util_base.IdObj):
wait_socks = list(map(lambda sock: sock._path, sockets)) wait_socks = list(map(lambda sock: sock._path, sockets))
await self.executor.await_files(wait_socks, verbose=True) await self.executor.await_files(wait_socks, verbose=True)
# TODO: add more methods constructing paths as required by methods in simulators or image handling classes @property
def create_checkpoint(self) -> bool:
# TODO: fix paths to support mutliple exeriment runs etc. """
def wrkdir(self) -> str: Whether to use checkpoint and restore for simulators.
return pathlib.Path(self._env._workdir).resolve()
The most common use-case for this is accelerating host simulator startup
def shm_base_dir(self) -> str: by first running in a less accurate mode, then checkpointing the system
return pathlib.Path(self._env._shm_base).resolve() state after boot and running simulations from there.
"""
def create_cp(self) -> bool: assert (self._create_checkpoint ^ self._restore_checkpoint) or (
return self._env._create_cp not self._create_checkpoint and not self._restore_checkpoint
)
def restore_cp(self) -> bool: return self._create_checkpoint
return self._env._restore_cp
def cpdir(self) -> str: @create_checkpoint.setter
return pathlib.Path(self._env._cpdir).resolve() def create_checkpoint(self, create_checkpoint: bool) -> None:
assert (self._create_checkpoint ^ self._restore_checkpoint) or (
not self._create_checkpoint and not self._restore_checkpoint
)
self._create_checkpoint = create_checkpoint
def wrkdir(self) -> str: @property
return pathlib.Path(self._env._workdir).resolve() def restore_checkpoint(self) -> bool:
assert (self._create_checkpoint ^ self._restore_checkpoint) or (
not self._create_checkpoint and not self._restore_checkpoint
)
return self._restore_checkpoint
def tmp_dir(self) -> str: @restore_checkpoint.setter
return pathlib.Path(self._env._tmp_simulation_files).resolve() def restore_checkpoint(self, restore_checkpoint: bool) -> None:
assert (self._create_checkpoint ^ self._restore_checkpoint) or (
not self._create_checkpoint and not self._restore_checkpoint
)
self._restore_checkpoint = restore_checkpoint
async def prepare(self) -> None: def copy(self) -> Instantiation:
wrkdir = self.wrkdir() copy = Instantiation(sim=self.simulation, env=self.env)
print(f"wrkdir={wrkdir}") return copy
shutil.rmtree(wrkdir, ignore_errors=True)
await self.executor.rmtree(wrkdir)
shm_base = self.shm_base_dir() def out_base_dir(self) -> str:
print(f"shm_base={shm_base}") return pathlib.Path(
shutil.rmtree(shm_base, ignore_errors=True) f"{self.env._output_base}/{self.simulation.name}" # /{self.run._run_nr}"
await self.executor.rmtree(shm_base) ).resolve()
cpdir = self.cpdir() def shm_base_dir(self) -> str:
print(f"cpdir={cpdir}") return pathlib.Path(
if self.create_cp(): f"{self.env._shm_base}/{self.simulation.name}" # /{self.run._run_nr}"
shutil.rmtree(cpdir, ignore_errors=True) ).resolve()
await self.executor.rmtree(cpdir)
tmpdir = self.tmp_dir() def imgs_dir(self) -> str:
print(f"tmpdir={tmpdir}") return pathlib.Path(
shutil.rmtree(tmpdir, ignore_errors=True) f"{self.env._imgdir}/{self.simulation.name}" # /{self.run._run_nr}"
await self.executor.rmtree(tmpdir) ).resolve()
pathlib.Path(wrkdir).mkdir(parents=True, exist_ok=True) def cpdir(self) -> str:
await self.executor.mkdir(wrkdir) return pathlib.Path(
f"{self.env._cpdir}/{self.simulation.name}" # /{self.run._run_nr}"
).resolve()
pathlib.Path(cpdir).mkdir(parents=True, exist_ok=True) def wrkdir(self) -> str:
await self.executor.mkdir(cpdir) return pathlib.Path(
f"{self.env._workdir}/{self.simulation.name}" # /{self.run._run_nr}"
).resolve()
pathlib.Path(shm_base).mkdir(parents=True, exist_ok=True) async def prepare(self) -> None:
await self.executor.mkdir(shm_base) to_prepare = [self.shm_base_dir(), self.imgs_dir()]
if not self.create_checkpoint and not self.restore_checkpoint:
to_prepare.append(self.cpdir())
for tp in to_prepare:
shutil.rmtree(tp, ignore_errors=True)
await self.executor.rmtree(tp)
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True) pathlib.Path(tp).mkdir(parents=True, exist_ok=True)
await self.executor.mkdir(tmpdir) await self.executor.mkdir(tp)
await self._simulation.prepare(inst=self) await self.simulation.prepare(inst=self)
async def cleanup(self) -> None: async def cleanup(self) -> None:
pass # TODO: implement cleanup functionality (e.g. delete ) if self.preserve_tmp_folder:
return
to_delete = [self.shm_base_dir(), self.imgs_dir()]
if not self._preserve_checkpoints:
to_delete.append(self.cpdir())
for td in to_delete:
shutil.rmtree(td, ignore_errors=True)
await self.executor.rmtree(td)
def _join_paths( def _join_paths(
self, base: str = "", relative_path: str = "", enforce_existence=False self, base: str = "", relative_path: str = "", enforce_existence=False
...@@ -424,12 +396,12 @@ class Instantiation(util_base.IdObj): ...@@ -424,12 +396,12 @@ class Instantiation(util_base.IdObj):
def join_repo_base(self, relative_path: str) -> str: def join_repo_base(self, relative_path: str) -> str:
return self._join_paths( return self._join_paths(
base=self._env._repodir, relative_path=relative_path, enforce_existence=True base=self.env._repodir, relative_path=relative_path, enforce_existence=True
) )
def join_output_base(self, relative_path: str) -> str: def join_output_base(self, relative_path: str) -> str:
return self._join_paths( return self._join_paths(
base=self._env._output_base, base=self.out_base_dir(),
relative_path=relative_path, relative_path=relative_path,
enforce_existence=True, enforce_existence=True,
) )
...@@ -438,7 +410,7 @@ class Instantiation(util_base.IdObj): ...@@ -438,7 +410,7 @@ class Instantiation(util_base.IdObj):
if Instantiation.is_absolute_exists(hd_name_or_path): if Instantiation.is_absolute_exists(hd_name_or_path):
return hd_name_or_path return hd_name_or_path
path = self._join_paths( path = self._join_paths(
base=self._env._repodir, base=self.env._repodir,
relative_path=f"images/output-{hd_name_or_path}/{hd_name_or_path}", relative_path=f"images/output-{hd_name_or_path}/{hd_name_or_path}",
enforce_existence=True, enforce_existence=True,
) )
...@@ -447,23 +419,29 @@ class Instantiation(util_base.IdObj): ...@@ -447,23 +419,29 @@ class Instantiation(util_base.IdObj):
def cfgtar_path(self, sim: sim_base.Simulator) -> str: def cfgtar_path(self, sim: sim_base.Simulator) -> str:
return f"{self.wrkdir()}/cfg.{sim.name}.tar" return f"{self.wrkdir()}/cfg.{sim.name}.tar"
def join_tmp_base(self, relative_path: str) -> str: # def join_tmp_base(self, relative_path: str) -> str:
# return self._join_paths(
# base=self.tmp_dir(),
# relative_path=relative_path,
# )
def join_imgs_path(self, relative_path: str) -> str:
return self._join_paths( return self._join_paths(
base=self.tmp_dir(), base=self.imgs_dir(),
relative_path=relative_path, relative_path=relative_path,
) )
def dynamic_img_path(self, img: disk_images.DiskImage, format: str) -> str: def dynamic_img_path(self, img: disk_images.DiskImage, format: str) -> str:
filename = f"{img._id}.{format}" filename = f"{img._id}.{format}"
return self._join_paths( return self._join_paths(
base=self.tmp_dir(), base=self.imgs_dir(),
relative_path=filename, relative_path=filename,
) )
def hdcopy_path(self, img: disk_images.DiskImage, format: str) -> str: def hdcopy_path(self, img: disk_images.DiskImage, format: str) -> str:
filename = f"{img._id}_hdcopy.{format}" filename = f"{img._id}_hdcopy.{format}"
return self._join_paths( return self._join_paths(
base=self.tmp_dir(), base=self.imgs_dir(),
relative_path=filename, relative_path=filename,
) )
...@@ -475,7 +453,7 @@ class Instantiation(util_base.IdObj): ...@@ -475,7 +453,7 @@ class Instantiation(util_base.IdObj):
def get_simmulator_output_dir(self, sim: sim_base.Simulator) -> str: def get_simmulator_output_dir(self, sim: sim_base.Simulator) -> str:
dir_path = f"output.{sim.full_name()}-{sim._id}" dir_path = f"output.{sim.full_name()}-{sim._id}"
return self._join_paths(base=self._env._output_base, relative_path=dir_path) return self._join_paths(base=self.out_base_dir(), relative_path=dir_path)
def get_simulator_shm_pool_path(self, sim: sim_base.Simulator) -> str: def get_simulator_shm_pool_path(self, sim: sim_base.Simulator) -> str:
return self._join_paths( return self._join_paths(
...@@ -483,10 +461,10 @@ class Instantiation(util_base.IdObj): ...@@ -483,10 +461,10 @@ class Instantiation(util_base.IdObj):
relative_path=f"{sim.full_name()}-shm-pool-{sim._id}", relative_path=f"{sim.full_name()}-shm-pool-{sim._id}",
) )
def get_simulation_output_path(self, run_number: int) -> str: def get_simulation_output_path(self, run_nr: int) -> str:
return self._join_paths( return self._join_paths(
base=self._env._output_base, base=self.out_base_dir(),
relative_path=f"out-{run_number}.json", relative_path=f"{run_nr}/out.json",
) )
def find_sim_by_interface( def find_sim_by_interface(
...@@ -496,4 +474,4 @@ class Instantiation(util_base.IdObj): ...@@ -496,4 +474,4 @@ class Instantiation(util_base.IdObj):
def find_sim_by_spec(self, spec: sys_base.Component) -> sim_base.Simulator: def find_sim_by_spec(self, spec: sys_base.Component) -> sim_base.Simulator:
util_base.has_expected_type(spec, sys_base.Component) util_base.has_expected_type(spec, sys_base.Component)
return self._simulation.find_sim(spec) return self.simulation.find_sim(spec)
...@@ -85,144 +85,144 @@ class LocalSimpleRuntime(Runtime): ...@@ -85,144 +85,144 @@ class LocalSimpleRuntime(Runtime):
self._running.cancel() self._running.cancel()
class LocalParallelRuntime(Runtime): # class LocalParallelRuntime(Runtime):
"""Execute runs locally in parallel on multiple cores.""" # """Execute runs locally in parallel on multiple cores."""
def __init__( # def __init__(
self, # self,
cores: int, # cores: int,
mem: tp.Optional[int] = None, # mem: tp.Optional[int] = None,
verbose=False, # verbose=False,
executor: exectools.Executor = exectools.LocalExecutor() # executor: exectools.Executor = exectools.LocalExecutor()
): # ):
super().__init__() # super().__init__()
self.runs_noprereq: tp.List[Run] = [] # self.runs_noprereq: tp.List[Run] = []
"""Runs with no prerequesite runs.""" # """Runs with no prerequesite runs."""
self.runs_prereq: tp.List[Run] = [] # self.runs_prereq: tp.List[Run] = []
"""Runs with prerequesite runs.""" # """Runs with prerequesite runs."""
self.complete: tp.Set[Run] = set() # self.complete: tp.Set[Run] = set()
self.cores = cores # self.cores = cores
self.mem = mem # self.mem = mem
self.verbose = verbose # self.verbose = verbose
self.executor = executor # self.executor = executor
self._pending_jobs: tp.Set[asyncio.Task] = set() # self._pending_jobs: tp.Set[asyncio.Task] = set()
self._starter_task: asyncio.Task # self._starter_task: asyncio.Task
def add_run(self, run: Run) -> None: # def add_run(self, run: Run) -> None:
if run.experiment.resreq_cores() > self.cores: # if run.experiment.resreq_cores() > self.cores:
raise RuntimeError('Not enough cores available for run') # raise RuntimeError('Not enough cores available for run')
if self.mem is not None and run.experiment.resreq_mem() > self.mem: # if self.mem is not None and run.experiment.resreq_mem() > self.mem:
raise RuntimeError('Not enough memory available for run') # raise RuntimeError('Not enough memory available for run')
if run.prereq is None: # if run.prereq is None:
self.runs_noprereq.append(run) # self.runs_noprereq.append(run)
else: # else:
self.runs_prereq.append(run) # self.runs_prereq.append(run)
async def do_run(self, run: Run) -> tp.Optional[Run]: # async def do_run(self, run: Run) -> tp.Optional[Run]:
"""Actually executes `run`.""" # """Actually executes `run`."""
try: # try:
runner = ExperimentSimpleRunner( # runner = ExperimentSimpleRunner(
self.executor, run.experiment, run.env, self.verbose # self.executor, run.experiment, run.env, self.verbose
) # )
if self.profile_int: # if self.profile_int:
runner.profile_int = self.profile_int # runner.profile_int = self.profile_int
await run.prep_dirs(executor=self.executor) # await run.prep_dirs(executor=self.executor)
await runner.prepare() # await runner.prepare()
except asyncio.CancelledError: # except asyncio.CancelledError:
# it is safe to just exit here because we are not running any # # it is safe to just exit here because we are not running any
# simulators yet # # simulators yet
return None # return None
print('starting run ', run.name()) # print('starting run ', run.name())
run.output = await runner.run() # already handles CancelledError # run.output = await runner.run() # already handles CancelledError
# if the log is huge, this step takes some time # # if the log is huge, this step takes some time
if self.verbose: # if self.verbose:
print( # print(
f'Writing collected output of run {run.name()} to JSON file ...' # f'Writing collected output of run {run.name()} to JSON file ...'
) # )
run.output.dump(run.outpath) # run.output.dump(run.outpath)
print('finished run ', run.name()) # print('finished run ', run.name())
return run # return run
async def wait_completion(self) -> None: # async def wait_completion(self) -> None:
"""Wait for any run to terminate and return.""" # """Wait for any run to terminate and return."""
assert self._pending_jobs # assert self._pending_jobs
done, self._pending_jobs = await asyncio.wait( # done, self._pending_jobs = await asyncio.wait(
self._pending_jobs, return_when=asyncio.FIRST_COMPLETED # self._pending_jobs, return_when=asyncio.FIRST_COMPLETED
) # )
for run in done: # for run in done:
run = await run # run = await run
self.complete.add(run) # self.complete.add(run)
self.cores_used -= run.experiment.resreq_cores() # self.cores_used -= run.experiment.resreq_cores()
self.mem_used -= run.experiment.resreq_mem() # self.mem_used -= run.experiment.resreq_mem()
def enough_resources(self, run: Run) -> bool: # def enough_resources(self, run: Run) -> bool:
"""Check if enough cores and mem are available for the run.""" # """Check if enough cores and mem are available for the run."""
exp = run.experiment # pylint: disable=redefined-outer-name # exp = run.experiment # pylint: disable=redefined-outer-name
if self.cores is not None: # if self.cores is not None:
enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores() # enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
else: # else:
enough_cores = True # enough_cores = True
if self.mem is not None: # if self.mem is not None:
enough_mem = (self.mem - self.mem_used) >= exp.resreq_mem() # enough_mem = (self.mem - self.mem_used) >= exp.resreq_mem()
else: # else:
enough_mem = True # enough_mem = True
return enough_cores and enough_mem # return enough_cores and enough_mem
def prereq_ready(self, run: Run) -> bool: # def prereq_ready(self, run: Run) -> bool:
"""Check if the prerequesite run for `run` has completed.""" # """Check if the prerequesite run for `run` has completed."""
if run.prereq is None: # if run.prereq is None:
return True # return True
return run.prereq in self.complete # return run.prereq in self.complete
async def do_start(self) -> None: # async def do_start(self) -> None:
"""Asynchronously execute the runs defined in `self.runs_noprereq + # """Asynchronously execute the runs defined in `self.runs_noprereq +
self.runs_prereq.""" # self.runs_prereq."""
#self.completions = asyncio.Queue() # #self.completions = asyncio.Queue()
self.cores_used = 0 # self.cores_used = 0
self.mem_used = 0 # self.mem_used = 0
runs = self.runs_noprereq + self.runs_prereq # runs = self.runs_noprereq + self.runs_prereq
for run in runs: # for run in runs:
# if necessary, wait for enough memory or cores # # if necessary, wait for enough memory or cores
while not self.enough_resources(run): # while not self.enough_resources(run):
print('waiting for resources') # print('waiting for resources')
await self.wait_completion() # await self.wait_completion()
# if necessary, wait for prerequesite runs to complete # # if necessary, wait for prerequesite runs to complete
while not self.prereq_ready(run): # while not self.prereq_ready(run):
print('waiting for prereq') # print('waiting for prereq')
await self.wait_completion() # await self.wait_completion()
self.cores_used += run.experiment.resreq_cores() # self.cores_used += run.experiment.resreq_cores()
self.mem_used += run.experiment.resreq_mem() # self.mem_used += run.experiment.resreq_mem()
job = asyncio.create_task(self.do_run(run)) # job = asyncio.create_task(self.do_run(run))
self._pending_jobs.add(job) # self._pending_jobs.add(job)
# wait for all runs to finish # # wait for all runs to finish
await asyncio.gather(*self._pending_jobs) # await asyncio.gather(*self._pending_jobs)
async def start(self) -> None: # async def start(self) -> None:
"""Execute all defined runs.""" # """Execute all defined runs."""
self._starter_task = asyncio.create_task(self.do_start()) # self._starter_task = asyncio.create_task(self.do_start())
try: # try:
await self._starter_task # await self._starter_task
except asyncio.CancelledError: # except asyncio.CancelledError:
for job in self._pending_jobs: # for job in self._pending_jobs:
job.cancel() # job.cancel()
# wait for all runs to finish # # wait for all runs to finish
await asyncio.gather(*self._pending_jobs) # await asyncio.gather(*self._pending_jobs)
def interrupt_handler(self) -> None: # def interrupt_handler(self) -> None:
self._starter_task.cancel() # self._starter_task.cancel()
...@@ -27,9 +27,7 @@ import itertools ...@@ -27,9 +27,7 @@ import itertools
import abc import abc
from simbricks.orchestration.simulation import output from simbricks.orchestration.simulation import output
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import command_executor
class Run: class Run:
...@@ -39,22 +37,22 @@ class Run: ...@@ -39,22 +37,22 @@ class Run:
def __init__( def __init__(
self, self,
simulation: sim_base.Simulation,
instantiation: inst_base.Instantiation, instantiation: inst_base.Instantiation,
prereq: Run | None = None, prereq: Run | None = None,
output: output.SimulationOutput | None = None, output: output.SimulationOutput | None = None,
job_id: int | None = None, job_id: int | None = None,
cp: bool = False,
): ):
self._simulation: sim_base.Simulation = simulation self.instantiation: inst_base.Instantiation = instantiation
self._run_nr = next(self.__run_nr) self._run_nr = next(self.__run_nr)
self._instantiation: inst_base.Instantiation = instantiation
self._output: output.SimulationOutput | None = output self._output: output.SimulationOutput | None = output
self._prereq: Run | None = prereq self._prereq: Run | None = prereq
self._job_id: int | None = job_id self._job_id: int | None = job_id
self.checkpoint: bool = cp
"""Slurm job id.""" """Slurm job id."""
def name(self) -> str: def name(self) -> str:
return self._simulation.name + "." + str(self._run_nr) return self.instantiation.simulation.name + "." + str(self._run_nr)
class Runtime(metaclass=abc.ABCMeta): class Runtime(metaclass=abc.ABCMeta):
......
...@@ -27,6 +27,7 @@ import asyncio ...@@ -27,6 +27,7 @@ import asyncio
from simbricks.orchestration.runtime_new import simulation_executor from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
from simbricks.orchestration.runtime_new.runs import base as run_base from simbricks.orchestration.runtime_new.runs import base as run_base
from simbricks.orchestration.instantiation import base as inst_base
class LocalSimpleRuntime(run_base.Runtime): class LocalSimpleRuntime(run_base.Runtime):
...@@ -50,8 +51,8 @@ class LocalSimpleRuntime(run_base.Runtime): ...@@ -50,8 +51,8 @@ class LocalSimpleRuntime(run_base.Runtime):
async def do_run(self, run: run_base.Run) -> None: async def do_run(self, run: run_base.Run) -> None:
"""Actually executes `run`.""" """Actually executes `run`."""
try: try:
runner = simulation_executor.ExperimentSimpleRunner( runner = simulation_executor.SimulationSimpleRunner(
self._executor, run._simulation, run._instantiation, self._verbose self._executor, run.instantiation, self._verbose
) )
if self._profile_int: if self._profile_int:
runner.profile_int = self.profile_int runner.profile_int = self.profile_int
...@@ -68,11 +69,13 @@ class LocalSimpleRuntime(run_base.Runtime): ...@@ -68,11 +69,13 @@ class LocalSimpleRuntime(run_base.Runtime):
if self._verbose: if self._verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...") print(f"Writing collected output of run {run.name()} to JSON file ...")
output_path = run._instantiation.get_simulation_output_path( output_path = run.instantiation.get_simulation_output_path(
run_number=run._run_nr run_nr=run._run_nr
) )
run._output.dump(outpath=output_path) run._output.dump(outpath=output_path)
await runner.cleanup()
async def start(self) -> None: async def start(self) -> None:
"""Execute the runs defined in `self.runnable`.""" """Execute the runs defined in `self.runnable`."""
for run in self._runnable: for run in self._runnable:
...@@ -112,10 +115,13 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -112,10 +115,13 @@ class LocalParallelRuntime(run_base.Runtime):
self._starter_task: asyncio.Task self._starter_task: asyncio.Task
def add_run(self, run: run_base.Run) -> None: def add_run(self, run: run_base.Run) -> None:
if run._simulation.resreq_cores() > self._cores: if run.instantiation.simulation.resreq_cores() > self._cores:
raise RuntimeError("Not enough cores available for run") raise RuntimeError("Not enough cores available for run")
if self._mem is not None and run._simulation.resreq_mem() > self._mem: if (
self._mem is not None
and run.instantiation.simulation.resreq_mem() > self._mem
):
raise RuntimeError("Not enough memory available for run") raise RuntimeError("Not enough memory available for run")
if run._prereq is None: if run._prereq is None:
...@@ -126,8 +132,8 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -126,8 +132,8 @@ class LocalParallelRuntime(run_base.Runtime):
async def do_run(self, run: run_base.Run) -> run_base.Run | None: async def do_run(self, run: run_base.Run) -> run_base.Run | None:
"""Actually executes `run`.""" """Actually executes `run`."""
try: try:
runner = simulation_executor.ExperimentSimpleRunner( runner = simulation_executor.SimulationSimpleRunner(
self._executor, run._simulation, run._inst_env, self._verbose self._executor, run.instantiation, self._verbose
) )
if self._profile_int is not None: if self._profile_int is not None:
runner._profile_int = self._profile_int runner._profile_int = self._profile_int
...@@ -141,13 +147,16 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -141,13 +147,16 @@ class LocalParallelRuntime(run_base.Runtime):
run._output = await runner.run() # already handles CancelledError run._output = await runner.run() # already handles CancelledError
# if the log is huge, this step takes some time # if the log is huge, this step takes some time
if self.verbose: if self._verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...") print(f"Writing collected output of run {run.name()} to JSON file ...")
output_path = run._instantiation.get_simulation_output_path( output_path = run.instantiation.get_simulation_output_path(
run_number=run._run_nr run_number=run._run_nr
) )
run._output.dump(outpath=output_path) run._output.dump(outpath=output_path)
await runner.cleanup()
print("finished run ", run.name()) print("finished run ", run.name())
return run return run
...@@ -162,12 +171,14 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -162,12 +171,14 @@ class LocalParallelRuntime(run_base.Runtime):
for r_awaitable in done: for r_awaitable in done:
run = await r_awaitable run = await r_awaitable
self._complete.add(run) self._complete.add(run)
self._cores_used -= run._simulation.resreq_cores() self._cores_used -= run.instantiation.simulation.resreq_cores()
self._mem_used -= run._simulation.resreq_mem() self._mem_used -= run.instantiation.simulation.resreq_mem()
def enough_resources(self, run: run_base.Run) -> bool: def enough_resources(self, run: run_base.Run) -> bool:
"""Check if enough cores and mem are available for the run.""" """Check if enough cores and mem are available for the run."""
simulation = run._simulation # pylint: disable=redefined-outer-name simulation = (
run.instantiation.simulation
) # pylint: disable=redefined-outer-name
if self._cores is not None: if self._cores is not None:
enough_cores = (self._cores - self._cores_used) >= simulation.resreq_cores() enough_cores = (self._cores - self._cores_used) >= simulation.resreq_cores()
...@@ -207,8 +218,8 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -207,8 +218,8 @@ class LocalParallelRuntime(run_base.Runtime):
print("waiting for prereq") print("waiting for prereq")
await self.wait_completion() await self.wait_completion()
self._cores_used += run._simulation.resreq_cores() self._cores_used += run.instantiation.simulation.resreq_cores()
self._mem_used += run._simulation.resreq_mem() self._mem_used += run.instantiation.simulation.resreq_mem()
job = asyncio.create_task(self.do_run(run)) job = asyncio.create_task(self.do_run(run))
self._pending_jobs.add(job) self._pending_jobs.add(job)
......
...@@ -36,19 +36,17 @@ from simbricks.orchestration.instantiation import base as inst_base ...@@ -36,19 +36,17 @@ from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import command_executor from simbricks.orchestration.runtime_new import command_executor
class ExperimentBaseRunner(abc.ABC): class SimulationBaseRunner(abc.ABC):
def __init__( def __init__(
self, self,
simulation: sim_base.Simulation,
instantiation: inst_base.Instantiation, instantiation: inst_base.Instantiation,
verbose: bool, verbose: bool,
) -> None: ) -> None:
self._simulation: sim_base.Simulation = simulation
self._instantiation: inst_base.Instantiation = instantiation self._instantiation: inst_base.Instantiation = instantiation
self._verbose: bool = verbose self._verbose: bool = verbose
self._profile_int: int | None = None self._profile_int: int | None = None
self._out = output.SimulationOutput(self._simulation) self._out = output.SimulationOutput(self._instantiation.simulation)
self._running: list[ self._running: list[
tuple[sim_base.Simulator, command_executor.SimpleComponent] tuple[sim_base.Simulator, command_executor.SimpleComponent]
] = [] ] = []
...@@ -64,16 +62,16 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -64,16 +62,16 @@ class ExperimentBaseRunner(abc.ABC):
name = sim.full_name() name = sim.full_name()
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: starting {name}") print(f"{self._instantiation.simulation.name}: starting {name}")
run_cmd = sim.run_cmd(self._instantiation) run_cmd = sim.run_cmd(self._instantiation)
if run_cmd is None: if run_cmd is None:
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: started dummy {name}") print(f"{self._instantiation.simulation.name}: started dummy {name}")
return return
# run simulator # run simulator
executor = self._instantiation.executor executor = self._instantiation.executor # TODO: this should be a function or something
sc = executor.create_component( sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self._verbose, canfail=True name, shlex.split(run_cmd), verbose=self._verbose, canfail=True
) )
...@@ -88,11 +86,11 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -88,11 +86,11 @@ class ExperimentBaseRunner(abc.ABC):
wait_socks = sim.sockets_wait(inst=self._instantiation) wait_socks = sim.sockets_wait(inst=self._instantiation)
if len(wait_socks) > 0: if len(wait_socks) > 0:
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: waiting for sockets {name}") print(f"{self._instantiation.simulation.name}: waiting for sockets {name}")
await self._instantiation.wait_for_sockets(sockets=wait_socks) await self._instantiation.wait_for_sockets(sockets=wait_socks)
if self._verbose: if self._verbose:
print( print(
f"{self._simulation.name}: waited successfully for sockets {name}" f"{self._instantiation.simulation.name}: waited successfully for sockets {name}"
) )
# add time delay if required # add time delay if required
...@@ -104,7 +102,7 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -104,7 +102,7 @@ class ExperimentBaseRunner(abc.ABC):
self._wait_sims.append(sc) self._wait_sims.append(sc)
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: started {name}") print(f"{self._instantiation.simulation.name}: started {name}")
async def before_wait(self) -> None: async def before_wait(self) -> None:
pass pass
...@@ -124,15 +122,15 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -124,15 +122,15 @@ class ExperimentBaseRunner(abc.ABC):
async def wait_for_sims(self) -> None: async def wait_for_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on).""" """Wait for simulators to terminate (the ones marked to wait on)."""
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: waiting for hosts to terminate") print(f"{self._instantiation.simulation.name}: waiting for hosts to terminate")
for sc in self._wait_sims: for sc in self._wait_sims:
await sc.wait() await sc.wait()
async def terminate_collect_sims(self) -> output.SimulationOutput: async def terminate_collect_sims(self) -> None: # output.SimulationOutput:
"""Terminates all simulators and collects output.""" """Terminates all simulators and collects output."""
self._out.set_end() self._out.set_end()
if self._verbose: if self._verbose:
print(f"{self._simulation.name}: cleaning up") print(f"{self._instantiation.simulation.name}: cleaning up")
await self.before_cleanup() await self.before_cleanup()
...@@ -146,9 +144,6 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -146,9 +144,6 @@ class ExperimentBaseRunner(abc.ABC):
for _, sc in self._running: for _, sc in self._running:
await sc.wait() await sc.wait()
# remove all sockets
await self._instantiation.cleanup_sockets(sockets=self._sockets)
# add all simulator components to the output # add all simulator components to the output
for sim, sc in self._running: for sim, sc in self._running:
self._out.add_sim(sim, sc) self._out.add_sim(sim, sc)
...@@ -215,8 +210,11 @@ class ExperimentBaseRunner(abc.ABC): ...@@ -215,8 +210,11 @@ class ExperimentBaseRunner(abc.ABC):
print(e) print(e)
pass pass
async def cleanup(self) -> None:
await self._instantiation.cleanup()
class ExperimentSimpleRunner(ExperimentBaseRunner): class SimulationSimpleRunner(SimulationBaseRunner):
"""Simple experiment runner with just one executor.""" """Simple experiment runner with just one executor."""
def __init__(self, executor: command_executor.Executor, *args, **kwargs) -> None: def __init__(self, executor: command_executor.Executor, *args, **kwargs) -> None:
...@@ -227,27 +225,27 @@ class ExperimentSimpleRunner(ExperimentBaseRunner): ...@@ -227,27 +225,27 @@ class ExperimentSimpleRunner(ExperimentBaseRunner):
return self._executor return self._executor
class ExperimentDistributedRunner(ExperimentBaseRunner): # class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor.""" # """Simple experiment runner with just one executor."""
# TODO: FIXME # # TODO: FIXME
def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs) -> None: # def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs) -> None:
self.execs = execs # self.execs = execs
super().__init__(exp, *args, **kwargs) # super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class # self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs) # assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim) -> command_executor.Executor: # def sim_executor(self, sim) -> command_executor.Executor:
h_id = self.exp.host_mapping[sim] # h_id = self.exp.host_mapping[sim]
return self.execs[h_id] # return self.execs[h_id]
async def prepare(self) -> None: # async def prepare(self) -> None:
# make sure all simulators are assigned to an executor # # make sure all simulators are assigned to an executor
assert self.exp.all_sims_assigned() # assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors # # set IP addresses for proxies based on assigned executors
for p in itertools.chain(self.exp.proxies_listen, self.exp.proxies_connect): # for p in itertools.chain(self.exp.proxies_listen, self.exp.proxies_connect):
executor = self.sim_executor(p) # executor = self.sim_executor(p)
p.ip = executor.ip # p.ip = executor.ip
await super().prepare() # await super().prepare()
...@@ -287,14 +287,6 @@ class Simulation(utils_base.IdObj): ...@@ -287,14 +287,6 @@ class Simulation(utils_base.IdObj):
""" """
self.timeout: int | None = None self.timeout: int | None = None
"""Timeout for experiment in seconds.""" """Timeout for experiment in seconds."""
self.checkpoint = False
"""
Whether to use checkpoint and restore for simulators.
The most common use-case for this is accelerating host simulator startup
by first running in a less accurate mode, then checkpointing the system
state after boot and running simulations from there.
"""
self.metadata: dict[str, tp.Any] = {} self.metadata: dict[str, tp.Any] = {}
self._sys_sim_map: dict[sys_conf.Component, Simulator] = {} self._sys_sim_map: dict[sys_conf.Component, Simulator] = {}
...@@ -368,10 +360,10 @@ class Simulation(utils_base.IdObj): ...@@ -368,10 +360,10 @@ class Simulation(utils_base.IdObj):
promises.append(sim.prepare(inst=inst)) promises.append(sim.prepare(inst=inst))
await asyncio.gather(*promises) await asyncio.gather(*promises)
# TODO: FIXME def any_supports_checkpointing(self) -> bool:
def enable_checkpointing_if_supported() -> None: if (
raise Exception("not implemented") len(list(filter(lambda sim: sim.supports_checkpointing(), self._sim_list)))
> 0
# TODO: FIXME ):
def is_checkpointing_enabled(self) -> bool: return True
raise Exception("not implemented") return False
...@@ -70,6 +70,9 @@ class Gem5Sim(HostSim): ...@@ -70,6 +70,9 @@ class Gem5Sim(HostSim):
self._variant: str = "fast" self._variant: str = "fast"
self._sys_clock: str = "1GHz" # TODO: move to system module self._sys_clock: str = "1GHz" # TODO: move to system module
def supports_checkpointing(self) -> bool:
return True
def resreq_cores(self) -> int: def resreq_cores(self) -> int:
return 1 return 1
...@@ -96,7 +99,7 @@ class Gem5Sim(HostSim): ...@@ -96,7 +99,7 @@ class Gem5Sim(HostSim):
def run_cmd(self, inst: inst_base.Instantiation) -> str: def run_cmd(self, inst: inst_base.Instantiation) -> str:
cpu_type = self.cpu_type cpu_type = self.cpu_type
if inst.create_cp(): if inst.create_checkpoint:
cpu_type = self.cpu_type_cp cpu_type = self.cpu_type_cp
full_sys_hosts = self.filter_components_by_type(ty=sys_host.FullSystemHost) full_sys_hosts = self.filter_components_by_type(ty=sys_host.FullSystemHost)
...@@ -126,10 +129,10 @@ class Gem5Sim(HostSim): ...@@ -126,10 +129,10 @@ class Gem5Sim(HostSim):
# if self.node_config.kcmd_append: # if self.node_config.kcmd_append:
# cmd += f'--command-line-append="{self.node_config.kcmd_append}" ' # cmd += f'--command-line-append="{self.node_config.kcmd_append}" '
if inst.create_cp(): if inst.create_checkpoint:
cmd += "--max-checkpoints=1 " cmd += "--max-checkpoints=1 "
if inst.restore_cp(): if inst.restore_checkpoint:
cmd += "-r 1 " cmd += "-r 1 "
latency, sync_period, run_sync = ( latency, sync_period, run_sync = (
...@@ -153,7 +156,7 @@ class Gem5Sim(HostSim): ...@@ -153,7 +156,7 @@ class Gem5Sim(HostSim):
f":latency={latency}ns" f":latency={latency}ns"
f":sync_interval={sync_period}ns" f":sync_interval={sync_period}ns"
) )
if run_sync: if run_sync and not inst.create_checkpoint:
cmd += ":sync" cmd += ":sync"
cmd += " " cmd += " "
...@@ -173,7 +176,7 @@ class Gem5Sim(HostSim): ...@@ -173,7 +176,7 @@ class Gem5Sim(HostSim):
f":latency={latency}ns" f":latency={latency}ns"
f":sync_interval={sync_period}ns" f":sync_interval={sync_period}ns"
) )
if run_sync: if run_sync and not inst.create_checkpoint:
cmd += ":sync" cmd += ":sync"
cmd += " " cmd += " "
......
...@@ -138,7 +138,6 @@ class SwitchNet(NetSim): ...@@ -138,7 +138,6 @@ class SwitchNet(NetSim):
for sock in listen: for sock in listen:
cmd += " -h " + sock._path cmd += " -h " + sock._path
print(f"SWITCH NET CMD!!! ===== {cmd}")
return cmd return cmd
...@@ -227,9 +226,6 @@ class NS3DumbbellNet(SimpleNS3Sim): ...@@ -227,9 +226,6 @@ class NS3DumbbellNet(SimpleNS3Sim):
cmd += f"--SimbricksPortRight={sock._path} " cmd += f"--SimbricksPortRight={sock._path} "
# TODO cmd += f"{self.opt}" # TODO cmd += f"{self.opt}"
print(
f"!!!!!!!!!!!!!!!!!!!!!! NS3DumbbellNet run_cmd: {cmd} !!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
)
return cmd return cmd
...@@ -256,7 +252,4 @@ class NS3BridgeNet(SimpleNS3Sim): ...@@ -256,7 +252,4 @@ class NS3BridgeNet(SimpleNS3Sim):
cmd += f"--SimbricksPort={sock._path} " cmd += f"--SimbricksPort={sock._path} "
# TODO cmd += f"{self.opt}" # TODO cmd += f"{self.opt}"
print(
f"!!!!!!!!!!!!!!!!!!!!!! NS3BridgeNet run_cmd: {cmd} !!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
)
return cmd return cmd
...@@ -20,12 +20,16 @@ ...@@ -20,12 +20,16 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.orchestration.simulation import base as sim_base from __future__ import annotations
from simbricks.orchestration.runtime_new import command_executor
import json
import time import time
import pathlib import pathlib
import json import typing
from simbricks.orchestration.runtime_new import command_executor
if typing.TYPE_CHECKING:
from simbricks.orchestration.simulation import base as sim_base
class SimulationOutput: class SimulationOutput:
"""Manages an experiment's output.""" """Manages an experiment's output."""
......
...@@ -43,7 +43,7 @@ class BaseLinuxApplication(abc.ABC): ...@@ -43,7 +43,7 @@ class BaseLinuxApplication(abc.ABC):
self.host = h self.host = h
self.start_delay: float | None = None self.start_delay: float | None = None
self.end_delay: float | None = None self.end_delay: float | None = None
self.wait = True self.wait = False
@abc.abstractmethod @abc.abstractmethod
def run_cmds(self, inst: inst_base.Instantiation) -> list[str]: def run_cmds(self, inst: inst_base.Instantiation) -> list[str]:
......
...@@ -138,8 +138,8 @@ class BaseLinuxHost(FullSystemHost): ...@@ -138,8 +138,8 @@ class BaseLinuxHost(FullSystemHost):
) )
def config_str(self, inst: instantiation.Instantiation) -> str: def config_str(self, inst: instantiation.Instantiation) -> str:
if inst.create_cp(): sim = inst.find_sim_by_spec(spec=self)
sim = inst.find_sim_by_spec(spec=self) if inst.create_checkpoint:
cp_cmd = sim.checkpoint_commands() cp_cmd = sim.checkpoint_commands()
else: else:
cp_cmd = [] cp_cmd = []
......
...@@ -50,7 +50,7 @@ class DiskImage(utils_base.IdObj): ...@@ -50,7 +50,7 @@ class DiskImage(utils_base.IdObj):
async def make_qcow_copy(self, inst: inst_base.Instantiation, format: str) -> str: async def make_qcow_copy(self, inst: inst_base.Instantiation, format: str) -> str:
disk_path = pathlib.Path(self.path(inst=inst, format=format)) disk_path = pathlib.Path(self.path(inst=inst, format=format))
copy_path = inst.join_tmp_base(relative_path=f"hdcopy.{self._id}") copy_path = inst.join_imgs_path(relative_path=f"hdcopy.{self._id}")
prep_cmds = [ prep_cmds = [
( (
f"{inst.join_repo_base(relative_path=self._qemu_img_exec)} create -f qcow2 -o " f"{inst.join_repo_base(relative_path=self._qemu_img_exec)} create -f qcow2 -o "
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment