Commit d3f566f7 authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

move experiment runners to their own module to remove import cycle

parent 7017638b
...@@ -30,8 +30,9 @@ import pickle ...@@ -30,8 +30,9 @@ import pickle
import sys import sys
import typing as tp import typing as tp
import simbricks.experiments as exp from simbricks.exectools import LocalExecutor, RemoteExecutor
from simbricks import exectools from simbricks.experiment.experiment_environment import ExpEnv
from simbricks.experiments import DistributedExperiment, Experiment
from simbricks.runtime.common import Run from simbricks.runtime.common import Run
from simbricks.runtime.distributed import DistributedSimpleRuntime, auto_dist from simbricks.runtime.distributed import DistributedSimpleRuntime, auto_dist
from simbricks.runtime.local import LocalParallelRuntime, LocalSimpleRuntime from simbricks.runtime.local import LocalParallelRuntime, LocalSimpleRuntime
...@@ -221,9 +222,9 @@ def load_executors(path): ...@@ -221,9 +222,9 @@ def load_executors(path):
exs = [] exs = []
for h in hosts: for h in hosts:
if h['type'] == 'local': if h['type'] == 'local':
ex = exectools.LocalExecutor() ex = LocalExecutor()
elif h['type'] == 'remote': elif h['type'] == 'remote':
ex = exectools.RemoteExecutor(h['host'], h['workdir']) ex = RemoteExecutor(h['host'], h['workdir'])
if 'ssh_args' in h: if 'ssh_args' in h:
ex.ssh_extra_args += h['ssh_args'] ex.ssh_extra_args += h['ssh_args']
if 'scp_args' in h: if 'scp_args' in h:
...@@ -236,7 +237,7 @@ def load_executors(path): ...@@ -236,7 +237,7 @@ def load_executors(path):
if args.hosts is None: if args.hosts is None:
executors = [exectools.LocalExecutor()] executors = [LocalExecutor()]
else: else:
executors = load_executors(args.hosts) executors = load_executors(args.hosts)
...@@ -269,7 +270,7 @@ else: ...@@ -269,7 +270,7 @@ else:
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
def add_exp( def add_exp(
e: exp.Experiment, e: Experiment,
run: int, run: int,
prereq: tp.Optional[Run], prereq: tp.Optional[Run],
create_cp: bool, create_cp: bool,
...@@ -286,7 +287,7 @@ def add_exp( ...@@ -286,7 +287,7 @@ def add_exp(
if args.shmdir is not None: if args.shmdir is not None:
shmdir = f'{args.shmdir}/{e.name}/{run}' shmdir = f'{args.shmdir}/{e.name}/{run}'
env = exp.ExpEnv(args.repo, workdir, cpdir) env = ExpEnv(args.repo, workdir, cpdir)
env.create_cp = create_cp env.create_cp = create_cp
env.restore_cp = restore_cp env.restore_cp = restore_cp
env.no_simbricks = no_simbricks env.no_simbricks = no_simbricks
...@@ -326,7 +327,7 @@ if not args.pickled: ...@@ -326,7 +327,7 @@ if not args.pickled:
sys.exit(0) sys.exit(0)
for e in experiments: for e in experiments:
if args.auto_dist and not isinstance(e, exp.DistributedExperiment): if args.auto_dist and not isinstance(e, DistributedExperiment):
e = auto_dist(e, executors, args.proxy_type) e = auto_dist(e, executors, args.proxy_type)
# apply filter if any specified # apply filter if any specified
if (args.filter) and (len(args.filter) > 0): if (args.filter) and (len(args.filter) > 0):
......
...@@ -23,11 +23,13 @@ ...@@ -23,11 +23,13 @@
import json import json
import time import time
from simbricks.experiments import Experiment
class ExpOutput(object): class ExpOutput(object):
"""Manages an experiment's output.""" """Manages an experiment's output."""
def __init__(self, exp): def __init__(self, exp: Experiment):
self.exp_name = exp.name self.exp_name = exp.name
self.metadata = exp.metadata self.metadata = exp.metadata
self.start_time = None self.start_time = None
......
...@@ -20,21 +20,13 @@ ...@@ -20,21 +20,13 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import itertools import itertools
import shlex
import traceback
import typing as tp import typing as tp
from abc import ABC, abstractmethod
from simbricks.exectools import Executor, SimpleComponent
from simbricks.experiment.experiment_environment import ExpEnv
from simbricks.experiment.experiment_output import ExpOutput
from simbricks.proxy import NetProxyConnecter, NetProxyListener from simbricks.proxy import NetProxyConnecter, NetProxyListener
from simbricks.simulators import ( from simbricks.simulators import (
HostSim, I40eMultiNIC, NetSim, NICSim, PCIDevSim, Simulator HostSim, I40eMultiNIC, NetSim, NICSim, PCIDevSim, Simulator
) )
from simbricks.utils import graphlib
class Experiment(object): class Experiment(object):
...@@ -142,210 +134,3 @@ class DistributedExperiment(Experiment): ...@@ -142,210 +134,3 @@ class DistributedExperiment(Experiment):
if s not in self.host_mapping: if s not in self.host_mapping:
return False return False
return True return True
class ExperimentBaseRunner(ABC):
def __init__(self, exp: Experiment, env: ExpEnv, verbose: bool):
self.exp = exp
self.env = env
self.verbose = verbose
self.out = ExpOutput(exp)
self.running: tp.List[tp.Tuple[Simulator, SimpleComponent]] = []
self.sockets = []
self.wait_sims = []
@abstractmethod
def sim_executor(self, sim: Simulator) -> Executor:
pass
def sim_graph(self):
sims = self.exp.all_simulators()
graph = {}
for sim in sims:
deps = sim.dependencies() + sim.extra_deps
graph[sim] = set()
for d in deps:
graph[sim].add(d)
return graph
async def start_sim(self, sim: Simulator):
"""Start a simulator and wait for it to be ready."""
name = sim.full_name()
if self.verbose:
print(f'{self.exp.name}: starting {name}')
run_cmd = sim.run_cmd(self.env)
if run_cmd is None:
if self.verbose:
print(f'{self.exp.name}: started dummy {name}')
return
# run simulator
executor = self.sim_executor(sim)
sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self.verbose, canfail=True
)
await sc.start()
self.running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((executor, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
if wait_socks:
if self.verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate():
self.wait_sims.append(sc)
if self.verbose:
print(f'{self.exp.name}: started {name}')
async def before_wait(self):
pass
async def before_cleanup(self):
pass
async def after_cleanup(self):
pass
async def prepare(self):
# generate config tars
copies = []
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
host.node_config.make_tar(path)
copies.append(self.sim_executor(host).send_file(path, self.verbose))
await asyncio.wait(copies)
# prepare all simulators in parallel
sims = []
for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env))
executor = self.sim_executor(sim)
sims.append(
executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
)
)
await asyncio.wait(sims)
async def wait_for_sims(self):
"""Wait for simulators to terminate (the ones marked to wait on)."""
if self.verbose:
print(f'{self.exp.name}: waiting for hosts to terminate')
for sc in self.wait_sims:
await sc.wait()
async def run(self):
try:
self.out.set_start()
graph = self.sim_graph()
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
# start ready simulators in parallel
starts = []
sims = []
for sim in ts.get_ready():
starts.append(self.start_sim(sim))
sims.append(sim)
# wait for starts to complete
await asyncio.wait(starts)
for sim in sims:
ts.done(sim)
await self.before_wait()
await self.wait_for_sims()
except: # pylint: disable=bare-except
self.out.set_failed()
traceback.print_exc()
finally:
self.out.set_end()
# shut things back down
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(sc.int_term_kill())
await asyncio.wait(scs)
# wait for all processes to terminate
for _, sc in self.running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(executor.rmtree(sock))
if scs:
await asyncio.wait(scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
class ExperimentSimpleRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, executor: Executor, *args, **kwargs):
self.executor = executor
super().__init__(*args, **kwargs)
def sim_executor(self, sim: Simulator):
return self.executor
class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs):
self.execs = execs
super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim):
h_id = self.exp.host_mapping[sim]
return self.execs[h_id]
async def prepare(self):
# make sure all simulators are assigned to an executor
assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors
for p in itertools.chain(
self.exp.proxies_listen, self.exp.proxies_connect
):
executor = self.sim_executor(p)
p.ip = executor.ip
await super().prepare()
# Copyright 2022 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import itertools
import shlex
import traceback
import typing as tp
from abc import ABC, abstractmethod
from simbricks.exectools import Executor, SimpleComponent
from simbricks.experiment.experiment_environment import ExpEnv
from simbricks.experiment.experiment_output import ExpOutput
from simbricks.experiments import DistributedExperiment, Experiment
from simbricks.simulators import Simulator
from simbricks.utils import graphlib
class ExperimentBaseRunner(ABC):
def __init__(self, exp: Experiment, env: ExpEnv, verbose: bool):
self.exp = exp
self.env = env
self.verbose = verbose
self.out = ExpOutput(exp)
self.running: tp.List[tp.Tuple[Simulator, SimpleComponent]] = []
self.sockets = []
self.wait_sims = []
@abstractmethod
def sim_executor(self, sim: Simulator) -> Executor:
pass
def sim_graph(self):
sims = self.exp.all_simulators()
graph = {}
for sim in sims:
deps = sim.dependencies() + sim.extra_deps
graph[sim] = set()
for d in deps:
graph[sim].add(d)
return graph
async def start_sim(self, sim: Simulator):
"""Start a simulator and wait for it to be ready."""
name = sim.full_name()
if self.verbose:
print(f'{self.exp.name}: starting {name}')
run_cmd = sim.run_cmd(self.env)
if run_cmd is None:
if self.verbose:
print(f'{self.exp.name}: started dummy {name}')
return
# run simulator
executor = self.sim_executor(sim)
sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self.verbose, canfail=True
)
await sc.start()
self.running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((executor, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
if wait_socks:
if self.verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate():
self.wait_sims.append(sc)
if self.verbose:
print(f'{self.exp.name}: started {name}')
async def before_wait(self):
pass
async def before_cleanup(self):
pass
async def after_cleanup(self):
pass
async def prepare(self):
# generate config tars
copies = []
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
host.node_config.make_tar(path)
copies.append(self.sim_executor(host).send_file(path, self.verbose))
await asyncio.wait(copies)
# prepare all simulators in parallel
sims = []
for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env))
executor = self.sim_executor(sim)
sims.append(
executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
)
)
await asyncio.wait(sims)
async def wait_for_sims(self):
"""Wait for simulators to terminate (the ones marked to wait on)."""
if self.verbose:
print(f'{self.exp.name}: waiting for hosts to terminate')
for sc in self.wait_sims:
await sc.wait()
async def run(self):
try:
self.out.set_start()
graph = self.sim_graph()
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
# start ready simulators in parallel
starts = []
sims = []
for sim in ts.get_ready():
starts.append(self.start_sim(sim))
sims.append(sim)
# wait for starts to complete
await asyncio.wait(starts)
for sim in sims:
ts.done(sim)
await self.before_wait()
await self.wait_for_sims()
except: # pylint: disable=bare-except
self.out.set_failed()
traceback.print_exc()
finally:
self.out.set_end()
# shut things back down
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(sc.int_term_kill())
await asyncio.wait(scs)
# wait for all processes to terminate
for _, sc in self.running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(executor.rmtree(sock))
if scs:
await asyncio.wait(scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
class ExperimentSimpleRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, executor: Executor, *args, **kwargs):
self.executor = executor
super().__init__(*args, **kwargs)
def sim_executor(self, sim: Simulator):
return self.executor
class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs):
self.execs = execs
super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim):
h_id = self.exp.host_mapping[sim]
return self.execs[h_id]
async def prepare(self):
# make sure all simulators are assigned to an executor
assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors
for p in itertools.chain(
self.exp.proxies_listen, self.exp.proxies_connect
):
executor = self.sim_executor(p)
p.ip = executor.ip
await super().prepare()
...@@ -28,7 +28,7 @@ import shutil ...@@ -28,7 +28,7 @@ import shutil
import typing as tp import typing as tp
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from simbricks import exectools from simbricks.exectools import LocalExecutor
from simbricks.experiment.experiment_environment import ExpEnv from simbricks.experiment.experiment_environment import ExpEnv
from simbricks.experiment.experiment_output import ExpOutput from simbricks.experiment.experiment_output import ExpOutput
from simbricks.experiments import Experiment from simbricks.experiments import Experiment
...@@ -55,7 +55,7 @@ class Run(object): ...@@ -55,7 +55,7 @@ class Run(object):
def name(self): def name(self):
return self.experiment.name + '.' + str(self.index) return self.experiment.name + '.' + str(self.index)
async def prep_dirs(self, executor=exectools.LocalExecutor()): async def prep_dirs(self, executor=LocalExecutor()):
shutil.rmtree(self.env.workdir, ignore_errors=True) shutil.rmtree(self.env.workdir, ignore_errors=True)
await executor.rmtree(self.env.workdir) await executor.rmtree(self.env.workdir)
shutil.rmtree(self.env.shm_base, ignore_errors=True) shutil.rmtree(self.env.shm_base, ignore_errors=True)
......
...@@ -24,9 +24,10 @@ import asyncio ...@@ -24,9 +24,10 @@ import asyncio
import pathlib import pathlib
import typing as tp import typing as tp
import simbricks.experiments as exp
from simbricks import proxy from simbricks import proxy
from simbricks.exectools import Executor from simbricks.exectools import Executor
from simbricks.experiments import DistributedExperiment, Experiment
from simbricks.runners import ExperimentDistributedRunner
from simbricks.runtime.common import Run, Runtime from simbricks.runtime.common import Run, Runtime
...@@ -39,16 +40,16 @@ class DistributedSimpleRuntime(Runtime): ...@@ -39,16 +40,16 @@ class DistributedSimpleRuntime(Runtime):
self.executors = executors self.executors = executors
def add_run(self, run: Run): def add_run(self, run: Run):
if not isinstance(run.experiment, exp.DistributedExperiment): if not isinstance(run.experiment, DistributedExperiment):
raise RuntimeError('Only distributed experiments supported') raise RuntimeError('Only distributed experiments supported')
self.runnable.append(run) self.runnable.append(run)
async def do_run(self, run: Run): async def do_run(self, run: Run):
runner = exp.ExperimentDistributedRunner( runner = ExperimentDistributedRunner(
self.executors, self.executors,
# we ensure the correct type in add_run() # we ensure the correct type in add_run()
tp.cast(exp.DistributedExperiment, run.experiment), tp.cast(DistributedExperiment, run.experiment),
run.env, run.env,
self.verbose self.verbose
) )
...@@ -68,7 +69,7 @@ class DistributedSimpleRuntime(Runtime): ...@@ -68,7 +69,7 @@ class DistributedSimpleRuntime(Runtime):
def auto_dist( def auto_dist(
e: exp.Experiment, execs: tp.List[Executor], proxy_type: str = 'sockets' e: Experiment, execs: tp.List[Executor], proxy_type: str = 'sockets'
): ):
""" """
Converts an Experiment into a DistributedExperiment. Converts an Experiment into a DistributedExperiment.
...@@ -92,7 +93,7 @@ def auto_dist( ...@@ -92,7 +93,7 @@ def auto_dist(
raise RuntimeError('Unknown proxy type specified') raise RuntimeError('Unknown proxy type specified')
# Create the distributed experiment # Create the distributed experiment
de = exp.DistributedExperiment(e.name, 2) de = DistributedExperiment(e.name, 2)
de.timeout = e.timeout de.timeout = e.timeout
de.checkpoint = e.checkpoint de.checkpoint = e.checkpoint
de.no_simbricks = e.no_simbricks de.no_simbricks = e.no_simbricks
......
...@@ -24,8 +24,8 @@ import asyncio ...@@ -24,8 +24,8 @@ import asyncio
import pathlib import pathlib
import typing as tp import typing as tp
import simbricks.experiments as exp
from simbricks import exectools from simbricks import exectools
from simbricks.runners import ExperimentSimpleRunner
from simbricks.runtime.common import Run, Runtime from simbricks.runtime.common import Run, Runtime
...@@ -47,7 +47,7 @@ class LocalSimpleRuntime(Runtime): ...@@ -47,7 +47,7 @@ class LocalSimpleRuntime(Runtime):
async def do_run(self, run: Run): async def do_run(self, run: Run):
"""Actually executes `run`.""" """Actually executes `run`."""
runner = exp.ExperimentSimpleRunner( runner = ExperimentSimpleRunner(
self.executor, run.experiment, run.env, self.verbose self.executor, run.experiment, run.env, self.verbose
) )
await run.prep_dirs(self.executor) await run.prep_dirs(self.executor)
...@@ -99,7 +99,7 @@ class LocalParallelRuntime(Runtime): ...@@ -99,7 +99,7 @@ class LocalParallelRuntime(Runtime):
async def do_run(self, run: Run): async def do_run(self, run: Run):
"""Actually executes `run`.""" """Actually executes `run`."""
runner = exp.ExperimentSimpleRunner( runner = ExperimentSimpleRunner(
self.executor, run.experiment, run.env, self.verbose self.executor, run.experiment, run.env, self.verbose
) )
await run.prep_dirs(executor=self.executor) await run.prep_dirs(executor=self.executor)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment