Unverified Commit 2741f329 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

atrtedt refactoring of runtime to support new orchestration framework

parent a20ed8c2
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""This is the top-level module of the SimBricks orchestration framework that
users interact with."""
import argparse
import asyncio
import fnmatch
import importlib
import importlib.util
import json
import os
import pickle
import signal
import sys
from simbricks.orchestration import exectools
from simbricks.orchestration import experiments as exps
from simbricks.orchestration import runtime
from simbricks.orchestration.experiment import experiment_environment
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import runs
from simbricks.orchestration.runtime_new import command_executor
from simbricks.orchestration.runtime_new import simulation_executor
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
# general arguments for experiments
parser.add_argument(
"experiments",
metavar="EXP",
type=str,
nargs="+",
help="Python modules to load the experiments from",
)
parser.add_argument(
"--list",
action="store_const",
const=True,
default=False,
help="List available experiment names",
)
parser.add_argument(
"--filter",
metavar="PATTERN",
type=str,
nargs="+",
help="Only run experiments matching the given Unix shell style patterns",
)
parser.add_argument(
"--pickled",
action="store_const",
const=True,
default=False,
help="Interpret experiment modules as pickled runs instead of .py files",
)
parser.add_argument(
"--runs",
metavar="N",
type=int,
default=1,
help="Number of repetition of each experiment",
)
parser.add_argument(
"--firstrun", metavar="N", type=int, default=1, help="ID for first run"
)
parser.add_argument(
"--force",
action="store_const",
const=True,
default=False,
help="Run experiments even if output already exists (overwrites output)",
)
parser.add_argument(
"--verbose",
action="store_const",
const=True,
default=False,
help="Verbose output, for example, print component simulators' output",
)
parser.add_argument(
"--pcap",
action="store_const",
const=True,
default=False,
help="Dump pcap file (if supported by component simulator)",
)
parser.add_argument(
"--profile-int",
metavar="S",
type=int,
default=None,
help="Enable periodic sigusr1 to each simulator every S seconds.",
)
# arguments for the experiment environment
g_env = parser.add_argument_group("Environment")
g_env.add_argument(
"--repo",
metavar="DIR",
type=str,
default=os.path.dirname(__file__) + "/..",
help="SimBricks repository directory",
)
g_env.add_argument(
"--workdir",
metavar="DIR",
type=str,
default="./out/",
help="Work directory base",
)
g_env.add_argument(
"--outdir",
metavar="DIR",
type=str,
default="./out/",
help="Output directory base",
)
g_env.add_argument(
"--cpdir",
metavar="DIR",
type=str,
default="./out/",
help="Checkpoint directory base",
)
g_env.add_argument(
"--hosts",
metavar="JSON_FILE",
type=str,
default=None,
help="List of hosts to use (json)",
)
g_env.add_argument(
"--shmdir",
metavar="DIR",
type=str,
default=None,
help="Shared memory directory base (workdir if not set)",
)
# arguments for the parallel runtime
g_par = parser.add_argument_group("Parallel Runtime")
g_par.add_argument(
"--parallel",
dest="runtime",
action="store_const",
const="parallel",
default="sequential",
help="Use parallel instead of sequential runtime",
)
g_par.add_argument(
"--cores",
metavar="N",
type=int,
default=len(os.sched_getaffinity(0)),
help="Number of cores to use for parallel runs",
)
g_par.add_argument(
"--mem",
metavar="N",
type=int,
default=None,
help="Memory limit for parallel runs (in MB)",
)
# arguments for the slurm runtime
g_slurm = parser.add_argument_group("Slurm Runtime")
g_slurm.add_argument(
"--slurm",
dest="runtime",
action="store_const",
const="slurm",
default="sequential",
help="Use slurm instead of sequential runtime",
)
g_slurm.add_argument(
"--slurmdir",
metavar="DIR",
type=str,
default="./slurm/",
help="Slurm communication directory",
)
# arguments for the distributed runtime
g_dist = parser.add_argument_group("Distributed Runtime")
g_dist.add_argument(
"--dist",
dest="runtime",
action="store_const",
const="dist",
default="sequential",
help="Use sequential distributed runtime instead of local",
)
g_dist.add_argument(
"--auto-dist",
action="store_const",
const=True,
default=False,
help="Automatically distribute non-distributed experiments",
)
g_dist.add_argument(
"--proxy-type",
metavar="TYPE",
type=str,
default="sockets",
help="Proxy type to use (sockets,rdma) for auto distribution",
)
return parser.parse_args()
def load_executors(path: str) -> list[exectools.Executor]:
"""Load hosts list from json file and return list of executors."""
with open(path, "r", encoding="utf-8") as f:
hosts = json.load(f)
exs = []
for h in hosts:
if h["type"] == "local":
ex = command_executor.LocalExecutor()
elif h["type"] == "remote":
ex = command_executor.RemoteExecutor(h["host"], h["workdir"])
if "ssh_args" in h:
ex.ssh_extra_args += h["ssh_args"]
if "scp_args" in h:
ex.scp_extra_args += h["scp_args"]
else:
raise RuntimeError('invalid host type "' + h["type"] + '"')
ex.ip = h["ip"]
exs.append(ex)
return exs
def warn_multi_exec(executors: list[command_executor.Executor]):
if len(executors) > 1:
print(
"Warning: multiple hosts specified, only using first one for now",
file=sys.stderr,
)
def add_exp(
simulation: sim_base.Simulation,
rt: runs.base.Runtime,
run_number: int,
prereq: runs.base.Run | None,
create_cp: bool,
restore_cp: bool,
args: argparse.Namespace,
) -> inst_base.InstantiationEnvironment:
outpath = f"{args.outdir}/{simulation.name}-{run_number}.json"
if os.path.exists(outpath) and not args.force:
print(f"skip {e.name} run {run_number}")
return None
workdir = f"{args.workdir}/{simulation.name}/{run_number}"
cpdir = f"{args.cpdir}/{simulation.name}/0"
if args.shmdir is not None:
shmdir = f"{args.shmdir}/{simulation.name}/{run_number}"
shm_base = "" # TODO
if args.shmdir is not None:
env.shm_base = os.path.abspath(shmdir)
# TODO: user can specify output base
output_base = ""
tmp_sim_files = "" # TODO
inst_env = inst_base.InstantiationEnvironment(
repo_path="",
workdir=workdir,
cpdir=cpdir,
create_cp=create_cp,
restore_cp=restore_cp,
shm_base=shm_base,
output_base=output_base,
tmp_simulation_files=tmp_sim_files,
)
run = runs.base.Run(
simulation=simulation,
inst_env=inst_env,
prereq=prereq,
)
rt.add_run(run)
return run
def main():
args = parse_args()
if args.hosts is None:
executors = [command_executor.LocalExecutor()]
else:
executors = load_executors(args.hosts)
# initialize runtime
if args.runtime == "parallel":
warn_multi_exec(executors)
rt = runs.LocalParallelRuntime(
cores=args.cores, mem=args.mem, verbose=args.verbose, executor=executors[0]
)
elif args.runtime == "slurm":
rt = runs.SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == "dist":
rt = runs.DistributedSimpleRuntime(executors, verbose=args.verbose)
else:
warn_multi_exec(executors)
rt = runs.LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
if args.profile_int:
rt.enable_profiler(args.profile_int)
# load experiments
if not args.pickled:
# default: load python modules with experiments
simulations: list[sim_base.Simulation] = []
for path in args.experiments:
modname, _ = os.path.splitext(os.path.basename(path))
class ExperimentModuleLoadError(Exception):
pass
spec = importlib.util.spec_from_file_location(modname, path)
if spec is None:
raise ExperimentModuleLoadError("spec is None")
mod = importlib.util.module_from_spec(spec)
if spec.loader is None:
raise ExperimentModuleLoadError("spec.loader is None")
spec.loader.exec_module(mod)
simulations += mod.experiments
if args.list:
for sim in simulations:
print(sim.name)
sys.exit(0)
for sim in simulations:
# TODO: do we want a sitributed SImulation class? --> probably not, choose slightly different abstraction
if args.auto_dist and not isinstance(sim, exps.DistributedExperiment):
sim = runtime.auto_dist(sim, executors, args.proxy_type)
# apply filter if any specified
if (args.filter) and (len(args.filter) > 0):
match = False
for f in args.filter:
match = fnmatch.fnmatch(sim.name, f)
if match:
break
if not match:
continue
# if this is an experiment with a checkpoint we might have to create
# it
# TODO: what to do / how to handel checkpointing
if sim.checkpoint:
prereq = add_exp(e, rt, 0, None, True, False, args)
else:
prereq = None
for run in range(args.firstrun, args.firstrun + args.runs):
add_exp(e, rt, run, prereq, False, sim.checkpoint, args)
else:
# otherwise load pickled run object
for path in args.experiments:
with open(path, "rb") as f:
rt.add_run(pickle.load(f))
# register interrupt handler
signal.signal(signal.SIGINT, lambda *_: rt.interrupt())
# invoke runtime to run experiments
asyncio.run(rt.start())
if __name__ == "__main__":
main()
...@@ -20,11 +20,15 @@ ...@@ -20,11 +20,15 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import enum import enum
import pathlib import pathlib
from simbricks.orchestration.utils import base import shutil
from simbricks.orchestration import simulation from simbricks.orchestration.utils import base as util_base
from simbricks.orchestration import system from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.system import base as sys_base
from simbricks.orchestration.runtime_new import command_executor
class SockType(enum.Enum): class SockType(enum.Enum):
...@@ -39,80 +43,86 @@ class Socket: ...@@ -39,80 +43,86 @@ class Socket:
self._type = ty self._type = ty
class InstantiationEnvironment(base.IdObj): class InstantiationEnvironment(util_base.IdObj):
def __init__( def __init__(
self, self,
repo_path: str = pathlib.Path(__file__).parents[3].resolve(), repo_path: str = pathlib.Path(__file__).parents[3].resolve(),
workdir: str = pathlib.Path(), workdir: str = pathlib.Path().resolve(),
cpdir: str = pathlib.Path(), cpdir: str = pathlib.Path().resolve(),
shm_base: str = pathlib.Path(), create_cp: bool = False,
output_base: str = pathlib.Path(), restore_cp: bool = False,
tmp_simulation_files: str = pathlib.Path(), shm_base: str = pathlib.Path().resolve(),
output_base: str = pathlib.Path().resolve(),
tmp_simulation_files: str = pathlib.Path().resolve(),
): ):
super().__init__() super().__init__()
# TODO: add more parameters that wont change during instantiation # TODO: add more parameters that wont change during instantiation
self._repodir: str = pathlib.Path(repo_path).absolute() self._repodir: str = pathlib.Path(repo_path).absolute()
self._workdir: str = pathlib.Path(workdir).absolute() self._workdir: str = pathlib.Path(workdir).absolute()
self._cpdir: str = pathlib.Path(cpdir).absolute() self._cpdir: str = pathlib.Path(self._workdir).joinpath(cpdir).absolute()
self._shm_base: str = pathlib.Path(workdir).joinpath(shm_base).absolute() self._shm_base: str = pathlib.Path(self._workdir).joinpath(shm_base).absolute()
self._output_base: str = pathlib.Path(workdir).joinpath(output_base).absolute() self._output_base: str = (
pathlib.Path(self._workdir).joinpath(output_base).absolute()
)
self._tmp_simulation_files: str = ( self._tmp_simulation_files: str = (
pathlib.Path(workdir).joinpath(tmp_simulation_files).absolute() pathlib.Path(self._workdir).joinpath(tmp_simulation_files).absolute()
) )
self._create_cp = create_cp
self._restore_cp = restore_cp
class Instantiation(base.IdObj): class Instantiation(util_base.IdObj):
def __init__( def __init__(
self, simulation, env: InstantiationEnvironment = InstantiationEnvironment() self,
simulation: sim_base.Simulation,
env: InstantiationEnvironment = InstantiationEnvironment(),
): ):
super().__init__() super().__init__()
self._simulation = simulation self._simulation: sim_base.Simulation = simulation
self._env: InstantiationEnvironment = env self._env: InstantiationEnvironment = env
self._socket_per_interface: dict[system.base.Interface, Socket] = {} self._socket_per_interface: dict[sys_base.Interface, Socket] = {}
@staticmethod @staticmethod
def is_absolute_exists(path: str) -> bool: def is_absolute_exists(path: str) -> bool:
path = pathlib.Path(path) path = pathlib.Path(path)
return path.is_absolute() and path.is_file() return path.is_absolute() and path.is_file()
def _get_chan_by_interface( def _get_chan_by_interface(self, interface: sys_base.Interface) -> sys_base.Channel:
self, interface: system.base.Interface
) -> system.Channel:
if not interface.is_connected(): if not interface.is_connected():
raise Exception( raise Exception(
"cannot determine channel by interface, interface isn't connecteds" "cannot determine channel by interface, interface isn't connecteds"
) )
return interface.channel return interface.channel
def _get_opposing_interface(self, interface: system.Interface) -> system.Interface: def _get_opposing_interface(
self, interface: sys_base.Interface
) -> sys_base.Interface:
channel = self._get_chan_by_interface(interface=interface) channel = self._get_chan_by_interface(interface=interface)
return channel.a if channel.a is not interface else channel.b return channel.a if channel.a is not interface else channel.b
def _updated_tracker_mapping( def _updated_tracker_mapping(
self, interface: system.base.Interface, socket: Socket self, interface: sys_base.Interface, socket: Socket
) -> None: ) -> None:
# update interface mapping # update interface mapping
if interface in self._socket_per_interface: if interface in self._socket_per_interface:
raise Exception("an interface cannot be associated with two sockets") raise Exception("an interface cannot be associated with two sockets")
self._socket_per_interface[interface] = socket self._socket_per_interface[interface] = socket
def _get_socket_by_interface( def _get_socket_by_interface(self, interface: sys_base.Interface) -> Socket | None:
self, interface: system.base.Interface
) -> Socket | None:
if interface not in self._socket_per_interface: if interface not in self._socket_per_interface:
return None return None
return self._socket_per_interface[interface] return self._socket_per_interface[interface]
def _get_opposing_socket_by_interface( def _get_opposing_socket_by_interface(
self, interface: system.base.Interface self, interface: sys_base.Interface
) -> Socket | None: ) -> Socket | None:
opposing_interface = self._get_opposing_interface(interface=interface) opposing_interface = self._get_opposing_interface(interface=interface)
socket = self._get_socket_by_interface(interface=opposing_interface) socket = self._get_socket_by_interface(interface=opposing_interface)
return socket return socket
def _interface_to_sock_path(self, interface: system.base.Interface) -> str: def _interface_to_sock_path(self, interface: sys_base.Interface) -> str:
basepath = pathlib.Path(self._env._workdir) basepath = pathlib.Path(self._env._workdir)
channel = self._get_chan_by_interface(interface=interface) channel = self._get_chan_by_interface(interface=interface)
...@@ -140,7 +150,7 @@ class Instantiation(base.IdObj): ...@@ -140,7 +150,7 @@ class Instantiation(base.IdObj):
new_socket = Socket(path=new_path, ty=new_ty) new_socket = Socket(path=new_path, ty=new_ty)
return new_socket return new_socket
def get_socket(self, interface: system.base.Interface) -> Socket: def get_socket(self, interface: sys_base.Interface) -> Socket:
# check if already a socket is associated with this interface # check if already a socket is associated with this interface
socket = self._get_socket_by_interface(interface=interface) socket = self._get_socket_by_interface(interface=interface)
if socket is not None: if socket is not None:
...@@ -163,9 +173,56 @@ class Instantiation(base.IdObj): ...@@ -163,9 +173,56 @@ class Instantiation(base.IdObj):
# TODO: add more methods constructing paths as required by methods in simulators or image handling classes # TODO: add more methods constructing paths as required by methods in simulators or image handling classes
def prepare_environment(self) -> None: def wrkdir(self) -> str:
TODO return pathlib.Path(self._env._workdir).absolute()
raise Exception("not implemented")
def shm_base_dir(self) -> str:
return pathlib.Path(self._env._shm_base).absolute()
def checkpointing_enabled(self) -> bool:
# TODO: not sure if needed wanted here like this
return self._simulation.checkpointing_enabled()
def create_cp(self) -> bool:
return self._env._create_cp
def cpdir(self) -> str:
return pathlib.Path(self._env._cpdir).absolute()
def wrkdir(self) -> str:
return pathlib.Path(self._env._workdir).absolute()
async def prepare_directories(
self, executor: command_executor.Executor = command_executor.LocalExecutor()
) -> None:
# DISCLAIMER: that we poass the executor in here is an artifact of the
# sub-optimal distributed executions as we may need a remote executor to
# remove or create folders on other machines. In an ideal wolrd, we have
# some sort of runtime on each machine that executes thus making pasing
# an executor in here obsolete...
wrkdir = self._instantiation.wrkdir()
shutil.rmtree(wrkdir, ignore_errors=True)
await executor.rmtree(wrkdir)
shm_base = self.shm_base_dir()
shutil.rmtree(shm_base, ignore_errors=True)
await executor.rmtree(shm_base)
cpdir = self.cpdir()
if self.create_cp():
shutil.rmtree(cpdir, ignore_errors=True)
await executor.rmtree(cpdir)
pathlib.Path(wrkdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(wrkdir)
pathlib.Path(cpdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(cpdir)
pathlib.Path(shm_base).mkdir(parents=True, exist_ok=True)
await executor.mkdir(shm_base)
def _join_paths( def _join_paths(
self, base: str = "", relative_path: str = "", enforce_existence=True self, base: str = "", relative_path: str = "", enforce_existence=True
......
# Copyright 2022 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import abc
import asyncio
import os
import pathlib
import re
import shlex
import shutil
import signal
import typing as tp
from asyncio.subprocess import Process
class Component(object):
def __init__(self, cmd_parts: tp.List[str], with_stdin=False):
self.is_ready = False
self.stdout: tp.List[str] = []
self.stdout_buf = bytearray()
self.stderr: tp.List[str] = []
self.stderr_buf = bytearray()
self.cmd_parts = cmd_parts
#print(cmd_parts)
self.with_stdin = with_stdin
self._proc: Process
self._terminate_future: asyncio.Task
def _parse_buf(self, buf: bytearray, data: bytes) -> tp.List[str]:
if data is not None:
buf.extend(data)
lines = []
start = 0
for i in range(0, len(buf)):
if buf[i] == ord('\n'):
l = buf[start:i].decode('utf-8')
lines.append(l)
start = i + 1
del buf[0:start]
if len(data) == 0 and len(buf) > 0:
lines.append(buf.decode('utf-8'))
return lines
async def _consume_out(self, data: bytes) -> None:
eof = len(data) == 0
ls = self._parse_buf(self.stdout_buf, data)
if len(ls) > 0 or eof:
await self.process_out(ls, eof=eof)
self.stdout.extend(ls)
async def _consume_err(self, data: bytes) -> None:
eof = len(data) == 0
ls = self._parse_buf(self.stderr_buf, data)
if len(ls) > 0 or eof:
await self.process_err(ls, eof=eof)
self.stderr.extend(ls)
async def _read_stream(self, stream: asyncio.StreamReader, fn):
while True:
bs = await stream.readline()
if bs:
await fn(bs)
else:
await fn(bs)
return
async def _waiter(self) -> None:
stdout_handler = asyncio.create_task(
self._read_stream(self._proc.stdout, self._consume_out)
)
stderr_handler = asyncio.create_task(
self._read_stream(self._proc.stderr, self._consume_err)
)
rc = await self._proc.wait()
await asyncio.gather(stdout_handler, stderr_handler)
await self.terminated(rc)
async def send_input(self, bs: bytes, eof=False) -> None:
self._proc.stdin.write(bs)
if eof:
self._proc.stdin.close()
async def start(self) -> None:
if self.with_stdin:
stdin = asyncio.subprocess.PIPE
else:
stdin = asyncio.subprocess.DEVNULL
self._proc = await asyncio.create_subprocess_exec(
*self.cmd_parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=stdin,
)
self._terminate_future = asyncio.create_task(self._waiter())
await self.started()
async def wait(self) -> None:
"""
Wait for running process to finish and output to be collected.
On cancellation, the `CancelledError` is propagated but this component
keeps running.
"""
await asyncio.shield(self._terminate_future)
async def interrupt(self) -> None:
"""Sends an interrupt signal."""
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGINT)
async def terminate(self) -> None:
"""Sends a terminate signal."""
if self._proc.returncode is None:
self._proc.terminate()
async def kill(self) -> None:
"""Sends a kill signal."""
if self._proc.returncode is None:
self._proc.kill()
async def int_term_kill(self, delay: int = 5) -> None:
"""Attempts to stop this component by sending signals in the following
order: interrupt, terminate, kill."""
await self.interrupt()
try:
await asyncio.wait_for(self._proc.wait(), delay)
return
# before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_-
except (TimeoutError, asyncio.TimeoutError):
print(
f'terminating component {self.cmd_parts[0]} '
f'pid {self._proc.pid}',
flush=True
)
await self.terminate()
try:
await asyncio.wait_for(self._proc.wait(), delay)
return
except (TimeoutError, asyncio.TimeoutError):
print(
f'killing component {self.cmd_parts[0]} '
f'pid {self._proc.pid}',
flush=True
)
await self.kill()
await self._proc.wait()
async def sigusr1(self) -> None:
"""Sends an interrupt signal."""
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGUSR1)
async def started(self) -> None:
pass
async def terminated(self, rc) -> None:
pass
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
pass
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
pass
class SimpleComponent(Component):
def __init__(
self,
label: str,
cmd_parts: tp.List[str],
*args,
verbose=True,
canfail=False,
**kwargs
) -> None:
self.label = label
self.verbose = verbose
self.canfail = canfail
self.cmd_parts = cmd_parts
super().__init__(cmd_parts, *args, **kwargs)
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, 'OUT:', lines, flush=True)
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, 'ERR:', lines, flush=True)
async def terminated(self, rc: int) -> None:
if self.verbose:
print(self.label, 'TERMINATED:', rc, flush=True)
if not self.canfail and rc != 0:
raise RuntimeError('Command Failed: ' + str(self.cmd_parts))
class SimpleRemoteComponent(SimpleComponent):
def __init__(
self,
host_name: str,
label: str,
cmd_parts: tp.List[str],
*args,
cwd: tp.Optional[str] = None,
ssh_extra_args: tp.Optional[tp.List[str]] = None,
**kwargs
) -> None:
if ssh_extra_args is None:
ssh_extra_args = []
self.host_name = host_name
self.extra_flags = ssh_extra_args
# add a wrapper to print the PID
remote_parts = ['echo', 'PID', '$$', '&&']
if cwd is not None:
# if necessary add a CD command
remote_parts += ['cd', cwd, '&&']
# escape actual command parts
cmd_parts = list(map(shlex.quote, cmd_parts))
# use exec to make sure the command we run keeps the PIDS
remote_parts += ['exec'] + cmd_parts
# wrap up command in ssh invocation
parts = self._ssh_cmd(remote_parts)
super().__init__(label, parts, *args, **kwargs)
self._pid_fut: tp.Optional[asyncio.Future] = None
def _ssh_cmd(self, parts: tp.List[str]) -> tp.List[str]:
"""SSH invocation of command for this host."""
return [
'ssh',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no'
] + self.extra_flags + [self.host_name, '--'] + parts
async def start(self) -> None:
"""Start this command (includes waiting for its pid)."""
self._pid_fut = asyncio.get_running_loop().create_future()
await super().start()
await self._pid_fut
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
"""Scans output and set PID future once PID line found."""
if not self._pid_fut.done():
newlines = []
pid_re = re.compile(r'^PID\s+(\d+)\s*$')
for l in lines:
m = pid_re.match(l)
if m:
pid = int(m.group(1))
self._pid_fut.set_result(pid)
else:
newlines.append(l)
lines = newlines
if eof and not self._pid_fut.done():
# cancel PID future if it's not going to happen
print('PID not found but EOF already found:', self.label)
self._pid_fut.cancel()
await super().process_out(lines, eof)
async def _kill_cmd(self, sig: str) -> None:
"""Send signal to command by running ssh kill -$sig $PID."""
cmd_parts = self._ssh_cmd([
'kill', '-' + sig, str(self._pid_fut.result())
])
proc = await asyncio.create_subprocess_exec(*cmd_parts)
await proc.wait()
async def interrupt(self) -> None:
await self._kill_cmd('INT')
async def terminate(self) -> None:
await self._kill_cmd('TERM')
async def kill(self) -> None:
await self._kill_cmd('KILL')
class Executor(abc.ABC):
def __init__(self) -> None:
self.ip = None
@abc.abstractmethod
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleComponent:
pass
@abc.abstractmethod
async def await_file(self, path: str, delay=0.05, verbose=False) -> None:
pass
@abc.abstractmethod
async def send_file(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def mkdir(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def rmtree(self, path: str, verbose=False) -> None:
pass
# runs the list of commands as strings sequentially
async def run_cmdlist(
self, label: str, cmds: tp.List[str], verbose=True
) -> None:
i = 0
for cmd in cmds:
cmd_c = self.create_component(
label + '.' + str(i), shlex.split(cmd), verbose=verbose
)
await cmd_c.start()
await cmd_c.wait()
async def await_files(self, paths: tp.List[str], *args, **kwargs) -> None:
xs = []
for p in paths:
waiter = asyncio.create_task(self.await_file(p, *args, **kwargs))
xs.append(waiter)
await asyncio.gather(*xs)
class LocalExecutor(Executor):
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleComponent:
return SimpleComponent(label, parts, **kwargs)
async def await_file(
self, path: str, delay=0.05, verbose=False, timeout=30
) -> None:
if verbose:
print(f'await_file({path})')
t = 0
while not os.path.exists(path):
if t >= timeout:
raise TimeoutError()
await asyncio.sleep(delay)
t += delay
async def send_file(self, path: str, verbose=False) -> None:
# locally we do not need to do anything
pass
async def mkdir(self, path: str, verbose=False) -> None:
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
async def rmtree(self, path: str, verbose=False) -> None:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.unlink(path)
class RemoteExecutor(Executor):
def __init__(self, host_name: str, workdir: str) -> None:
super().__init__()
self.host_name = host_name
self.cwd = workdir
self.ssh_extra_args = []
self.scp_extra_args = []
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleRemoteComponent:
return SimpleRemoteComponent(
self.host_name,
label,
parts,
cwd=self.cwd,
ssh_extra_args=self.ssh_extra_args,
**kwargs
)
async def await_file(
self, path: str, delay=0.05, verbose=False, timeout=30
) -> None:
if verbose:
print(f'{self.host_name}.await_file({path}) started')
to_its = timeout / delay
loop_cmd = (
f'i=0 ; while [ ! -e {path} ] ; do '
f'if [ $i -ge {to_its:u} ] ; then exit 1 ; fi ; '
f'sleep {delay} ; '
'i=$(($i+1)) ; done; exit 0'
) % (path, to_its, delay)
parts = ['/bin/sh', '-c', loop_cmd]
sc = self.create_component(
f"{self.host_name}.await_file('{path}')",
parts,
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
# TODO: Implement opitimized await_files()
async def send_file(self, path: str, verbose=False) -> None:
parts = [
'scp',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no'
] + self.scp_extra_args + [path, f'{self.host_name}:{path}']
sc = SimpleComponent(
f'{self.host_name}.send_file("{path}")',
parts,
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
async def mkdir(self, path: str, verbose=False) -> None:
sc = self.create_component(
f"{self.host_name}.mkdir('{path}')", ['mkdir', '-p', path],
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
async def rmtree(self, path: str, verbose=False) -> None:
sc = self.create_component(
f'{self.host_name}.rmtree("{path}")', ['rm', '-rf', path],
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.orchestration.runtime.common import Run, Runtime
from simbricks.orchestration.runtime.distributed import (
DistributedSimpleRuntime, auto_dist
)
from simbricks.orchestration.runtime.local import (
LocalParallelRuntime, LocalSimpleRuntime
)
from simbricks.orchestration.runtime.slurm import SlurmRuntime
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Allow own class to be used as type for a method's argument
from __future__ import annotations
import itertools
import pathlib
import shutil
import typing as tp
import abc
from simbricks.orchestration.exectools import LocalExecutor
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiment.experiment_output import ExpOutput
from simbricks.orchestration.experiments import Experiment
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import command_executor
class Run:
"""Defines a single execution run for an experiment."""
__run_nr = itertools.count()
def __init__(
self,
simulation: sim_base,
instantiation: inst_base.Instantiation,
prereq: Run | None = None,
output: ExpOutput | None = None,
job_id: int | None = None,
):
self._simulation: sim_base.Simulation = simulation
self._run_nr = next(self.__run_nr)
self._instantiation: inst_base.Instantiation = instantiation
self._output: sim_base.SimulationOutput | None = output
self._prereq: Run | None = prereq
self._job_id: int | None = job_id
"""Slurm job id."""
def name(self) -> str:
return self._simulation.name + "." + str(self._run_nr)
async def prep_dirs(self, executor=command_executor.LocalExecutor()) -> None:
await self._instantiation.prepare_directories(executor=executor)
class Runtime(metaclass=abc.ABCMeta):
"""Base class for managing the execution of multiple runs."""
def __init__(self) -> None:
self._interrupted = False
"""Indicates whether interrupt has been signaled."""
self._profile_int: int | None = None
@abc.abstractmethod
def add_run(self, run: Run) -> None:
pass
@abc.abstractmethod
async def start(self) -> None:
pass
@abc.abstractmethod
def interrupt_handler(self) -> None:
"""
Interrupts signal handler.
All currently running simulators should be stopped cleanly and their
output collected.
"""
pass
def interrupt(self) -> None:
"""Signals interrupt to runtime."""
# don't invoke interrupt handler multiple times as this would trigger
# repeated CancelledError
if not self._interrupted:
self._interrupted = True
self.interrupt_handler()
def enable_profiler(self, profile_int: int) -> None:
self._profile_int = profile_int
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import typing as tp
from simbricks.orchestration import proxy
from simbricks.orchestration.exectools import Executor
from simbricks.orchestration.experiments import DistributedExperiment, Experiment
from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new.runs import base
class DistributedSimpleRuntime(base.Runtime):
def __init__(self, executors, verbose=False) -> None:
super().__init__()
self.runnable: list[base.Run] = []
self.complete: list[base.Run] = []
self.verbose = verbose
self.executors = executors
self._running: asyncio.Task
def add_run(self, run: base.Run) -> None:
if not isinstance(run.experiment, DistributedExperiment):
raise RuntimeError("Only distributed experiments supported")
self.runnable.append(run)
async def do_run(self, run: base.Run) -> None:
runner = simulation_executor.ExperimentDistributedRunner(
self.executors,
# we ensure the correct type in add_run()
tp.cast(DistributedExperiment, run.experiment),
run.env,
self.verbose,
)
if self.profile_int:
runner.profile_int = self.profile_int
try:
for executor in self.executors:
await run.prep_dirs(executor)
await runner.prepare()
except asyncio.CancelledError:
# it is safe to just exit here because we are not running any
# simulators yet
return
run.output = await runner.run() # already handles CancelledError
self.complete.append(run)
# if the log is huge, this step takes some time
if self.verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
run.output.dump(run.outpath)
async def start(self) -> None:
for run in self.runnable:
if self._interrupted:
return
self._running = asyncio.create_task(self.do_run(run))
await self._running
def interrupt_handler(self) -> None:
if self._running:
self._running.cancel()
# TODO: fix this function
def auto_dist(
e: Experiment, execs: list[Executor], proxy_type: str = "sockets"
) -> DistributedExperiment:
"""
Converts an Experiment into a DistributedExperiment.
Assigns network to executor zero, and then round-robin assignment of hosts
to executors, while also assigning all nics for a host to the same executor.
"""
if len(execs) < 2:
raise RuntimeError("auto_dist needs at least two hosts")
elif len(execs) > 2:
print("Warning: currently auto_dist only uses the first two hosts")
if proxy_type == "sockets":
proxy_listener_c = proxy.SocketsNetProxyListener
proxy_connecter_c = proxy.SocketsNetProxyConnecter
elif proxy_type == "rdma":
proxy_listener_c = proxy.RDMANetProxyListener
proxy_connecter_c = proxy.RDMANetProxyConnecter
else:
raise RuntimeError("Unknown proxy type specified")
# Create the distributed experiment
de = DistributedExperiment(e.name, 2)
de.timeout = e.timeout
de.checkpoint = e.checkpoint
de.no_simbricks = e.no_simbricks
de.metadata = e.metadata.copy()
# create listening proxy on host 0
lp = proxy_listener_c()
lp.name = "listener"
de.add_proxy(lp)
de.assign_sim_host(lp, 0)
# assign networks to first host
for net in e.networks:
de.add_network(net)
de.assign_sim_host(net, 0)
# create connecting proxy on host 1
cp = proxy_connecter_c(lp)
cp.name = "connecter"
de.add_proxy(cp)
de.assign_sim_host(cp, 1)
# round-robin assignment for hosts
k = 0
for h in e.hosts:
de.add_host(h)
de.assign_sim_host(h, k)
for nic in h.nics:
de.assign_sim_host(nic, k)
if k != 0:
cp.add_nic(nic)
k = (k + 1) % 2
for nic in e.nics:
de.add_nic(nic)
return de
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio
from simbricks.orchestration import exectools
from simbricks.orchestration.runners import ExperimentSimpleRunner
from simbricks.orchestration.runtime.common import Run, Runtime
from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new import command_executor
from simbricks.orchestration.runtime_new.runs import base as run_base
class LocalSimpleRuntime(run_base.Runtime):
"""Execute runs locally in sequence."""
def __init__(
self,
verbose=False,
executor: command_executor.Executor = command_executor.LocalExecutor(),
):
super().__init__()
self._runnable: list[run_base.Run] = []
self._complete: list[run_base.Run] = []
self._verbose: bool = verbose
self._executor: command_executor.Executor = executor
self._running: asyncio.Task | None = None
def add_run(self, run: run_base.Run) -> None:
self._runnable.append(run)
async def do_run(self, run: run_base.Run) -> None:
"""Actually executes `run`."""
try:
runner = simulation_executor.ExperimentSimpleRunner(
self._executor, run._simulation, run._inst_env, self.verbose
)
if self._profile_int:
runner.profile_int = self.profile_int
await run.prep_dirs(self.executor)
await runner.prepare()
except asyncio.CancelledError:
# it is safe to just exit here because we are not running any
# simulators yet
return
run.output = await runner.run() # handles CancelledError
self._complete.append(run)
# if the log is huge, this step takes some time
if self._verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
# TODO: FIXME
run._output.dump(run.outpath)
async def start(self) -> None:
"""Execute the runs defined in `self.runnable`."""
for run in self.runnable:
if self._interrupted:
return
self._running = asyncio.create_task(self.do_run(run))
await self._running
def interrupt_handler(self) -> None:
if self._running:
self._running.cancel()
class LocalParallelRuntime(run_base.Runtime):
"""Execute runs locally in parallel on multiple cores."""
def __init__(
self,
cores: int,
mem: int | None = None,
verbose: bool = False,
executor: command_executor.Executor = command_executor.LocalExecutor(),
):
super().__init__()
self._runs_noprereq: list[run_base.Run] = []
"""Runs with no prerequesite runs."""
self._runs_prereq: list[run_base.Run] = []
"""Runs with prerequesite runs."""
self._complete: set[run_base.Run] = set()
self._cores: int = cores
self._mem: int | None = mem
self._verbose: bool = verbose
self._executor = executor
self._pending_jobs: set[asyncio.Task] = set()
self._starter_task: asyncio.Task
def add_run(self, run: run_base.Run) -> None:
if run._simulation.resreq_cores() > self._cores:
raise RuntimeError("Not enough cores available for run")
if self._mem is not None and run._simulation.resreq_mem() > self._mem:
raise RuntimeError("Not enough memory available for run")
if run._prereq is None:
self._runs_noprereq.append(run)
else:
self._runs_prereq.append(run)
async def do_run(self, run: run_base.Run) -> run_base.Run | None:
"""Actually executes `run`."""
try:
runner = simulation_executor.ExperimentSimpleRunner(
self._executor, run._simulation, run._inst_env, self._verbose
)
if self._profile_int is not None:
runner._profile_int = self._profile_int
await run.prep_dirs(executor=self._executor)
await runner.prepare()
except asyncio.CancelledError:
# it is safe to just exit here because we are not running any
# simulators yet
return None
print("starting run ", run.name())
run._output = await runner.run() # already handles CancelledError
# if the log is huge, this step takes some time
if self.verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
run._output.dump(output path) # TODO: FIXME
print("finished run ", run.name())
return run
async def wait_completion(self) -> None:
"""Wait for any run to terminate and return."""
assert self._pending_jobs
done, self._pending_jobs = await asyncio.wait(
self._pending_jobs, return_when=asyncio.FIRST_COMPLETED
)
for r_awaitable in done:
run = await r_awaitable
self._complete.add(run)
self._cores_used -= run._simulation.resreq_cores()
self._mem_used -= run._simulation.resreq_mem()
def enough_resources(self, run: run_base.Run) -> bool:
"""Check if enough cores and mem are available for the run."""
simulation = run._simulation # pylint: disable=redefined-outer-name
if self._cores is not None:
enough_cores = (self._cores - self._cores_used) >= simulation.resreq_cores()
else:
enough_cores = True
if self._mem is not None:
enough_mem = (self.mem - self.mem_used) >= simulation.resreq_mem()
else:
enough_mem = True
return enough_cores and enough_mem
def prereq_ready(self, run: run_base.Run) -> bool:
"""Check if the prerequesite run for `run` has completed."""
if run._prereq is None:
return True
return run._prereq in self._complete
async def do_start(self) -> None:
"""Asynchronously execute the runs defined in `self.runs_noprereq +
self.runs_prereq."""
# self.completions = asyncio.Queue()
self._cores_used = 0
self._mem_used = 0
runs = self._runs_noprereq + self._runs_prereq
for run in runs:
# if necessary, wait for enough memory or cores
while not self.enough_resources(run):
print("waiting for resources")
await self.wait_completion()
# if necessary, wait for prerequesite runs to complete
while not self.prereq_ready(run):
print("waiting for prereq")
await self.wait_completion()
self._cores_used += run._simulation.resreq_cores()
self._mem_used += run._simulation.resreq_mem()
job = asyncio.create_task(self.do_run(run))
self._pending_jobs.add(job)
# wait for all runs to finish
await asyncio.gather(*self._pending_jobs)
async def start(self) -> None:
"""Execute all defined runs."""
self._starter_task = asyncio.create_task(self.do_start())
try:
await self._starter_task
except asyncio.CancelledError:
for job in self._pending_jobs:
job.cancel()
# wait for all runs to finish
await asyncio.gather(*self._pending_jobs)
def interrupt_handler(self) -> None:
self._starter_task.cancel()
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import os
import pathlib
import pickle
import re
import typing as tp
from simbricks.orchestration.runtime.common import Run, Runtime
from simbricks.orchestration.runtime_new import runs
class SlurmRuntime(Runtime):
def __init__(self, slurmdir, args, verbose=False, cleanup=True) -> None:
super().__init__()
self.runnable: tp.List[Run] = []
self.slurmdir = slurmdir
self.args = args
self.verbose = verbose
self.cleanup = cleanup
self._start_task: asyncio.Task
def add_run(self, run: Run) -> None:
self.runnable.append(run)
def prep_run(self, run: Run) -> str:
exp = run.experiment
e_idx = exp.name + f'-{run.index}' + '.exp'
exp_path = os.path.join(self.slurmdir, e_idx)
log_idx = exp.name + f'-{run.index}' + '.log'
exp_log = os.path.join(self.slurmdir, log_idx)
sc_idx = exp.name + f'-{run.index}' + '.sh'
exp_script = os.path.join(self.slurmdir, sc_idx)
print(exp_path)
print(exp_log)
print(exp_script)
# write out pickled run
with open(exp_path, 'wb', encoding='utf-8') as f:
run.prereq = None # we don't want to pull in the prereq too
pickle.dump(run, f)
# create slurm batch script
with open(exp_script, 'w', encoding='utf-8') as f:
f.write('#!/bin/sh\n')
f.write(f'#SBATCH -o {exp_log} -e {exp_log}\n')
#f.write('#SBATCH -c %d\n' % (exp.resreq_cores(),))
f.write(f'#SBATCH --mem={exp.resreq_mem()}M\n')
f.write(f'#SBATCH --job-name="{run.name()}"\n')
f.write('#SBATCH --exclude=spyder[01-05],spyder16\n')
f.write('#SBATCH -c 32\n')
f.write('#SBATCH --nodes=1\n')
if exp.timeout is not None:
h = int(exp.timeout / 3600)
m = int((exp.timeout % 3600) / 60)
s = int(exp.timeout % 60)
f.write(f'#SBATCH --time={h:02d}:{m:02d}:{s:02d}\n')
extra = ''
if self.verbose:
extra = '--verbose'
f.write(f'python3 run.py {extra} --pickled {exp_path}\n')
f.write('status=$?\n')
if self.cleanup:
f.write(f'rm -rf {run.env.workdir}\n')
f.write('exit $status\n')
return exp_script
async def _do_start(self) -> None:
pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True)
jid_re = re.compile(r'Submitted batch job ([0-9]+)')
for run in self.runnable:
if run.prereq is None:
dep_cmd = ''
else:
dep_cmd = '--dependency=afterok:' + str(run.prereq.job_id)
script = self.prep_run(run)
stream = os.popen(f'sbatch {dep_cmd} {script}')
output = stream.read()
result = stream.close()
if result is not None:
raise RuntimeError('running sbatch failed')
m = jid_re.search(output)
if m is None:
raise RuntimeError('cannot retrieve id of submitted job')
run.job_id = int(m.group(1))
async def start(self) -> None:
self._start_task = asyncio.create_task(self._do_start())
try:
await self._start_task
except asyncio.CancelledError:
# stop all runs that have already been scheduled
# (existing slurm job id)
job_ids = []
for run in self.runnable:
if run.job_id:
job_ids.append(str(run.job_id))
scancel_process = await asyncio.create_subprocess_shell(
f"scancel {' '.join(job_ids)}"
)
await scancel_process.wait()
def interrupt_handler(self) -> None:
self._start_task.cancel()
# Copyright 2022 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import itertools
import shlex
import traceback
import typing as tp
import abc
from simbricks.orchestration.exectools import (
Component, Executor, SimpleComponent
)
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiment.experiment_output import ExpOutput
from simbricks.orchestration.experiments import (
DistributedExperiment, Experiment
)
from simbricks.orchestration.simulators import Simulator
from simbricks.orchestration.utils import graphlib
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new import command_executor
class ExperimentBaseRunner(abc.ABC):
def __init__(self, simulation: sim_base.Simulation, inst_env: inst_base.InstantiationEnvironment, verbose: bool) -> None:
self._simulation: sim_base.Simulation = exp
self._inst_env: inst_base.InstantiationEnvironment = env
self._verbose: bool = verbose
self._profile_int: int | None = None
self._out = ExpOutput(exp) # TODO: wtf shall this whitchcraft be
self._running: list[tuple[sim_base.Simulator, simulation_executor.SimpleComponent]] = []
self._sockets: list[tuple[simulation_executor.Executor, str]] = []
self._wait_sims: list[simulation_executor.Component] = []
@abc.abstractmethod
def sim_executor(self, simulator: sim_base.Simulator) -> simulation_executor.Executor:
pass
def sim_graph(self) -> dict[sim_base.Simulator, set[sim_base.Simulator]]:
sims = self._simulation.all_simulators()
graph = {}
for sim in sims:
deps = sim.dependencies() + sim.extra_deps
print(f'deps of {sim}: {sim.dependencies()}')
graph[sim] = set()
for d in deps:
graph[sim].add(d)
return graph
async def start_sim(self, sim: sim_base.Simulator) -> None:
"""Start a simulator and wait for it to be ready."""
name = sim.full_name()
if self.verbose:
print(f'{self._simulation.name}: starting {name}')
run_cmd = sim.run_cmd(self.env)
if run_cmd is None:
if self.verbose:
print(f'{self._simulation.name}: started dummy {name}')
return
# run simulator
executor = self.sim_executor(sim)
sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self.verbose, canfail=True
)
await sc.start()
self.running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((executor, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
if wait_socks:
if self.verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate(self.env):
self.wait_sims.append(sc)
if self.verbose:
print(f'{self.exp.name}: started {name}')
async def before_wait(self) -> None:
pass
async def before_cleanup(self) -> None:
pass
async def after_cleanup(self) -> None:
pass
async def prepare(self) -> None:
# generate config tars
copies = []
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
host.node_config.make_tar(self.env, path)
executor = self.sim_executor(host)
task = asyncio.create_task(executor.send_file(path, self.verbose))
copies.append(task)
await asyncio.gather(*copies)
# prepare all simulators in parallel
sims = []
for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env))
executor = self.sim_executor(sim)
task = asyncio.create_task(
executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
)
)
sims.append(task)
await asyncio.gather(*sims)
async def wait_for_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on)."""
if self.verbose:
print(f'{self.exp.name}: waiting for hosts to terminate')
for sc in self.wait_sims:
await sc.wait()
async def terminate_collect_sims(self) -> ExpOutput:
"""Terminates all simulators and collects output."""
self.out.set_end()
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.gather(*scs)
# wait for all processes to terminate
for _, sc in self.running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.gather(*scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
async def profiler(self):
assert self.profile_int
while True:
await asyncio.sleep(self.profile_int)
for (_, sc) in self.running:
await sc.sigusr1()
async def run(self) -> ExpOutput:
profiler_task = None
try:
self.out.set_start()
graph = self.sim_graph()
print(graph)
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
# start ready simulators in parallel
starting = []
sims = []
for sim in ts.get_ready():
starting.append(asyncio.create_task(self.start_sim(sim)))
sims.append(sim)
# wait for starts to complete
await asyncio.gather(*starting)
for sim in sims:
ts.done(sim)
if self.profile_int:
profiler_task = asyncio.create_task(self.profiler())
await self.before_wait()
await self.wait_for_sims()
except asyncio.CancelledError:
if self.verbose:
print(f'{self.exp.name}: interrupted')
self.out.set_interrupted()
except: # pylint: disable=bare-except
self.out.set_failed()
traceback.print_exc()
if profiler_task:
try:
profiler_task.cancel()
except asyncio.CancelledError:
pass
# The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output
# file.
terminate_collect_task = asyncio.create_task(
self.terminate_collect_sims()
)
# prevent terminate_collect_task from being cancelled
while True:
try:
return await asyncio.shield(terminate_collect_task)
except asyncio.CancelledError as e:
print(e)
pass
class ExperimentSimpleRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, executor: command_executor.Executor, *args, **kwargs) -> None:
self._executor = executor
super().__init__(*args, **kwargs)
def sim_executor(self, sim: sim_base.Simulator) -> command_executor.Executor:
return self._executor
class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
# TODO: FIXME
def __init__(
self, execs: dict[sim -> Executor], exp: DistributedExperiment, *args, **kwargs
) -> None:
self.execs = execs
super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim) -> Executor:
h_id = self.exp.host_mapping[sim]
return self.execs[h_id]
async def prepare(self) -> None:
# make sure all simulators are assigned to an executor
assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors
for p in itertools.chain(
self.exp.proxies_listen, self.exp.proxies_connect
):
executor = self.sim_executor(p)
p.ip = executor.ip
await super().prepare()
...@@ -22,11 +22,13 @@ ...@@ -22,11 +22,13 @@
from __future__ import annotations from __future__ import annotations
import abc import abc
import time
import typing as tp import typing as tp
import simbricks.orchestration.system as sys_conf import simbricks.orchestration.system as sys_conf
from simbricks.orchestration.experiment import experiment_environment_new as exp_env from simbricks.orchestration.experiment import experiment_environment_new as exp_env
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.utils import base as utils_base from simbricks.orchestration.utils import base as utils_base
from simbricks.orchestration.runtime_new import command_executor
if tp.TYPE_CHECKING: if tp.TYPE_CHECKING:
from simbricks.orchestration.simulation import ( from simbricks.orchestration.simulation import (
...@@ -171,6 +173,7 @@ class Simulator(utils_base.IdObj): ...@@ -171,6 +173,7 @@ class Simulator(utils_base.IdObj):
"""Command to execute this simulator.""" """Command to execute this simulator."""
return "" return ""
# TODO: FIXME
def dependencies(self) -> list[Simulator]: def dependencies(self) -> list[Simulator]:
"""Other simulators to execute before this one.""" """Other simulators to execute before this one."""
return [] return []
...@@ -198,8 +201,11 @@ class Simulator(utils_base.IdObj): ...@@ -198,8 +201,11 @@ class Simulator(utils_base.IdObj):
def wait_terminate(self) -> bool: def wait_terminate(self) -> bool:
return False return False
def supports_checkpointing(self) -> bool:
return False
class Simulation(object): class Simulation(utils_base.IdObj):
""" """
Base class for all simulation experiments. Base class for all simulation experiments.
...@@ -207,6 +213,7 @@ class Simulation(object): ...@@ -207,6 +213,7 @@ class Simulation(object):
""" """
def __init__(self, name: str) -> None: def __init__(self, name: str) -> None:
super().__init__()
self.name = name self.name = name
""" """
This experiment's name. This experiment's name.
...@@ -223,9 +230,6 @@ class Simulation(object): ...@@ -223,9 +230,6 @@ class Simulation(object):
by first running in a less accurate mode, then checkpointing the system by first running in a less accurate mode, then checkpointing the system
state after boot and running simulations from there. state after boot and running simulations from there.
""" """
self.no_simbricks = False
"""If `true`, no simbricks adapters are used in any of the
simulators."""
self.hosts: list[HostSim] = [] self.hosts: list[HostSim] = []
"""The host simulators to run.""" """The host simulators to run."""
self.pcidevs: list[PCIDevSim] = [] self.pcidevs: list[PCIDevSim] = []
...@@ -304,6 +308,8 @@ class Simulation(object): ...@@ -304,6 +308,8 @@ class Simulation(object):
def all_simulators(self) -> tp.Iterable[Simulator]: def all_simulators(self) -> tp.Iterable[Simulator]:
"""Returns all simulators defined to run in this experiment.""" """Returns all simulators defined to run in this experiment."""
# TODO: FIXME
raise Exception("needs fixed implementation")
return itertools.chain( return itertools.chain(
self.hosts, self.pcidevs, self.memdevs, self.netmems, self.networks self.hosts, self.pcidevs, self.memdevs, self.netmems, self.networks
) )
...@@ -324,8 +330,57 @@ class Simulation(object): ...@@ -324,8 +330,57 @@ class Simulation(object):
def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator: def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator:
"""Returns the used simulator object for the system component.""" """Returns the used simulator object for the system component."""
for c, sim in self.sys_sim_map.items(): if comp not in self.sys_sim_map:
if c == comp: raise Exception("Simulator Not Found")
return sim return self.sys_sim_map[comp]
raise Exception("Simulator Not Found") def enable_checkpointing_if_supported() -> None:
raise Exception("not implemented")
def is_checkpointing_enabled(self) -> bool:
raise Exception("not implemented")
class SimulationOutput:
"""Manages an experiment's output."""
def __init__(self, sim: Simulation) -> None:
self._sim_name: str = sim.name
self._start_time: float = None
self._end_time: float = None
self._success: bool = True
self._interrupted: bool = False
self._metadata = exp.metadata
self._sims: dict[str, dict[str, str | list[str]]] = {}
def set_start(self) -> None:
self._start_time = time.time()
def set_end(self) -> None:
self.end_time = time.time()
def set_failed(self) -> None:
self.success = False
def set_interrupted(self) -> None:
self.success = False
self.interrupted = True
def add_sim(self, sim: Simulator, comp: command_executor.Component) -> None:
obj = {
"class": sim.__class__.__name__,
"cmd": comp.cmd_parts,
"stdout": comp.stdout,
"stderr": comp.stderr,
}
self._sims[sim.full_name()] = obj
def dump(self, outpath: str) -> None:
pathlib.Path(outpath).parent.mkdir(parents=True, exist_ok=True)
with open(outpath, "w", encoding="utf-8") as file:
json.dump(self.__dict__, file, indent=4)
def load(self, file: str) -> None:
with open(file, "r", encoding="utf-8") as fp:
for k, v in json.load(fp).items():
self.__dict__[k] = v
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment