run.py 9.93 KB
Newer Older
Antoine Kaufmann's avatar
Antoine Kaufmann committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

23
import argparse
24
import asyncio
Jonas Kaufmann's avatar
Jonas Kaufmann committed
25
import fnmatch
26
import importlib
Hejing Li's avatar
Hejing Li committed
27
import importlib.util
28
import json
Jonas Kaufmann's avatar
Jonas Kaufmann committed
29
import os
30
import pickle
Jonas Kaufmann's avatar
Jonas Kaufmann committed
31
import sys
32
import typing as tp
33
from signal import SIGINT, signal
34

35
36
37
38
39
40
41
42
43
44
45
46
47
from simbricks.orchestration.exectools import LocalExecutor, RemoteExecutor
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiments import (
    DistributedExperiment, Experiment
)
from simbricks.orchestration.runtime.common import Run
from simbricks.orchestration.runtime.distributed import (
    DistributedSimpleRuntime, auto_dist
)
from simbricks.orchestration.runtime.local import (
    LocalParallelRuntime, LocalSimpleRuntime
)
from simbricks.orchestration.runtime.slurm import SlurmRuntime
Jonas Kaufmann's avatar
Jonas Kaufmann committed
48
49


50
# pylint: disable=redefined-outer-name
51
52
53
54
def mkdir_if_not_exists(path):
    if not os.path.exists(path):
        os.mkdir(path)

55

56
parser = argparse.ArgumentParser()
Jonas Kaufmann's avatar
Jonas Kaufmann committed
57
58
59
60
61
parser.add_argument(
    'experiments',
    metavar='EXP',
    type=str,
    nargs='+',
62
    help='Python modules to load the experiments from'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
63
64
65
66
67
68
)
parser.add_argument(
    '--list',
    action='store_const',
    const=True,
    default=False,
69
    help='List available experiment names'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
70
71
72
73
74
75
)
parser.add_argument(
    '--filter',
    metavar='PATTERN',
    type=str,
    nargs='+',
76
    help='Only run experiments matching the given Unix shell style patterns'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
77
78
79
80
81
82
)
parser.add_argument(
    '--pickled',
    action='store_const',
    const=True,
    default=False,
83
    help='Interpret experiment modules as pickled runs instead of .py files'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
84
85
86
87
88
89
)
parser.add_argument(
    '--runs',
    metavar='N',
    type=int,
    default=1,
90
    help='Number of repetition of each experiment'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
91
92
93
94
95
96
97
98
99
)
parser.add_argument(
    '--firstrun', metavar='N', type=int, default=1, help='ID for first run'
)
parser.add_argument(
    '--force',
    action='store_const',
    const=True,
    default=False,
100
    help='Run experiments even if output already exists (overwrites output)'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
101
102
103
104
105
106
)
parser.add_argument(
    '--verbose',
    action='store_const',
    const=True,
    default=False,
107
    help='Verbose output, for example, print component simulators\' output'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
108
109
110
111
112
113
)
parser.add_argument(
    '--pcap',
    action='store_const',
    const=True,
    default=False,
114
    help='Dump pcap file (if supported by component simulator)'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
115
)
116
117

g_env = parser.add_argument_group('Environment')
Jonas Kaufmann's avatar
Jonas Kaufmann committed
118
g_env.add_argument(
119
120
121
122
123
    '--repo',
    metavar='DIR',
    type=str,
    default='..',
    help='SimBricks repository directory'
Jonas Kaufmann's avatar
Jonas Kaufmann committed
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
)
g_env.add_argument(
    '--workdir',
    metavar='DIR',
    type=str,
    default='./out/',
    help='Work directory base'
)
g_env.add_argument(
    '--outdir',
    metavar='DIR',
    type=str,
    default='./out/',
    help='Output directory base'
)
g_env.add_argument(
    '--cpdir',
    metavar='DIR',
    type=str,
    default='./out/',
    help='Checkpoint directory base'
)
g_env.add_argument(
    '--hosts',
    metavar='JSON_FILE',
    type=str,
    default=None,
    help='List of hosts to use (json)'
)
g_env.add_argument(
    '--shmdir',
    metavar='DIR',
    type=str,
    default=None,
    help='Shared memory directory base (workdir if not set)'
)
160

161
g_par = parser.add_argument_group('Parallel Runtime')
Jonas Kaufmann's avatar
Jonas Kaufmann committed
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
g_par.add_argument(
    '--parallel',
    dest='runtime',
    action='store_const',
    const='parallel',
    default='sequential',
    help='Use parallel instead of sequential runtime'
)
g_par.add_argument(
    '--cores',
    metavar='N',
    type=int,
    default=len(os.sched_getaffinity(0)),
    help='Number of cores to use for parallel runs'
)
g_par.add_argument(
    '--mem',
    metavar='N',
    type=int,
    default=None,
    help='Memory limit for parallel runs (in MB)'
)
184

185
g_slurm = parser.add_argument_group('Slurm Runtime')
Jonas Kaufmann's avatar
Jonas Kaufmann committed
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
g_slurm.add_argument(
    '--slurm',
    dest='runtime',
    action='store_const',
    const='slurm',
    default='sequential',
    help='Use slurm instead of sequential runtime'
)
g_slurm.add_argument(
    '--slurmdir',
    metavar='DIR',
    type=str,
    default='./slurm/',
    help='Slurm communication directory'
)
201

202
g_dist = parser.add_argument_group('Distributed Runtime')
Jonas Kaufmann's avatar
Jonas Kaufmann committed
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
g_dist.add_argument(
    '--dist',
    dest='runtime',
    action='store_const',
    const='dist',
    default='sequential',
    help='Use sequential distributed runtime instead of local'
)
g_dist.add_argument(
    '--auto-dist',
    action='store_const',
    const=True,
    default=False,
    help='Automatically distribute non-distributed experiments'
)
g_dist.add_argument(
    '--proxy-type',
    metavar='TYPE',
    type=str,
    default='sockets',
    help='Proxy type to use (sockets,rdma) for auto distribution'
)
225
args = parser.parse_args()
226

227

228
# pylint: disable=redefined-outer-name
229
def load_executors(path):
Jonas Kaufmann's avatar
Jonas Kaufmann committed
230
    """Load hosts list from json file and return list of executors."""
231
    with open(path, 'r', encoding='utf-8') as f:
232
233
234
235
236
        hosts = json.load(f)

        exs = []
        for h in hosts:
            if h['type'] == 'local':
237
                ex = LocalExecutor()
238
            elif h['type'] == 'remote':
239
                ex = RemoteExecutor(h['host'], h['workdir'])
240
241
242
243
                if 'ssh_args' in h:
                    ex.ssh_extra_args += h['ssh_args']
                if 'scp_args' in h:
                    ex.scp_extra_args += h['scp_args']
244
245
            else:
                raise RuntimeError('invalid host type "' + h['type'] + '"')
246
247
            ex.ip = h['ip']
            exs.append(ex)
248
249
    return exs

Jonas Kaufmann's avatar
Jonas Kaufmann committed
250

251
if args.hosts is None:
252
    executors = [LocalExecutor()]
253
254
255
else:
    executors = load_executors(args.hosts)

Jonas Kaufmann's avatar
Jonas Kaufmann committed
256

257
258
def warn_multi_exec():
    if len(executors) > 1:
Jonas Kaufmann's avatar
Jonas Kaufmann committed
259
260
261
262
263
        print(
            'Warning: multiple hosts specified, only using first one for now',
            file=sys.stderr
        )

264

265
# initialize runtime
266
if args.runtime == 'parallel':
267
    warn_multi_exec()
Jonas Kaufmann's avatar
Jonas Kaufmann committed
268
    rt = LocalParallelRuntime(
269
270
271
272
        cores=args.cores,
        mem=args.mem,
        verbose=args.verbose,
        executor=executors[0]
Jonas Kaufmann's avatar
Jonas Kaufmann committed
273
    )
274
elif args.runtime == 'slurm':
Jonas Kaufmann's avatar
Jonas Kaufmann committed
275
    rt = SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
276
elif args.runtime == 'dist':
Jonas Kaufmann's avatar
Jonas Kaufmann committed
277
    rt = DistributedSimpleRuntime(executors, verbose=args.verbose)
278
else:
279
    warn_multi_exec()
280
    rt = LocalSimpleRuntime(verbose=args.verbose, executor=executors[0])
281

282

283
# pylint: disable=redefined-outer-name
284
def add_exp(
285
    e: Experiment,
Jonas Kaufmann's avatar
Jonas Kaufmann committed
286
287
288
289
290
    run: int,
    prereq: tp.Optional[Run],
    create_cp: bool,
    restore_cp: bool,
    no_simbricks: bool
291
):
292
    outpath = f'{args.outdir}/{e.name}-{run}.json'
293
    if os.path.exists(outpath) and not args.force:
294
        print(f'skip {e.name} run {run}')
295
296
        return None

297
298
    workdir = f'{args.workdir}/{e.name}/{run}'
    cpdir = f'{args.cpdir}/{e.name}/0'
299
    if args.shmdir is not None:
300
        shmdir = f'{args.shmdir}/{e.name}/{run}'
301

302
    env = ExpEnv(args.repo, workdir, cpdir)
303
304
    env.create_cp = create_cp
    env.restore_cp = restore_cp
Jonas Kaufmann's avatar
Jonas Kaufmann committed
305
    env.no_simbricks = no_simbricks
306
307
    env.pcap_file = ''
    if args.pcap:
Jonas Kaufmann's avatar
Jonas Kaufmann committed
308
        env.pcap_file = workdir + '/pcap'
309
310
    if args.shmdir is not None:
        env.shm_base = os.path.abspath(shmdir)
311

Jonas Kaufmann's avatar
Jonas Kaufmann committed
312
    run = Run(e, run, env, outpath, prereq)
313
314
315
    rt.add_run(run)
    return run

Jonas Kaufmann's avatar
Jonas Kaufmann committed
316

317
318
319
320
321
322
323
# load experiments
if not args.pickled:
    # default: load python modules with experiments
    experiments = []
    for path in args.experiments:
        modname, _ = os.path.splitext(os.path.basename(path))

324
325
326
        class ExperimentModuleLoadError(Exception):
            pass

327
        spec = importlib.util.spec_from_file_location(modname, path)
328
329
        if spec is None:
            raise ExperimentModuleLoadError('spec is None')
330
        mod = importlib.util.module_from_spec(spec)
331
332
        if spec.loader is None:
            raise ExperimentModuleLoadError('spec.loader is None')
333
334
        spec.loader.exec_module(mod)
        experiments += mod.experiments
335

336
337
338
339
340
    if args.list:
        for e in experiments:
            print(e.name)
        sys.exit(0)

341
    for e in experiments:
342
        if args.auto_dist and not isinstance(e, DistributedExperiment):
Jonas Kaufmann's avatar
Jonas Kaufmann committed
343
            e = auto_dist(e, executors, args.proxy_type)
344
        # apply filter if any specified
345
        if (args.filter) and (len(args.filter) > 0):
346
347
            match = False
            for f in args.filter:
348
349
                match = fnmatch.fnmatch(e.name, f)
                if match:
350
                    break
351

352
353
354
            if not match:
                continue

355
        # if this is an experiment with a checkpoint we might have to create it
356
        no_simbricks = e.no_simbricks
357
        if e.checkpoint:
358
            prereq = add_exp(e, 0, None, True, False, no_simbricks)
359
360
        else:
            prereq = None
361

362
        for run in range(args.firstrun, args.firstrun + args.runs):
363
            add_exp(e, run, prereq, False, e.checkpoint, no_simbricks)
364
365
366
367
368
else:
    # otherwise load pickled run object
    for path in args.experiments:
        with open(path, 'rb') as f:
            rt.add_run(pickle.load(f))
369

370
# register interrupt handler
371
signal(SIGINT, lambda *_: rt.interrupt())
372
373

asyncio.run(rt.start())