Commit eacbf429 authored by Jonas Kaufmann's avatar Jonas Kaufmann
Browse files

simbricks-run: update to work with changes to simulation executor, throw out...

simbricks-run: update to work with changes to simulation executor, throw out all things that are no longer supported
parent 588595ae
...@@ -20,25 +20,22 @@ ...@@ -20,25 +20,22 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""This is the top-level module of the SimBricks orchestration framework that """This is the top-level module of the SimBricks orchestration framework that
users interact with.""" users interact with for running simulations locally."""
import argparse import argparse
import asyncio import asyncio
import fnmatch import fnmatch
import importlib import importlib
import importlib.util import importlib.util
import json
import os import os
import pathlib
import signal import signal
import sys import sys
from simbricks.runtime import output as sim_out
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.runtime.runs import base as runs_base from simbricks.runtime import output as sim_out
from simbricks.runtime.runs import base as runs_base from simbricks.runtime.runs import base as runs_base
from simbricks.runtime.runs import local as rt_local from simbricks.runtime.runs import local as rt_local
from simbricks.runtime import command_executor
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
...@@ -65,13 +62,6 @@ def parse_args() -> argparse.Namespace: ...@@ -65,13 +62,6 @@ def parse_args() -> argparse.Namespace:
nargs="+", nargs="+",
help="Only run experiments matching the given Unix shell style patterns", help="Only run experiments matching the given Unix shell style patterns",
) )
parser.add_argument(
"--pickled",
action="store_const",
const=True,
default=False,
help="Interpret experiment modules as pickled runs instead of .py files",
)
parser.add_argument( parser.add_argument(
"--runs", "--runs",
metavar="N", metavar="N",
...@@ -79,9 +69,7 @@ def parse_args() -> argparse.Namespace: ...@@ -79,9 +69,7 @@ def parse_args() -> argparse.Namespace:
default=1, default=1,
help="Number of repetition of each experiment", help="Number of repetition of each experiment",
) )
parser.add_argument( parser.add_argument("--firstrun", metavar="N", type=int, default=1, help="ID for first run")
"--firstrun", metavar="N", type=int, default=1, help="ID for first run"
)
parser.add_argument( parser.add_argument(
"--force", "--force",
action="store_const", action="store_const",
...@@ -116,45 +104,17 @@ def parse_args() -> argparse.Namespace: ...@@ -116,45 +104,17 @@ def parse_args() -> argparse.Namespace:
g_env.add_argument( g_env.add_argument(
"--repo", "--repo",
metavar="DIR", metavar="DIR",
type=str, type=pathlib.Path,
default=os.path.dirname(__file__) + "/..", default=pathlib.Path("/simbricks"),
help="SimBricks repository directory", help="SimBricks repository directory",
) )
g_env.add_argument( g_env.add_argument(
"--workdir", "--workdir",
metavar="DIR", metavar="DIR",
type=str, type=pathlib.Path,
default="./out/", default=pathlib.Path("./out/"),
help="Work directory base", help="Work directory base",
) )
g_env.add_argument(
"--outdir",
metavar="DIR",
type=str,
default="./out/",
help="Output directory base",
)
g_env.add_argument(
"--cpdir",
metavar="DIR",
type=str,
default="./out/",
help="Checkpoint directory base",
)
g_env.add_argument(
"--hosts",
metavar="JSON_FILE",
type=str,
default=None,
help="List of hosts to use (json)",
)
g_env.add_argument(
"--shmdir",
metavar="DIR",
type=str,
default=None,
help="Shared memory directory base (workdir if not set)",
)
# arguments for the parallel runtime # arguments for the parallel runtime
g_par = parser.add_argument_group("Parallel Runtime") g_par = parser.add_argument_group("Parallel Runtime")
...@@ -181,89 +141,17 @@ def parse_args() -> argparse.Namespace: ...@@ -181,89 +141,17 @@ def parse_args() -> argparse.Namespace:
help="Memory limit for parallel runs (in MB)", help="Memory limit for parallel runs (in MB)",
) )
# arguments for the slurm runtime
g_slurm = parser.add_argument_group("Slurm Runtime")
g_slurm.add_argument(
"--slurm",
dest="runtime",
action="store_const",
const="slurm",
default="sequential",
help="Use slurm instead of sequential runtime",
)
g_slurm.add_argument(
"--slurmdir",
metavar="DIR",
type=str,
default="./slurm/",
help="Slurm communication directory",
)
# arguments for the distributed runtime
g_dist = parser.add_argument_group("Distributed Runtime")
g_dist.add_argument(
"--dist",
dest="runtime",
action="store_const",
const="dist",
default="sequential",
help="Use sequential distributed runtime instead of local",
)
g_dist.add_argument(
"--auto-dist",
action="store_const",
const=True,
default=False,
help="Automatically distribute non-distributed experiments",
)
g_dist.add_argument(
"--proxy-type",
metavar="TYPE",
type=str,
default="sockets",
help="Proxy type to use (sockets,rdma) for auto distribution",
)
return parser.parse_args() return parser.parse_args()
def load_executors(path: str) -> list[command_executor.Executor]:
"""Load hosts list from json file and return list of executors."""
with open(path, "r", encoding="utf-8") as f:
hosts = json.load(f)
exs = []
for h in hosts:
if h["type"] == "local":
ex = command_executor.LocalExecutor()
elif h["type"] == "remote":
ex = command_executor.RemoteExecutor(h["host"], h["workdir"])
if "ssh_args" in h:
ex.ssh_extra_args += h["ssh_args"]
if "scp_args" in h:
ex.scp_extra_args += h["scp_args"]
else:
raise RuntimeError('invalid host type "' + h["type"] + '"')
ex.ip = h["ip"]
exs.append(ex)
return exs
def warn_multi_exec(executors: list[command_executor.Executor]):
if len(executors) > 1:
print(
"Warning: multiple hosts specified, only using first one for now",
file=sys.stderr,
)
def add_exp( def add_exp(
instantiation: inst_base.Instantiation, instantiation: inst_base.Instantiation,
prereq: runs_base.Run | None, prereq: runs_base.Run | None,
rt: runs_base.Runtime, rt: runs_base.Runtime,
args: argparse.Namespace,
) -> runs_base.Run: ) -> runs_base.Run:
env = inst_base.InstantiationEnvironment() # TODO: set from args env = inst_base.InstantiationEnvironment(args.workdir, args.repo)
instantiation.env = env instantiation.env = env
output = sim_out.SimulationOutput(instantiation.simulation) output = sim_out.SimulationOutput(instantiation.simulation)
...@@ -274,90 +162,71 @@ def add_exp( ...@@ -274,90 +162,71 @@ def add_exp(
def main(): def main():
args = parse_args() args = parse_args()
if args.hosts is None:
executors = [command_executor.LocalExecutor()]
else:
executors = load_executors(args.hosts)
# initialize runtime # initialize runtime
if args.runtime == "parallel": if args.runtime == "parallel":
warn_multi_exec(executors) rt = rt_local.LocalParallelRuntime(cores=args.cores, mem=args.mem, verbose=args.verbose)
rt = rt_local.LocalParallelRuntime(
cores=args.cores, mem=args.mem, verbose=args.verbose, executor=executors[0]
)
# elif args.runtime == "slurm":
# rt = runs.SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
# elif args.runtime == "dist":
# rt = runs.DistributedSimpleRuntime(executors, verbose=args.verbose)
else: else:
warn_multi_exec(executors) rt = rt_local.LocalSimpleRuntime(verbose=args.verbose)
rt = rt_local.LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
if args.profile_int: if args.profile_int:
rt.enable_profiler(args.profile_int) rt.enable_profiler(args.profile_int)
# load experiments # load python modules with experiments
if not args.pickled: instantiations: list[inst_base.Instantiation] = []
# default: load python modules with experiments for path in args.experiments:
instantiations: list[inst_base.Instantiation] = [] modname, _ = os.path.splitext(os.path.basename(path))
for path in args.experiments:
modname, _ = os.path.splitext(os.path.basename(path))
class ExperimentModuleLoadError(Exception):
pass
spec = importlib.util.spec_from_file_location(modname, path) class ExperimentModuleLoadError(Exception):
if spec is None: pass
raise ExperimentModuleLoadError("spec is None")
mod = importlib.util.module_from_spec(spec)
if spec.loader is None:
raise ExperimentModuleLoadError("spec.loader is None")
spec.loader.exec_module(mod)
instantiations += mod.instantiations
if args.list: spec = importlib.util.spec_from_file_location(modname, path)
for inst in instantiations: if spec is None:
print(inst.simulation.name) raise ExperimentModuleLoadError("spec is None")
sys.exit(0) mod = importlib.util.module_from_spec(spec)
if spec.loader is None:
raise ExperimentModuleLoadError("spec.loader is None")
spec.loader.exec_module(mod)
instantiations += mod.instantiations
if args.list:
for inst in instantiations: for inst in instantiations:
# if args.auto_dist and not isinstance(sim, sim_base.DistributedExperiment): print(inst.simulation.name)
# sim = runs_base.auto_dist(sim, executors, args.proxy_type) sys.exit(0)
# apply filter if any specified for inst in instantiations:
if (args.filter) and (len(args.filter) > 0): # if args.auto_dist and not isinstance(sim, sim_base.DistributedExperiment):
match = False # sim = runs_base.auto_dist(sim, executors, args.proxy_type)
for f in args.filter:
match = fnmatch.fnmatch(inst.simulation.name, f) # apply filter if any specified
if match: if (args.filter) and (len(args.filter) > 0):
break match = False
for f in args.filter:
if not match: match = fnmatch.fnmatch(inst.simulation.name, f)
continue if match:
break
# if this is an experiment with a checkpoint we might have to create
# it if not match:
prereq = None continue
if inst.create_checkpoint and inst.simulation.any_supports_checkpointing():
checkpointing_inst = inst.copy() # if this is an experiment with a checkpoint we might have to create
checkpointing_inst.restore_checkpoint = False # it
checkpointing_inst.create_checkpoint = True prereq = None
inst.create_checkpoint = False if inst.create_checkpoint and inst.simulation.any_supports_checkpointing():
inst.restore_checkpoint = True checkpointing_inst = inst.copy()
checkpointing_inst.restore_checkpoint = False
prereq = add_exp(instantiation=checkpointing_inst, rt=rt, prereq=None) checkpointing_inst.create_checkpoint = True
inst.create_checkpoint = False
for index in range(args.firstrun, args.firstrun + args.runs): inst.restore_checkpoint = True
inst_copy = inst.copy()
inst_copy.preserve_tmp_folder = False prereq = add_exp(instantiation=checkpointing_inst, rt=rt, prereq=None, args=args)
if index == args.firstrun + args.runs - 1:
inst_copy._preserve_checkpoints = False for index in range(args.firstrun, args.firstrun + args.runs):
add_exp(instantiation=inst_copy, rt=rt, prereq=prereq) inst_copy = inst.copy()
# else: inst_copy.preserve_tmp_folder = False
# # otherwise load pickled run object if index == args.firstrun + args.runs - 1:
# for path in args.experiments: inst_copy._preserve_checkpoints = False
# with open(path, "rb") as f: add_exp(instantiation=inst_copy, rt=rt, prereq=prereq, args=args)
# rt.add_run(pickle.load(f))
# register interrupt handler # register interrupt handler
signal.signal(signal.SIGINT, lambda *_: rt.interrupt()) signal.signal(signal.SIGINT, lambda *_: rt.interrupt())
......
...@@ -23,11 +23,97 @@ ...@@ -23,11 +23,97 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import typing
from simbricks.utils import artifatcs as art from simbricks.runtime import simulation_executor as sim_exec
from simbricks.runtime import simulation_executor
from simbricks.runtime import command_executor
from simbricks.runtime.runs import base as run_base from simbricks.runtime.runs import base as run_base
from simbricks.utils import artifatcs as art
if typing.TYPE_CHECKING:
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.instantiation import proxy as inst_proxy
from simbricks.orchestration.simulation import base as sim_base
class LocalSimulationExecutorCallbacks(sim_exec.SimulationExecutorCallbacks):
def __init__(self, instantiation: inst_base.Instantiation, verbose: bool):
super().__init__(instantiation)
self._instantiation = instantiation
self._verbose = verbose
# ---------------------------------------
# Callbacks related to whole simulation -
# ---------------------------------------
async def simulation_prepare_cmd_start(self, cmd: str) -> None:
await super().simulation_prepare_cmd_start(cmd)
if self._verbose:
print(f"+ [prepare] {cmd}")
async def simulation_prepare_cmd_stdout(self, cmd: str, lines: list[str]) -> None:
await super().simulation_prepare_cmd_stdout(cmd, lines)
if self._verbose:
for line in lines:
print(f"[prepare] {line}")
async def simulation_prepare_cmd_stderr(self, cmd: str, lines: list[str]) -> None:
await super().simulation_prepare_cmd_stderr(cmd, lines)
if self._verbose:
for line in lines:
print(f"[prepare] {line}")
# -----------------------------
# Simulator-related callbacks -
# -----------------------------
async def simulator_started(self, sim: sim_base.Simulator, cmd: str) -> None:
await super().simulator_started(sim, cmd)
if self._verbose:
print(f"+ [{sim.full_name()}] {cmd}")
async def simulator_exited(self, sim: sim_base.Simulator, exit_code: int) -> None:
await super().simulator_exited(sim, exit_code)
if self._verbose:
print(f"- [{sim.full_name()}] exited with code {exit_code}")
async def simulator_stdout(self, sim: sim_base.Simulator, lines: list[str]) -> None:
await super().simulator_stdout(sim, lines)
if self._verbose:
for line in lines:
print(f"[{sim.full_name()}] {line}")
async def simulator_stderr(self, sim: sim_base.Simulator, lines: list[str]) -> None:
await super().simulator_stderr(sim, lines)
if self._verbose:
for line in lines:
print(f"[{sim.full_name()}] {line}")
# -------------------------
# Proxy-related callbacks -
# -------------------------
async def proxy_started(self, proxy: inst_proxy.Proxy, cmd: str) -> None:
await super().proxy_started(proxy, cmd)
if self._verbose:
print(f"+ [{proxy.name}] {cmd}")
async def proxy_exited(self, proxy: inst_proxy.Proxy, exit_code: int) -> None:
await super().proxy_exited(proxy, exit_code)
if self._verbose:
print(f"- [{proxy.name}] exited with code {exit_code}")
async def proxy_stdout(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
await super().proxy_stdout(proxy, lines)
if self._verbose:
for line in lines:
print(f"[{proxy.name}] {line}")
async def proxy_stderr(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
await super().proxy_stderr(proxy, lines)
if self._verbose:
for line in lines:
print(f"[{proxy.name}] {line}")
class LocalSimpleRuntime(run_base.Runtime): class LocalSimpleRuntime(run_base.Runtime):
...@@ -36,13 +122,11 @@ class LocalSimpleRuntime(run_base.Runtime): ...@@ -36,13 +122,11 @@ class LocalSimpleRuntime(run_base.Runtime):
def __init__( def __init__(
self, self,
verbose=False, verbose=False,
executor: command_executor.Executor = command_executor.LocalExecutor(),
): ):
super().__init__() super().__init__()
self._runnable: list[run_base.Run] = [] self._runnable: list[run_base.Run] = []
self._complete: list[run_base.Run] = [] self._complete: list[run_base.Run] = []
self._verbose: bool = verbose self._verbose: bool = verbose
self._executor: command_executor.Executor = executor
self._running: asyncio.Task | None = None self._running: asyncio.Task | None = None
def add_run(self, run: run_base.Run) -> None: def add_run(self, run: run_base.Run) -> None:
...@@ -52,18 +136,17 @@ class LocalSimpleRuntime(run_base.Runtime): ...@@ -52,18 +136,17 @@ class LocalSimpleRuntime(run_base.Runtime):
"""Actually executes `run`.""" """Actually executes `run`."""
try: try:
runner = simulation_executor.SimulationSimpleRunner(self._executor, run.instantiation, self._verbose) callbacks = LocalSimulationExecutorCallbacks(run.instantiation, self._verbose)
if self._profile_int: sim_executor = sim_exec.SimulationExecutor(
runner.profile_int = self.profile_int run.instantiation, callbacks, self._verbose, self._profile_int
await runner.prepare() )
for sim in run.instantiation.simulation.all_simulators(): await sim_executor.prepare()
runner.add_listener(sim, command_executor.LegacyOutputListener())
except asyncio.CancelledError: except asyncio.CancelledError:
# it is safe to just exit here because we are not running any # it is safe to just exit here because we are not running any
# simulators yet # simulators yet
return return
run._output = await runner.run() # handles CancelledError run._output = await sim_executor.run() # handles CancelledError
self._complete.append(run) self._complete.append(run)
# if the log is huge, this step takes some time # if the log is huge, this step takes some time
...@@ -75,10 +158,11 @@ class LocalSimpleRuntime(run_base.Runtime): ...@@ -75,10 +158,11 @@ class LocalSimpleRuntime(run_base.Runtime):
run._output.dump(outpath=output_path) run._output.dump(outpath=output_path)
if run.instantiation.create_artifact: if run.instantiation.create_artifact:
art.create_artifact( art.create_artifact(
artifact_name=run.instantiation.artifact_name, paths_to_include=run.instantiation.artifact_paths artifact_name=run.instantiation.artifact_name,
paths_to_include=run.instantiation.artifact_paths,
) )
await runner.cleanup() await sim_executor.cleanup()
async def start(self) -> None: async def start(self) -> None:
"""Execute the runs defined in `self.runnable`.""" """Execute the runs defined in `self.runnable`."""
...@@ -102,7 +186,6 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -102,7 +186,6 @@ class LocalParallelRuntime(run_base.Runtime):
cores: int, cores: int,
mem: int | None = None, mem: int | None = None,
verbose: bool = False, verbose: bool = False,
executor: command_executor.Executor = command_executor.LocalExecutor(),
): ):
super().__init__() super().__init__()
self._runs_noprereq: list[run_base.Run] = [] self._runs_noprereq: list[run_base.Run] = []
...@@ -113,7 +196,6 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -113,7 +196,6 @@ class LocalParallelRuntime(run_base.Runtime):
self._cores: int = cores self._cores: int = cores
self._mem: int | None = mem self._mem: int | None = mem
self._verbose: bool = verbose self._verbose: bool = verbose
self._executor = executor
self._pending_jobs: set[asyncio.Task] = set() self._pending_jobs: set[asyncio.Task] = set()
self._starter_task: asyncio.Task self._starter_task: asyncio.Task
...@@ -133,19 +215,17 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -133,19 +215,17 @@ class LocalParallelRuntime(run_base.Runtime):
async def do_run(self, run: run_base.Run) -> run_base.Run | None: async def do_run(self, run: run_base.Run) -> run_base.Run | None:
"""Actually executes `run`.""" """Actually executes `run`."""
try: try:
runner = simulation_executor.SimulationSimpleRunner(self._executor, run.instantiation, self._verbose) sim_executor = sim_exec.SimulationExecutor(run.instantiation, self._verbose)
if self._profile_int is not None: if self._profile_int is not None:
runner._profile_int = self._profile_int sim_executor._profile_int = self._profile_int
await runner.prepare() await sim_executor.prepare()
for sim in run.instantiation.simulation.all_simulators():
runner.add_listener(sim, command_executor.LegacyOutputListener())
except asyncio.CancelledError: except asyncio.CancelledError:
# it is safe to just exit here because we are not running any # it is safe to just exit here because we are not running any
# simulators yet # simulators yet
return None return None
print("starting run ", run.name()) print("starting run ", run.name())
run._output = await runner.run() # already handles CancelledError run._output = await sim_executor.run() # already handles CancelledError
# if the log is huge, this step takes some time # if the log is huge, this step takes some time
if self._verbose: if self._verbose:
...@@ -154,7 +234,7 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -154,7 +234,7 @@ class LocalParallelRuntime(run_base.Runtime):
output_path = run.instantiation.get_simulation_output_path() output_path = run.instantiation.get_simulation_output_path()
run._output.dump(outpath=output_path) run._output.dump(outpath=output_path)
await runner.cleanup() await sim_executor.cleanup()
print("finished run ", run.name()) print("finished run ", run.name())
return run return run
...@@ -163,7 +243,9 @@ class LocalParallelRuntime(run_base.Runtime): ...@@ -163,7 +243,9 @@ class LocalParallelRuntime(run_base.Runtime):
"""Wait for any run to terminate and return.""" """Wait for any run to terminate and return."""
assert self._pending_jobs assert self._pending_jobs
done, self._pending_jobs = await asyncio.wait(self._pending_jobs, return_when=asyncio.FIRST_COMPLETED) done, self._pending_jobs = await asyncio.wait(
self._pending_jobs, return_when=asyncio.FIRST_COMPLETED
)
for r_awaitable in done: for r_awaitable in done:
run = await r_awaitable run = await r_awaitable
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment