runtime.py 3.59 KB
Newer Older
1
2
import asyncio

3
4
5
import modes.experiments as exp

class Run(object):
6
    def __init__(self, experiment, index, env, outpath):
7
        self.experiment = experiment
8
        self.index = index
9
10
11
12
        self.env = env
        self.outpath = outpath
        self.output = None

13
14
15
    def name(self):
        return self.experiment.name + '[' + str(self.index) + ']'

16
17
18
19
20
21
22
class Runtime(object):
    def add_run(self, run):
        pass

    def start(self):
        pass

23

24
class LocalSimpleRuntime(Runtime):
25
    def __init__(self, verbose=False):
26
27
        self.runnable = []
        self.complete = []
28
        self.verbose = verbose
29
30
31
32
33
34

    def add_run(self, run):
        self.runnable.append(run)

    def start(self):
        for run in self.runnable:
35
36
            run.output = exp.run_exp_local(run.experiment, run.env,
                    verbose=self.verbose)
37
38
39
40
            self.complete.append(run)

            with open(run.outpath, 'w') as f:
                f.write(run.output.dumps())
41

42

43
class LocalParallelRuntime(Runtime):
44
    def __init__(self, cores, mem=None, verbose=False):
45
46
47
48
        self.runnable = []
        self.complete = []
        self.cores = cores
        self.mem = mem
49
        self.verbose = verbose
50
51
52
53
54
55
56
57
58
59
60
61

    def add_run(self, run):
        if run.experiment.resreq_cores() > self.cores:
            raise Exception('Not enough cores available for run')

        if self.mem is not None and run.experiment.resreq_mem() > self.mem:
            raise Exception('Not enough memory available for run')

        self.runnable.append(run)

    async def do_run(self, run):
        ''' actually starts a run '''
62
        await run.experiment.prepare(run.env, verbose=self.verbose)
63
        print('starting run ', run.name())
64
        run.output = await run.experiment.run(run.env, verbose=self.verbose)
65
66
        with open(run.outpath, 'w') as f:
            f.write(run.output.dumps())
67
        print('finished run ', run.name())
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
        return run

    async def wait_completion(self):
        ''' wait for any run to terminate and return '''
        assert self.pending_jobs

        done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
                return_when=asyncio.FIRST_COMPLETED)

        for run in done:
            run = await run
            self.complete.append(run)
            self.cores_used -= run.experiment.resreq_cores()
            self.mem_used -= run.experiment.resreq_mem()

    def enough_resources(self, run):
        ''' check if enough cores and mem are available for the run '''
        exp = run.experiment

        if self.cores is not None:
            enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
        else:
            enough_cores = True

        if self.mem is not None:
            enough_mem = (self.mem - self.cores_used) >= exp.resreq_cores()
        else:
            enough_mem = True

        return enough_cores and enough_mem

    async def do_start(self):
        #self.completions = asyncio.Queue()
        self.cores_used = 0
        self.mem_used = 0
        self.pending_jobs = set()

        for run in self.runnable:
            # check if we first have to wait for memory or cores
            while not self.enough_resources(run):
                print('waiting for resources')
                await self.wait_completion()

            self.cores_used += run.experiment.resreq_cores()
            self.mem_used += run.experiment.resreq_mem()

            job = self.do_run(run)
            self.pending_jobs.add(job)

        # wait for all runs to finish
        while self.pending_jobs:
            await self.wait_completion()

    def start(self):
        asyncio.run(self.do_start())