Commit 423b30f1 authored by Jonas Kaufmann's avatar Jonas Kaufmann
Browse files

symphony: prototype of user-facing interface for distributed simulations

pyexps/minimal_net_dist.py contains defines an experiment using the new interface
parent 3327b01b
from simbricks.orchestration import system
from simbricks.orchestration import simulation as sim
from simbricks.orchestration.helpers import simulation as sim_helpers
from simbricks.orchestration.helpers import instantiation as inst_helpers
from simbricks.orchestration.instantiation import proxy
from simbricks.orchestration.instantiation import fragment
from simbricks.orchestration.instantiation import socket
sys = system.System()
# create a host instance and a NIC instance then install the NIC on the host
# host0 = system.CorundumLinuxHost(sys)
host0 = system.I40ELinuxHost(sys)
host0.add_disk(system.DistroDiskImage(h=host0, name="base"))
host0.add_disk(system.LinuxConfigDiskImage(h=host0))
nic0 = system.IntelI40eNIC(sys)
nic0.add_ipv4("10.0.0.1")
host0.connect_pcie_dev(nic0)
# create a host instance and a NIC instance then install the NIC on the host
host1 = system.I40ELinuxHost(sys)
host1.add_disk(system.DistroDiskImage(h=host1, name="base"))
host1.add_disk(system.LinuxConfigDiskImage(h=host1))
nic1 = system.IntelI40eNIC(sys)
nic1.add_ipv4("10.0.0.2")
host1.connect_pcie_dev(nic1)
switch0 = system.EthSwitch(sys)
switch0.connect_eth_peer_if(nic0._eth_if)
switch0_nic1_if = system.EthInterface(switch0)
switch0.add_if(switch0_nic1_if)
switch0_nic1_channel = system.EthChannel(switch0_nic1_if, nic1._eth_if)
# configure the software to run on the host
ping_client_app = system.PingClient(host0, nic1._ip)
ping_client_app.wait = True
host0.add_app(ping_client_app)
host1.add_app(system.Sleep(host1, infinite=True))
simulation = sim_helpers.simple_simulation(
sys,
compmap={
system.FullSystemHost: sim.QemuSim,
system.IntelI40eNIC: sim.I40eNicSim,
system.EthSwitch: sim.SwitchNet,
},
)
instantiation = inst_helpers.simple_instantiation(simulation)
instantiations = [instantiation]
# Create fragments and assign simulators
fragment0 = fragment.Fragment()
fragment0_sims = {simulation.find_sim(comp) for comp in [host0, nic0, switch0]}
fragment0.add_simulators(fragment0_sims)
fragment1 = fragment.Fragment()
fragment1_sims = {simulation.find_sim(comp) for comp in [host1, nic1]}
fragment1.add_simulators(fragment1_sims)
# Create proxies
proxy0 = proxy.TCPProxy()
proxy0.add_interfaces(switch0_nic1_if)
proxy0.connection_mode = socket.SockType.LISTEN
fragment0.add_proxies(proxy0)
proxy1 = proxy.TCPProxy()
proxy1.add_interfaces(nic1._eth_if)
proxy1.connection_mode = socket.SockType.CONNECT
fragment1.add_proxies(proxy1)
instantiation.simulation_fragments = [fragment0, fragment1]
# Define runners
runner0_label = "runner0"
runner1_label = "runner1"
# Map simulation fragments to runners
instantiation.fragment_runner_map = {
fragment0: runner0_label,
fragment1: runner1_label,
}
......@@ -35,6 +35,7 @@ from simbricks.orchestration.system import mem as sys_mem
from simbricks.orchestration.system import eth as sys_eth
from simbricks.orchestration.system.host import disk_images
from simbricks.orchestration.instantiation import socket as inst_socket
from simbricks.orchestration.instantiation import fragment as inst_fragment
if typing.TYPE_CHECKING:
from simbricks.orchestration.simulation import base as sim_base
......@@ -72,6 +73,13 @@ class Instantiation():
):
self._id = next(self.__id_iter)
self.simulation: sim_base.Simulation = sim
self.simulation_fragments: set[inst_fragment.Fragment] = set()
self.fragment_runner_map: dict[inst_fragment.Fragment, str] = dict()
"""Map simulation fragment to runner label."""
self.runner_label: str | None = None
"""Label of runner we are executing on. Set by runner when fetching
run."""
self._simulation_fragment: inst_fragment.Fragment | None = None
self.env: InstantiationEnvironment | None = None
self.artifact_name: str = f"simbricks-artifact-{str(uuid.uuid4())}.zip"
self.artifact_paths: list[str] = []
......@@ -97,7 +105,7 @@ class Instantiation():
if self._executor is None:
raise Exception("you must set an executor")
return self._executor
@property
def create_artifact(self) -> bool:
return len(self.artifact_paths) > 0
......@@ -326,6 +334,27 @@ class Instantiation():
)
self._restore_checkpoint = restore_checkpoint
@property
def fragment(self) -> inst_fragment.Fragment:
if self._simulation_fragment is not None:
return self._simulation_fragment
if self.runner_label is None or not self.simulation_fragments:
# Experiment does not define any simulation fragments, so
# implicitly, we create one fragment that spans the whole simulation
self._simulation_fragment = inst_fragment.Fragment()
self._simulation_fragment.add_simulators(
self.simulation.all_simulators()
)
else:
fragments = [
fragment
for fragment, runner_label in self.fragment_runner_map.items()
if runner_label == self.runner_label
]
self._simulation_fragment = inst_fragment.Fragment.merged(fragments)
return self._simulation_fragment
# TODO: this needs fixing...
def copy(self) -> Instantiation:
cop = Instantiation(sim=self.simulation)
......
# Copyright 2024 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.utils import base as util_base
from simbricks.orchestration.instantiation import proxy
import typing as tp
if tp.TYPE_CHECKING:
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.system import base as sys_base
class Fragment(util_base.IdObj):
def __init__(self):
super().__init__()
self._proxies: set[proxy.Proxy]
self._simulators: set[sim_base.Simulator]
@staticmethod
def merged(*fragments: "Fragment"):
merged_fragment = Fragment()
proxies = set()
simulators = set()
for fragment in fragments:
proxies.update(fragment.all_proxies())
simulators.update(fragment.all_simulators())
merged_fragment._proxies = proxies
merged_fragment._simulators = simulators
def add_simulators(self, *sims: sim_base.Simulator):
self._simulators.update(sims)
def add_proxies(self, *proxies: proxy.Proxy):
self._proxies.update(proxies)
def all_simulators(self) -> set[sim_base.Simulator]:
return self._simulators
def all_proxies(self) -> set[proxy.Proxy]:
return self._proxies
def find_proxy_by_interface(self, interface: sys_base.Interface) -> proxy.Proxy | None:
for proxy in self._proxies:
if interface in proxy.interfaces:
return proxy
return None
def get_proxy_by_interface(self, interface: sys_base.Interface) -> proxy.Proxy:
"""Same as `find_proxy_by_interface()` but raises an Error if interface
is assigned to any proxy in this fragment."""
proxy = self.find_proxy_by_interface(interface)
if proxy is None:
raise RuntimeError("Interface not assigned to any proxies in this fragment.")
return proxy
def interface_handled_by_proxy(self, interface: sys_base.Interface) -> bool:
return self.find_proxy_by_interface(interface) is not None
# Copyright 2022 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import typing
import simbricks.utils.base as util_base
from simbricks.orchestration.instantiation import socket as inst_socket
import abc
import typing
if typing.TYPE_CHECKING:
from simbricks.orchestration.instantiation import base as inst_base
import simbricks.orchestration.system.base as sys_base
class Proxy(util_base.IdObj, abc.ABC):
def __init__(self):
super().__init__()
self._interfaces: list[sys_base.Interface]
"""
The interfaces this proxy provides.
Order is important here because proxies forward messages for SimBricks
sockets in the order these sockets are passed on the command-line. So
for two connecting proxies executing on separate runners, this order
must be the same.
"""
self.connection_mode: inst_socket.SockType = inst_socket.SockType.CONNECT
@property
def name(self) -> str:
return f"proxy_{self.id()}"
@abc.abstractmethod
def run_cmd() -> str:
pass
def sockets_wait(self, inst: inst_base.Instantiation) -> set[inst_socket.Socket]:
wait_sockets = []
for iface in self._interfaces:
socket = inst.update_get_socket(iface)
if socket.type == inst_socket.SockType.LISTEN:
wait_sockets.append(socket)
return wait_sockets
def supported_socket_types(
interface: sys_base.Interface,
) -> set[inst_socket.SockType]:
return {inst_socket.SockType.CONNECT, inst_socket.SockType.LISTEN}
class DummyProxy(Proxy):
def __init__(self):
super().__init__()
class TCPProxy(Proxy):
def __init__(self):
super().__init__()
self.ip: str
self.port: int
class RDMAProxy(Proxy):
def __init__(self):
super().__init__()
self.ip: str
self.port: int
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import typing as tp
from simbricks.orchestration.simulators import NICSim, Simulator
if tp.TYPE_CHECKING:
from simbricks.orchestration.experiment import experiment_environment
class SimProxy(Simulator):
def __init__(self) -> None:
super().__init__()
self.name = ''
# set by the experiment runner
self.ip = ''
self.listen = False
def full_name(self) -> str:
return 'proxy.' + self.name
class NetProxy(SimProxy):
"""Proxy for connections between NICs and networks."""
def __init__(self) -> None:
super().__init__()
self.nics: tp.List[tp.Tuple[NICSim, bool]] = []
"""List of tuples (nic, with_listener)"""
self.n2ns: tp.List[tp.Tuple[tp.Tuple[Simulator, Simulator], bool]] = []
"""List of tuples ((netL,netC), with_listener)"""
self.shm_size = 2048
"""Shared memory size in GB."""
def start_delay(self) -> int:
return 10
class NetProxyListener(NetProxy):
def __init__(self) -> None:
super().__init__()
self.port = 12345
self.connecter: NetProxyConnecter
self.listen = True
def add_nic(self, nic: NICSim) -> None:
self.nics.append((nic, True))
# the network this nic connects to now also depends on the peer
nic.network.extra_deps.append(self.connecter)
# add net2net connection with listening network on the listener side
def add_n2n(self, net_c: Simulator, net_l: Simulator) -> None:
self.n2ns.append(((net_c, net_l), True))
# the connecting network depends on our peer
net_c.extra_deps.append(self.connecter)
def dependencies(self) -> tp.List[Simulator]:
deps = []
for (nic, local) in self.nics:
if local:
deps.append(nic)
for ((_, net_l), local) in self.n2ns:
if local:
deps.append(net_l)
return deps
def sockets_cleanup(self,
env: 'experiment_environment.ExpEnv') -> tp.List[str]:
socks = []
for (nic, local) in self.nics:
if not local:
socks.append(env.nic_eth_path(nic))
for ((net_c, net_l), local) in self.n2ns:
if not local:
socks.append(env.n2n_eth_path(net_l, net_c))
return []
# sockets to wait for indicating the simulator is ready
def sockets_wait(self,
env: 'experiment_environment.ExpEnv') -> tp.List[str]:
socks = []
for (nic, local) in self.nics:
if not local:
socks.append(env.nic_eth_path(nic))
for ((net_c, net_l), local) in self.n2ns:
if not local:
socks.append(env.n2n_eth_path(net_l, net_c))
return socks
def run_cmd_base(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = (f'-s {env.proxy_shm_path(self)} '
f'-S {self.shm_size} ')
for (nic, local) in self.nics:
cmd += '-C ' if local else '-L '
cmd += env.nic_eth_path(nic) + ' '
for ((net_c, net_l), local) in self.n2ns:
cmd += '-C ' if local else '-L '
cmd += env.n2n_eth_path(net_l, net_c) + ' '
cmd += f' 0.0.0.0 {self.port}'
return cmd
class NetProxyConnecter(NetProxy):
def __init__(self, listener: NetProxyListener) -> None:
super().__init__()
self.listener = listener
listener.connecter = self
self.nics = listener.nics
self.n2ns = listener.n2ns
def add_nic(self, nic: NICSim) -> None:
self.nics.append((nic, False))
# the network this nic connects to now also depends on the proxy
nic.network.extra_deps.append(self.listener)
# add net2net connection with listening network on the connection side
def add_n2n(self, net_c: Simulator, net_l: Simulator) -> None:
self.n2ns.append(((net_c, net_l), False))
# the connecting network depends on our peer
net_c.extra_deps.append(self.listener)
def dependencies(self) -> tp.List[Simulator]:
deps = [self.listener]
for (nic, local) in self.nics:
if not local:
deps.append(nic)
for ((_, net_l), local) in self.n2ns:
if not local:
deps.append(net_l)
return deps
def sockets_cleanup(self,
env: 'experiment_environment.ExpEnv') -> tp.List[str]:
socks = []
for (nic, local) in self.nics:
if local:
socks.append(env.nic_eth_path(nic))
for ((net_c, net_l), local) in self.n2ns:
if local:
socks.append(env.n2n_eth_path(net_l, net_c))
return []
# sockets to wait for indicating the simulator is ready
def sockets_wait(self,
env: 'experiment_environment.ExpEnv') -> tp.List[str]:
socks = []
for (nic, local) in self.nics:
if local:
socks.append(env.nic_eth_path(nic))
for ((net_c, net_l), local) in self.n2ns:
if local:
socks.append(env.n2n_eth_path(net_l, net_c))
return socks
def run_cmd_base(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = (f'-s {env.proxy_shm_path(self)} '
f'-S {self.shm_size} ')
for (nic, local) in self.nics:
cmd += '-L ' if local else '-C '
cmd += env.nic_eth_path(nic) + ' '
for ((net_c, net_l), local) in self.n2ns:
cmd += '-L ' if local else '-C '
cmd += env.n2n_eth_path(net_l, net_c) + ' '
cmd += f' {self.listener.ip} {self.listener.port}'
return cmd
class RDMANetProxyListener(NetProxyListener):
def run_cmd(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = f'{env.repodir}/dist/rdma/net_rdma -l '
cmd += super().run_cmd_base(env)
return cmd
class RDMANetProxyConnecter(NetProxyConnecter):
def run_cmd(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = f'{env.repodir}/dist/rdma/net_rdma '
cmd += super().run_cmd_base(env)
return cmd
class SocketsNetProxyListener(NetProxyListener):
def run_cmd(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = f'{env.repodir}/dist/sockets/net_sockets -l '
cmd += super().run_cmd_base(env)
return cmd
class SocketsNetProxyConnecter(NetProxyConnecter):
def run_cmd(self, env: 'experiment_environment.ExpEnv') -> str:
cmd = f'{env.repodir}/dist/sockets/net_sockets '
cmd += super().run_cmd_base(env)
return cmd
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment