runtime.py 3.46 KB
Newer Older
1
2
import asyncio

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import modes.experiments as exp

class Run(object):
    def __init__(self, experiment, env, outpath):
        self.experiment = experiment
        self.env = env
        self.outpath = outpath
        self.output = None

class Runtime(object):
    def add_run(self, run):
        pass

    def start(self):
        pass

19

20
class LocalSimpleRuntime(Runtime):
21
    def __init__(self, verbose=False):
22
23
        self.runnable = []
        self.complete = []
24
        self.verbose = verbose
25
26
27
28
29
30

    def add_run(self, run):
        self.runnable.append(run)

    def start(self):
        for run in self.runnable:
31
32
            run.output = exp.run_exp_local(run.experiment, run.env,
                    verbose=self.verbose)
33
34
35
36
            self.complete.append(run)

            with open(run.outpath, 'w') as f:
                f.write(run.output.dumps())
37

38

39
class LocalParallelRuntime(Runtime):
40
    def __init__(self, cores, mem=None, verbose=False):
41
42
43
44
        self.runnable = []
        self.complete = []
        self.cores = cores
        self.mem = mem
45
        self.verbose = verbose
46
47
48
49
50
51
52
53
54
55
56
57

    def add_run(self, run):
        if run.experiment.resreq_cores() > self.cores:
            raise Exception('Not enough cores available for run')

        if self.mem is not None and run.experiment.resreq_mem() > self.mem:
            raise Exception('Not enough memory available for run')

        self.runnable.append(run)

    async def do_run(self, run):
        ''' actually starts a run '''
58
        await run.experiment.prepare(run.env, verbose=self.verbose)
59
        print('starting run ', run)
60
        run.output = await run.experiment.run(run.env, verbose=self.verbose)
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        with open(run.outpath, 'w') as f:
            f.write(run.output.dumps())
        print('finished run ', run)
        return run

    async def wait_completion(self):
        ''' wait for any run to terminate and return '''
        assert self.pending_jobs

        done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
                return_when=asyncio.FIRST_COMPLETED)

        for run in done:
            run = await run
            self.complete.append(run)
            self.cores_used -= run.experiment.resreq_cores()
            self.mem_used -= run.experiment.resreq_mem()

    def enough_resources(self, run):
        ''' check if enough cores and mem are available for the run '''
        exp = run.experiment

        if self.cores is not None:
            enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
        else:
            enough_cores = True

        if self.mem is not None:
            enough_mem = (self.mem - self.cores_used) >= exp.resreq_cores()
        else:
            enough_mem = True

        return enough_cores and enough_mem

    async def do_start(self):
        #self.completions = asyncio.Queue()
        self.cores_used = 0
        self.mem_used = 0
        self.pending_jobs = set()

        for run in self.runnable:
            # check if we first have to wait for memory or cores
            while not self.enough_resources(run):
                print('waiting for resources')
                await self.wait_completion()

            self.cores_used += run.experiment.resreq_cores()
            self.mem_used += run.experiment.resreq_mem()

            job = self.do_run(run)
            self.pending_jobs.add(job)

        # wait for all runs to finish
        while self.pending_jobs:
            await self.wait_completion()

    def start(self):
        asyncio.run(self.do_start())