Commit 15cead28 authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

orchestration: prevent cancellation while terminating simulators and collecting output

parent 05bfbefb
......@@ -150,10 +150,41 @@ class ExperimentBaseRunner(ABC):
for sc in self.wait_sims:
await sc.wait()
async def terminate_collect_sims(self) -> ExpOutput:
"""Terminates all simulators and collects output."""
self.out.set_end()
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.shield(asyncio.wait(scs))
# wait for all processes to terminate
for _, sc in self.running:
await asyncio.shield(sc.wait())
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.shield(asyncio.wait(scs))
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await asyncio.shield(self.after_cleanup())
return self.out
async def run(self) -> ExpOutput:
try:
self.out.set_start()
graph = self.sim_graph()
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
......@@ -181,38 +212,18 @@ class ExperimentBaseRunner(ABC):
self.out.set_failed()
traceback.print_exc()
finally:
self.out.set_end()
# shut things back down
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.wait(scs)
# wait for all processes to terminate
for _, sc in self.running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.wait(scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
# The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output
# file.
terminate_collect_task = asyncio.create_task(
self.terminate_collect_sims()
)
# prevent terminate_collect_task from being cancelled
while True:
try:
return await asyncio.shield(terminate_collect_task)
except asyncio.CancelledError:
pass
class ExperimentSimpleRunner(ExperimentBaseRunner):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment