Commit 32fd378d authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: add support for dependency (for checkpoints)

parent 6d803cca
......@@ -7,12 +7,13 @@ import re
import modes.experiments as exp
class Run(object):
def __init__(self, experiment, index, env, outpath):
def __init__(self, experiment, index, env, outpath, prereq=None):
self.experiment = experiment
self.index = index
self.env = env
self.outpath = outpath
self.output = None
self.prereq = prereq
def name(self):
return self.experiment.name + '.' + str(self.index)
......@@ -46,8 +47,9 @@ class LocalSimpleRuntime(Runtime):
class LocalParallelRuntime(Runtime):
def __init__(self, cores, mem=None, verbose=False):
self.runnable = []
self.complete = []
self.runs_noprereq = []
self.runs_prereq = []
self.complete = set()
self.cores = cores
self.mem = mem
self.verbose = verbose
......@@ -59,7 +61,10 @@ class LocalParallelRuntime(Runtime):
if self.mem is not None and run.experiment.resreq_mem() > self.mem:
raise Exception('Not enough memory available for run')
self.runnable.append(run)
if run.prereq is None:
self.runs_noprereq.append(run)
else:
self.runs_prereq.append(run)
async def do_run(self, run):
''' actually starts a run '''
......@@ -80,7 +85,7 @@ class LocalParallelRuntime(Runtime):
for run in done:
run = await run
self.complete.append(run)
self.complete.add(run)
self.cores_used -= run.experiment.resreq_cores()
self.mem_used -= run.experiment.resreq_mem()
......@@ -100,18 +105,30 @@ class LocalParallelRuntime(Runtime):
return enough_cores and enough_mem
def prereq_ready(self, run):
if run.prereq is None:
return True
return run.prereq in self.complete
async def do_start(self):
#self.completions = asyncio.Queue()
self.cores_used = 0
self.mem_used = 0
self.pending_jobs = set()
for run in self.runnable:
runs = self.runs_noprereq + self.runs_prereq
for run in runs:
# check if we first have to wait for memory or cores
while not self.enough_resources(run):
while not self.enough_resources(run) or not self.prereq_ready(run):
print('waiting for resources')
await self.wait_completion()
# check if we first have to wait for memory or cores
while not self.prereq_ready(run):
print('waiting for prereq')
await self.wait_completion()
self.cores_used += run.experiment.resreq_cores()
self.mem_used += run.experiment.resreq_mem()
......@@ -177,9 +194,15 @@ class SlurmRuntime(Runtime):
jid_re = re.compile(r'Submitted batch job ([0-9]+)')
for run in self.runnable:
if run.prereq is None:
dep_cmd = ''
else:
dep_cmd = '--dependency=afterok:' + str(run.prereq.job_id)
script = self.prep_run(run)
stream = os.popen('sbatch ' + script)
cmd = 'sbatch ' + script
stream = os.popen('sbatch %s %s' % (dep_cmd, script))
output = stream.read()
result = stream.close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment