Commit 175ae3ab authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

more doc strings and type annotations

- adds description for netperf experiment
- reformat local.py
- reformat results/netperf.py
parent 6c0c2d58
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Experiment, which simulates two hosts, one running a netperf server and the
other a client with the goal of measuring latency and throughput between them.
The goal is to compare different simulators for host, NIC, and the network in
terms of simulated network throughput and latency."""
import simbricks.experiments as exp import simbricks.experiments as exp
import simbricks.simulators as sim import simbricks.simulators as sim
......
...@@ -28,6 +28,8 @@ import importlib.util ...@@ -28,6 +28,8 @@ import importlib.util
import json import json
import pickle import pickle
import fnmatch import fnmatch
import typing as tp
import simbricks.exectools as exectools import simbricks.exectools as exectools
import simbricks.experiments as exp import simbricks.experiments as exp
import simbricks.runtime as runtime import simbricks.runtime as runtime
...@@ -141,7 +143,11 @@ else: ...@@ -141,7 +143,11 @@ else:
warn_multi_exec() warn_multi_exec()
rt = runtime.LocalSimpleRuntime(verbose=args.verbose, exec=executors[0]) rt = runtime.LocalSimpleRuntime(verbose=args.verbose, exec=executors[0])
def add_exp(e, run, prereq, create_cp, restore_cp, no_simbricks):
def add_exp(
e: exp.Experiment, run: int, prereq: tp.Optional[runtime.Run],
create_cp: bool, restore_cp: bool, no_simbricks: bool
):
outpath = '%s/%s-%d.json' % (args.outdir, e.name, run) outpath = '%s/%s-%d.json' % (args.outdir, e.name, run)
if os.path.exists(outpath) and not args.force: if os.path.exists(outpath) and not args.force:
print('skip %s run %d' % (e.name, run)) print('skip %s run %d' % (e.name, run))
......
...@@ -257,7 +257,7 @@ class SimpleRemoteComponent(SimpleComponent): ...@@ -257,7 +257,7 @@ class SimpleRemoteComponent(SimpleComponent):
class Executor(object): class Executor(object):
ip = None ip = None
def create_component(self, label, parts, **kwargs): def create_component(self, label, parts, **kwargs) -> SimpleComponent:
raise NotImplementedError("Please Implement this method") raise NotImplementedError("Please Implement this method")
async def await_file(self, path, delay=0.05, verbose=False): async def await_file(self, path, delay=0.05, verbose=False):
......
...@@ -23,9 +23,9 @@ ...@@ -23,9 +23,9 @@
import asyncio import asyncio
import pathlib import pathlib
from simbricks.exectools import Executor
from simbricks.runtime.common import * from simbricks.runtime.common import *
import simbricks.experiments as exp import simbricks.experiments as exp
import simbricks.exectools as exectools
import simbricks.proxy as proxy import simbricks.proxy as proxy
class DistributedSimpleRuntime(Runtime): class DistributedSimpleRuntime(Runtime):
...@@ -35,13 +35,13 @@ class DistributedSimpleRuntime(Runtime): ...@@ -35,13 +35,13 @@ class DistributedSimpleRuntime(Runtime):
self.verbose = verbose self.verbose = verbose
self.execs = execs self.execs = execs
def add_run(self, run): def add_run(self, run: Run):
if not isinstance(run.experiment, exp.DistributedExperiment): if not isinstance(run.experiment, exp.DistributedExperiment):
raise RuntimeError('Only distributed experiments supported') raise RuntimeError('Only distributed experiments supported')
self.runnable.append(run) self.runnable.append(run)
async def do_run(self, run): async def do_run(self, run: Run):
runner = exp.ExperimentDistributedRunner(self.execs, run.experiment, runner = exp.ExperimentDistributedRunner(self.execs, run.experiment,
run.env, self.verbose) run.env, self.verbose)
for exec in self.execs: for exec in self.execs:
...@@ -58,7 +58,10 @@ class DistributedSimpleRuntime(Runtime): ...@@ -58,7 +58,10 @@ class DistributedSimpleRuntime(Runtime):
for run in self.runnable: for run in self.runnable:
asyncio.run(self.do_run(run)) asyncio.run(self.do_run(run))
def auto_dist(e, execs, proxy_type='sockets'):
def auto_dist(
e: Experiment, execs: tp.List[Executor], proxy_type: str = 'sockets'
):
""" Converts an Experiment into a DistributedExperiment. Assigns network to """ Converts an Experiment into a DistributedExperiment. Assigns network to
executor zero, and then round-robin assignment of hosts to executors, executor zero, and then round-robin assignment of hosts to executors,
while also assigning all nics for a host to the same executor. while also assigning all nics for a host to the same executor.
......
...@@ -22,24 +22,34 @@ ...@@ -22,24 +22,34 @@
import asyncio import asyncio
import pathlib import pathlib
import typing as tp
from simbricks.runtime.common import * from simbricks.runtime.common import *
import simbricks.experiments as exp import simbricks.experiments as exp
import simbricks.exectools as exectools import simbricks.exectools as exectools
class LocalSimpleRuntime(Runtime): class LocalSimpleRuntime(Runtime):
def __init__(self, verbose=False, exec=exectools.LocalExecutor()): """Execute runs locally in sequence."""
self.runnable = []
self.complete = [] def __init__(
self,
verbose=False,
exec: exectools.Executor = exectools.LocalExecutor()
):
self.runnable: tp.List[Run] = []
self.complete: tp.List[Run] = []
self.verbose = verbose self.verbose = verbose
self.exec = exec self.exec = exec
def add_run(self, run): def add_run(self, run: Run):
self.runnable.append(run) self.runnable.append(run)
async def do_run(self, run): async def do_run(self, run: Run):
runner = exp.ExperimentSimpleRunner(self.exec, run.experiment, run.env, """Actually executes `run`."""
self.verbose) runner = exp.ExperimentSimpleRunner(
self.exec, run.experiment, run.env, self.verbose
)
await run.prep_dirs(self.exec) await run.prep_dirs(self.exec)
await runner.prepare() await runner.prepare()
run.output = await runner.run() run.output = await runner.run()
...@@ -50,22 +60,32 @@ class LocalSimpleRuntime(Runtime): ...@@ -50,22 +60,32 @@ class LocalSimpleRuntime(Runtime):
f.write(run.output.dumps()) f.write(run.output.dumps())
def start(self): def start(self):
"""Execute the runs defined in `self.runnable`."""
for run in self.runnable: for run in self.runnable:
asyncio.run(self.do_run(run)) asyncio.run(self.do_run(run))
class LocalParallelRuntime(Runtime): class LocalParallelRuntime(Runtime):
def __init__(self, cores, mem=None, verbose=False, """Execute runs locally in parallel on multiple cores."""
exec=exectools.LocalExecutor()):
self.runs_noprereq = [] def __init__(
self.runs_prereq = [] self,
cores: int,
mem: tp.Optional[int] = None,
verbose=False,
exec: exectools.Executor = exectools.LocalExecutor()
):
self.runs_noprereq: tp.List[Run] = []
"""Runs with no prerequesite runs."""
self.runs_prereq: tp.List[Run] = []
"""Runs with prerequesite runs."""
self.complete = set() self.complete = set()
self.cores = cores self.cores = cores
self.mem = mem self.mem = mem
self.verbose = verbose self.verbose = verbose
self.exec = exec self.exec = exec
def add_run(self, run): def add_run(self, run: Run):
if run.experiment.resreq_cores() > self.cores: if run.experiment.resreq_cores() > self.cores:
raise Exception('Not enough cores available for run') raise Exception('Not enough cores available for run')
...@@ -77,10 +97,11 @@ class LocalParallelRuntime(Runtime): ...@@ -77,10 +97,11 @@ class LocalParallelRuntime(Runtime):
else: else:
self.runs_prereq.append(run) self.runs_prereq.append(run)
async def do_run(self, run): async def do_run(self, run: Run):
''' actually starts a run ''' """Actually executes `run`."""
runner = exp.ExperimentSimpleRunner(self.exec, run.experiment, run.env, runner = exp.ExperimentSimpleRunner(
self.verbose) self.exec, run.experiment, run.env, self.verbose
)
await run.prep_dirs(exec=self.exec) await run.prep_dirs(exec=self.exec)
await runner.prepare() await runner.prepare()
print('starting run ', run.name()) print('starting run ', run.name())
...@@ -93,11 +114,12 @@ class LocalParallelRuntime(Runtime): ...@@ -93,11 +114,12 @@ class LocalParallelRuntime(Runtime):
return run return run
async def wait_completion(self): async def wait_completion(self):
''' wait for any run to terminate and return ''' """Wait for any run to terminate and return."""
assert self.pending_jobs assert self.pending_jobs
done, self.pending_jobs = await asyncio.wait(self.pending_jobs, done, self.pending_jobs = await asyncio.wait(
return_when=asyncio.FIRST_COMPLETED) self.pending_jobs, return_when=asyncio.FIRST_COMPLETED
)
for run in done: for run in done:
run = await run run = await run
...@@ -105,8 +127,8 @@ class LocalParallelRuntime(Runtime): ...@@ -105,8 +127,8 @@ class LocalParallelRuntime(Runtime):
self.cores_used -= run.experiment.resreq_cores() self.cores_used -= run.experiment.resreq_cores()
self.mem_used -= run.experiment.resreq_mem() self.mem_used -= run.experiment.resreq_mem()
def enough_resources(self, run): def enough_resources(self, run: Run):
''' check if enough cores and mem are available for the run ''' """Check if enough cores and mem are available for the run."""
exp = run.experiment exp = run.experiment
if self.cores is not None: if self.cores is not None:
...@@ -121,13 +143,16 @@ class LocalParallelRuntime(Runtime): ...@@ -121,13 +143,16 @@ class LocalParallelRuntime(Runtime):
return enough_cores and enough_mem return enough_cores and enough_mem
def prereq_ready(self, run): def prereq_ready(self, run: Run):
"""Check if the prerequesite run for `run` has completed."""
if run.prereq is None: if run.prereq is None:
return True return True
return run.prereq in self.complete return run.prereq in self.complete
async def do_start(self): async def do_start(self):
"""Asynchronously execute the runs defined in `self.runs_noprereq +
self.runs_prereq."""
#self.completions = asyncio.Queue() #self.completions = asyncio.Queue()
self.cores_used = 0 self.cores_used = 0
self.mem_used = 0 self.mem_used = 0
...@@ -156,4 +181,5 @@ class LocalParallelRuntime(Runtime): ...@@ -156,4 +181,5 @@ class LocalParallelRuntime(Runtime):
await self.wait_completion() await self.wait_completion()
def start(self): def start(self):
"""Execute all defined runs."""
asyncio.run(self.do_start()) asyncio.run(self.do_start())
...@@ -35,10 +35,10 @@ class SlurmRuntime(Runtime): ...@@ -35,10 +35,10 @@ class SlurmRuntime(Runtime):
self.verbose = verbose self.verbose = verbose
self.cleanup = cleanup self.cleanup = cleanup
def add_run(self, run): def add_run(self, run: Run):
self.runnable.append(run) self.runnable.append(run)
def prep_run(self, run): def prep_run(self, run: Run):
exp = run.experiment exp = run.experiment
e_idx = exp.name + f'-{run.index}' + '.exp' e_idx = exp.name + f'-{run.index}' + '.exp'
exp_path = os.path.join(self.slurmdir, e_idx) exp_path = os.path.join(self.slurmdir, e_idx)
......
...@@ -24,6 +24,7 @@ import math ...@@ -24,6 +24,7 @@ import math
import typing as tp import typing as tp
from simbricks.nodeconfig import NodeConfig from simbricks.nodeconfig import NodeConfig
from simbricks.experiment.experiment_environment import ExpEnv
class Simulator(object): class Simulator(object):
...@@ -32,30 +33,32 @@ class Simulator(object): ...@@ -32,30 +33,32 @@ class Simulator(object):
def __init__(self): def __init__(self):
self.extra_deps = [] self.extra_deps = []
# number of cores required for this simulator
def resreq_cores(self): def resreq_cores(self):
"""Number of cores required for this simulator."""
return 1 return 1
# memory required for this simulator (in MB)
def resreq_mem(self): def resreq_mem(self):
"""Memory required for this simulator (in MB)."""
return 64 return 64
def prep_cmds(self, env): def prep_cmds(self, env: ExpEnv) -> tp.List[str]:
"""Commands to run to prepare simulator."""
return [] return []
def run_cmd(self, env): def run_cmd(self, env: ExpEnv) -> tp.Optional[str]:
"""Command to run to execute simulator."""
return None return None
# Other simulators this one depends on
def dependencies(self): def dependencies(self):
"""Other simulators this one depends on."""
return [] return []
# Sockets to be cleaned up # Sockets to be cleaned up
def sockets_cleanup(self, env): def sockets_cleanup(self, env: ExpEnv):
return [] return []
# sockets to wait for indicating the simulator is ready # sockets to wait for indicating the simulator is ready
def sockets_wait(self, env): def sockets_wait(self, env: ExpEnv):
return [] return []
def start_delay(self): def start_delay(self):
...@@ -169,7 +172,7 @@ class NetSim(Simulator): ...@@ -169,7 +172,7 @@ class NetSim(Simulator):
class HostSim(Simulator): class HostSim(Simulator):
node_config: NodeConfig node_config: NodeConfig
"""Config for this node. """ """Config for the simulated host. """
name = '' name = ''
wait = False wait = False
""" """
...@@ -193,7 +196,7 @@ class HostSim(Simulator): ...@@ -193,7 +196,7 @@ class HostSim(Simulator):
def add_nic(self, dev: NICSim): def add_nic(self, dev: NICSim):
self.add_pcidev(dev) self.add_pcidev(dev)
def add_pcidev(self, dev: NICSim): def add_pcidev(self, dev: PCIDevSim):
dev.name = self.name + '.' + dev.name dev.name = self.name + '.' + dev.name
self.pcidevs.append(dev) self.pcidevs.append(dev)
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
from utils.netperf import * from utils.netperf import *
import sys import sys
import os.path
from time import strftime from time import strftime
from time import gmtime from time import gmtime
...@@ -37,6 +36,7 @@ def fmt_lat(lat): ...@@ -37,6 +36,7 @@ def fmt_lat(lat):
else: else:
return '%d\\,$\\mu$s' % (int(x)) return '%d\\,$\\mu$s' % (int(x))
def fmt_tp(tp): def fmt_tp(tp):
if not tp: if not tp:
return '' return ''
...@@ -47,13 +47,14 @@ def fmt_tp(tp): ...@@ -47,13 +47,14 @@ def fmt_tp(tp):
else: else:
return '%d\\,M' % (int(x)) return '%d\\,M' % (int(x))
hosts = [('qemu','QK'), ('qt','QT'), ('gem5','G5')]
nics = [('i40e','IB'), ('cd_bm','CB'), ('cd_verilator','CV')] hosts = [('qemu', 'QK'), ('qt', 'QT'), ('gem5', 'G5')]
nets = [('switch','SW'), ('ns3','NS')] nics = [('i40e', 'IB'), ('cd_bm', 'CB'), ('cd_verilator', 'CV')]
nets = [('switch', 'SW'), ('ns3', 'NS')]
outdir = sys.argv[1] outdir = sys.argv[1]
for (h,h_l) in hosts: for (h, h_l) in hosts:
for (nic, nic_l) in nics: for (nic, nic_l) in nics:
for (net, net_l) in nets: for (net, net_l) in nets:
path = '%s/netperf-%s-%s-%s-1.json' % (outdir, h, net, nic) path = '%s/netperf-%s-%s-%s-1.json' % (outdir, h, net, nic)
...@@ -66,5 +67,7 @@ for (h,h_l) in hosts: ...@@ -66,5 +67,7 @@ for (h,h_l) in hosts:
tp = fmt_tp(data.get('throughput', '')) tp = fmt_tp(data.get('throughput', ''))
latMean = fmt_lat(data.get('latenyMean', '')) latMean = fmt_lat(data.get('latenyMean', ''))
latTail = fmt_lat(data.get('latenyTail', '')) latTail = fmt_lat(data.get('latenyTail', ''))
print(' %s & %s & %s & %s & %s & %s \\\\' % (h_l, nic_l, net_l, print(
tp, latMean, t)) ' %s & %s & %s & %s & %s & %s \\\\' %
(h_l, nic_l, net_l, tp, latMean, t)
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment