"include/ck/utility/math.hpp" did not exist on "1566b31736d191fe3a43dd5efa59968e44191729"
runtime.py 6.89 KB
Newer Older
1
import asyncio
2
3
4
import pickle
import os
import pathlib
5
import re
6

7
8
9
import modes.experiments as exp

class Run(object):
10
    def __init__(self, experiment, index, env, outpath, prereq=None):
11
        self.experiment = experiment
12
        self.index = index
13
14
15
        self.env = env
        self.outpath = outpath
        self.output = None
16
        self.prereq = prereq
17

18
    def name(self):
19
        return self.experiment.name + '.' + str(self.index)
20

21
22
23
24
25
26
27
class Runtime(object):
    def add_run(self, run):
        pass

    def start(self):
        pass

28

29
class LocalSimpleRuntime(Runtime):
30
    def __init__(self, verbose=False):
31
32
        self.runnable = []
        self.complete = []
33
        self.verbose = verbose
34
35
36
37
38
39

    def add_run(self, run):
        self.runnable.append(run)

    def start(self):
        for run in self.runnable:
40
            pathlib.Path(run.env.workdir).mkdir(parents=True, exist_ok=True)
41
            pathlib.Path(run.env.cpdir).mkdir(parents=True, exist_ok=True)
42

43
44
            run.output = exp.run_exp_local(run.experiment, run.env,
                    verbose=self.verbose)
45
46
            self.complete.append(run)

47
            pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
48
49
            with open(run.outpath, 'w') as f:
                f.write(run.output.dumps())
50

51

52
class LocalParallelRuntime(Runtime):
53
    def __init__(self, cores, mem=None, verbose=False):
54
55
56
        self.runs_noprereq = []
        self.runs_prereq = []
        self.complete = set()
57
58
        self.cores = cores
        self.mem = mem
59
        self.verbose = verbose
60
61
62
63
64
65
66
67

    def add_run(self, run):
        if run.experiment.resreq_cores() > self.cores:
            raise Exception('Not enough cores available for run')

        if self.mem is not None and run.experiment.resreq_mem() > self.mem:
            raise Exception('Not enough memory available for run')

68
69
70
71
        if run.prereq is None:
            self.runs_noprereq.append(run)
        else:
            self.runs_prereq.append(run)
72
73
74

    async def do_run(self, run):
        ''' actually starts a run '''
75
        pathlib.Path(run.env.workdir).mkdir(parents=True, exist_ok=True)
76
        pathlib.Path(run.env.cpdir).mkdir(parents=True, exist_ok=True)
77

78
        await run.experiment.prepare(run.env, verbose=self.verbose)
79
        print('starting run ', run.name())
80
        run.output = await run.experiment.run(run.env, verbose=self.verbose)
81
82

        pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
83
84
        with open(run.outpath, 'w') as f:
            f.write(run.output.dumps())
85
        print('finished run ', run.name())
86
87
88
89
90
91
92
93
94
95
96
        return run

    async def wait_completion(self):
        ''' wait for any run to terminate and return '''
        assert self.pending_jobs

        done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
                return_when=asyncio.FIRST_COMPLETED)

        for run in done:
            run = await run
97
            self.complete.add(run)
98
99
100
101
102
103
104
105
106
107
108
109
110
            self.cores_used -= run.experiment.resreq_cores()
            self.mem_used -= run.experiment.resreq_mem()

    def enough_resources(self, run):
        ''' check if enough cores and mem are available for the run '''
        exp = run.experiment

        if self.cores is not None:
            enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
        else:
            enough_cores = True

        if self.mem is not None:
111
            enough_mem = (self.mem - self.mem_used) >= exp.resreq_mem()
112
113
114
115
116
        else:
            enough_mem = True

        return enough_cores and enough_mem

117
118
119
120
121
122
    def prereq_ready(self, run):
        if run.prereq is None:
            return True

        return run.prereq in self.complete

123
124
125
126
127
128
    async def do_start(self):
        #self.completions = asyncio.Queue()
        self.cores_used = 0
        self.mem_used = 0
        self.pending_jobs = set()

129
130
        runs = self.runs_noprereq + self.runs_prereq
        for run in runs:
131
            # check if we first have to wait for memory or cores
132
            while not self.enough_resources(run):
133
134
135
                print('waiting for resources')
                await self.wait_completion()

136
137
138
139
140
            # check if we first have to wait for memory or cores
            while not self.prereq_ready(run):
                print('waiting for prereq')
                await self.wait_completion()

141
142
143
144
145
146
147
148
149
150
151
152
            self.cores_used += run.experiment.resreq_cores()
            self.mem_used += run.experiment.resreq_mem()

            job = self.do_run(run)
            self.pending_jobs.add(job)

        # wait for all runs to finish
        while self.pending_jobs:
            await self.wait_completion()

    def start(self):
        asyncio.run(self.do_start())
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170

class SlurmRuntime(Runtime):
    def __init__(self, slurmdir, args, verbose=False, cleanup=True):
        self.runnable = []
        self.slurmdir = slurmdir
        self.args = args
        self.verbose = verbose
        self.cleanup = cleanup

    def add_run(self, run):
        self.runnable.append(run)

    def prep_run(self, run):
        exp = run.experiment
        exp_path = '%s/%s-%d.exp' % (self.slurmdir, exp.name, run.index)
        exp_log = '%s/%s-%d.log' % (self.slurmdir, exp.name, run.index)
        exp_script = '%s/%s-%d.sh' % (self.slurmdir, exp.name, run.index)

171
        # write out pickled run
172
        with open(exp_path, 'wb') as f:
173
174
            run.prereq = None # we don't want to pull in the prereq too
            pickle.dump(run, f)
175
176
177
178
179
180
181
182
183
184
185
186
187
188

        # create slurm batch script
        with open(exp_script, 'w') as f:
            f.write('#!/bin/sh\n')
            f.write('#SBATCH -o %s -e %s\n' % (exp_log, exp_log))
            f.write('#SBATCH -c %d\n' % (exp.resreq_cores(),))
            f.write('#SBATCH --mem=%dM\n' % (exp.resreq_mem(),))
            f.write('#SBATCH --job-name="%s"\n' % (run.name(),))
            if exp.timeout is not None:
                h = int(exp.timeout / 3600)
                m = int((exp.timeout % 3600) / 60)
                s = int(exp.timeout % 60)
                f.write('#SBATCH --time=%02d:%02d:%02d\n' % (h, m, s))

189
            f.write('python3 run.py --pickled %s\n' % (exp_path))
190
191
192
193
194
195
196
197
198
199
            f.write('status=$?\n')
            if self.cleanup:
                f.write('rm -rf %s\n' % (run.env.workdir))
            f.write('exit $status\n')

        return exp_script

    def start(self):
        pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True)

200
201
        jid_re = re.compile(r'Submitted batch job ([0-9]+)')

202
        for run in self.runnable:
203
204
205
206
207
            if run.prereq is None:
                dep_cmd = ''
            else:
                dep_cmd = '--dependency=afterok:' + str(run.prereq.job_id)

208
            script = self.prep_run(run)
209

210
            stream = os.popen('sbatch %s %s' % (dep_cmd, script))
211
212
213
214
215
216
217
218
            output = stream.read()
            result = stream.close()

            if result is not None:
                raise Exception('running sbatch failed')

            m = jid_re.search(output)
            run.job_id = int(m.group(1))