Commit 9031dc48 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: add parallel runtime

parent 8be8b838
import asyncio
import modes.experiments as exp
class Run(object):
......@@ -29,3 +31,84 @@ class LocalSimpleRuntime(Runtime):
with open(run.outpath, 'w') as f:
f.write(run.output.dumps())
class LocalParallelRuntime(Runtime):
def __init__(self, cores, mem=None):
self.runnable = []
self.complete = []
self.cores = cores
self.mem = mem
def add_run(self, run):
if run.experiment.resreq_cores() > self.cores:
raise Exception('Not enough cores available for run')
if self.mem is not None and run.experiment.resreq_mem() > self.mem:
raise Exception('Not enough memory available for run')
self.runnable.append(run)
async def do_run(self, run):
''' actually starts a run '''
await run.experiment.prepare(run.env)
print('starting run ', run)
run.output = await run.experiment.run(run.env)
with open(run.outpath, 'w') as f:
f.write(run.output.dumps())
print('finished run ', run)
return run
#await self.completions.put(run)
async def wait_completion(self):
''' wait for any run to terminate and return '''
assert self.pending_jobs
done, self.pending_jobs = await asyncio.wait(self.pending_jobs,
return_when=asyncio.FIRST_COMPLETED)
for run in done:
run = await run
self.complete.append(run)
self.cores_used -= run.experiment.resreq_cores()
self.mem_used -= run.experiment.resreq_mem()
def enough_resources(self, run):
''' check if enough cores and mem are available for the run '''
exp = run.experiment
if self.cores is not None:
enough_cores = (self.cores - self.cores_used) >= exp.resreq_cores()
else:
enough_cores = True
if self.mem is not None:
enough_mem = (self.mem - self.cores_used) >= exp.resreq_cores()
else:
enough_mem = True
return enough_cores and enough_mem
async def do_start(self):
#self.completions = asyncio.Queue()
self.cores_used = 0
self.mem_used = 0
self.pending_jobs = set()
for run in self.runnable:
# check if we first have to wait for memory or cores
while not self.enough_resources(run):
print('waiting for resources')
await self.wait_completion()
self.cores_used += run.experiment.resreq_cores()
self.mem_used += run.experiment.resreq_mem()
job = self.do_run(run)
self.pending_jobs.add(job)
# wait for all runs to finish
while self.pending_jobs:
await self.wait_completion()
def start(self):
asyncio.run(self.do_start())
......@@ -15,14 +15,26 @@ parser = argparse.ArgumentParser()
parser.add_argument('experiments', metavar='EXP', type=str, nargs='+',
help='An experiment file to run')
parser.add_argument('--runs', metavar='N', type=int, default=1,
help='Number of runs')
parser.add_argument('--repo', metavar='DIR', type=str,
help='Number of repetition for each experiment')
g_env = parser.add_argument_group('Environment')
g_env.add_argument('--repo', metavar='DIR', type=str,
default='..', help='Repo directory')
parser.add_argument('--workdir', metavar='DIR', type=str,
g_env.add_argument('--workdir', metavar='DIR', type=str,
default='./out/', help='Work directory base')
parser.add_argument('--outdir', metavar='DIR', type=str,
g_env.add_argument('--outdir', metavar='DIR', type=str,
default='./out/', help='Output directory base')
g_par = parser.add_argument_group('Parallel Runtime')
g_par.add_argument('--parallel', dest='runtime', action='store_const',
const='parallel', default='sequential',
help='Use parallel instead of sequential runtime')
g_par.add_argument('--cores', metavar='N', type=int,
default=len(os.sched_getaffinity(0)),
help='Number of cores to use for parallel runs')
g_par.add_argument('--mem', metavar='N', type=int, default=None,
help='Memory limit for parallel runs (in MB)')
args = parser.parse_args()
experiments = []
......@@ -41,7 +53,10 @@ for path in args.experiments:
mkdir_if_not_exists(args.workdir)
mkdir_if_not_exists(args.outdir)
runtime = runtime.LocalSimpleRuntime()
if args.runtime == 'parallel':
rt = runtime.LocalParallelRuntime(cores=args.cores, mem=args.mem)
else:
rt = runtime.LocalSimpleRuntime()
for e in experiments:
workdir_base = '%s/%s' % (args.workdir, e.name)
......@@ -57,6 +72,6 @@ for e in experiments:
mkdir_if_not_exists(workdir)
env = exp.ExpEnv(args.repo, workdir)
runtime.add_run(Run(e, env, outpath))
rt.add_run(runtime.Run(e, env, outpath))
runtime.start()
rt.start()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment