"...composable_kernel.git" did not exist on "39430bfdeb7247f0fdf4ba07559619ab1ab9a415"
Commit 4b2fde4d authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: add ExperimentRunner instead of logic in Experiment class

Again prepration for distributed experiments.
parent a8fd4999
...@@ -59,133 +59,204 @@ class Experiment(object): ...@@ -59,133 +59,204 @@ class Experiment(object):
raise Exception('Duplicate net name') raise Exception('Duplicate net name')
self.networks.append(sim) self.networks.append(sim)
async def prepare(self, env, verbose=False, exec=exectools.LocalExecutor()): def resreq_mem(self):
mem = 0
for h in self.hosts:
mem += h.resreq_mem()
for n in self.nics:
mem += n.resreq_mem()
for n in self.networks:
mem += n.resreq_mem()
return mem
def resreq_cores(self):
cores = 0
for h in self.hosts:
cores += h.resreq_cores()
for n in self.nics:
cores += n.resreq_cores()
for n in self.networks:
cores += n.resreq_cores()
return cores
class ExperimentBaseRunner(object):
def __init__(self, exp, env, verbose):
self.exp = exp
self.env = env
self.verbose = verbose
self.out = ExpOutput(exp)
self.running = []
self.sockets = []
self.wait_hosts = []
def sim_executor(self, sim):
raise NotImplementedError("Please implement this method")
async def before_nics(self):
pass
async def before_nets(self):
pass
async def before_hosts(self):
pass
async def before_wait(self):
pass
async def before_cleanup(self):
pass
async def after_cleanup(self):
pass
async def prepare(self):
# generate config tars # generate config tars
for host in self.hosts: for host in self.exp.hosts:
path = env.cfgtar_path(host) path = self.env.cfgtar_path(host)
if verbose: if self.verbose:
print('preparing config tar:', path) print('preparing config tar:', path)
host.node_config.make_tar(path) host.node_config.make_tar(path)
await exec.send_file(path, verbose) await self.sim_executor(host).send_file(path, self.verbose)
# prepare all simulators in parallel # prepare all simulators in parallel
sims = [] sims = []
for sim in self.hosts + self.nics + self.networks: for sim in self.exp.hosts + self.exp.nics + self.exp.networks:
prep_cmds = [pc for pc in sim.prep_cmds(env)] prep_cmds = [pc for pc in sim.prep_cmds(self.env)]
sims.append(exec.run_cmdlist('prepare_' + self.name, prep_cmds, exec = self.sim_executor(sim)
verbose=verbose)) sims.append(exec.run_cmdlist('prepare_' + self.exp.name, prep_cmds,
verbose=self.verbose))
await asyncio.wait(sims) await asyncio.wait(sims)
async def run(self, env, verbose=False, exec=exectools.LocalExecutor()): async def run_nics(self):
running = [] """ Start all NIC simulators. """
sockets = [] if self.verbose:
out = ExpOutput(self) print('%s: starting NICS' % self.exp.name)
for nic in self.exp.nics:
if self.verbose:
print('start NIC:', nic.run_cmd(self.env))
exec = self.sim_executor(nic)
sc = exec.create_component(nic.full_name(),
shlex.split(nic.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((nic, sc))
self.sockets.append((exec, self.env.nic_pci_path(nic)))
self.sockets.append((exec, self.env.nic_eth_path(nic)))
self.sockets.append((exec, self.env.nic_shm_path(nic)))
# Wait till all NIC sockets exist
if self.verbose:
print('%s: waiting for sockets' % self.exp.name)
for (exec, s) in self.sockets:
await exec.await_file(s, verbose=self.verbose)
# just a bit of a safety delay
await asyncio.sleep(0.5)
async def run_nets(self):
""" Start all network simulators (typically one). """
if self.verbose:
print('%s: starting networks' % self.exp.name)
for net in self.exp.networks:
if self.verbose:
print('start Net:', net.run_cmd(self.env))
exec = self.sim_executor(net)
sc = exec.create_component(net.full_name(),
shlex.split(net.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((net, sc))
async def run_hosts(self):
""" Start all host simulators. """
if self.verbose:
print('%s: starting hosts' % self.exp.name)
for host in self.exp.hosts:
if self.verbose:
print('start Host:', host.run_cmd(self.env))
exec = self.sim_executor(host)
sc = exec.create_component(host.full_name(),
shlex.split(host.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((host,sc))
if host.wait:
self.wait_hosts.append(sc)
if host.sleep > 0:
await asyncio.sleep(host.sleep)
async def wait_for_hosts(self):
""" Wait for hosts to terminate (the ones marked to wait on). """
if self.verbose:
print('%s: waiting for hosts to terminate' % self.exp.name)
for sc in self.wait_hosts:
await sc.wait()
async def run(self):
try: try:
out.set_start() self.out.set_start()
if verbose: await self.before_nics()
print('%s: starting NICS' % self.name) await self.run_nics()
for nic in self.nics:
if verbose: await self.before_nets()
print('start NIC:', nic.run_cmd(env)) await self.run_nets()
sc = exec.create_component(nic.full_name(),
shlex.split(nic.run_cmd(env)), verbose=verbose, await self.before_hosts()
canfail=True) await self.run_hosts()
await sc.start()
running.append((nic, sc)) await self.before_wait()
await self.wait_for_hosts()
sockets.append(env.nic_pci_path(nic))
sockets.append(env.nic_eth_path(nic))
sockets.append(env.nic_shm_path(nic))
if verbose:
print('%s: waiting for sockets' % self.name)
for s in sockets:
await exec.await_file(s, verbose=verbose)
await asyncio.sleep(0.5)
# start networks
for net in self.networks:
if verbose:
print('start Net:', net.run_cmd(env))
sc = exec.create_component(net.full_name(),
shlex.split(net.run_cmd(env)), verbose=verbose,
canfail=True)
await sc.start()
running.append((net, sc))
# start hosts
wait_hosts = []
for host in self.hosts:
if verbose:
print('start Host:', host.run_cmd(env))
sc = exec.create_component(host.full_name(),
shlex.split(host.run_cmd(env)), verbose=verbose,
canfail=True)
await sc.start()
running.append((host,sc))
if host.wait:
wait_hosts.append(sc)
if host.sleep > 0:
await asyncio.sleep(host.sleep)
if verbose:
print('%s: waiting for hosts to terminate' % self.name)
for sc in wait_hosts:
await sc.wait()
# wait for necessary hosts to terminate
except: except:
out.set_failed() self.out.set_failed()
traceback.print_exc() traceback.print_exc()
finally: finally:
out.set_end() self.out.set_end()
# shut things back down # shut things back down
if verbose: if self.verbose:
print('%s: cleaning up' % self.name) print('%s: cleaning up' % self.exp.name)
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = [] scs = []
for _,sc in running: for _,sc in self.running:
scs.append(sc.int_term_kill()) scs.append(sc.int_term_kill())
await asyncio.wait(scs) await asyncio.wait(scs)
for _,sc in running: # wait for all processes to terminate
for _,sc in self.running:
await sc.wait() await sc.wait()
for sock in sockets: # remove all sockets
for (exec,sock) in self.sockets:
await exec.rmtree(sock) await exec.rmtree(sock)
for sim,sc in running: # add all simulator components to the output
out.add_sim(sim, sc) for sim,sc in self.running:
return out self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
class ExperimentSimpleRunner(ExperimentBaseRunner):
""" Simple experiment runner with just one executor. """
def __init__(self, exec, *args, **kwargs):
self.exec = exec
super().__init__(*args, **kwargs)
def resreq_mem(self): def sim_executor(self, sim):
mem = 0 return self.exec
for h in self.hosts:
mem += h.resreq_mem()
for n in self.nics:
mem += n.resreq_mem()
for n in self.networks:
mem += n.resreq_mem()
return mem
def resreq_cores(self):
cores = 0
for h in self.hosts:
cores += h.resreq_cores()
for n in self.nics:
cores += n.resreq_cores()
for n in self.networks:
cores += n.resreq_cores()
return cores
class ExpEnv(object): class ExpEnv(object):
def __init__(self, repo_path, workdir, cpdir): def __init__(self, repo_path, workdir, cpdir):
......
...@@ -38,11 +38,11 @@ class LocalSimpleRuntime(Runtime): ...@@ -38,11 +38,11 @@ class LocalSimpleRuntime(Runtime):
self.runnable.append(run) self.runnable.append(run)
async def do_run(self, run): async def do_run(self, run):
runner = exp.ExperimentSimpleRunner(self.exec, run.experiment, run.env,
self.verbose)
await run.prep_dirs(self.exec) await run.prep_dirs(self.exec)
await run.experiment.prepare(run.env, verbose=self.verbose, await runner.prepare()
exec=self.exec) run.output = await runner.run()
run.output = await run.experiment.run(run.env, verbose=self.verbose,
exec=self.exec)
self.complete.append(run) self.complete.append(run)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
...@@ -79,12 +79,12 @@ class LocalParallelRuntime(Runtime): ...@@ -79,12 +79,12 @@ class LocalParallelRuntime(Runtime):
async def do_run(self, run): async def do_run(self, run):
''' actually starts a run ''' ''' actually starts a run '''
runner = exp.ExperimentSimpleRunner(self.exec, run.experiment, run.env,
self.verbose)
await run.prep_dirs(exec=self.exec) await run.prep_dirs(exec=self.exec)
await run.experiment.prepare(run.env, verbose=self.verbose, await runner.prepare()
exec=self.exec)
print('starting run ', run.name()) print('starting run ', run.name())
run.output = await run.experiment.run(run.env, verbose=self.verbose, run.output = await runner.run()
exec=self.exec)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w') as f: with open(run.outpath, 'w') as f:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment