Commit 645e0d2f authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: support for basic distributed experiments

Includes an new DistributedExperiment class, a runner for distributed
experiments, a basic sequential runtime, and an example experiment
"dist_netperf" (although scaling beyond 2 hosts is currently still
broken in the rdma proxy).
parent 168ee22d
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import simbricks.experiments as exp
import simbricks.simulators as sim
import simbricks.proxy as proxy
import simbricks.nodeconfig as node
host_types = ['qemu', 'gem5', 'qt']
nic_types = ['i40e', 'cd_bm', 'cd_verilator']
n_clients = [1, 4, 8, 16, 32]
experiments = []
for host_type in host_types:
for nic_type in nic_types:
for n in n_clients:
e = exp.DistributedExperiment(f'dist_netperf-{host_type}-{nic_type}-{n}', 2)
net = sim.SwitchNet()
e.add_network(net)
# host
if host_type == 'qemu':
host_class = sim.QemuHost
elif host_type == 'qt':
def qemu_timing():
h = sim.QemuHost()
h.sync = True
return h
host_class = qemu_timing
elif host_type == 'gem5':
host_class = sim.Gem5Host
e.checkpoint = True
else:
raise NameError(host_type)
# nic
if nic_type == 'i40e':
nic_class = sim.I40eNIC
nc_class = node.I40eLinuxNode
elif nic_type == 'cd_bm':
nic_class = sim.CorundumBMNIC
nc_class = node.CorundumLinuxNode
elif nic_type == 'cd_verilator':
nic_class = sim.CorundumVerilatorNIC
nc_class = node.CorundumLinuxNode
else:
raise NameError(nic_type)
# create servers and clients
servers = sim.create_basic_hosts(e, 1, 'server', net, nic_class, host_class,
nc_class, node.NetperfServer)
clients = sim.create_basic_hosts(e, n, 'client', net, nic_class, host_class,
nc_class, node.NetperfClient, ip_start = 2)
for c in clients:
c.wait = True
c.node_config.app.server_ip = servers[0].node_config.ip
# create proxy
lp = proxy.RDMANetProxyListener()
lp.name = 'listener'
e.add_proxy(lp)
cp = proxy.RDMANetProxyConnecter(lp)
cp.name = 'connecter'
e.add_proxy(cp)
# assign network and server to first host with listener
e.assign_sim_host(lp, 0)
e.assign_sim_host(net, 0)
e.assign_sim_host(servers[0], 0)
e.assign_sim_host(servers[0].nics[0], 0)
e.assign_sim_host(cp, 1)
# round-robin assignment for hosts
k = 1
for c in clients:
e.assign_sim_host(c, k)
e.assign_sim_host(c.nics[0], k)
if k != 0:
cp.add_nic(c.nics[0])
k = (k + 1) % 2
# add to experiments
experiments.append(e)
...@@ -86,6 +86,10 @@ g_slurm.add_argument('--slurm', dest='runtime', action='store_const', ...@@ -86,6 +86,10 @@ g_slurm.add_argument('--slurm', dest='runtime', action='store_const',
g_slurm.add_argument('--slurmdir', metavar='DIR', type=str, g_slurm.add_argument('--slurmdir', metavar='DIR', type=str,
default='./slurm/', help='Slurm communication directory') default='./slurm/', help='Slurm communication directory')
g_dist = parser.add_argument_group('Distributed Runtime')
g_par.add_argument('--dist', dest='runtime', action='store_const',
const='dist', default='sequential',
help='Use sequential distributed runtime instead of local')
args = parser.parse_args() args = parser.parse_args()
...@@ -124,6 +128,8 @@ if args.runtime == 'parallel': ...@@ -124,6 +128,8 @@ if args.runtime == 'parallel':
verbose=args.verbose, exec=executors[0]) verbose=args.verbose, exec=executors[0])
elif args.runtime == 'slurm': elif args.runtime == 'slurm':
rt = runtime.SlurmRuntime(args.slurmdir, args, verbose=args.verbose) rt = runtime.SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == 'dist':
rt = runtime.DistributedSimpleRuntime(executors, verbose=args.verbose)
else: else:
warn_multi_exec() warn_multi_exec()
rt = runtime.LocalSimpleRuntime(verbose=args.verbose, exec=executors[0]) rt = runtime.LocalSimpleRuntime(verbose=args.verbose, exec=executors[0])
......
...@@ -25,6 +25,7 @@ import asyncio ...@@ -25,6 +25,7 @@ import asyncio
import simbricks.exectools as exectools import simbricks.exectools as exectools
import shlex import shlex
import time import time
import itertools
import json import json
import traceback import traceback
...@@ -59,26 +60,58 @@ class Experiment(object): ...@@ -59,26 +60,58 @@ class Experiment(object):
raise Exception('Duplicate net name') raise Exception('Duplicate net name')
self.networks.append(sim) self.networks.append(sim)
def all_simulators(self):
""" All simulators used in experiment. """
return itertools.chain(self.hosts, self.nics, self.networks)
def resreq_mem(self): def resreq_mem(self):
mem = 0 mem = 0
for h in self.hosts: for s in self.all_simulators():
mem += h.resreq_mem() mem += s.resreq_mem()
for n in self.nics:
mem += n.resreq_mem()
for n in self.networks:
mem += n.resreq_mem()
return mem return mem
def resreq_cores(self): def resreq_cores(self):
cores = 0 cores = 0
for h in self.hosts: for s in self.all_simulators():
cores += h.resreq_cores() cores += s.resreq_cores()
for n in self.nics:
cores += n.resreq_cores()
for n in self.networks:
cores += n.resreq_cores()
return cores return cores
class DistributedExperiment(Experiment):
num_hosts = 1
host_mapping = None
proxies_listen = None
proxies_connect = None
def __init__(self, name, num_hosts):
self.num_hosts = num_hosts
self.host_mapping = {}
self.proxies_listen = []
self.proxies_connect = []
super().__init__(name)
def add_proxy(self, proxy):
if proxy.listen:
self.proxies_listen.append(proxy)
else:
self.proxies_connect.append(proxy)
def all_simulators(self):
return itertools.chain(super().all_simulators(),
self.proxies_listen, self.proxies_connect)
def assign_sim_host(self, sim, host):
""" Assign host ID (< self.num_hosts) for a simulator. """
assert(host >= 0 and host < self.num_hosts)
self.host_mapping[sim] = host
def all_sims_assigned(self):
""" Check if all simulators are assigned to a host. """
for s in self.all_simulators():
if s not in self.host_mapping:
return False
return True
class ExperimentBaseRunner(object): class ExperimentBaseRunner(object):
def __init__(self, exp, env, verbose): def __init__(self, exp, env, verbose):
...@@ -249,6 +282,7 @@ class ExperimentBaseRunner(object): ...@@ -249,6 +282,7 @@ class ExperimentBaseRunner(object):
await self.after_cleanup() await self.after_cleanup()
return self.out return self.out
class ExperimentSimpleRunner(ExperimentBaseRunner): class ExperimentSimpleRunner(ExperimentBaseRunner):
""" Simple experiment runner with just one executor. """ """ Simple experiment runner with just one executor. """
def __init__(self, exec, *args, **kwargs): def __init__(self, exec, *args, **kwargs):
...@@ -258,6 +292,75 @@ class ExperimentSimpleRunner(ExperimentBaseRunner): ...@@ -258,6 +292,75 @@ class ExperimentSimpleRunner(ExperimentBaseRunner):
def sim_executor(self, sim): def sim_executor(self, sim):
return self.exec return self.exec
class ExperimentDistributedRunner(ExperimentBaseRunner):
""" Simple experiment runner with just one executor. """
def __init__(self, execs, *args, **kwargs):
self.execs = execs
super().__init__(*args, **kwargs)
assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim):
h_id = self.exp.host_mapping[sim]
return self.execs[h_id]
async def prepare(self):
# make sure all simulators are assigned to an executor
assert(self.exp.all_sims_assigned())
# set IP addresses for proxies based on assigned executors
for p in itertools.chain(
self.exp.proxies_listen, self.exp.proxies_connect):
exec = self.sim_executor(p)
p.ip = exec.ip
await super().prepare()
async def run_proxies_listeners(self):
""" Start all listening proxies. """
if self.verbose:
print('%s: starting listening proxies' % self.exp.name)
for proxy in self.exp.proxies_listen:
if self.verbose:
print('start listening proxy:', proxy.run_cmd(self.env))
exec = self.sim_executor(proxy)
sc = exec.create_component(proxy.full_name(),
shlex.split(proxy.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((proxy, sc))
await asyncio.sleep(0.5)
async def run_proxies_connecters(self):
""" Start all connecting proxies. """
if self.verbose:
print('%s: starting connecting proxies' % self.exp.name)
for proxy in self.exp.proxies_connect:
if self.verbose:
print('start connecting proxy:', proxy.run_cmd(self.env))
exec = self.sim_executor(proxy)
sc = exec.create_component(proxy.full_name(),
shlex.split(proxy.run_cmd(self.env)), verbose=self.verbose,
canfail=True)
await sc.start()
self.running.append((proxy, sc))
await asyncio.sleep(0.5)
async def wait_proxy_sockets(self):
# TODO
pass
async def before_nets(self):
await self.run_proxies_listeners()
await self.run_proxies_connecters()
await self.wait_proxy_sockets()
await super().before_nets()
class ExpEnv(object): class ExpEnv(object):
def __init__(self, repo_path, workdir, cpdir): def __init__(self, repo_path, workdir, cpdir):
self.repodir = os.path.abspath(repo_path) self.repodir = os.path.abspath(repo_path)
...@@ -291,6 +394,9 @@ class ExpEnv(object): ...@@ -291,6 +394,9 @@ class ExpEnv(object):
def nic_shm_path(self, sim): def nic_shm_path(self, sim):
return '%s/nic.shm.%s' % (self.workdir, sim.name) return '%s/nic.shm.%s' % (self.workdir, sim.name)
def proxy_shm_path(self, sim):
return '%s/proxy.shm.%s' % (self.workdir, sim.name)
def gem5_outdir(self, sim): def gem5_outdir(self, sim):
return '%s/gem5-out.%s' % (self.workdir, sim.name) return '%s/gem5-out.%s' % (self.workdir, sim.name)
......
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.simulators import Simulator
class SimProxy(Simulator):
name = ''
# set by the experiment runner
ip = ''
listen = False
def full_name(self):
return 'proxy.' + self.name
class NetProxy(SimProxy):
""" Proxy for connections between NICs and networks. """
# List of tuples (nic, with_listener)
nics = None
class NetProxyListener(NetProxy):
def __init__(self):
self.listen = True
self.nics = []
super().__init__()
def add_nic(self, nic):
self.nics.append((nic, True))
class NetProxyConnecter(NetProxy):
listener = None
def __init__(self, listener):
self.listener = listener
self.nics = listener.nics
super().__init__()
def add_nic(self, nic):
self.nics.append((nic, False))
class RDMANetProxyListener(NetProxyListener):
port = 12345
def __init__(self):
self.listen = True
super().__init__()
def run_cmd(self, env):
cmd = (f'{env.repodir}/dist/net_rdma -l '
f'-s {env.proxy_shm_path(self)} ')
for (nic, local) in self.nics:
cmd += '-d ' if local else '-n '
cmd += env.nic_eth_path(nic) + ' '
cmd += f' 0.0.0.0 {self.port}'
return cmd
class RDMANetProxyConnecter(NetProxyConnecter):
def __init__(self, listener):
super().__init__(listener)
def run_cmd(self, env):
cmd = (f'{env.repodir}/dist/net_rdma '
f'-s {env.proxy_shm_path(self)} ')
for (nic, local) in self.nics:
cmd += '-n ' if local else '-d '
cmd += env.nic_eth_path(nic) + ' '
cmd += f' {self.listener.ip} {self.listener.port}'
return cmd
\ No newline at end of file
...@@ -23,3 +23,4 @@ ...@@ -23,3 +23,4 @@
from simbricks.runtime.common import (Run, Runtime) from simbricks.runtime.common import (Run, Runtime)
from simbricks.runtime.local import (LocalSimpleRuntime, LocalParallelRuntime) from simbricks.runtime.local import (LocalSimpleRuntime, LocalParallelRuntime)
from simbricks.runtime.slurm import SlurmRuntime from simbricks.runtime.slurm import SlurmRuntime
from simbricks.runtime.distributed import DistributedSimpleRuntime
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import pathlib
from simbricks.runtime.common import *
import simbricks.experiments as exp
import simbricks.exectools as exectools
class DistributedSimpleRuntime(Runtime):
def __init__(self, execs, verbose=False):
self.runnable = []
self.complete = []
self.verbose = verbose
self.execs = execs
def add_run(self, run):
self.runnable.append(run)
async def do_run(self, run):
runner = exp.ExperimentDistributedRunner(self.execs, run.experiment,
run.env, self.verbose)
for exec in self.execs:
await run.prep_dirs(exec)
await runner.prepare()
run.output = await runner.run()
self.complete.append(run)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w') as f:
f.write(run.output.dumps())
def start(self):
for run in self.runnable:
asyncio.run(self.do_run(run))
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment