Unverified Commit a72c6391 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

started adjustment of do_run methods + FIXME identification

parent d8d53650
......@@ -266,3 +266,10 @@ class Instantiation(util_base.IdObj):
return self._join_paths(
base=self._env._tmp_simulation_files, relative_path=filename
)
def get_simulation_output_path(self, run_number: int) -> str:
return self._join_paths(
base=self._env._output_base,
relative_path=f"/{self._simulation.name}-{run_number}.json",
enforce_existence=False,
)
......@@ -20,6 +20,8 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio
import typing as tp
......@@ -27,39 +29,46 @@ from simbricks.orchestration import proxy
from simbricks.orchestration.exectools import Executor
from simbricks.orchestration.experiments import DistributedExperiment, Experiment
from simbricks.orchestration.runtime_new import simulation_executor
from simbricks.orchestration.runtime_new.runs import base
from simbricks.orchestration.runtime_new import command_executor
from simbricks.orchestration.runtime_new.runs import base as run_base
from simbricks.orchestration.simulation import base as sim_base
class DistributedSimpleRuntime(base.Runtime):
def __init__(self, executors, verbose=False) -> None:
def __init__(
self,
executors: dict[sim_base.Simulator, command_executor.Executor],
verbose: bool = False,
) -> None:
super().__init__()
self.runnable: list[base.Run] = []
self.complete: list[base.Run] = []
self.verbose = verbose
self.executors = executors
self._running: asyncio.Task
def add_run(self, run: base.Run) -> None:
if not isinstance(run.experiment, DistributedExperiment):
self._runnable: list[run_base.Run] = []
self._complete: list[run_base.Run] = []
self._verbose: bool = verbose
self._executors: dict[sim_base.Simulator, command_executor.Executor] = executors
self._running: asyncio.Task | None = None
def add_run(self, run: run_base.Run) -> None:
# TODO: FIXME
if not isinstance(run._simulation, DistributedExperiment):
raise RuntimeError("Only distributed experiments supported")
self.runnable.append(run)
self._runnable.append(run)
async def do_run(self, run: base.Run) -> None:
async def do_run(self, run: run_base.Run) -> None:
# TODO: FIXME Distributed Experiments needs fixing
runner = simulation_executor.ExperimentDistributedRunner(
self.executors,
self._executors,
# we ensure the correct type in add_run()
tp.cast(DistributedExperiment, run.experiment),
run.env,
self.verbose,
tp.cast(DistributedExperiment, run._simulation),
run._instantiation,
self._verbose,
)
if self.profile_int:
runner.profile_int = self.profile_int
if self._profile_int:
runner._profile_int = self._profile_int
try:
for executor in self.executors:
for executor in self._executors:
await run.prep_dirs(executor)
await runner.prepare()
except asyncio.CancelledError:
......@@ -67,16 +76,20 @@ class DistributedSimpleRuntime(base.Runtime):
# simulators yet
return
run.output = await runner.run() # already handles CancelledError
self.complete.append(run)
run._output = await runner.run() # already handles CancelledError
self._complete.append(run)
# if the log is huge, this step takes some time
if self.verbose:
if self._verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
run.output.dump(run.outpath)
output_path = run._instantiation.get_simulation_output_path(
run_number=run._run_nr
)
run._output.dump(outpath=output_path)
async def start(self) -> None:
for run in self.runnable:
for run in self._runnable:
if self._interrupted:
return
......@@ -84,11 +97,11 @@ class DistributedSimpleRuntime(base.Runtime):
await self._running
def interrupt_handler(self) -> None:
if self._running:
if self._running is not None:
self._running.cancel()
# TODO: fix this function
# TODO: FIXME
def auto_dist(
e: Experiment, execs: list[Executor], proxy_type: str = "sockets"
) -> DistributedExperiment:
......
......@@ -72,9 +72,11 @@ class LocalSimpleRuntime(run_base.Runtime):
# if the log is huge, this step takes some time
if self._verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
# TODO: FIXME
run._output.dump(run.outpath)
output_path = run._instantiation.get_simulation_output_path(
run_number=run._run_nr
)
run._output.dump(outpath=output_path)
async def start(self) -> None:
"""Execute the runs defined in `self.runnable`."""
......@@ -147,7 +149,11 @@ class LocalParallelRuntime(run_base.Runtime):
# if the log is huge, this step takes some time
if self.verbose:
print(f"Writing collected output of run {run.name()} to JSON file ...")
run._output.dump(output path) # TODO: FIXME
output_path = run._instantiation.get_simulation_output_path(
run_number=run._run_nr
)
run._output.dump(outpath=output_path)
print("finished run ", run.name())
return run
......
......@@ -20,6 +20,8 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio
import os
import pathlib
......@@ -29,94 +31,97 @@ import typing as tp
from simbricks.orchestration.runtime.common import Run, Runtime
from simbricks.orchestration.runtime_new import runs
class SlurmRuntime(Runtime):
def __init__(self, slurmdir, args, verbose=False, cleanup=True) -> None:
super().__init__()
self.runnable: tp.List[Run] = []
self.slurmdir = slurmdir
self.args = args
self.verbose = verbose
self.cleanup = cleanup
from simbricks.orchestration.runtime_new.runs import base as run_base
self._start_task: asyncio.Task
def add_run(self, run: Run) -> None:
self.runnable.append(run)
class SlurmRuntime(run_base.Runtime):
def prep_run(self, run: Run) -> str:
exp = run.experiment
e_idx = exp.name + f'-{run.index}' + '.exp'
exp_path = os.path.join(self.slurmdir, e_idx)
log_idx = exp.name + f'-{run.index}' + '.log'
exp_log = os.path.join(self.slurmdir, log_idx)
sc_idx = exp.name + f'-{run.index}' + '.sh'
exp_script = os.path.join(self.slurmdir, sc_idx)
def __init__(
self, slurmdir: str, args, verbose: bool = False, cleanup: bool = True
) -> None:
super().__init__()
self._runnable: list[run_base.Run] = []
self._slurmdir: str = slurmdir
self._args = args
self._verbose: bool = verbose
self._cleanup: bool = cleanup
self._start_task: asyncio.Task | None = None
def add_run(self, run: run_base.Run) -> None:
self._runnable.append(run)
def prep_run(self, run: run_base.Run) -> str:
simulation = run._simulation
e_idx = simulation.name + f"-{run._run_nr}" + ".exp"
exp_path = os.path.join(self._slurmdir, e_idx)
log_idx = simulation.name + f"-{run._run_nr}" + ".log"
exp_log = os.path.join(self._slurmdir, log_idx)
sc_idx = simulation.name + f"-{run._run_nr}" + ".sh"
exp_script = os.path.join(self._slurmdir, sc_idx)
print(exp_path)
print(exp_log)
print(exp_script)
# write out pickled run
with open(exp_path, 'wb', encoding='utf-8') as f:
with open(exp_path, "wb", encoding="utf-8") as f:
run.prereq = None # we don't want to pull in the prereq too
pickle.dump(run, f)
# create slurm batch script
with open(exp_script, 'w', encoding='utf-8') as f:
f.write('#!/bin/sh\n')
f.write(f'#SBATCH -o {exp_log} -e {exp_log}\n')
#f.write('#SBATCH -c %d\n' % (exp.resreq_cores(),))
f.write(f'#SBATCH --mem={exp.resreq_mem()}M\n')
with open(exp_script, "w", encoding="utf-8") as f:
f.write("#!/bin/sh\n")
f.write(f"#SBATCH -o {exp_log} -e {exp_log}\n")
# f.write('#SBATCH -c %d\n' % (exp.resreq_cores(),))
f.write(f"#SBATCH --mem={simulation.resreq_mem()}M\n")
f.write(f'#SBATCH --job-name="{run.name()}"\n')
f.write('#SBATCH --exclude=spyder[01-05],spyder16\n')
f.write('#SBATCH -c 32\n')
f.write('#SBATCH --nodes=1\n')
f.write("#SBATCH --exclude=spyder[01-05],spyder16\n")
f.write("#SBATCH -c 32\n")
f.write("#SBATCH --nodes=1\n")
# TODO: FIXME, timeout within simulation?!
if exp.timeout is not None:
h = int(exp.timeout / 3600)
m = int((exp.timeout % 3600) / 60)
s = int(exp.timeout % 60)
f.write(f'#SBATCH --time={h:02d}:{m:02d}:{s:02d}\n')
f.write(f"#SBATCH --time={h:02d}:{m:02d}:{s:02d}\n")
extra = ''
if self.verbose:
extra = '--verbose'
extra = ""
if self._verbose:
extra = "--verbose"
f.write(f'python3 run.py {extra} --pickled {exp_path}\n')
f.write('status=$?\n')
if self.cleanup:
f.write(f'rm -rf {run.env.workdir}\n')
f.write('exit $status\n')
f.write(f"python3 run.py {extra} --pickled {exp_path}\n")
f.write("status=$?\n")
if self._cleanup:
f.write(f"rm -rf {run._instantiation.wrkdir()}\n")
f.write("exit $status\n")
return exp_script
async def _do_start(self) -> None:
pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True)
pathlib.Path(self._slurmdir).mkdir(parents=True, exist_ok=True)
jid_re = re.compile(r'Submitted batch job ([0-9]+)')
jid_re = re.compile(r"Submitted batch job ([0-9]+)")
for run in self.runnable:
if run.prereq is None:
dep_cmd = ''
for run in self._runnable:
if run._prereq is None:
dep_cmd = ""
else:
dep_cmd = '--dependency=afterok:' + str(run.prereq.job_id)
dep_cmd = "--dependency=afterok:" + str(run._prereq._job_id)
script = self.prep_run(run)
stream = os.popen(f'sbatch {dep_cmd} {script}')
stream = os.popen(f"sbatch {dep_cmd} {script}")
output = stream.read()
result = stream.close()
if result is not None:
raise RuntimeError('running sbatch failed')
raise RuntimeError("running sbatch failed")
m = jid_re.search(output)
if m is None:
raise RuntimeError('cannot retrieve id of submitted job')
run.job_id = int(m.group(1))
raise RuntimeError("cannot retrieve id of submitted job")
run._job_id = int(m.group(1))
async def start(self) -> None:
self._start_task = asyncio.create_task(self._do_start())
......@@ -126,9 +131,9 @@ class SlurmRuntime(Runtime):
# stop all runs that have already been scheduled
# (existing slurm job id)
job_ids = []
for run in self.runnable:
if run.job_id:
job_ids.append(str(run.job_id))
for run in self._runnable:
if run._job_id is not None:
job_ids.append(str(run._job_id))
scancel_process = await asyncio.create_subprocess_shell(
f"scancel {' '.join(job_ids)}"
......
......@@ -20,6 +20,8 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio
import itertools
import shlex
......@@ -46,12 +48,12 @@ from simbricks.orchestration.runtime_new import command_executor
class ExperimentBaseRunner(abc.ABC):
def __init__(self, simulation: sim_base.Simulation, inst_env: inst_base.InstantiationEnvironment, verbose: bool) -> None:
self._simulation: sim_base.Simulation = exp
self._inst_env: inst_base.InstantiationEnvironment = env
def __init__(self, simulation: sim_base.Simulation, instantiation: inst_base.Instantiation, verbose: bool) -> None:
self._simulation: sim_base.Simulation = simulation
self._instantiation: inst_base.Instantiation = instantiation
self._verbose: bool = verbose
self._profile_int: int | None = None
self._out = ExpOutput(exp) # TODO: wtf shall this whitchcraft be
self._out = sim_base.SimulationOutput(self._simulation)
self._running: list[tuple[sim_base.Simulator, simulation_executor.SimpleComponent]] = []
self._sockets: list[tuple[simulation_executor.Executor, str]] = []
self._wait_sims: list[simulation_executor.Component] = []
......@@ -75,12 +77,12 @@ class ExperimentBaseRunner(abc.ABC):
"""Start a simulator and wait for it to be ready."""
name = sim.full_name()
if self.verbose:
if self._verbose:
print(f'{self._simulation.name}: starting {name}')
run_cmd = sim.run_cmd(self.env)
run_cmd = sim.run_cmd(self._instantiation)
if run_cmd is None:
if self.verbose:
if self._verbose:
print(f'{self._simulation.name}: started dummy {name}')
return
......@@ -90,16 +92,16 @@ class ExperimentBaseRunner(abc.ABC):
name, shlex.split(run_cmd), verbose=self.verbose, canfail=True
)
await sc.start()
self.running.append((sim, sc))
self._running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((executor, s))
for s in sim.sockets_cleanup(self.env): # TODO: FIXME
self._sockets.append((executor, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
wait_socks = sim.sockets_wait(self.env) # TODO: FIXME
if wait_socks:
if self.verbose:
if self._verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
......@@ -127,10 +129,12 @@ class ExperimentBaseRunner(abc.ABC):
async def prepare(self) -> None:
# generate config tars
copies = []
# TODO: FIXME
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
# TODO: FIXME
host.node_config.make_tar(self.env, path)
executor = self.sim_executor(host)
task = asyncio.create_task(executor.send_file(path, self.verbose))
......@@ -139,12 +143,13 @@ class ExperimentBaseRunner(abc.ABC):
# prepare all simulators in parallel
sims = []
for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env))
# TODO: FIXME
for sim in self._simulation.all_simulators():
prep_cmds = list(sim.prep_cmds(inst=self._instantiation))
executor = self.sim_executor(sim)
task = asyncio.create_task(
executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
'prepare_' + self._simulation.name, prep_cmds, verbose=self._verbose
)
)
sims.append(task)
......@@ -157,9 +162,9 @@ class ExperimentBaseRunner(abc.ABC):
for sc in self.wait_sims:
await sc.wait()
async def terminate_collect_sims(self) -> ExpOutput:
async def terminate_collect_sims(self) -> sim_base.SimulationOutput:
"""Terminates all simulators and collects output."""
self.out.set_end()
self._out.set_end()
if self.verbose:
print(f'{self.exp.name}: cleaning up')
......@@ -167,40 +172,40 @@ class ExperimentBaseRunner(abc.ABC):
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
for _, sc in self._running:
scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.gather(*scs)
# wait for all processes to terminate
for _, sc in self.running:
for _, sc in self._running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
for (executor, sock) in self._sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.gather(*scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
for sim, sc in self._running:
self._out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
return self._out
async def profiler(self):
assert self.profile_int
assert self._profile_int
while True:
await asyncio.sleep(self.profile_int)
for (_, sc) in self.running:
await asyncio.sleep(self._profile_int)
for (_, sc) in self._running:
await sc.sigusr1()
async def run(self) -> ExpOutput:
async def run(self) -> sim_base.SimulationOutput:
profiler_task = None
try:
self.out.set_start()
self._out.set_start()
graph = self.sim_graph()
print(graph)
ts = graphlib.TopologicalSorter(graph)
......@@ -219,16 +224,16 @@ class ExperimentBaseRunner(abc.ABC):
for sim in sims:
ts.done(sim)
if self.profile_int:
if self._profile_int:
profiler_task = asyncio.create_task(self.profiler())
await self.before_wait()
await self.wait_for_sims()
except asyncio.CancelledError:
if self.verbose:
print(f'{self.exp.name}: interrupted')
self.out.set_interrupted()
if self._verbose:
print(f'{self._simulation.name}: interrupted')
self._out.set_interrupted()
except: # pylint: disable=bare-except
self.out.set_failed()
self._out.set_failed()
traceback.print_exc()
if profiler_task:
......@@ -262,6 +267,7 @@ class ExperimentSimpleRunner(ExperimentBaseRunner):
return self._executor
# TODO: FIXME
class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
......
......@@ -21,6 +21,7 @@
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import abc
import time
import typing as tp
......@@ -94,7 +95,7 @@ class Simulator(utils_base.IdObj):
return ""
# pylint: disable=unused-argument
def prep_cmds(self, env: exp_env.ExpEnv) -> list[str]:
def prep_cmds(self, inst: inst_base.Instantiation) -> list[str]:
"""Commands to prepare execution of this simulator."""
return []
......@@ -169,7 +170,7 @@ class Simulator(utils_base.IdObj):
# pylint: disable=unused-argument
@abc.abstractmethod
def run_cmd(self, env: exp_env.ExpEnv) -> str:
def run_cmd(self, inst: inst_base.Instantiation) -> str:
"""Command to execute this simulator."""
return ""
......@@ -180,6 +181,7 @@ class Simulator(utils_base.IdObj):
# Sockets to be cleaned up: always the CONNECTING sockets
# pylint: disable=unused-argument
# TODO: FIXME
def sockets_cleanup(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
sockets = []
for comp_spec in self._components:
......@@ -192,6 +194,7 @@ class Simulator(utils_base.IdObj):
# sockets to wait for indicating the simulator is ready
# pylint: disable=unused-argument
# TODO: FIXME
def sockets_wait(self, env: exp_env.ExpEnv) -> list[str]:
return []
......@@ -357,14 +360,14 @@ class SimulationOutput:
self._start_time = time.time()
def set_end(self) -> None:
self.end_time = time.time()
self._end_time = time.time()
def set_failed(self) -> None:
self.success = False
self._success = False
def set_interrupted(self) -> None:
self.success = False
self.interrupted = True
self._success = False
self._interrupted = True
def add_sim(self, sim: Simulator, comp: command_executor.Component) -> None:
obj = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment