runtime.py 8.4 KB
Newer Older
Antoine Kaufmann's avatar
Antoine Kaufmann committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

23
import asyncio
24
25
26
import pickle
import os
import pathlib
27
import shutil
28
import re
29

30
31
32
import modes.experiments as exp

class Run(object):
33
    def __init__(self, experiment, index, env, outpath, prereq=None):
34
        self.experiment = experiment
35
        self.index = index
36
37
38
        self.env = env
        self.outpath = outpath
        self.output = None
39
        self.prereq = prereq
40

41
    def name(self):
42
        return self.experiment.name + '.' + str(self.index)
43

44
45
46
47
48
49
50
51
    def prep_dirs(self):
        shutil.rmtree(self.env.workdir, ignore_errors=True)
        if self.env.create_cp:
            shutil.rmtree(self.env.cpdir, ignore_errors=True)

        pathlib.Path(self.env.workdir).mkdir(parents=True, exist_ok=True)
        pathlib.Path(self.env.cpdir).mkdir(parents=True, exist_ok=True)

52
53
54
55
56
57
58
class Runtime(object):
    def add_run(self, run):
        pass

    def start(self):
        pass

59

60
class LocalSimpleRuntime(Runtime):
61
    def __init__(self, verbose=False):
62
63
        self.runnable = []
        self.complete = []
64
        self.verbose = verbose
65
66
67
68
69
70

    def add_run(self, run):
        self.runnable.append(run)

    def start(self):
        for run in self.runnable:
71
            run.prep_dirs()
72
73
            run.output = exp.run_exp_local(run.experiment, run.env,
                    verbose=self.verbose)
74
75
            self.complete.append(run)

76
            pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
77
78
            with open(run.outpath, 'w') as f:
                f.write(run.output.dumps())
79

80

81
class LocalParallelRuntime(Runtime):
82
    def __init__(self, cores, mem=None, verbose=False):
83
84
85
        self.runs_noprereq = []
        self.runs_prereq = []
        self.complete = set()
86
87
        self.cores = cores
        self.mem = mem
88
        self.verbose = verbose
89
90
91
92
93
94
95
96

    def add_run(self, run):
        if run.experiment.resreq_cores() > self.cores:
            raise Exception('Not enough cores available for run')

        if self.mem is not None and run.experiment.resreq_mem() > self.mem:
            raise Exception('Not enough memory available for run')

97
98
99
100
        if run.prereq is None:
            self.runs_noprereq.append(run)
        else:
            self.runs_prereq.append(run)
101
102
103

    async def do_run(self, run):
        ''' actually starts a run '''
104
        run.prep_dirs()
105

106
        await run.experiment.prepare(run.env, verbose=self.verbose)
107
        print('starting run ', run.name())
108
        run.output = await run.experiment.run(run.env, verbose=self.verbose)
109
110

        pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
111
112
        with open(run.outpath, 'w') as f:
            f.write(run.output.dumps())
113
        print('finished run ', run.name())
114
115
116
117
118
119
120
121
122
123
124
        return run

    async def wait_completion(self):
        ''' wait for any run to terminate and return '''
        assert self.pending_jobs

        done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
                return_when=asyncio.FIRST_COMPLETED)

        for run in done:
            run = await run
125
            self.complete.add(run)
126
127
128
129
130
131
132
133
134
135
136
137
138
            self.cores_used -= run.experiment.resreq_cores()
            self.mem_used -= run.experiment.resreq_mem()

    def enough_resources(self, run):
        ''' check if enough cores and mem are available for the run '''
        exp = run.experiment

        if self.cores is not None:
            enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
        else:
            enough_cores = True

        if self.mem is not None:
139
            enough_mem = (self.mem - self.mem_used) >= exp.resreq_mem()
140
141
142
143
144
        else:
            enough_mem = True

        return enough_cores and enough_mem

145
146
147
148
149
150
    def prereq_ready(self, run):
        if run.prereq is None:
            return True

        return run.prereq in self.complete

151
152
153
154
155
156
    async def do_start(self):
        #self.completions = asyncio.Queue()
        self.cores_used = 0
        self.mem_used = 0
        self.pending_jobs = set()

157
158
        runs = self.runs_noprereq + self.runs_prereq
        for run in runs:
159
            # check if we first have to wait for memory or cores
160
            while not self.enough_resources(run):
161
162
163
                print('waiting for resources')
                await self.wait_completion()

164
165
166
167
168
            # check if we first have to wait for memory or cores
            while not self.prereq_ready(run):
                print('waiting for prereq')
                await self.wait_completion()

169
170
171
172
173
174
175
176
177
178
179
180
            self.cores_used += run.experiment.resreq_cores()
            self.mem_used += run.experiment.resreq_mem()

            job = self.do_run(run)
            self.pending_jobs.add(job)

        # wait for all runs to finish
        while self.pending_jobs:
            await self.wait_completion()

    def start(self):
        asyncio.run(self.do_start())
181
182
183
184
185
186
187
188
189
190
191
192
193
194

class SlurmRuntime(Runtime):
    def __init__(self, slurmdir, args, verbose=False, cleanup=True):
        self.runnable = []
        self.slurmdir = slurmdir
        self.args = args
        self.verbose = verbose
        self.cleanup = cleanup

    def add_run(self, run):
        self.runnable.append(run)

    def prep_run(self, run):
        exp = run.experiment
Hejing Li's avatar
Hejing Li committed
195
196
197
198
199
200
201
202
203
204
205
206
        e_idx = exp.name + f'-{run.index}' + '.exp'
        exp_path = os.path.join(self.slurmdir, e_idx)
        
        log_idx = exp.name + f'-{run.index}' + '.log'
        exp_log = os.path.join(self.slurmdir, log_idx)

        sc_idx = exp.name + f'-{run.index}' + '.sh'
        exp_script = os.path.join(self.slurmdir, sc_idx)
        print(exp_path)
        print(exp_log)
        print(exp_script)
        
207
        # write out pickled run
208
        with open(exp_path, 'wb') as f:
209
210
            run.prereq = None # we don't want to pull in the prereq too
            pickle.dump(run, f)
211
212
213
214
215
216
217
218
219
220
221
222
223
224

        # create slurm batch script
        with open(exp_script, 'w') as f:
            f.write('#!/bin/sh\n')
            f.write('#SBATCH -o %s -e %s\n' % (exp_log, exp_log))
            f.write('#SBATCH -c %d\n' % (exp.resreq_cores(),))
            f.write('#SBATCH --mem=%dM\n' % (exp.resreq_mem(),))
            f.write('#SBATCH --job-name="%s"\n' % (run.name(),))
            if exp.timeout is not None:
                h = int(exp.timeout / 3600)
                m = int((exp.timeout % 3600) / 60)
                s = int(exp.timeout % 60)
                f.write('#SBATCH --time=%02d:%02d:%02d\n' % (h, m, s))

225
226
227
228
229
            extra = ''
            if self.verbose:
                extra = '--verbose'

            f.write('python3 run.py %s --pickled %s\n' % (extra, exp_path))
230
231
232
233
234
235
236
237
238
239
            f.write('status=$?\n')
            if self.cleanup:
                f.write('rm -rf %s\n' % (run.env.workdir))
            f.write('exit $status\n')

        return exp_script

    def start(self):
        pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True)

240
241
        jid_re = re.compile(r'Submitted batch job ([0-9]+)')

242
        for run in self.runnable:
243
244
245
246
247
            if run.prereq is None:
                dep_cmd = ''
            else:
                dep_cmd = '--dependency=afterok:' + str(run.prereq.job_id)

248
            script = self.prep_run(run)
249

250
            stream = os.popen('sbatch %s %s' % (dep_cmd, script))
251
252
253
254
255
256
257
258
            output = stream.read()
            result = stream.close()

            if result is not None:
                raise Exception('running sbatch failed')

            m = jid_re.search(output)
            run.job_id = int(m.group(1))