"...composable_kernel.git" did not exist on "f584ab0c545ade05ae793a8b36fa282d47d0f698"
Unverified Commit ef6700c5 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

symphony/runner: proper kill/simulation_status command + execute multiple runs concurrently

parent ccb3e6bc
...@@ -24,7 +24,7 @@ import sys ...@@ -24,7 +24,7 @@ import sys
import asyncio import asyncio
import json import json
import pathlib import pathlib
from rich.console import Console import logging
from simbricks.runtime import simulation_executor from simbricks.runtime import simulation_executor
from simbricks.orchestration.instantiation import base as inst_base from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.system import base as sys_base from simbricks.orchestration.system import base as sys_base
...@@ -37,7 +37,6 @@ from simbricks.utils import artifatcs as art ...@@ -37,7 +37,6 @@ from simbricks.utils import artifatcs as art
class ConsoleLineListener(command_executor.OutputListener): class ConsoleLineListener(command_executor.OutputListener):
# TODO: make actually use of this
def __init__(self, rc: client.RunnerClient, run_id: int, prefix: str = ""): def __init__(self, rc: client.RunnerClient, run_id: int, prefix: str = ""):
super().__init__() super().__init__()
self._prefix: str = prefix self._prefix: str = prefix
...@@ -60,14 +59,16 @@ class ConsoleLineListener(command_executor.OutputListener): ...@@ -60,14 +59,16 @@ class ConsoleLineListener(command_executor.OutputListener):
return json_obj return json_obj
class Runner: class Run:
def __init__(self, run_id: int, inst: inst_base.Instantiation, runner: simulation_executor.SimulationSimpleRunner):
self.run_id: int = run_id
self.inst: inst_base.Instantiation = inst
self.cancelled: bool = False
self.runner: simulation_executor.SimulationSimpleRunner = runner
self.exec_task: asyncio.Task | None = None
class Run: class Runner:
def __init__(self, ident: int, inst: inst_base.Instantiation):
self.id: int = ident
self.inst: inst_base.Instantiation = inst
self.exec_task: asyncio.Task | None = None
self.cancelled: bool = False
def __init__(self, base_url: str, workdir: str, namespace: str, ident: int): def __init__(self, base_url: str, workdir: str, namespace: str, ident: int):
self._base_url: str = base_url self._base_url: str = base_url
...@@ -79,19 +80,20 @@ class Runner: ...@@ -79,19 +80,20 @@ class Runner:
self._sb_client = client.SimBricksClient(self._namespace_client) self._sb_client = client.SimBricksClient(self._namespace_client)
self._rc = client.RunnerClient(self._namespace_client, ident) self._rc = client.RunnerClient(self._namespace_client, ident)
self._console = Console() # self._cur_run: Run | None = None # currently executed run
self._to_run_queue: asyncio.Queue = asyncio.Queue() # queue of run ids to run next # self._to_run_queue: asyncio.Queue = asyncio.Queue() # queue of run ids to run next
self._cur_run: Runner.Run | None = None # currently executed run self._run_map: dict[int, Run] = {}
async def _fetch_and_assemble_run(self, to_fetch_run_id: int) -> Run: async def _fetch_assemble_inst(self, run_id: int) -> inst_base.Instantiation:
run_obj_list = await self._rc.filter_get_runs(run_id=to_fetch_run_id, state="pending") LOGGER.debug(f"fetch and assemble instantiation related to run {run_id}")
run_obj_list = await self._rc.filter_get_runs(run_id=run_id, state="pending")
if not run_obj_list or len(run_obj_list) != 1: if not run_obj_list or len(run_obj_list) != 1:
raise Exception(f"could not fetch run with id {to_fetch_run_id} that is still 'pending'") msg = f"could not fetch run with id {run_id} that is still 'pending'"
LOGGER.error(msg)
raise Exception(msg)
run_obj = run_obj_list[0] run_obj = run_obj_list[0]
run_id = to_fetch_run_id
self._console.log(f"Preparing run {run_id}")
run_workdir = self._workdir / f"run-{run_id}" run_workdir = self._workdir / f"run-{run_id}"
run_workdir.mkdir(parents=True) run_workdir.mkdir(parents=True)
...@@ -108,133 +110,172 @@ class Runner: ...@@ -108,133 +110,172 @@ class Runner:
inst.create_checkpoint = True inst.create_checkpoint = True
# inst.artifact_paths = [f"{run_workdir}/output"] # create an artifact # inst.artifact_paths = [f"{run_workdir}/output"] # create an artifact
inst.artifact_paths = [] # create NO artifact inst.artifact_paths = [] # create NO artifact
return inst
run = Runner.Run(ident=run_id, inst=inst) async def _prepare_run(self, run_id: int) -> Run:
LOGGER.debug(f"prepare run {run_id}")
inst = await self._fetch_assemble_inst(run_id=run_id)
executor = command_executor.LocalExecutor()
runner = simulation_executor.SimulationSimpleRunner(executor, inst, runset().verbose)
await runner.prepare()
for sim in inst.simulation.all_simulators():
listener = ConsoleLineListener(rc=self._rc, run_id=run_id)
runner.add_listener(sim, listener)
run = Run(run_id=run_id, inst=inst, runner=runner)
return run return run
async def _start_run(self, run: Run) -> None:
sim_task: asyncio.Task | None = None
try:
LOGGER.info(f"start run {run.run_id}")
await self._rc.update_run(run.run_id, "running", "")
sim_task = asyncio.create_task(run.runner.run())
res = await sim_task
output_path = run.inst.get_simulation_output_path()
res.dump(outpath=output_path)
if run.inst.create_artifact:
art.create_artifact(
artifact_name=run.inst.artifact_name,
paths_to_include=run.inst.artifact_paths,
)
await self._sb_client.set_run_artifact(run.run_id, run.inst.artifact_name)
status = "error" if res.failed() else "completed"
await self._rc.update_run(run.run_id, status, output="")
await run.runner.cleanup()
LOGGER.info(f"finished run {run.run_id}")
except asyncio.CancelledError:
LOGGER.debug("_start_sim handel cancelled error")
if sim_task:
sim_task.cancel()
await self._rc.update_run(run.run_id, state="cancelled", output="")
LOGGER.info(f"cancelled execution of run {run.run_id}")
raise
except Exception as ex:
LOGGER.debug("_start_sim handel fatal error")
if sim_task:
sim_task.cancel()
await self._rc.update_run(run_id=run.run_id, state="error", output="")
LOGGER.error(f"error while executing run {run.run_id}: {ex}")
raise ex
async def _cancel_all_tasks(self) -> None:
for _, run in self._run_map.items():
if run.exec_task.done():
continue
run.exec_task.cancel()
await run.exec_task
async def _handel_events(self) -> None: async def _handel_events(self) -> None:
try: try:
while True: while True:
# fetch all events not handeled yet # fetch all events not handeled yet
events = list(await self._rc.get_events(run_id=None, action=None, limit=None, event_status="pending")) events = list(await self._rc.get_events(run_id=None, action=None, limit=None, event_status="pending"))
if self._cur_run: for run_id in list(self._run_map.keys()):
run = self._run_map[run_id]
# check if run finished and cleanup map
if run.exec_task.done():
run = self._run_map.pop(run_id)
await run.exec_task
LOGGER.debug(f"removed run {run_id} from run_map")
assert run_id not in self._run_map
continue
# only fecth events in case run is not finished yet
run_events = list( run_events = list(
await self._rc.get_events( await self._rc.get_events(
run_id=self._cur_run.id, run_id=run_id,
action=None, action=None,
limit=None, limit=None,
event_status="pending", event_status="pending",
) )
) )
events.extend(run_events) events.extend(run_events)
LOGGER.debug(f"events fetched ({len(events)}): {events}")
# handel the fetched events # handel the fetched events
for event in events: for event in events:
event_id = event["id"] event_id = event["id"]
self._console.log(f"try to handel event {event}") run_id = event["run_id"] if event["run_id"] else None
LOGGER.debug(f"try to handel event {event}")
event_status = "completed" event_status = "completed"
match event["action"]: match event["action"]:
case "kill": case "kill":
if not self._cur_run or not self._cur_run.exec_task: if run_id and not run_id in self._run_map:
event_status = "cancelled" event_status = "cancelled"
else: else:
self._cur_run.cancelled = True run = self._run_map[run_id]
self._cur_run.exec_task.cancel() run.exec_task.cancel()
await run.exec_task
LOGGER.debug(f"executed kill to cancel execution of run {run_id}")
case "heartbeat": case "heartbeat":
await self._rc.send_heartbeat() await self._rc.send_heartbeat()
LOGGER.debug(f"send heartbeat")
case "start_run": case "start_run":
to_fetch_id = event["run_id"] if event["run_id"] else None if not run_id or run_id in self._run_map:
if not to_fetch_id: LOGGER.debug(f"cannot start run, no run id or run with given id is being executed")
event_status = "cancelled" event_status = "cancelled"
else: else:
await self._to_run_queue.put(to_fetch_id) run = await self._prepare_run(run_id=run_id)
run.exec_task = asyncio.create_task(self._start_run(run=run))
self._run_map[run_id] = run
LOGGER.debug(f"started execution of run {run_id}")
case "simulation_status": case "simulation_status":
event_status = "cancelled" if not run_id or not run_id in self._run_map:
run_id = event["run_id"] if event["run_id"] else None
if not self._cur_run or not self._cur_run.exec_task or not run_id:
event_status = "cancelled" event_status = "cancelled"
else: else:
# TODO: implement run = self._run_map[run_id]
self._console.log("handling of the simulation_status event is not implemented yet") await run.runner.sigusr1()
LOGGER.debug(f"send sigusr1 to run {run_id}")
await self._rc.update_runner_event( await self._rc.update_runner_event(
event_id=event_id, event_status=event_status, action=None, run_id=None event_id=event_id, event_status=event_status, action=None, run_id=None
) )
self._console.log(f"handeled event {event}") LOGGER.info(f"handeled event {event_id}")
await asyncio.sleep(3) await asyncio.sleep(3)
except asyncio.CancelledError: except asyncio.CancelledError:
pass await self._cancel_all_tasks()
async def _execute_run(self) -> None: except Exception as exc:
try: await self._cancel_all_tasks()
while True: LOGGER.error(f"an error occured while running: {exc}")
to_fetch_id = await self._to_run_queue.get() raise exc
self._cur_run = await self._fetch_and_assemble_run(to_fetch_run_id=to_fetch_id)
assert self._cur_run
try:
self._console.log(f"Starting run {self._cur_run.id}")
await self._rc.update_run(self._cur_run.id, "running", "")
executor = command_executor.LocalExecutor()
runner = simulation_executor.SimulationSimpleRunner(executor, self._cur_run.inst, runset().verbose)
await runner.prepare()
listeners = [] async def run(self) -> None:
for sim in self._cur_run.inst.simulation.all_simulators(): LOGGER.info("STARTED RUNNER")
listener = ConsoleLineListener(rc=self._rc, run_id=self._cur_run.id) LOGGER.debug(
runner.add_listener(sim, listener) f" runner params: base_url={self._base_url}, workdir={self._workdir}, namespace={self._namespace}, _ident={self._ident}"
listeners.append((sim.name, listener)) )
simulation_task = asyncio.create_task(runner.run())
self._cur_run.exec_task = simulation_task
res = await simulation_task
output_path = self._cur_run.inst.get_simulation_output_path()
res.dump(outpath=output_path)
if self._cur_run.inst.create_artifact:
art.create_artifact(
artifact_name=self._cur_run.inst.artifact_name,
paths_to_include=self._cur_run.inst.artifact_paths,
)
await self._sb_client.set_run_artifact(self._cur_run.id, self._cur_run.inst.artifact_name)
status = "error" if res.failed() else "completed"
await self._rc.update_run(self._cur_run.id, status, output="")
self._console.log(f"Finished run {self._cur_run.id}")
except Exception as err:
if self._cur_run:
status = "cancelled" if self._cur_run.cancelled else "error"
await self._rc.update_run(self._cur_run.id, state=status, output="")
self._console.log(f"stopped execution of run {self._cur_run.id} {status}")
else:
raise err
self._cur_run = None
except asyncio.CancelledError: # execute_runs_task = asyncio.create_task(self._execute_run())
if self._cur_run: # handel_events_task = asyncio.create_task(self._handel_events())
await self._rc.update_run(self._cur_run.id, "cancelled", output="") try:
return await self._handel_events()
except Exception as error: # _, pending = await asyncio.wait(
self._console.log(f"encountered fatal error: {error}") # [execute_runs_task, handel_events_task], return_when=asyncio.FIRST_COMPLETED
# )
# map(lambda t: t.cancel(), pending)
except Exception as exc:
LOGGER.error(f"fatal error {exc}")
sys.exit(1) sys.exit(1)
# execute_runs_task.cancel()
# handel_events_task.cancel()
async def run(self) -> None: LOGGER.info("TERMINATED RUNNER")
with self._console.status(f"[bold green]Waiting for valid run...") as status:
execute_runs_task = asyncio.create_task(self._execute_run())
handel_events_task = asyncio.create_task(self._handel_events())
try:
_, pending = await asyncio.wait(
[execute_runs_task, handel_events_task], return_when=asyncio.FIRST_COMPLETED
)
map(lambda t: t.cancel(), pending)
except asyncio.CancelledError:
execute_runs_task.cancel()
handel_events_task.cancel()
async def amain(): async def amain():
...@@ -248,6 +289,16 @@ async def amain(): ...@@ -248,6 +289,16 @@ async def amain():
await runner.run() await runner.run()
def setup_logger() -> logging.Logger:
level = runset().log_level
logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
return logger
LOGGER = setup_logger()
def main(): def main():
asyncio.run(amain()) asyncio.run(amain())
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
from functools import lru_cache from functools import lru_cache
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
class RunnerSettings(BaseSettings): class RunnerSettings(BaseSettings):
base_url: str = "https://app.simbricks.io/api" base_url: str = "https://app.simbricks.io/api"
auth_client_id: str = "api.auth.simbricks.io" auth_client_id: str = "api.auth.simbricks.io"
...@@ -33,8 +34,9 @@ class RunnerSettings(BaseSettings): ...@@ -33,8 +34,9 @@ class RunnerSettings(BaseSettings):
runner_id: int = 1 runner_id: int = 1
verbose: bool = True verbose: bool = True
log_level: str = "DEBUG"
@lru_cache @lru_cache
def runner_settings() -> RunnerSettings: def runner_settings() -> RunnerSettings:
return RunnerSettings(_env_file="runner.env", _env_file_encoding="utf-8") return RunnerSettings(_env_file="runner.env", _env_file_encoding="utf-8")
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment