"...resnet50_tensorflow.git" did not exist on "f25d4ae88c4c76e9cf1027abb06b5346034c6be1"
Commit 7d951bee authored by Jonas Kaufmann's avatar Jonas Kaufmann
Browse files

symphony/runtime: simplify simulation_executor.py and command_executor.py

parent f5f9e30c
......@@ -23,7 +23,6 @@
from __future__ import annotations
import pathlib
import shutil
import typing
import itertools
import copy
......@@ -39,10 +38,11 @@ from simbricks.orchestration.instantiation import (
fragment as inst_fragment,
dependency_topology as inst_dep_topo,
)
from simbricks.utils import file as util_file
if typing.TYPE_CHECKING:
from simbricks.orchestration.simulation import base as sim_base
from simbricks.runtime import command_executor
from simbricks.runtime import command_executor as cmd_exec
class InstantiationEnvironment(util_base.IdObj):
......@@ -82,7 +82,6 @@ class Instantiation:
self.env: InstantiationEnvironment | None = None
self.artifact_name: str = f"simbricks-artifact-{str(uuid.uuid4())}.zip"
self.artifact_paths: list[str] = []
self._executor: command_executor.Executor | None = None
self._create_checkpoint: bool = False
self._restore_checkpoint: bool = False
self._preserve_checkpoints: bool = True
......@@ -91,25 +90,22 @@ class Instantiation:
self._socket_per_interface: dict[sys_base.Interface, inst_socket.Socket] = {}
# NOTE: temporary data structure
self._sim_dependency: dict[sim_base.Simulator, set[sim_base.Simulator]] | None = None
self._cmd_executor: cmd_exec.CommandExecutorFactory | None = None
@staticmethod
def is_absolute_exists(path: str) -> bool:
path = pathlib.Path(path)
return path.is_absolute() and path.is_file()
@property
def executor(self):
if self._executor is None:
raise Exception("you must set an executor")
return self._executor
@property
def create_artifact(self) -> bool:
return len(self.artifact_paths) > 0
@executor.setter
def executor(self, executor: command_executor.Executor):
self._executor = executor
@property
def command_executor(self) -> cmd_exec.CommandExecutorFactory:
if self._cmd_executor is None:
raise RuntimeError(f"{type(self).__name__}._cmd_executor should be set")
return self._cmd_executor
def _get_opposing_interface(self, interface: sys_base.Interface) -> sys_base.Interface:
opposing_inf = interface.get_opposing_interface()
......@@ -221,13 +217,6 @@ class Instantiation:
self._sim_dependency = inst_dep_topo.build_simulation_topology(self)
return self._sim_dependency
async def wait_for_sockets(
self,
sockets: list[inst_socket.Socket] = [],
) -> None:
wait_socks = list(map(lambda sock: sock._path, sockets))
await self.executor.await_files(wait_socks, verbose=True)
@property
def create_checkpoint(self) -> bool:
"""
......@@ -318,11 +307,8 @@ class Instantiation:
if not self.create_checkpoint and not self.restore_checkpoint:
to_prepare.append(self.cpdir())
for tp in to_prepare:
shutil.rmtree(tp, ignore_errors=True)
await self.executor.rmtree(tp)
pathlib.Path(tp).mkdir(parents=True, exist_ok=True)
await self.executor.mkdir(tp)
util_file.rmtree(tp)
util_file.mkdir(tp)
await self.simulation.prepare(inst=self)
......@@ -333,8 +319,7 @@ class Instantiation:
if not self._preserve_checkpoints:
to_delete.append(self.cpdir())
for td in to_delete:
shutil.rmtree(td, ignore_errors=True)
await self.executor.rmtree(td)
util_file.rmtree(td)
def _join_paths(self, base: str = "", relative_path: str = "", enforce_existence=False) -> str:
if relative_path.startswith("/"):
......
......@@ -22,6 +22,7 @@
import enum
from simbricks.utils import base as util_base
from simbricks.utils import file as util_file
class SockType(enum.Enum):
......@@ -39,3 +40,7 @@ class Socket(util_base.IdObj):
@property
def type(self) -> SockType:
return self._type
async def wait(self):
"""Wait for socket to become available."""
await util_file.await_file(self._path)
......@@ -23,14 +23,13 @@
from __future__ import annotations
import math
import asyncio
import simbricks.orchestration.simulation.base as sim_base
import simbricks.orchestration.system as system
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.system import host as sys_host
from simbricks.orchestration.system import pcie as sys_pcie
from simbricks.orchestration.system import mem as sys_mem
from simbricks.utils import base as utils_base
from simbricks.utils import base as utils_base, file as util_file
from simbricks.orchestration.instantiation import socket as inst_socket
......@@ -117,12 +116,7 @@ class Gem5Sim(HostSim):
async def prepare(self, inst: inst_base.Instantiation) -> None:
await super().prepare(inst=inst)
prep_cmds = [f"mkdir -p {inst.cpdir_subdir(sim=self)}"]
task = asyncio.create_task(
inst.executor.run_cmdlist(label="prepare", cmds=prep_cmds, verbose=True)
)
await task
util_file.mkdir(inst.cpdir_subdir(sim=self))
def checkpoint_commands(self) -> list[str]:
return ["m5 checkpoint"]
......@@ -276,6 +270,7 @@ class QemuSim(HostSim):
copy_path = await disk.make_qcow_copy(
inst=inst,
format=format,
sim=self
)
assert copy_path is not None
d.append((copy_path, format))
......
......@@ -33,6 +33,7 @@ from simbricks.orchestration.system import base as sys_base
if tp.TYPE_CHECKING:
from simbricks.orchestration.system import host as sys_host
from simbricks.orchestration.simulation import base as sim_base
class DiskImage(utils_base.IdObj):
......@@ -49,7 +50,12 @@ class DiskImage(utils_base.IdObj):
def path(self, inst: inst_base.Instantiation, format: str) -> str:
raise Exception("must be overwritten")
async def make_qcow_copy(self, inst: inst_base.Instantiation, format: str) -> str:
async def make_qcow_copy(
self,
inst: inst_base.Instantiation,
format: str,
sim: sim_base.Simulator
) -> str:
disk_path = pathlib.Path(self.path(inst=inst, format=format))
copy_path = inst.join_imgs_path(relative_path=f"hdcopy.{self._id}")
prep_cmds = [
......@@ -59,7 +65,7 @@ class DiskImage(utils_base.IdObj):
f"{copy_path}"
)
]
await inst.executor.run_cmdlist(label="prepare", cmds=prep_cmds, verbose=True)
await inst._cmd_executor.simulator_prepare_run_cmds(sim, prep_cmds)
return copy_path
@staticmethod
......
......@@ -24,14 +24,15 @@ from __future__ import annotations
import abc
import asyncio
import os
import pathlib
import shlex
import shutil
import signal
import typing as tp
import typing
import shlex
import collections
from asyncio.subprocess import Process
if typing.TYPE_CHECKING:
from simbricks.orchestration.simulation import base as sim_base
class OutputListener:
......@@ -85,15 +86,17 @@ class LegacyOutputListener(OutputListener):
return json_obj
class Component(object):
class CommandExecutor:
def __init__(self, cmd_parts: tp.List[str], with_stdin=False):
def __init__(self, cmd_parts: list[str], label: str, canfail=False):
self.is_ready = False
self.stdout_buf = bytearray()
self.stderr_buf = bytearray()
self.cmd_parts: list[str] = cmd_parts
self._output_handler: list[OutputListener] = []
self.with_stdin: bool = with_stdin
self.label = label
self.canfail = canfail
self.cmd_parts = cmd_parts
self._proc: Process
self._terminate_future: asyncio.Task
......@@ -102,7 +105,7 @@ class Component(object):
listener.cmd_parts = self.cmd_parts
self._output_handler.append(listener)
def _parse_buf(self, buf: bytearray, data: bytes) -> tp.List[str]:
def _parse_buf(self, buf: bytearray, data: bytes) -> list[str]:
if data is not None:
buf.extend(data)
lines = []
......@@ -144,8 +147,12 @@ class Component(object):
return
async def _waiter(self) -> None:
stdout_handler = asyncio.create_task(self._read_stream(self._proc.stdout, self._consume_out))
stderr_handler = asyncio.create_task(self._read_stream(self._proc.stderr, self._consume_err))
stdout_handler = asyncio.create_task(
self._read_stream(self._proc.stdout, self._consume_out)
)
stderr_handler = asyncio.create_task(
self._read_stream(self._proc.stderr, self._consume_err)
)
rc = await self._proc.wait()
await asyncio.gather(stdout_handler, stderr_handler)
await self.terminated(rc)
......@@ -156,19 +163,13 @@ class Component(object):
self._proc.stdin.close()
async def start(self) -> None:
if self.with_stdin:
stdin = asyncio.subprocess.PIPE
else:
stdin = asyncio.subprocess.DEVNULL
self._proc = await asyncio.create_subprocess_exec(
*self.cmd_parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=stdin,
stdin=asyncio.subprocess.DEVNULL,
)
self._terminate_future = asyncio.create_task(self._waiter())
await self.started()
async def wait(self) -> None:
"""
......@@ -203,14 +204,20 @@ class Component(object):
return
# before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_-
except (TimeoutError, asyncio.TimeoutError):
print(f"terminating component {self.cmd_parts[0]} " f"pid {self._proc.pid}", flush=True)
print(
f"terminating component {self.cmd_parts[0]} pid" f" {self._proc.pid}",
flush=True,
)
await self.terminate()
try:
await asyncio.wait_for(self._proc.wait(), delay)
return
except (TimeoutError, asyncio.TimeoutError):
print(f"killing component {self.cmd_parts[0]} " f"pid {self._proc.pid}", flush=True)
print(
f"killing component {self.cmd_parts[0]} pid {self._proc.pid}",
flush=True,
)
await self.kill()
await self._proc.wait()
......@@ -219,111 +226,40 @@ class Component(object):
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGUSR1)
async def started(self) -> None:
async def process_out(self, lines: list[str], eof: bool) -> None:
# TODO
pass
async def terminated(self, rc) -> None:
async def process_err(self, lines: list[str], eof: bool) -> None:
# TODO
pass
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
pass
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
pass
class SimpleComponent(Component):
def __init__(self, label: str, cmd_parts: tp.List[str], *args, verbose=True, canfail=False, **kwargs) -> None:
self.label = label
self.verbose = verbose
self.canfail = canfail
self.cmd_parts = cmd_parts
super().__init__(cmd_parts, *args, **kwargs)
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, "OUT:", lines, flush=True)
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, "ERR:", lines, flush=True)
async def terminated(self, rc: int) -> None:
if self.verbose:
print(self.label, "TERMINATED:", rc, flush=True)
# TODO
if not self.canfail and rc != 0:
raise RuntimeError("Command Failed: " + str(self.cmd_parts))
class Executor(abc.ABC):
class CommandExecutorFactory:
def __init__(self) -> None:
self.ip = None
@abc.abstractmethod
def create_component(self, label: str, parts: tp.List[str], **kwargs) -> SimpleComponent:
pass
@abc.abstractmethod
async def await_file(self, path: str, delay=0.05, verbose=False) -> None:
pass
@abc.abstractmethod
async def send_file(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def mkdir(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def rmtree(self, path: str, verbose=False) -> None:
pass
def __init__(self):
self._sim_executors: collections.defaultdict[sim_base.Simulator, set[CommandExecutor]] = (
collections.defaultdict(set)
)
self._generic_prepare_executors: set[CommandExecutor] = set()
# runs the list of commands as strings sequentially
async def run_cmdlist(self, label: str, cmds: tp.List[str], verbose=True) -> None:
i = 0
async def generic_prepare_run_cmds(self, cmds: list[str]) -> None:
for cmd in cmds:
cmd_c = self.create_component(label + "." + str(i), shlex.split(cmd), verbose=verbose)
await cmd_c.start()
await cmd_c.wait()
async def await_files(self, paths: tp.List[str], *args, **kwargs) -> None:
xs = []
for p in paths:
waiter = asyncio.create_task(self.await_file(p, *args, **kwargs))
xs.append(waiter)
await asyncio.gather(*xs)
class LocalExecutor(Executor):
executor = CommandExecutor(shlex.split(cmd), "prepare")
self._generic_prepare_executors.add(executor)
await executor.start()
await executor.wait()
self._generic_prepare_executors.remove(executor)
def create_component(self, label: str, parts: list[str], **kwargs) -> SimpleComponent:
return SimpleComponent(label, parts, **kwargs)
async def await_file(self, path: str, delay=0.05, verbose=False, timeout=30) -> None:
if verbose:
print(f"await_file({path})")
t = 0
while not os.path.exists(path):
if t >= timeout:
raise TimeoutError()
await asyncio.sleep(delay)
t += delay
async def send_file(self, path: str, verbose=False) -> None:
# locally we do not need to do anything
pass
async def mkdir(self, path: str, verbose=False) -> None:
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
async def rmtree(self, path: str, verbose=False) -> None:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.unlink(path)
async def simulator_prepare_run_cmds(self, sim: sim_base.Simulator, cmds: list[str]) -> None:
for cmd in cmds:
executor = CommandExecutor(shlex.split(cmd), sim.full_name())
self._sim_executors[sim].add(executor)
await executor.start()
await executor.wait()
self._sim_executors[sim].remove(executor)
......@@ -25,45 +25,48 @@ from __future__ import annotations
import asyncio
import shlex
import traceback
import abc
import typing
from simbricks.runtime import output
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.instantiation import socket as inst_socket
from simbricks.runtime import command_executor
from simbricks.runtime import command_executor as cmd_exec
from simbricks.utils import graphlib
if typing.TYPE_CHECKING:
from simbricks.runtime import simulation_executor_callbacks
class SimulationBaseRunner(abc.ABC):
class SimulationBaseRunner():
# TODO (Jonas) Rename this to InstantiationExecutor
def __init__(
self,
instantiation: inst_base.Instantiation,
callbacks: simulation_executor_callbacks.ExecutorCallbacks,
verbose: bool,
) -> None:
self._instantiation: inst_base.Instantiation = instantiation
self._callbacks = callbacks
self._verbose: bool = verbose
self._profile_int: int | None = None
self._out: output.SimulationOutput = output.SimulationOutput(self._instantiation.simulation)
self._out_listener: dict[sim_base.Simulator, command_executor.OutputListener] = {}
self._running: list[tuple[sim_base.Simulator, command_executor.SimpleComponent]] = []
self._out_listener: dict[sim_base.Simulator, cmd_exec.OutputListener] = {}
self._running: list[tuple[sim_base.Simulator, cmd_exec.CommandExecutor]] = []
self._sockets: list[inst_socket.Socket] = []
self._wait_sims: list[command_executor.Component] = []
@abc.abstractmethod
def sim_executor(self, simulator: sim_base.Simulator) -> command_executor.Executor:
pass
self._wait_sims: list[cmd_exec.CommandExecutor] = []
self._cmd_executor = cmd_exec.CommandExecutorFactory()
def sim_listener(self, sim: sim_base.Simulator) -> command_executor.OutputListener:
def sim_listener(self, sim: sim_base.Simulator) -> cmd_exec.OutputListener:
if sim not in self._out_listener:
raise Exception(f"no listener specified for simulator {sim.id()}")
return self._out_listener[sim]
def add_listener(self, sim: sim_base.Simulator, listener: command_executor.OutputListener) -> None:
def add_listener(self, sim: sim_base.Simulator, listener: cmd_exec.OutputListener) -> None:
self._out_listener[sim] = listener
self._out.add_mapping(sim, listener)
async def start_sim(self, sim: sim_base.Simulator) -> None:
"""Start a simulator and wait for it to be ready."""
......@@ -78,13 +81,12 @@ class SimulationBaseRunner(abc.ABC):
return
# run simulator
executor = self._instantiation.executor # TODO: this should be a function or something
cmds = shlex.split(run_cmd)
sc = executor.create_component(name, cmds, verbose=self._verbose, canfail=True)
cmd_exec = cmd_exec.CommandExecutor(cmds, name, self._verbose, True)
if listener := self.sim_listener(sim=sim):
sc.subscribe(listener=listener)
await sc.start()
self._running.append((sim, sc))
cmd_exec.subscribe(listener=listener)
await cmd_exec.start()
self._running.append((sim, cmd_exec))
# add sockets for cleanup
for sock in sim.sockets_cleanup(inst=self._instantiation):
......@@ -95,7 +97,8 @@ class SimulationBaseRunner(abc.ABC):
if len(wait_socks) > 0:
if self._verbose:
print(f"{self._instantiation.simulation.name}: waiting for sockets {name}")
await self._instantiation.wait_for_sockets(sockets=wait_socks)
for sock in wait_socks:
await sock.wait()
if self._verbose:
print(f"{self._instantiation.simulation.name}: waited successfully for sockets {name}")
......@@ -105,24 +108,13 @@ class SimulationBaseRunner(abc.ABC):
await asyncio.sleep(delay)
if sim.wait_terminate:
self._wait_sims.append(sc)
self._wait_sims.append(cmd_exec)
if self._verbose:
print(f"{self._instantiation.simulation.name}: started {name}")
async def before_wait(self) -> None:
pass
async def before_cleanup(self) -> None:
pass
async def after_cleanup(self) -> None:
pass
async def prepare(self) -> None:
# TODO: FIXME
executor = command_executor.LocalExecutor()
self._instantiation.executor = executor
self._instantiation._cmd_executor = self._cmd_executor
await self._instantiation.prepare()
async def wait_for_sims(self) -> None:
......@@ -138,8 +130,6 @@ class SimulationBaseRunner(abc.ABC):
if self._verbose:
print(f"{self._instantiation.simulation.name}: cleaning up")
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self._running:
......@@ -150,7 +140,6 @@ class SimulationBaseRunner(abc.ABC):
for _, sc in self._running:
await sc.wait()
await self.after_cleanup()
return self._out
async def sigusr1(self) -> None:
......@@ -188,7 +177,6 @@ class SimulationBaseRunner(abc.ABC):
if self._profile_int:
profiler_task = asyncio.create_task(self.profiler())
await self.before_wait()
await self.wait_for_sims()
except asyncio.CancelledError:
if self._verbose:
......@@ -217,40 +205,3 @@ class SimulationBaseRunner(abc.ABC):
async def cleanup(self) -> None:
await self._instantiation.cleanup()
class SimulationSimpleRunner(SimulationBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, executor: command_executor.Executor, *args, **kwargs) -> None:
self._executor = executor
super().__init__(*args, **kwargs)
def sim_executor(self, sim: sim_base.Simulator) -> command_executor.Executor:
return self._executor
# class ExperimentDistributedRunner(ExperimentBaseRunner):
# """Simple experiment runner with just one executor."""
# # TODO: FIXME
# def __init__(self, execs, exp: DistributedExperiment, *args, **kwargs) -> None:
# self.execs = execs
# super().__init__(exp, *args, **kwargs)
# self.exp = exp # overrides the type in the base class
# assert self.exp.num_hosts <= len(execs)
# def sim_executor(self, sim) -> command_executor.Executor:
# h_id = self.exp.host_mapping[sim]
# return self.execs[h_id]
# async def prepare(self) -> None:
# # make sure all simulators are assigned to an executor
# assert self.exp.all_sims_assigned()
# # set IP addresses for proxies based on assigned executors
# for p in itertools.chain(self.exp.proxies_listen, self.exp.proxies_connect):
# executor = self.sim_executor(p)
# p.ip = executor.ip
# await super().prepare()
# Copyright 2025 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Utility functions for operations on files and directories."""
import os
import asyncio
import pathlib
import shutil
async def await_file(path: str, delay=0.05, verbose=False, timeout=30) -> None:
if verbose:
print(f"await_file({path})")
t = 0
while not os.path.exists(path):
if t >= timeout:
raise TimeoutError()
await asyncio.sleep(delay)
t += delay
def mkdir(path: str) -> None:
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
def rmtree(path: str) -> None:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.unlink(path)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment