Unverified Commit 6eedf111 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

symphony/runner: runner handels events + non buffering handling of command line output

parent ae8dadcd
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import asyncio import asyncio
import json import json
import pathlib import pathlib
...@@ -33,106 +34,218 @@ from simbricks import client ...@@ -33,106 +34,218 @@ from simbricks import client
from .settings import runner_settings as runset from .settings import runner_settings as runset
from simbricks.utils import artifatcs as art from simbricks.utils import artifatcs as art
verbose = True
class ConsoleLineListener(command_executor.OutputListener):
# TODO: FIXME, create a custom listener for the runner to register + create backend endpoint to update the output etc.
async def periodically_update(rc: client.RunnerClient, run_id: int, # TODO: make actually use of this
listeners: list[tuple[str, command_executor.LegacyOutputListener]]) -> None: def __init__(self, rc: client.RunnerClient, run_id: int, prefix: str = ""):
try: super().__init__()
while True: self._prefix: str = prefix
all_out: list[str] = [] self._rc: client.RunnerClient = rc
sim_outs: list[tuple[str, bool, str]] = [] self._run_id: int = run_id
for listener in listeners:
all_out.extend(listener[1].merged_output) async def handel_out(self, lines: list[str]) -> None:
listener[1].merged_output = [] if len(lines) < 1:
sim_outs.append((listener[0], False, listener[1].stdout)) return
sim_outs.append((listener[0], True, listener[1].stderr)) await self._rc.send_out(self._run_id, self._prefix, False, lines)
listener[1].stdout = []
listener[1].stderr = [] async def handel_err(self, lines: list[str]) -> None:
if len(lines) < 1:
if len(all_out) > 0: return
#print(all_out) await self._rc.send_out(self._run_id, self._prefix, True, lines)
await rc.update_run(run_id, "running", json.dumps(all_out))
def toJSON(self) -> dict:
for sim_out in sim_outs: json_obj = super().toJSON()
if len(sim_out[2]) > 0: json_obj.update({"prefix": self._prefix})
await rc.send_out(run_id, sim_out[0], sim_out[1], sim_out[2]) return json_obj
await asyncio.sleep(0.5)
class Runner:
except asyncio.CancelledError:
pass class Run:
def __init__(self, ident: int, inst: inst_base.Instantiation):
async def run_instantiation(sc: client.SimBricksClient, rc: client.RunnerClient, run_id: int, inst: inst_base.Instantiation) -> None: self.id: int = ident
self.inst: inst_base.Instantiation = inst
await rc.update_run(run_id, "running", "") self.exec_task: asyncio.Task | None = None
self.cancelled: bool = False
executor = command_executor.LocalExecutor()
runner = simulation_executor.SimulationSimpleRunner(executor, inst, verbose) def __init__(self, base_url: str, workdir: str, namespace: str, ident: int):
await runner.prepare() self._base_url: str = base_url
self._workdir: pathlib.Path = pathlib.Path(workdir).resolve()
listeners = [] self._namespace: str = namespace
for sim in inst.simulation.all_simulators(): self._ident: int = ident
listener = command_executor.LegacyOutputListener() self._base_client = client.BaseClient(base_url=base_url)
runner.add_listener(sim, listener) self._namespace_client = client.NSClient(base_client=self._base_client, namespace=namespace)
listeners.append((sim.name, listener)) self._sb_client = client.SimBricksClient(self._namespace_client)
self._rc = client.RunnerClient(self._namespace_client, ident)
update_task = asyncio.create_task(periodically_update(rc=rc, run_id=run_id, listeners=listeners))
d, p = await asyncio.wait([asyncio.create_task(runner.run()), update_task], return_when=asyncio.FIRST_COMPLETED) self._console = Console()
for pending in p: self._to_run_queue: asyncio.Queue = asyncio.Queue() # queue of run ids to run next
pending.cancel() self._cur_run: Runner.Run | None = None # currently executed run
output_path = inst.get_simulation_output_path() async def _fetch_and_assemble_run(self, to_fetch_run_id: int) -> Run:
list(d)[0].result().dump(outpath=output_path) run_obj_list = await self._rc.filter_get_runs(run_id=to_fetch_run_id, state="pending")
if not run_obj_list or len(run_obj_list) != 1:
if inst.create_artifact: raise Exception(f"could not fetch run with id {to_fetch_run_id} that is still 'pending'")
art.create_artifact( run_obj = run_obj_list[0]
artifact_name=inst.artifact_name, paths_to_include=inst.artifact_paths
) run_id = to_fetch_run_id
await sc.set_run_artifact(run_id, inst.artifact_name) self._console.log(f"Preparing run {run_id}")
await rc.update_run(run_id, "completed", json.dumps(list(d)[0].result().toJSON())) run_workdir = self._workdir / f"run-{run_id}"
run_workdir.mkdir(parents=True)
inst_obj = await self._sb_client.get_instantiation(run_obj["instantiation_id"])
sim_obj = await self._sb_client.get_simulation(inst_obj["simulation_id"])
sys_obj = await self._sb_client.get_system(sim_obj["system_id"])
system = sys_base.System.fromJSON(json.loads(sys_obj["sb_json"]))
simulation = sim_base.Simulation.fromJSON(system, json.loads(sim_obj["sb_json"]))
env = inst_base.InstantiationEnvironment(workdir=run_workdir) # TODO
inst = inst_base.Instantiation(sim=simulation)
inst.env = env
inst.preserve_tmp_folder = False
inst.create_checkpoint = True
# inst.artifact_paths = [f"{run_workdir}/output"] # create an artifact
inst.artifact_paths = [] # create NO artifact
run = Runner.Run(ident=run_id, inst=inst)
return run
async def _handel_events(self) -> None:
try:
while True:
# fetch all events not handeled yet
events = list(await self._rc.get_events(run_id=None, action=None, limit=None, event_status="pending"))
if self._cur_run:
run_events = list(
await self._rc.get_events(
run_id=self._cur_run.id,
action=None,
limit=None,
event_status="pending",
)
)
events.extend(run_events)
# handel the fetched events
for event in events:
event_id = event["id"]
self._console.log(f"try to handel event {event}")
event_status = "completed"
match event["action"]:
case "kill":
if not self._cur_run or not self._cur_run.exec_task:
event_status = "cancelled"
else:
self._cur_run.cancelled = True
self._cur_run.exec_task.cancel()
case "heartbeat":
await self._rc.send_heartbeat()
case "start_run":
to_fetch_id = event["run_id"] if event["run_id"] else None
if not to_fetch_id:
event_status = "cancelled"
else:
await self._to_run_queue.put(to_fetch_id)
case "simulation_status":
event_status = "cancelled"
run_id = event["run_id"] if event["run_id"] else None
if not self._cur_run or not self._cur_run.exec_task or not run_id:
event_status = "cancelled"
else:
# TODO: implement
self._console.log("handling of the simulation_status event is not implemented yet")
await self._rc.update_runner_event(
event_id=event_id, event_status=event_status, action=None, run_id=None
)
self._console.log(f"handeled event {event}")
await asyncio.sleep(3)
except asyncio.CancelledError:
pass
async def _execute_run(self) -> None:
try:
while True:
to_fetch_id = await self._to_run_queue.get()
self._cur_run = await self._fetch_and_assemble_run(to_fetch_run_id=to_fetch_id)
assert self._cur_run
try:
self._console.log(f"Starting run {self._cur_run.id}")
await self._rc.update_run(self._cur_run.id, "running", "")
executor = command_executor.LocalExecutor()
runner = simulation_executor.SimulationSimpleRunner(executor, self._cur_run.inst, runset().verbose)
await runner.prepare()
listeners = []
for sim in self._cur_run.inst.simulation.all_simulators():
listener = ConsoleLineListener(rc=self._rc, run_id=self._cur_run.id)
runner.add_listener(sim, listener)
listeners.append((sim.name, listener))
simulation_task = asyncio.create_task(runner.run())
self._cur_run.exec_task = simulation_task
res = await simulation_task
output_path = self._cur_run.inst.get_simulation_output_path()
res.dump(outpath=output_path)
if self._cur_run.inst.create_artifact:
art.create_artifact(
artifact_name=self._cur_run.inst.artifact_name,
paths_to_include=self._cur_run.inst.artifact_paths,
)
await self._sb_client.set_run_artifact(self._cur_run.id, self._cur_run.inst.artifact_name)
status = "error" if res.failed() else "completed"
await self._rc.update_run(self._cur_run.id, status, output="")
self._console.log(f"Finished run {self._cur_run.id}")
except Exception as err:
if self._cur_run:
status = "cancelled" if self._cur_run.cancelled else "error"
await self._rc.update_run(self._cur_run.id, state=status, output="")
self._console.log(f"stopped execution of run {self._cur_run.id} {status}")
else:
raise err
self._cur_run = None
except asyncio.CancelledError:
if self._cur_run:
await self._rc.update_run(self._cur_run.id, "cancelled", output="")
return
except Exception as error:
self._console.log(f"encountered fatal error: {error}")
sys.exit(1)
async def run(self) -> None:
with self._console.status(f"[bold green]Waiting for valid run...") as status:
execute_runs_task = asyncio.create_task(self._execute_run())
handel_events_task = asyncio.create_task(self._handel_events())
try:
_, pending = await asyncio.wait(
[execute_runs_task, handel_events_task], return_when=asyncio.FIRST_COMPLETED
)
map(lambda t: t.cancel(), pending)
except asyncio.CancelledError:
execute_runs_task.cancel()
handel_events_task.cancel()
async def amain(): async def amain():
base_client = client.BaseClient(base_url=runset().base_url) runner = Runner(
namespace_client = client.NSClient(base_client=base_client, namespace=runset().namespace) base_url=runset().base_url,
sb_client = client.SimBricksClient(namespace_client) workdir=pathlib.Path("./runner-work").resolve(),
rc = client.RunnerClient(namespace_client, runset().runner_id) namespace=runset().namespace,
ident=runset().runner_id,
workdir = pathlib.Path("./runner-work").resolve() )
console = Console() await runner.run()
with console.status(f"[bold green]Waiting for valid run...") as status:
while True:
run_obj = await rc.next_run()
if not run_obj:
await asyncio.sleep(5)
continue
run_id = run_obj["id"]
console.log(f"Preparing run {run_id}")
run_workdir = workdir / f"run-{run_id}"
run_workdir.mkdir(parents=True)
inst_obj = await sb_client.get_instantiation(run_obj["instantiation_id"])
sim_obj = await sb_client.get_simulation(inst_obj["simulation_id"])
sys_obj = await sb_client.get_system(sim_obj["system_id"])
system = sys_base.System.fromJSON(json.loads(sys_obj["sb_json"]))
simulation = sim_base.Simulation.fromJSON(system, json.loads(sim_obj["sb_json"]))
# TODO: set from args
env = inst_base.InstantiationEnvironment(workdir=run_workdir)
inst = inst_base.Instantiation(sim=simulation)
inst.env = env
inst.preserve_tmp_folder = False
inst.create_checkpoint = True
inst.artifact_paths = [f"{run_workdir}/output"]
console.log(f"Starting run {run_id}")
await run_instantiation(sb_client, rc, run_id, inst)
console.log(f"Finished run {run_id}")
def main(): def main():
......
...@@ -30,7 +30,9 @@ class RunnerSettings(BaseSettings): ...@@ -30,7 +30,9 @@ class RunnerSettings(BaseSettings):
auth_dev_url: str = "https://auth.simbricks.io/realms/SimBricks/protocol/openid-connect/auth/device" auth_dev_url: str = "https://auth.simbricks.io/realms/SimBricks/protocol/openid-connect/auth/device"
namespace: str = "foo/bar/baz" namespace: str = "foo/bar/baz"
runner_id: int = 42 runner_id: int = 1
verbose: bool = True
@lru_cache @lru_cache
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment