Commit 013f612a authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: more parallelism when starting experiments

Especially important for large scale un-synchronized experiments
parent 35fa9495
......@@ -281,6 +281,12 @@ class Executor(object):
await cmdC.start()
await cmdC.wait()
async def await_files(self, paths, delay=0.05, verbose=False):
xs = []
for p in paths:
xs.append(self.await_file(p, delay=delay, verbose=verbose))
await asyncio.wait(xs)
class LocalExecutor(Executor):
def create_component(self, label, parts, **kwargs):
return SimpleComponent(label, parts, **kwargs)
......
......@@ -22,6 +22,7 @@
import os
import asyncio
from collections import defaultdict
import simbricks.exectools as exectools
import shlex
import time
......@@ -147,12 +148,14 @@ class ExperimentBaseRunner(object):
async def prepare(self):
# generate config tars
copies = []
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
host.node_config.make_tar(path)
await self.sim_executor(host).send_file(path, self.verbose)
copies.append(self.sim_executor(host).send_file(path, self.verbose))
await asyncio.wait(copies)
# prepare all simulators in parallel
sims = []
......@@ -167,6 +170,7 @@ class ExperimentBaseRunner(object):
""" Start all NIC simulators. """
if self.verbose:
print('%s: starting NICS' % self.exp.name)
starts = []
for nic in self.exp.nics:
if self.verbose:
print('start NIC:', nic.run_cmd(self.env))
......@@ -174,18 +178,22 @@ class ExperimentBaseRunner(object):
sc = exec.create_component(nic.full_name(),
shlex.split(nic.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
starts.append(sc.start())
self.running.append((nic, sc))
self.sockets.append((exec, self.env.nic_pci_path(nic)))
self.sockets.append((exec, self.env.nic_eth_path(nic)))
self.sockets.append((exec, self.env.nic_shm_path(nic)))
await asyncio.wait(starts)
# Wait till all NIC sockets exist
if self.verbose:
print('%s: waiting for sockets' % self.exp.name)
byexec = defaultdict(lambda: [])
for (exec, s) in self.sockets:
await exec.await_file(s, verbose=self.verbose)
byexec[exec].append(s)
for (exec, ss) in byexec.items():
await exec.await_files(ss, verbose=self.verbose)
# just a bit of a safety delay
await asyncio.sleep(0.5)
......@@ -194,6 +202,7 @@ class ExperimentBaseRunner(object):
""" Start all network simulators (typically one). """
if self.verbose:
print('%s: starting networks' % self.exp.name)
starts = []
for net in self.exp.networks:
if self.verbose:
print('start Net:', net.run_cmd(self.env))
......@@ -202,13 +211,15 @@ class ExperimentBaseRunner(object):
sc = exec.create_component(net.full_name(),
shlex.split(net.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
starts.append(sc.start())
self.running.append((net, sc))
await asyncio.wait(starts)
async def run_hosts(self):
""" Start all host simulators. """
if self.verbose:
print('%s: starting hosts' % self.exp.name)
starts = []
for host in self.exp.hosts:
if self.verbose:
print('start Host:', host.run_cmd(self.env))
......@@ -217,7 +228,7 @@ class ExperimentBaseRunner(object):
sc = exec.create_component(host.full_name(),
shlex.split(host.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
starts.append(sc.start())
self.running.append((host,sc))
if host.wait:
......@@ -225,6 +236,7 @@ class ExperimentBaseRunner(object):
if host.sleep > 0:
await asyncio.sleep(host.sleep)
await asyncio.wait(starts)
async def wait_for_hosts(self):
""" Wait for hosts to terminate (the ones marked to wait on). """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment