Commit 26a56993 authored by Jonas Kaufmann's avatar Jonas Kaufmann
Browse files

symphony/runner/__main__.py: add callbacks to update simulator and proxy state...

symphony/runner/__main__.py: add callbacks to update simulator and proxy state and push output on/to backend
parent a4a71d8e
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
import aiohttp import aiohttp
import datetime
import typing import typing
import contextlib import contextlib
import json import json
...@@ -547,21 +548,81 @@ class RunnerClient: ...@@ -547,21 +548,81 @@ class RunnerClient:
async with self.put(url=f"/update_run/{run_id}", json=obj) as resp: async with self.put(url=f"/update_run/{run_id}", json=obj) as resp:
await resp.json() await resp.json()
async def send_out( async def update_state_simulator(
self, run_id: int, sim_id: int, sim_name: str, state: str, cmd: str
) -> None:
obj = {
"run_id": run_id,
"simulator_id": sim_id,
"simulator_name": sim_name,
"state": state,
"command": cmd,
}
async with self.post(url=f"/run/{run_id}/simulator/{sim_id}/state", json=obj) as resp:
await resp.json()
async def update_proxy(self, run_id: int, proxy_id: int, state: str, cmd: str) -> None:
obj = {"run_id": run_id, "proxy_id": proxy_id, "state": state, "cmd": cmd}
async with self.put(url=f"/{run_id}/proxy/{proxy_id}/state", json=obj) as resp:
await resp.json()
async def send_out_simulation(
self,
run_id: int,
cmd: int,
is_stderr: bool,
output: list[str],
) -> None:
objs = []
for line in output:
obj = {
"run_id": run_id,
"cmd": cmd,
"is_stderr": is_stderr,
"output": line,
}
objs.append(obj)
async with self.post(url=f"/{run_id}/simulation/console", json=objs) as resp:
_ = await resp.json()
async def send_out_simulator(
self,
run_id: int,
sim_id: int,
sim_name: str,
is_stderr: bool,
output: list[str],
created_at: datetime.datetime,
) -> None:
objs = []
for line in output:
obj = {
"run_id": run_id,
"simulator_id": sim_id,
"simulator_name": sim_name,
"is_stderr": is_stderr,
"output": line,
"created_at": str(created_at),
}
objs.append(obj)
async with self.post(url=f"/run/{run_id}/simulator/{sim_id}/console", json=objs) as resp:
_ = await resp.json()
async def send_out_proxy(
self, self,
run_id: int, run_id: int,
simulator: str, proxy_id: int,
stderr: bool, is_stderr: bool,
output: list[str], output: list[str],
) -> None: ) -> None:
objs = [] objs = []
for line in output: for line in output:
obj = { obj = {
"run_id": run_id, "run_id": run_id,
"simulator": simulator, "proxy_id": proxy_id,
"stderr": stderr, "is_stderr": is_stderr,
"output": line, "output": line,
} }
objs.append(obj) objs.append(obj)
async with self.post(url=f"/{run_id}/console", json=objs) as resp: async with self.post(url=f"/{run_id}/proxy/{proxy_id}/console", json=objs) as resp:
ret = await resp.json() _ = await resp.json()
...@@ -19,21 +19,157 @@ ...@@ -19,21 +19,157 @@
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio import asyncio
import datetime
import json import json
import logging import logging
import pathlib import pathlib
import sys import sys
import typing
import uuid import uuid
from simbricks import client from simbricks import client
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.simulation import base as sim_base from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.system import base as sys_base from simbricks.orchestration.system import base as sys_base
from simbricks.runtime import simulation_executor from simbricks.runner import settings
from simbricks.runtime import simulation_executor as sim_exec
from simbricks.utils import artifatcs as art from simbricks.utils import artifatcs as art
from .settings import runner_settings as runset if typing.TYPE_CHECKING:
from simbricks.orchestration.instantiation import proxy as inst_proxy
class RunnerSimulationExecutorCallbacks(sim_exec.SimulationExecutorCallbacks):
def __init__(
self,
instantiation: inst_base.Instantiation,
rc: client.RunnerClient,
run_id: int,
):
super().__init__(instantiation)
self._instantiation = instantiation
self._client: client.RunnerClient = rc
self._run_id: int = run_id
self._active_simulator_cmd: dict[sim_base.Simulator, str] = {}
self._active_proxy_cmd: dict[inst_proxy.Proxy, str] = {}
# ---------------------------------------
# Callbacks related to whole simulation -
# ---------------------------------------
async def simulation_prepare_cmd_start(self, cmd: str) -> None:
LOGGER.debug(f"+ [prepare] {cmd}")
# TODO (Jonas) Send executed prepare command to backend
async def simulation_prepare_cmd_stdout(self, cmd: str, lines: list[str]) -> None:
super().simulation_prepare_cmd_stdout(cmd, lines)
for line in lines:
LOGGER.debug(f"[prepare] {line}")
await self._client.send_out_simulation(self._run_id, cmd, False, lines)
async def simulation_prepare_cmd_stderr(self, cmd: str, lines: list[str]) -> None:
super().simulation_prepare_cmd_stderr(cmd, lines)
for line in lines:
LOGGER.debug(f"[prepare] {line}")
await self._client.send_out_simulation(self._run_id, cmd, True, lines)
# -----------------------------
# Simulator-related callbacks -
# -----------------------------
async def simulator_prepare_started(self, sim: sim_base.Simulator, cmd: str) -> None:
self._active_simulator_cmd[sim] = cmd
LOGGER.debug(f"+ [{sim.full_name()}] {cmd}")
await self._client.update_state_simulator(
self._run_id, sim.id(), sim.full_name(), "preparing", cmd
)
async def simulator_prepare_exited(self, sim: sim_base.Simulator, exit_code: int) -> None:
self._active_simulator_cmd.pop(sim)
LOGGER.debug(f"- [{sim.full_name()}] exited with code {exit_code}")
# TODO (Jonas) Report to backend if prepare command fails
async def simulator_prepare_stdout(self, sim: sim_base.Simulator, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{sim.full_name()}] {line}")
await self._client.send_out_simulator(
self._run_id, sim.id(), sim.full_name(), False, lines, datetime.datetime.now()
)
async def simulator_prepare_stderr(self, sim: sim_base.Simulator, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{sim.full_name()}] {line}")
await self._client.send_out_simulator(
self._run_id, sim.id(), sim.full_name(), True, lines, datetime.datetime.now()
)
async def simulator_started(self, sim: sim_base.Simulator, cmd: str) -> None:
self._active_simulator_cmd[sim] = cmd
LOGGER.debug(f"+ [{sim.full_name()}] {cmd}")
await self._client.update_state_simulator(
self._run_id, sim.id(), sim.full_name(), "starting", cmd
)
async def simulator_ready(self, sim: sim_base.Simulator) -> None:
LOGGER.debug(f"[{sim.full_name()}] has started successfully")
await self._client.update_state_simulator(
self._run_id, sim.id(), sim.full_name(), "running", self._active_simulator_cmd[sim]
)
async def simulator_exited(self, sim: sim_base.Simulator, exit_code: int) -> None:
cmd = self._active_simulator_cmd.pop(sim)
LOGGER.debug(f"- [{sim.full_name()}] exited with code {exit_code}")
await self._client.update_state_simulator(
self._run_id, sim.id(), sim.full_name(), "terminated", cmd
)
async def simulator_stdout(self, sim: sim_base.Simulator, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{sim.full_name()}] {line}")
await self._client.send_out_simulator(
self._run_id, sim.id(), sim.full_name(), False, lines, datetime.datetime.now()
)
async def simulator_stderr(self, sim: sim_base.Simulator, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{sim.full_name()}] {line}")
await self._client.send_out_simulator(
self._run_id, sim.id(), sim.full_name(), True, lines, datetime.datetime.now()
)
# -------------------------
# Proxy-related callbacks -
# -------------------------
async def proxy_started(self, proxy: inst_proxy.Proxy, cmd: str) -> None:
self._active_proxy_cmd[proxy] = cmd
LOGGER.debug(f"+ [{proxy.name}] {cmd}")
await self._client.update_state_proxy(self._run_id, proxy.id(), "starting", cmd)
async def proxy_ready(self, proxy: inst_proxy.Proxy) -> None:
LOGGER.debug(f"[{proxy.name}] has started successfully")
await self._client.update_state_proxy(
self._run_id, proxy.id(), "running", self._active_proxy_cmd[proxy]
)
async def proxy_exited(self, proxy: inst_proxy.Proxy, exit_code: int) -> None:
cmd = self._active_proxy_cmd.pop(proxy)
LOGGER.debug(f"- [{proxy.name}] exited with code {exit_code}")
await self._client.update_state_proxy(self._run_id, proxy.id(), "terminated", cmd)
async def proxy_stdout(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{proxy.name}] {line}")
await self._client.send_out_proxy(self._run_id, proxy.id(), False, lines)
async def proxy_stderr(self, proxy: inst_proxy.Proxy, lines: list[str]) -> None:
for line in lines:
LOGGER.debug(f"[{proxy.name}] {line}")
await self._client.send_out_proxy(self._run_id, proxy.id(), True, lines)
class Run: class Run:
...@@ -41,12 +177,14 @@ class Run: ...@@ -41,12 +177,14 @@ class Run:
self, self,
run_id: int, run_id: int,
inst: inst_base.Instantiation, inst: inst_base.Instantiation,
runner: simulation_executor.SimulationExecutor, callbacks: RunnerSimulationExecutorCallbacks,
): runner: sim_exec.SimulationExecutor,
) -> None:
self.run_id: int = run_id self.run_id: int = run_id
self.inst: inst_base.Instantiation = inst self.inst: inst_base.Instantiation = inst
self.callbacks: RunnerSimulationExecutorCallbacks = callbacks
self.cancelled: bool = False self.cancelled: bool = False
self.runner: simulation_executor.SimulationExecutor = runner self.runner: sim_exec.SimulationExecutor = runner
self.exec_task: asyncio.Task | None = None self.exec_task: asyncio.Task | None = None
...@@ -104,12 +242,11 @@ class Runner: ...@@ -104,12 +242,11 @@ class Runner:
LOGGER.debug(f"prepare run {run_id}") LOGGER.debug(f"prepare run {run_id}")
inst = await self._fetch_assemble_inst(run_id=run_id) inst = await self._fetch_assemble_inst(run_id=run_id)
callbacks = RunnerSimulationExecutorCallbacks(inst, self._rc, run_id)
callbacks = simulation_executor.SimulationExecutorCallbacks(inst) runner = sim_exec.SimulationExecutor(inst, callbacks, settings.RunnerSettings().verbose)
runner = simulation_executor.SimulationExecutor(inst, callbacks, runset().verbose)
await runner.prepare() await runner.prepare()
run = Run(run_id=run_id, inst=inst, runner=runner) run = Run(run_id=run_id, inst=inst, runner=runner, callbacks=callbacks)
return run return run
async def _start_run(self, run: Run) -> None: async def _start_run(self, run: Run) -> None:
...@@ -270,17 +407,17 @@ class Runner: ...@@ -270,17 +407,17 @@ class Runner:
async def amain(): async def amain():
runner = Runner( runner = Runner(
base_url=runset().base_url, base_url=settings.RunnerSettings().base_url,
workdir=pathlib.Path("./runner-work").resolve(), workdir=pathlib.Path("./runner-work").resolve(),
namespace=runset().namespace, namespace=settings.RunnerSettings().namespace,
ident=runset().runner_id, ident=settings.RunnerSettings().runner_id,
) )
await runner.run() await runner.run()
def setup_logger() -> logging.Logger: def setup_logger() -> logging.Logger:
level = runset().log_level level = settings.RunnerSettings().log_level
logging.basicConfig( logging.basicConfig(
level=level, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" level=level, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment