Commit 588595ae authored by Jonas Kaufmann's avatar Jonas Kaufmann
Browse files

symphony/runtime: add simulation executor callbacks, rework simulation...

symphony/runtime: add simulation executor callbacks, rework simulation executor, command executor and output handling
parent b86ff3a4
...@@ -65,7 +65,7 @@ class DiskImage(utils_base.IdObj): ...@@ -65,7 +65,7 @@ class DiskImage(utils_base.IdObj):
f"{copy_path}" f"{copy_path}"
) )
] ]
await inst._cmd_executor.simulator_prepare_run_cmds(sim, prep_cmds) await inst._cmd_executor.exec_simulator_prepare_cmds(sim, prep_cmds)
return copy_path return copy_path
@staticmethod @staticmethod
...@@ -199,7 +199,9 @@ class LinuxConfigDiskImage(DynamicDiskImage): ...@@ -199,7 +199,9 @@ class LinuxConfigDiskImage(DynamicDiskImage):
def available_formats(self) -> list[str]: def available_formats(self) -> list[str]:
return ["raw"] return ["raw"]
async def make_qcow_copy(self, inst: inst_base.Instantiation, format: str) -> str: async def make_qcow_copy(
self, inst: inst_base.Instantiation, format: str, sim: sim_base.Simulator
) -> str:
return self.path(inst=inst, format=format) return self.path(inst=inst, format=format)
async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None: async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
......
...@@ -22,89 +22,41 @@ ...@@ -22,89 +22,41 @@
from __future__ import annotations from __future__ import annotations
import abc
import asyncio import asyncio
import shlex
import signal import signal
import typing import typing
import shlex
import collections
from asyncio.subprocess import Process from asyncio.subprocess import Process
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from simbricks.orchestration.instantiation import proxy as inst_proxy
from simbricks.orchestration.simulation import base as sim_base from simbricks.orchestration.simulation import base as sim_base
from simbricks.runtime import simulation_executor as sim_exec
class OutputListener:
def __init__(self):
self.cmd_parts: list[str] = []
@abc.abstractmethod
async def handel_err(self, lines: list[str]) -> None:
pass
@abc.abstractmethod
async def handel_out(self, lines: list[str]) -> None:
pass
def toJSON(self) -> dict:
return {
"cmd": self.cmd_parts,
}
class LegacyOutputListener(OutputListener):
def __init__(self):
super().__init__()
self.stdout: list[str] = []
self.stderr: list[str] = []
self.merged_output: list[str] = []
def _add_to_lists(self, extend: list[str], to_add_to: list[str]) -> None:
if isinstance(extend, list):
to_add_to.extend(extend)
self.merged_output.extend(extend)
else:
raise Exception("ComponentOutputHandle: can only add str or list[str] to outputs")
async def handel_out(self, lines: list[str]) -> None:
self._add_to_lists(extend=lines, to_add_to=self.stdout)
async def handel_err(self, lines: list[str]) -> None:
self._add_to_lists(extend=lines, to_add_to=self.stderr)
def toJSON(self) -> dict:
json_obj = super().toJSON()
json_obj.update(
{
"stdout": self.stdout,
"stderr": self.stderr,
"merged_output": self.merged_output,
}
)
return json_obj
class CommandExecutor: class CommandExecutor:
def __init__(self, cmd_parts: list[str], label: str, canfail=False): def __init__(
self.is_ready = False self,
self.stdout_buf = bytearray() cmd: str,
self.stderr_buf = bytearray() label: str,
self.cmd_parts: list[str] = cmd_parts started_callback: typing.Callable[[], typing.Awaitable[None]],
self._output_handler: list[OutputListener] = [] exited_callback: typing.Callable[[int], typing.Awaitable[None]],
self.label = label stdout_callback: typing.Callable[[list[str]], typing.Awaitable[None]],
self.canfail = canfail stderr_callback: typing.Callable[[list[str]], typing.Awaitable[None]],
self.cmd_parts = cmd_parts ):
self._stdout_buf = bytearray()
self._stderr_buf = bytearray()
self._cmd_parts = shlex.split(cmd)
self._label = label
self._started_cb = started_callback
self._exited_cb = exited_callback
self._stdout_cb = stdout_callback
self._stderr_cb = stderr_callback
self._proc: Process self._proc: Process
self._terminate_future: asyncio.Task self._terminate_future: asyncio.Task
def subscribe(self, listener: OutputListener) -> None:
listener.cmd_parts = self.cmd_parts
self._output_handler.append(listener)
def _parse_buf(self, buf: bytearray, data: bytes) -> list[str]: def _parse_buf(self, buf: bytearray, data: bytes) -> list[str]:
if data is not None: if data is not None:
buf.extend(data) buf.extend(data)
...@@ -121,41 +73,39 @@ class CommandExecutor: ...@@ -121,41 +73,39 @@ class CommandExecutor:
lines.append(buf.decode("utf-8")) lines.append(buf.decode("utf-8"))
return lines return lines
async def _consume_out(self, data: bytes) -> None: async def _consume_stdout(self, data: bytes) -> None:
eof = len(data) == 0 eof = len(data) == 0
ls = self._parse_buf(self.stdout_buf, data) ls = self._parse_buf(self._stdout_buf, data)
if len(ls) > 0 or eof: if len(ls) > 0 or eof:
await self.process_out(ls, eof=eof) await self._stdout_cb(ls)
for h in self._output_handler:
await h.handel_out(ls)
async def _consume_err(self, data: bytes) -> None: async def _consume_stderr(self, data: bytes) -> None:
eof = len(data) == 0 eof = len(data) == 0
ls = self._parse_buf(self.stderr_buf, data) ls = self._parse_buf(self._stderr_buf, data)
if len(ls) > 0 or eof: if len(ls) > 0 or eof:
await self.process_err(ls, eof=eof) await self._stderr_cb(ls)
for h in self._output_handler:
await h.handel_err(ls)
async def _read_stream(self, stream: asyncio.StreamReader, fn): async def _consume_stream_loop(
self, stream: asyncio.StreamReader, consume_fn: typing.Callable[[bytes], None]
) -> None:
while True: while True:
bs = await stream.readline() bs = await stream.readline()
if bs: if bs:
await fn(bs) await consume_fn(bs)
else: else:
await fn(bs) await consume_fn(bs)
return return
async def _waiter(self) -> None: async def _waiter(self) -> None:
stdout_handler = asyncio.create_task( stdout_handler = asyncio.create_task(
self._read_stream(self._proc.stdout, self._consume_out) self._consume_stream_loop(self._proc.stdout, self._consume_stdout)
) )
stderr_handler = asyncio.create_task( stderr_handler = asyncio.create_task(
self._read_stream(self._proc.stderr, self._consume_err) self._consume_stream_loop(self._proc.stderr, self._consume_stderr)
) )
rc = await self._proc.wait() rc = await self._proc.wait()
await asyncio.gather(stdout_handler, stderr_handler) await asyncio.gather(stdout_handler, stderr_handler)
await self.terminated(rc) await self._exited_cb(rc)
async def send_input(self, bs: bytes, eof=False) -> None: async def send_input(self, bs: bytes, eof=False) -> None:
self._proc.stdin.write(bs) self._proc.stdin.write(bs)
...@@ -164,11 +114,12 @@ class CommandExecutor: ...@@ -164,11 +114,12 @@ class CommandExecutor:
async def start(self) -> None: async def start(self) -> None:
self._proc = await asyncio.create_subprocess_exec( self._proc = await asyncio.create_subprocess_exec(
*self.cmd_parts, *self._cmd_parts,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
) )
await self._started_cb()
self._terminate_future = asyncio.create_task(self._waiter()) self._terminate_future = asyncio.create_task(self._waiter())
async def wait(self) -> None: async def wait(self) -> None:
...@@ -205,7 +156,7 @@ class CommandExecutor: ...@@ -205,7 +156,7 @@ class CommandExecutor:
# before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_- # before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_-
except (TimeoutError, asyncio.TimeoutError): except (TimeoutError, asyncio.TimeoutError):
print( print(
f"terminating component {self.cmd_parts[0]} pid" f" {self._proc.pid}", f"terminating component {self._cmd_parts[0]} pid" f" {self._proc.pid}",
flush=True, flush=True,
) )
await self.terminate() await self.terminate()
...@@ -215,51 +166,84 @@ class CommandExecutor: ...@@ -215,51 +166,84 @@ class CommandExecutor:
return return
except (TimeoutError, asyncio.TimeoutError): except (TimeoutError, asyncio.TimeoutError):
print( print(
f"killing component {self.cmd_parts[0]} pid {self._proc.pid}", f"killing component {self._cmd_parts[0]} pid {self._proc.pid}",
flush=True, flush=True,
) )
await self.kill() await self.kill()
await self._proc.wait() await self._proc.wait()
async def sigusr1(self) -> None: async def sigusr1(self) -> None:
"""Sends an interrupt signal.""" """Sends an SIGUSR1 signal."""
if self._proc.returncode is None: if self._proc.returncode is None:
self._proc.send_signal(signal.SIGUSR1) self._proc.send_signal(signal.SIGUSR1)
async def process_out(self, lines: list[str], eof: bool) -> None:
# TODO
pass
async def process_err(self, lines: list[str], eof: bool) -> None: class CommandExecutorFactory:
# TODO
pass
async def terminated(self, rc: int) -> None: def __init__(self, sim_exec_cbs: sim_exec.SimulationExecutorCallbacks):
# TODO self._sim_exec_cbs = sim_exec_cbs
if not self.canfail and rc != 0:
raise RuntimeError("Command Failed: " + str(self.cmd_parts))
async def exec_generic_prepare_cmds(self, cmds: list[str]) -> None:
for cmd in cmds:
class CommandExecutorFactory: async def started_cb() -> None:
await self._sim_exec_cbs.simulation_prepare_cmd_start(cmd)
def __init__(self): async def exited_cb(exit_code: int) -> None:
self._sim_executors: collections.defaultdict[sim_base.Simulator, set[CommandExecutor]] = ( if exit_code != 0:
collections.defaultdict(set) raise RuntimeError(f"The following prepare command failed: {cmd}")
)
self._generic_prepare_executors: set[CommandExecutor] = set()
async def generic_prepare_run_cmds(self, cmds: list[str]) -> None: async def stdout_cb(lines: list[str]) -> None:
for cmd in cmds: await self._sim_exec_cbs.simulation_prepare_cmd_stdout(cmd, lines)
executor = CommandExecutor(shlex.split(cmd), "prepare")
self._generic_prepare_executors.add(executor) async def stderr_cb(lines: list[str]) -> None:
await self._sim_exec_cbs.simulation_prepare_cmd_stderr(cmd, lines)
executor = CommandExecutor(
cmd, "simulation_prepare", started_cb, exited_cb, stdout_cb, stderr_cb
)
await executor.start() await executor.start()
await executor.wait() await executor.wait()
self._generic_prepare_executors.remove(executor)
async def simulator_prepare_run_cmds(self, sim: sim_base.Simulator, cmds: list[str]) -> None: async def exec_simulator_prepare_cmds(self, sim: sim_base.Simulator, cmds: list[str]) -> None:
for cmd in cmds: for cmd in cmds:
executor = CommandExecutor(shlex.split(cmd), sim.full_name()) executor = await self.start_simulator(sim, cmd)
self._sim_executors[sim].add(executor)
await executor.start()
await executor.wait() await executor.wait()
self._sim_executors[sim].remove(executor)
async def start_simulator(self, sim: sim_base.Simulator, cmd) -> CommandExecutor:
async def started_cb() -> None:
await self._sim_exec_cbs.simulator_started(sim, cmd)
async def exited_cb(exit_code: int) -> None:
await self._sim_exec_cbs.simulator_exited(sim, exit_code)
async def stdout_cb(lines: list[str]) -> None:
await self._sim_exec_cbs.simulator_stdout(sim, lines)
async def stderr_cb(lines: list[str]) -> None:
await self._sim_exec_cbs.simulator_stderr(sim, lines)
executor = CommandExecutor(
cmd, sim.full_name(), started_cb, exited_cb, stdout_cb, stderr_cb
)
await executor.start()
return executor
async def start_proxy(self, proxy: inst_proxy.Proxy, cmd) -> CommandExecutor:
async def started_cb(cmd: str) -> None:
await self._sim_exec_cbs.proxy_started(proxy, cmd)
async def exited_cb(exit_code: int) -> None:
await self._sim_exec_cbs.proxy_exited(proxy, exit_code)
async def stdout_cb(lines: list[str]) -> None:
await self._sim_exec_cbs.proxy_stdout(proxy, lines)
async def stderr_cb(lines: list[str]) -> None:
await self._sim_exec_cbs.proxy_stderr(proxy, lines)
executor = CommandExecutor(
cmd, f"proxy_{proxy.id()}", started_cb, exited_cb, stdout_cb, stderr_cb
)
await executor.start()
return executor
...@@ -22,27 +22,66 @@ ...@@ -22,27 +22,66 @@
from __future__ import annotations from __future__ import annotations
import collections
import enum
import json import json
import time
import pathlib import pathlib
import time
import typing import typing
from simbricks.runtime import command_executor
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from simbricks.orchestration.instantiation import proxy as inst_proxy
from simbricks.orchestration.simulation import base as sim_base from simbricks.orchestration.simulation import base as sim_base
# TODO (Jonas) generic prepare
class SimulationExitState(enum.Enum):
SUCCESS = 0
FAILED = 1
INTERRUPTED = 2
class ProcessOutput:
def __init__(self, cmd: str):
self.cmd = cmd
self.stdout: list[str] = []
self.stderr: list[str] = []
self.merged: list[str] = []
def append_stdout(self, lines: list[str]) -> None:
self.stdout.extend(lines)
self.merged.extend(lines)
def append_stderr(self, lines: list[str]) -> None:
self.stderr.extend(lines)
self.merged.extend(lines)
def toJSON(self) -> dict:
return {
"cmd": self.cmd,
"stdout": self.stdout,
"stderr": self.stderr,
"merged_output": self.merged,
}
class SimulationOutput: class SimulationOutput:
"""Manages an experiment's output.""" """Manages an experiment's output."""
def __init__(self, sim: sim_base.Simulation) -> None: def __init__(self, sim: sim_base.Simulation) -> None:
self._sim_name: str = sim.name self._simulation_name: str = sim.name
self._start_time: float | None = None self._start_time: float | None = None
self._end_time: float | None = None self._end_time: float | None = None
self._success: bool = True self._success: bool = True
self._interrupted: bool = False self._interrupted: bool = False
self._metadata = sim.metadata self._metadata = sim.metadata
self._sims: dict[sim_base.Simulator, command_executor.OutputListener] = {} self._generic_prepare_output: dict[str, ProcessOutput] = {}
self._simulator_output: collections.defaultdict[sim_base.Simulator, list[ProcessOutput]] = (
collections.defaultdict(list)
)
self._proxy_output: collections.defaultdict[inst_proxy.Proxy, list[ProcessOutput]] = {}
def is_ended(self) -> bool: def is_ended(self) -> bool:
return self._end_time or self._interrupted return self._end_time or self._interrupted
...@@ -50,51 +89,93 @@ class SimulationOutput: ...@@ -50,51 +89,93 @@ class SimulationOutput:
def set_start(self) -> None: def set_start(self) -> None:
self._start_time = time.time() self._start_time = time.time()
def set_end(self) -> None: def set_end(self, exit_state: SimulationExitState) -> None:
self._end_time = time.time() self._end_time = time.time()
match exit_state:
def set_failed(self) -> None: case SimulationExitState.SUCCESS:
self._success = False self._success = True
case SimulationExitState.FAILED:
self._success = False
case SimulationExitState.INTERRUPTED:
self._success = False
self._interrupted = True
case _:
raise RuntimeError("Unknown simulation exit state")
def failed(self) -> bool: def failed(self) -> bool:
return not self._success return not self._success
def set_interrupted(self) -> None:
self._success = False
self._interrupted = True
def add_mapping(self, sim: sim_base.Simulator, output_handel: command_executor.OutputListener) -> None: # generic prepare command execution
assert sim not in self._sims def add_generic_prepare_cmd(self, cmd: str) -> None:
self._sims[sim] = output_handel self._generic_prepare_output[cmd] = ProcessOutput(cmd)
def generic_prepare_cmd_stdout(self, cmd: str, lines: list[str]) -> None:
assert cmd in self._generic_prepare_output
self._generic_prepare_output[cmd].append_stdout(lines)
def generic_prepare_cmd_stderr(self, cmd: str, lines: list[str]) -> None:
assert cmd in self._generic_prepare_output
self._generic_prepare_output[cmd].append_stderr(lines)
# simulator execution
def set_simulator_cmd(self, sim: sim_base.Simulator, cmd: str) -> None:
self._simulator_output[sim].append(ProcessOutput(cmd))
def get_output_listener(self, sim: sim_base.Simulator) -> command_executor.OutputListener: def append_simulator_stdout(self, sim: sim_base.Simulator, lines: list[str]) -> None:
if sim not in self._sims: assert sim in self._simulator_output
raise Exception("not output handel for simulator found") assert self._simulator_output[sim]
return self._sims[sim] self._simulator_output[sim][-1].append_stdout(lines)
def get_all_listeners(self) -> list[command_executor.OutputListener]: def append_simulator_stderr(self, sim: sim_base.Simulator, lines: list[str]) -> None:
return list(self._sims.values()) assert sim in self._simulator_output
assert self._simulator_output[sim]
self._simulator_output[sim][-1].append_stderr(lines)
def set_proxy_cmd(self, proxy: inst_proxy.Proxy, cmd: str) -> None:
self._proxy_output[proxy].append(ProcessOutput(cmd))
def append_proxy_stdout(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
assert proxy in self._proxy_output
self._proxy_output[proxy][-1].append_stdout(lines)
def append_proxy_stderr(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
assert proxy in self._proxy_output
self._proxy_output[proxy][-1].append_stderr(lines)
def toJSON(self) -> dict: def toJSON(self) -> dict:
json_obj = {} json_obj = {}
json_obj["_sim_name"] = self._sim_name json_obj["_sim_name"] = self._simulation_name
json_obj["_start_time"] = self._start_time json_obj["_start_time"] = self._start_time
json_obj["_end_time"] = self._end_time json_obj["_end_time"] = self._end_time
json_obj["_success"] = self._success json_obj["_success"] = self._success
json_obj["_interrupted"] = self._interrupted json_obj["_interrupted"] = self._interrupted
json_obj["_metadata"] = self._metadata json_obj["_metadata"] = self._metadata
for sim, out in self._sims.items(): # TODO (Jonas) Change backend to reflect multiple commands executed
json_obj[sim.full_name()] = out.toJSON() json_obj_out_list = []
json_obj["class"] = sim.__class__.__name__ for _, proc_out in self._generic_prepare_output.items():
json_obj_out_list.append(proc_out.toJSON())
json_obj["generic_prepare"] = json_obj_out_list
for sim, proc_list in self._simulator_output.items():
json_obj_out_list = []
for proc_out in proc_list:
json_obj_out_list.append(proc_out.toJSON())
json_obj[sim.full_name()] = {
"class": sim.__class__.__name__,
"output": json_obj_out_list,
}
for proxy, proc_list in self._proxy_output.items():
json_obj_out_list = []
for proc_out in proc_list:
json_obj_out_list.append(proc_out.toJSON())
json_obj[proxy.name] = {
"class": proxy.__class__.__name__,
"output": json_obj_out_list
}
return json_obj return json_obj
def dump(self, outpath: str) -> None: def dump(self, outpath: str) -> None:
json_obj = self.toJSON() json_obj = self.toJSON()
pathlib.Path(outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(outpath).parent.mkdir(parents=True, exist_ok=True)
with open(outpath, "w", encoding="utf-8") as file: with open(outpath, "w", encoding="utf-8") as file:
json.dump(json_obj, file, indent=4) json.dump(json_obj, file, indent=2)
# def load(self, file: str) -> None:
# with open(file, "r", encoding="utf-8") as fp:
# for k, v in json.load(fp).items():
# self.__dict__[k] = v
...@@ -23,74 +23,131 @@ ...@@ -23,74 +23,131 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import shlex import itertools
import traceback import traceback
import typing import typing
from simbricks.runtime import output
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.instantiation import dependency_topology as dep_topo
from simbricks.orchestration.instantiation import socket as inst_socket from simbricks.orchestration.instantiation import socket as inst_socket
from simbricks.orchestration.simulation import base as sim_base
from simbricks.runtime import command_executor as cmd_exec from simbricks.runtime import command_executor as cmd_exec
from simbricks.runtime import output
from simbricks.utils import graphlib from simbricks.utils import graphlib
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from simbricks.runtime import simulation_executor_callbacks from simbricks.orchestration.instantiation import proxy as inst_proxy
class SimulationExecutorCallbacks:
def __init__(self, instantiation: inst_base.Instantiation) -> None:
self._instantiation = instantiation
self._output: output.SimulationOutput = output.SimulationOutput(
self._instantiation.simulation
)
@property
def output(self) -> output.SimulationOutput:
return self._output
# ---------------------------------------
# Callbacks related to whole simulation -
# ---------------------------------------
async def simulation_started(self) -> None:
self._output.set_start()
async def simulation_prepare_cmd_start(self, cmd: str) -> None:
self._output.add_generic_prepare_cmd(cmd)
async def simulation_prepare_cmd_stdout(self, cmd: str, lines: list[str]) -> None:
self._output.generic_prepare_cmd_stdout(cmd, lines)
async def simulation_prepare_cmd_stderr(self, cmd: str, lines: list[str]) -> None:
self._output.generic_prepare_cmd_stderr(cmd, lines)
async def simulation_exited(self, state: output.SimulationExitState) -> None:
self._output.set_end(state)
# -----------------------------
# Simulator-related callbacks -
# -----------------------------
async def simulator_started(self, sim: sim_base.Simulator, cmd: str) -> None:
self._output.set_simulator_cmd(sim, cmd)
async def simulator_ready(self, sim: sim_base.Simulator) -> None:
pass
async def simulator_exited(self, sim: sim_base.Simulator, exit_code: int) -> None:
pass
async def simulator_stdout(self, sim: sim_base.Simulator, lines: list[str]) -> None:
self._output.append_simulator_stdout(sim, lines)
async def simulator_stderr(self, sim: sim_base.Simulator, lines: list[str]) -> None:
self._output.append_simulator_stderr(sim, lines)
# -------------------------
# Proxy-related callbacks -
# -------------------------
async def proxy_started(self, proxy: inst_proxy.Proxy, cmd: str) -> None:
self._output.set_proxy_cmd(proxy, cmd)
async def proxy_ready(self, proxy: inst_proxy.Proxy) -> None:
pass
async def proxy_exited(self, proxy: inst_proxy.Proxy, exit_code: int) -> None:
pass
async def proxy_stdout(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
self._output.append_proxy_stdout(proxy, lines)
async def proxy_stderr(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
self._output.append_proxy_stderr(proxy, lines)
class SimulationBaseRunner():
# TODO (Jonas) Rename this to InstantiationExecutor class SimulationExecutor:
def __init__( def __init__(
self, self,
instantiation: inst_base.Instantiation, instantiation: inst_base.Instantiation,
callbacks: simulation_executor_callbacks.ExecutorCallbacks, callbacks: SimulationExecutorCallbacks,
verbose: bool, verbose: bool,
profile_int=None,
) -> None: ) -> None:
self._instantiation: inst_base.Instantiation = instantiation self._instantiation: inst_base.Instantiation = instantiation
self._callbacks = callbacks self._callbacks = callbacks
self._verbose: bool = verbose self._verbose: bool = verbose
self._profile_int: int | None = None self._profile_int: int | None = profile_int
self._out: output.SimulationOutput = output.SimulationOutput(self._instantiation.simulation) self._running_sims: dict[sim_base.Simulator, cmd_exec.CommandExecutor] = {}
self._out_listener: dict[sim_base.Simulator, cmd_exec.OutputListener] = {} self._running_proxies: dict[inst_proxy.Proxy, cmd_exec.CommandExecutor] = {}
self._running: list[tuple[sim_base.Simulator, cmd_exec.CommandExecutor]] = []
self._sockets: list[inst_socket.Socket] = []
self._wait_sims: list[cmd_exec.CommandExecutor] = [] self._wait_sims: list[cmd_exec.CommandExecutor] = []
self._cmd_executor = cmd_exec.CommandExecutorFactory() self._cmd_executor = cmd_exec.CommandExecutorFactory(callbacks)
def sim_listener(self, sim: sim_base.Simulator) -> cmd_exec.OutputListener:
if sim not in self._out_listener:
raise Exception(f"no listener specified for simulator {sim.id()}")
return self._out_listener[sim]
def add_listener(self, sim: sim_base.Simulator, listener: cmd_exec.OutputListener) -> None:
self._out_listener[sim] = listener
self._out.add_mapping(sim, listener)
async def start_sim(self, sim: sim_base.Simulator) -> None:
"""Start a simulator and wait for it to be ready."""
name = sim.full_name() async def _start_proxy(self, proxy: inst_proxy.Proxy) -> None:
if self._verbose: """Start a proxy and wait for it to be ready."""
print(f"{self._instantiation.simulation.name}: starting {name}") cmd_exec = await self._cmd_executor.start_simulator(proxy, proxy.run_cmd())
self._running_proxies[proxy] = cmd_exec
run_cmd = sim.run_cmd(self._instantiation) # Wait till sockets exist
if run_cmd is None: wait_socks = proxy.sockets_wait(inst=self._instantiation)
if self._verbose: for sock in wait_socks:
print(f"{self._instantiation.simulation.name}: started dummy {name}") await sock.wait()
return self._callbacks.proxy_ready(proxy)
# run simulator async def _start_sim(self, sim: sim_base.Simulator) -> None:
cmds = shlex.split(run_cmd) """Start a simulator and wait for it to be ready."""
cmd_exec = cmd_exec.CommandExecutor(cmds, name, self._verbose, True) name = sim.full_name()
if listener := self.sim_listener(sim=sim): cmd_exec = await self._cmd_executor.start_simulator(sim, sim.run_cmd(self._instantiation))
cmd_exec.subscribe(listener=listener) self._running_sims[sim] = cmd_exec
await cmd_exec.start()
self._running.append((sim, cmd_exec))
# add sockets for cleanup # give simulator time to start if indicated
for sock in sim.sockets_cleanup(inst=self._instantiation): delay = sim.start_delay()
self._sockets.append(sock) if delay > 0:
await asyncio.sleep(delay)
# Wait till sockets exist # Wait till sockets exist
wait_socks = sim.sockets_wait(inst=self._instantiation) wait_socks = sim.sockets_wait(inst=self._instantiation)
...@@ -100,90 +157,87 @@ class SimulationBaseRunner(): ...@@ -100,90 +157,87 @@ class SimulationBaseRunner():
for sock in wait_socks: for sock in wait_socks:
await sock.wait() await sock.wait()
if self._verbose: if self._verbose:
print(f"{self._instantiation.simulation.name}: waited successfully for sockets {name}") print(
f"{self._instantiation.simulation.name}: waited successfully for sockets {name}"
# add time delay if required )
delay = sim.start_delay() await self._callbacks.simulator_ready(sim)
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate: if sim.wait_terminate:
self._wait_sims.append(cmd_exec) self._wait_sims.append(cmd_exec)
if self._verbose:
print(f"{self._instantiation.simulation.name}: started {name}")
async def prepare(self) -> None: async def prepare(self) -> None:
self._instantiation._cmd_executor = self._cmd_executor self._instantiation._cmd_executor = self._cmd_executor
await self._instantiation.prepare() await self._instantiation.prepare()
async def wait_for_sims(self) -> None: async def terminate_collect_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on)."""
if self._verbose:
print(f"{self._instantiation.simulation.name}: waiting for hosts to terminate")
for sc in self._wait_sims:
await sc.wait()
async def terminate_collect_sims(self) -> output.SimulationOutput:
"""Terminates all simulators and collects output.""" """Terminates all simulators and collects output."""
self._out.set_end()
if self._verbose: if self._verbose:
print(f"{self._instantiation.simulation.name}: cleaning up") print(f"{self._instantiation.simulation.name}: cleaning up")
# "interrupt, terminate, kill" all processes # Interrupt, then terminate, then kill all processes. Do this in parallel so user does not
# have to wait unnecessaryily long.
scs = [] scs = []
for _, sc in self._running: for exec in itertools.chain(self._running_sims.values(), self._running_proxies.values()):
scs.append(asyncio.create_task(sc.int_term_kill())) scs.append(asyncio.create_task(exec.int_term_kill()))
await asyncio.gather(*scs) await asyncio.gather(*scs)
# wait for all processes to terminate # wait for all processes to terminate
for _, sc in self._running: for exec in itertools.chain(self._running_sims.values(), self._running_proxies.values()):
await sc.wait() await exec.wait()
return self._out
async def sigusr1(self) -> None:
for _, sc in self._running:
await sc.sigusr1()
async def profiler(self): async def _profiler(self) -> None:
assert self._profile_int assert self._profile_int
while True: while True:
await asyncio.sleep(self._profile_int) await asyncio.sleep(self._profile_int)
await self.sigusr1() for sim, exec in self._running_sims.items():
await exec.sigusr1()
async def run(self) -> output.SimulationOutput: async def run(self) -> output.SimulationOutput:
profiler_task = None profiler_task = None
try: try:
self._out.set_start() await self._callbacks.simulation_started()
graph = self._instantiation.sim_dependencies() graph = self._instantiation.sim_dependencies()
print(graph)
ts = graphlib.TopologicalSorter(graph) ts = graphlib.TopologicalSorter(graph)
ts.prepare() ts.prepare()
while ts.is_active(): while ts.is_active():
# start ready simulators in parallel # start ready simulators in parallel
starting = [] starting = []
sims = [] topo_comps = []
for sim in ts.get_ready(): for comp in ts.get_ready():
starting.append(asyncio.create_task(self.start_sim(sim))) comp: dep_topo.TopologyComponent
sims.append(sim) match comp.type:
case dep_topo.TopologyComponentType.SIMULATOR:
starting.append(
asyncio.create_task(self._start_sim(comp.get_simulator()))
)
case dep_topo.TopologyComponentType.PROXY:
starting.append(
asyncio.create_task(self._start_proxy(comp.get_proxy()))
)
case _:
raise RuntimeError("Unhandled topology component type")
topo_comps.append(comp)
# wait for starts to complete # wait for starts to complete
await asyncio.gather(*starting) await asyncio.gather(*starting)
for sim in sims: for comp in topo_comps:
ts.done(sim) ts.done(comp)
if self._profile_int: if self._profile_int:
profiler_task = asyncio.create_task(self.profiler()) profiler_task = asyncio.create_task(self._profiler())
await self.wait_for_sims()
# wait until all simulators indicated to be awaited exit
for sc in self._wait_sims:
await sc.wait()
await self._callbacks.simulation_exited(output.SimulationExitState.SUCCESS)
except asyncio.CancelledError: except asyncio.CancelledError:
if self._verbose: if self._verbose:
print(f"{self._instantiation.simulation.name}: interrupted") print(f"{self._instantiation.simulation.name}: interrupted")
self._out.set_interrupted() await self._callbacks.simulation_exited(output.SimulationExitState.INTERRUPTED)
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
self._out.set_failed() await self._callbacks.simulation_exited(output.SimulationExitState.FAILED)
traceback.print_exc() traceback.print_exc()
if profiler_task: if profiler_task:
...@@ -191,6 +245,7 @@ class SimulationBaseRunner(): ...@@ -191,6 +245,7 @@ class SimulationBaseRunner():
profiler_task.cancel() profiler_task.cancel()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
# The bare except above guarantees that we always execute the following # The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output # code, which terminates all simulators and produces a proper output
# file. # file.
...@@ -198,10 +253,10 @@ class SimulationBaseRunner(): ...@@ -198,10 +253,10 @@ class SimulationBaseRunner():
# prevent terminate_collect_task from being cancelled # prevent terminate_collect_task from being cancelled
while True: while True:
try: try:
return await asyncio.shield(terminate_collect_task) await asyncio.shield(terminate_collect_task)
return self._callbacks.output
except asyncio.CancelledError as e: except asyncio.CancelledError as e:
print(e) print(e)
pass
async def cleanup(self) -> None: async def cleanup(self) -> None:
await self._instantiation.cleanup() await self._instantiation.cleanup()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment