Commit 0b3de18a authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

experiments/run.py: improve readabilty

There are now no more inline functions and a proper main function
instead of top-level code. Functionality-wise, nothing was changed. Code
was only rearranged and some comments added.
parent d3118b9b
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""This is the top-level module of the SimBricks orchestration framework that
users interact with."""
import argparse import argparse
import asyncio import asyncio
...@@ -32,11 +34,13 @@ import sys ...@@ -32,11 +34,13 @@ import sys
import typing as tp import typing as tp
from signal import SIGINT, signal from signal import SIGINT, signal
from simbricks.orchestration import exectools
from simbricks.orchestration.exectools import LocalExecutor, RemoteExecutor from simbricks.orchestration.exectools import LocalExecutor, RemoteExecutor
from simbricks.orchestration.experiment.experiment_environment import ExpEnv from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiments import ( from simbricks.orchestration.experiments import (
DistributedExperiment, Experiment DistributedExperiment, Experiment
) )
from simbricks.orchestration.runtime import common as rt_common
from simbricks.orchestration.runtime.common import Run from simbricks.orchestration.runtime.common import Run
from simbricks.orchestration.runtime.distributed import ( from simbricks.orchestration.runtime.distributed import (
DistributedSimpleRuntime, auto_dist DistributedSimpleRuntime, auto_dist
...@@ -47,186 +51,186 @@ from simbricks.orchestration.runtime.local import ( ...@@ -47,186 +51,186 @@ from simbricks.orchestration.runtime.local import (
from simbricks.orchestration.runtime.slurm import SlurmRuntime from simbricks.orchestration.runtime.slurm import SlurmRuntime
# pylint: disable=redefined-outer-name def parse_args() -> argparse.Namespace:
def mkdir_if_not_exists(path): parser = argparse.ArgumentParser()
if not os.path.exists(path): # general arguments for experiments
os.mkdir(path) parser.add_argument(
parser = argparse.ArgumentParser()
parser.add_argument(
'experiments', 'experiments',
metavar='EXP', metavar='EXP',
type=str, type=str,
nargs='+', nargs='+',
help='Python modules to load the experiments from' help='Python modules to load the experiments from'
) )
parser.add_argument( parser.add_argument(
'--list', '--list',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='List available experiment names' help='List available experiment names'
) )
parser.add_argument( parser.add_argument(
'--filter', '--filter',
metavar='PATTERN', metavar='PATTERN',
type=str, type=str,
nargs='+', nargs='+',
help='Only run experiments matching the given Unix shell style patterns' help='Only run experiments matching the given Unix shell style patterns'
) )
parser.add_argument( parser.add_argument(
'--pickled', '--pickled',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='Interpret experiment modules as pickled runs instead of .py files' help='Interpret experiment modules as pickled runs instead of .py files'
) )
parser.add_argument( parser.add_argument(
'--runs', '--runs',
metavar='N', metavar='N',
type=int, type=int,
default=1, default=1,
help='Number of repetition of each experiment' help='Number of repetition of each experiment'
) )
parser.add_argument( parser.add_argument(
'--firstrun', metavar='N', type=int, default=1, help='ID for first run' '--firstrun', metavar='N', type=int, default=1, help='ID for first run'
) )
parser.add_argument( parser.add_argument(
'--force', '--force',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='Run experiments even if output already exists (overwrites output)' help='Run experiments even if output already exists (overwrites output)'
) )
parser.add_argument( parser.add_argument(
'--verbose', '--verbose',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='Verbose output, for example, print component simulators\' output' help='Verbose output, for example, print component simulators\' output'
) )
parser.add_argument( parser.add_argument(
'--pcap', '--pcap',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='Dump pcap file (if supported by component simulator)' help='Dump pcap file (if supported by component simulator)'
) )
g_env = parser.add_argument_group('Environment') # arguments for the experiment environment
g_env.add_argument( g_env = parser.add_argument_group('Environment')
g_env.add_argument(
'--repo', '--repo',
metavar='DIR', metavar='DIR',
type=str, type=str,
default='..', default='..',
help='SimBricks repository directory' help='SimBricks repository directory'
) )
g_env.add_argument( g_env.add_argument(
'--workdir', '--workdir',
metavar='DIR', metavar='DIR',
type=str, type=str,
default='./out/', default='./out/',
help='Work directory base' help='Work directory base'
) )
g_env.add_argument( g_env.add_argument(
'--outdir', '--outdir',
metavar='DIR', metavar='DIR',
type=str, type=str,
default='./out/', default='./out/',
help='Output directory base' help='Output directory base'
) )
g_env.add_argument( g_env.add_argument(
'--cpdir', '--cpdir',
metavar='DIR', metavar='DIR',
type=str, type=str,
default='./out/', default='./out/',
help='Checkpoint directory base' help='Checkpoint directory base'
) )
g_env.add_argument( g_env.add_argument(
'--hosts', '--hosts',
metavar='JSON_FILE', metavar='JSON_FILE',
type=str, type=str,
default=None, default=None,
help='List of hosts to use (json)' help='List of hosts to use (json)'
) )
g_env.add_argument( g_env.add_argument(
'--shmdir', '--shmdir',
metavar='DIR', metavar='DIR',
type=str, type=str,
default=None, default=None,
help='Shared memory directory base (workdir if not set)' help='Shared memory directory base (workdir if not set)'
) )
g_par = parser.add_argument_group('Parallel Runtime') # arguments for the parallel runtime
g_par.add_argument( g_par = parser.add_argument_group('Parallel Runtime')
g_par.add_argument(
'--parallel', '--parallel',
dest='runtime', dest='runtime',
action='store_const', action='store_const',
const='parallel', const='parallel',
default='sequential', default='sequential',
help='Use parallel instead of sequential runtime' help='Use parallel instead of sequential runtime'
) )
g_par.add_argument( g_par.add_argument(
'--cores', '--cores',
metavar='N', metavar='N',
type=int, type=int,
default=len(os.sched_getaffinity(0)), default=len(os.sched_getaffinity(0)),
help='Number of cores to use for parallel runs' help='Number of cores to use for parallel runs'
) )
g_par.add_argument( g_par.add_argument(
'--mem', '--mem',
metavar='N', metavar='N',
type=int, type=int,
default=None, default=None,
help='Memory limit for parallel runs (in MB)' help='Memory limit for parallel runs (in MB)'
) )
g_slurm = parser.add_argument_group('Slurm Runtime') # arguments for the slurm runtime
g_slurm.add_argument( g_slurm = parser.add_argument_group('Slurm Runtime')
g_slurm.add_argument(
'--slurm', '--slurm',
dest='runtime', dest='runtime',
action='store_const', action='store_const',
const='slurm', const='slurm',
default='sequential', default='sequential',
help='Use slurm instead of sequential runtime' help='Use slurm instead of sequential runtime'
) )
g_slurm.add_argument( g_slurm.add_argument(
'--slurmdir', '--slurmdir',
metavar='DIR', metavar='DIR',
type=str, type=str,
default='./slurm/', default='./slurm/',
help='Slurm communication directory' help='Slurm communication directory'
) )
g_dist = parser.add_argument_group('Distributed Runtime') # arguments for the distributed runtime
g_dist.add_argument( g_dist = parser.add_argument_group('Distributed Runtime')
g_dist.add_argument(
'--dist', '--dist',
dest='runtime', dest='runtime',
action='store_const', action='store_const',
const='dist', const='dist',
default='sequential', default='sequential',
help='Use sequential distributed runtime instead of local' help='Use sequential distributed runtime instead of local'
) )
g_dist.add_argument( g_dist.add_argument(
'--auto-dist', '--auto-dist',
action='store_const', action='store_const',
const=True, const=True,
default=False, default=False,
help='Automatically distribute non-distributed experiments' help='Automatically distribute non-distributed experiments'
) )
g_dist.add_argument( g_dist.add_argument(
'--proxy-type', '--proxy-type',
metavar='TYPE', metavar='TYPE',
type=str, type=str,
default='sockets', default='sockets',
help='Proxy type to use (sockets,rdma) for auto distribution' help='Proxy type to use (sockets,rdma) for auto distribution'
) )
args = parser.parse_args()
return parser.parse_args()
# pylint: disable=redefined-outer-name
def load_executors(path): def load_executors(path: str) -> tp.List[exectools.Executor]:
"""Load hosts list from json file and return list of executors.""" """Load hosts list from json file and return list of executors."""
with open(path, 'r', encoding='utf-8') as f: with open(path, 'r', encoding='utf-8') as f:
hosts = json.load(f) hosts = json.load(f)
...@@ -248,13 +252,7 @@ def load_executors(path): ...@@ -248,13 +252,7 @@ def load_executors(path):
return exs return exs
if args.hosts is None: def warn_multi_exec(executors: tp.List[exectools.Executor]):
executors = [LocalExecutor()]
else:
executors = load_executors(args.hosts)
def warn_multi_exec():
if len(executors) > 1: if len(executors) > 1:
print( print(
'Warning: multiple hosts specified, only using first one for now', 'Warning: multiple hosts specified, only using first one for now',
...@@ -262,32 +260,16 @@ def warn_multi_exec(): ...@@ -262,32 +260,16 @@ def warn_multi_exec():
) )
# initialize runtime
if args.runtime == 'parallel':
warn_multi_exec()
rt = LocalParallelRuntime(
cores=args.cores,
mem=args.mem,
verbose=args.verbose,
executor=executors[0]
)
elif args.runtime == 'slurm':
rt = SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == 'dist':
rt = DistributedSimpleRuntime(executors, verbose=args.verbose)
else:
warn_multi_exec()
rt = LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
def add_exp( def add_exp(
e: Experiment, e: Experiment,
rt: rt_common.Runtime,
run: int, run: int,
prereq: tp.Optional[Run], prereq: tp.Optional[Run],
create_cp: bool, create_cp: bool,
restore_cp: bool, restore_cp: bool,
no_simbricks: bool no_simbricks: bool,
args: argparse.Namespace
): ):
outpath = f'{args.outdir}/{e.name}-{run}.json' outpath = f'{args.outdir}/{e.name}-{run}.json'
if os.path.exists(outpath) and not args.force: if os.path.exists(outpath) and not args.force:
...@@ -314,8 +296,32 @@ def add_exp( ...@@ -314,8 +296,32 @@ def add_exp(
return run return run
# load experiments def main():
if not args.pickled: args = parse_args()
if args.hosts is None:
executors = [exectools.LocalExecutor()]
else:
executors = load_executors(args.hosts)
# initialize runtime
if args.runtime == 'parallel':
warn_multi_exec(executors)
rt = LocalParallelRuntime(
cores=args.cores,
mem=args.mem,
verbose=args.verbose,
executor=executors[0]
)
elif args.runtime == 'slurm':
rt = SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == 'dist':
rt = DistributedSimpleRuntime(executors, verbose=args.verbose)
else:
warn_multi_exec(executors)
rt = LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
# load experiments
if not args.pickled:
# default: load python modules with experiments # default: load python modules with experiments
experiments = [] experiments = []
for path in args.experiments: for path in args.experiments:
...@@ -352,22 +358,32 @@ if not args.pickled: ...@@ -352,22 +358,32 @@ if not args.pickled:
if not match: if not match:
continue continue
# if this is an experiment with a checkpoint we might have to create it # if this is an experiment with a checkpoint we might have to create
# it
no_simbricks = e.no_simbricks no_simbricks = e.no_simbricks
if e.checkpoint: if e.checkpoint:
prereq = add_exp(e, 0, None, True, False, no_simbricks) prereq = add_exp(
e, rt, 0, None, True, False, no_simbricks, args
)
else: else:
prereq = None prereq = None
for run in range(args.firstrun, args.firstrun + args.runs): for run in range(args.firstrun, args.firstrun + args.runs):
add_exp(e, run, prereq, False, e.checkpoint, no_simbricks) add_exp(
else: e, rt, run, prereq, False, e.checkpoint, no_simbricks, args
)
else:
# otherwise load pickled run object # otherwise load pickled run object
for path in args.experiments: for path in args.experiments:
with open(path, 'rb') as f: with open(path, 'rb') as f:
rt.add_run(pickle.load(f)) rt.add_run(pickle.load(f))
# register interrupt handler # register interrupt handler
signal(SIGINT, lambda *_: rt.interrupt()) signal(SIGINT, lambda *_: rt.interrupt())
# invoke runtime to run experiments
asyncio.run(rt.start())
asyncio.run(rt.start()) if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment