runtime.py 3.32 KB
Newer Older
1
2
import asyncio

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import modes.experiments as exp

class Run(object):
    def __init__(self, experiment, env, outpath):
        self.experiment = experiment
        self.env = env
        self.outpath = outpath
        self.output = None

class Runtime(object):
    def add_run(self, run):
        pass

    def start(self):
        pass

class LocalSimpleRuntime(Runtime):
    def __init__(self):
        self.runnable = []
        self.complete = []

    def add_run(self, run):
        self.runnable.append(run)

    def start(self):
        for run in self.runnable:
            run.output = exp.run_exp_local(run.experiment, run.env)
            self.complete.append(run)

            with open(run.outpath, 'w') as f:
                f.write(run.output.dumps())
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

class LocalParallelRuntime(Runtime):
    def __init__(self, cores, mem=None):
        self.runnable = []
        self.complete = []
        self.cores = cores
        self.mem = mem

    def add_run(self, run):
        if run.experiment.resreq_cores() > self.cores:
            raise Exception('Not enough cores available for run')

        if self.mem is not None and run.experiment.resreq_mem() > self.mem:
            raise Exception('Not enough memory available for run')

        self.runnable.append(run)

    async def do_run(self, run):
        ''' actually starts a run '''
        await run.experiment.prepare(run.env)
        print('starting run ', run)
        run.output = await run.experiment.run(run.env)
        with open(run.outpath, 'w') as f:
            f.write(run.output.dumps())
        print('finished run ', run)
        return run
        #await self.completions.put(run)

    async def wait_completion(self):
        ''' wait for any run to terminate and return '''
        assert self.pending_jobs

        done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
                return_when=asyncio.FIRST_COMPLETED)

        for run in done:
            run = await run
            self.complete.append(run)
            self.cores_used -= run.experiment.resreq_cores()
            self.mem_used -= run.experiment.resreq_mem()

    def enough_resources(self, run):
        ''' check if enough cores and mem are available for the run '''
        exp = run.experiment

        if self.cores is not None:
            enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
        else:
            enough_cores = True

        if self.mem is not None:
            enough_mem = (self.mem - self.cores_used) >= exp.resreq_cores()
        else:
            enough_mem = True

        return enough_cores and enough_mem

    async def do_start(self):
        #self.completions = asyncio.Queue()
        self.cores_used = 0
        self.mem_used = 0
        self.pending_jobs = set()

        for run in self.runnable:
            # check if we first have to wait for memory or cores
            while not self.enough_resources(run):
                print('waiting for resources')
                await self.wait_completion()

            self.cores_used += run.experiment.resreq_cores()
            self.mem_used += run.experiment.resreq_mem()

            job = self.do_run(run)
            self.pending_jobs.add(job)

        # wait for all runs to finish
        while self.pending_jobs:
            await self.wait_completion()

    def start(self):
        asyncio.run(self.do_start())