Unverified Commit 99cf2cca authored by Jakob Görgen's avatar Jakob Görgen
Browse files

supported socket types method + new socket_wait and socket_cleanup impl

parent 23629578
......@@ -22,6 +22,7 @@
from __future__ import annotations
import asyncio
import enum
import pathlib
import shutil
......@@ -150,7 +151,11 @@ class Instantiation(util_base.IdObj):
new_socket = Socket(path=new_path, ty=new_ty)
return new_socket
def get_socket(self, interface: sys_base.Interface) -> Socket:
def get_socket(
self,
interface: sys_base.Interface,
supported_sock_types: set[SockType] = {SockType.LISTEN, SockType.CONNECT},
) -> Socket:
# check if already a socket is associated with this interface
socket = self._get_socket_by_interface(interface=interface)
if socket is not None:
......@@ -165,12 +170,44 @@ class Instantiation(util_base.IdObj):
# neither connecting nor listening side already created a socket, thus we
# create a completely new 'CONNECT' socket
sock_type = SockType.CONNECT
if len(supported_sock_types) > 1 or (
SockType.LISTEN not in supported_sock_types
and SockType.CONNECT not in supported_sock_types
):
raise Exception("cannot create a socket if no socket type is supported")
sock_type = (
SockType.CONNECT
if SockType.CONNECT in supported_sock_types
else SockType.LISTEN
)
sock_path = self._interface_to_sock_path(interface=interface)
new_socket = Socket(path=sock_path, ty=sock_type)
self._updated_tracker_mapping(interface=interface, socket=new_socket)
return new_socket
async def cleanup_sockets(
self,
sockets: list[tuple[command_executor.Executor, Socket]] = [],
) -> None:
# DISCLAIMER: that we pass the executor in here is an artifact of the
# sub-optimal distributed executions as we may need a remote executor to
# remove or create folders on other machines. In an ideal wolrd, we have
# some sort of runtime on each machine that executes thus making pasing
# an executor in here obsolete...
scs = []
for executor, sock in sockets:
scs.append(asyncio.create_task(executor.rmtree(path=sock._path)))
if len(scs) > 0:
await asyncio.gather(*scs)
async def wait_for_sockets(
self,
executor: command_executor.Executor = command_executor.LocalExecutor(),
sockets: list[Socket] = [],
) -> None:
wait_socks = map(lambda sock: sock._path, sockets)
await executor.await_files(wait_socks, verbose=self.verbose)
# TODO: add more methods constructing paths as required by methods in simulators or image handling classes
def wrkdir(self) -> str:
......
......@@ -55,7 +55,7 @@ class ExperimentBaseRunner(abc.ABC):
self._profile_int: int | None = None
self._out = sim_base.SimulationOutput(self._simulation)
self._running: list[tuple[sim_base.Simulator, simulation_executor.SimpleComponent]] = []
self._sockets: list[tuple[simulation_executor.Executor, str]] = []
self._sockets: list[tuple[simulation_executor.Executor, inst_base.Socket]] = []
self._wait_sims: list[simulation_executor.Component] = []
@abc.abstractmethod
......@@ -95,27 +95,28 @@ class ExperimentBaseRunner(abc.ABC):
self._running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env): # TODO: FIXME
self._sockets.append((executor, s))
for sock in sim.sockets_cleanup(inst=self._instantiation):
self._sockets.append((executor, sock))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env) # TODO: FIXME
if wait_socks:
wait_socks = sim.sockets_wait(inst=self._instantiation)
if len(wait_socks) > 0:
if self._verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
print(f'{self._simulation.name}: waiting for sockets {name}')
await self._instantiation.wait_for_sockets(executor=executor, sockets=wait_socks)
if self._verbose:
print(f'{self._simulation.name}: waited successfully for sockets {name}')
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate(self.env):
self.wait_sims.append(sc)
if sim.wait_terminate():
self._wait_sims.append(sc)
if self.verbose:
print(f'{self.exp.name}: started {name}')
print(f'{self._simulation.name}: started {name}')
async def before_wait(self) -> None:
pass
......@@ -181,11 +182,7 @@ class ExperimentBaseRunner(abc.ABC):
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self._sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.gather(*scs)
await self._instantiation.cleanup_sockets(sockets=self._sockets)
# add all simulator components to the output
for sim, sc in self._running:
......
......@@ -46,14 +46,17 @@ class Simulator(utils_base.IdObj):
"""Base class for all simulators."""
def __init__(
self, simulation: sim_base.Simulation, relative_executable_path: str = ""
self,
simulation: sim_base.Simulation,
name: str = "",
elative_executable_path: str = "",
) -> None:
super().__init__()
self.name: str = name
self._relative_executable_path: str = relative_executable_path
self.extra_deps: list[Simulator] = []
self.name: str = ""
self.experiment: sim_base.Simulation = simulation
self._components: set[sys_conf.Component] = set()
self._relative_executable_path: str = relative_executable_path
@staticmethod
def filter_sockets(
......@@ -140,7 +143,9 @@ class Simulator(utils_base.IdObj):
if not self._chan_needs_instance(chan):
return None
# create the socket to listen on or connect to
socket = inst.get_socket(interface=interface)
socket = inst.get_socket(
interface=interface, supported_sock_types=self.supported_socket_types()
)
return socket
def _get_sockets(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
......@@ -151,7 +156,13 @@ class Simulator(utils_base.IdObj):
if socket is None:
continue
sockets.append(socket)
return sockets
def _get_all_sockets_by_type(
self, inst: inst_base.Instantiation, sock_type: inst_base.SockType
) -> list[inst_base.Socket]:
sockets = self._get_sockets(inst=inst)
sockets = Simulator.filter_sockets(sockets=sockets, filter_type=sock_type)
return sockets
def _get_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel | None:
......@@ -180,24 +191,24 @@ class Simulator(utils_base.IdObj):
"""Other simulators to execute before this one."""
return []
# TODO: overwrite in sub-classes to reflect that currently not all adapters support both listening and connecting
# In future version adapters should support both which would render this method obsolete
def supported_socket_types(self) -> set[inst_base.SockType]:
return {inst_base.SockType.LISTEN, inst_base.SockType.CONNECT}
# Sockets to be cleaned up: always the CONNECTING sockets
# pylint: disable=unused-argument
# TODO: FIXME
def sockets_cleanup(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
sockets = []
for comp_spec in self._components:
for interface in comp_spec.interfaces():
socket = inst.get_socket(interface=interface)
if socket._type == inst_base.SockType.CONNECT:
sockets.append(socket)
return sockets
return self._get_all_sockets_by_type(
inst=inst, sock_type=inst_base.SockType.LISTEN
)
# sockets to wait for indicating the simulator is ready
# pylint: disable=unused-argument
# TODO: FIXME
def sockets_wait(self, env: exp_env.ExpEnv) -> list[str]:
return []
def sockets_wait(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
return self._get_all_sockets_by_type(
inst=inst, sock_type=inst_base.SockType.LISTEN
)
def start_delay(self) -> int:
return 5
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment