Commit f1f4cc1c authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

orchestration/exectools.py: fix getting stuck on terminating components

The reason we didn't spot this earlier is that exceptions were swallowed
by the call to `asyncio.wait()` in
`runners.py:ExperimentBaseRunner.terminate_collect_sims()` when invoking
`int_term_kill()`. Instead, we now use `asyncio.gather()`, which
forwards exceptions to the calling coroutine.
parent 8b2fefc3
...@@ -148,7 +148,8 @@ class Component(object): ...@@ -148,7 +148,8 @@ class Component(object):
try: try:
await asyncio.wait_for(self._proc.wait(), delay) await asyncio.wait_for(self._proc.wait(), delay)
return return
except TimeoutError: # before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_-
except (TimeoutError, asyncio.TimeoutError):
print( print(
f'terminating component {self.cmd_parts[0]} ' f'terminating component {self.cmd_parts[0]} '
f'pid {self._proc.pid}', f'pid {self._proc.pid}',
...@@ -159,14 +160,13 @@ class Component(object): ...@@ -159,14 +160,13 @@ class Component(object):
try: try:
await asyncio.wait_for(self._proc.wait(), delay) await asyncio.wait_for(self._proc.wait(), delay)
return return
except TimeoutError: except (TimeoutError, asyncio.TimeoutError):
print( print(
f'killing component {self.cmd_parts[0]} ' f'killing component {self.cmd_parts[0]} '
f'pid {self._proc.pid}', f'pid {self._proc.pid}',
flush=True flush=True
) )
await self.kill() await self.kill()
await self._proc.wait() await self._proc.wait()
async def started(self) -> None: async def started(self) -> None:
......
...@@ -162,24 +162,24 @@ class ExperimentBaseRunner(ABC): ...@@ -162,24 +162,24 @@ class ExperimentBaseRunner(ABC):
scs = [] scs = []
for _, sc in self.running: for _, sc in self.running:
scs.append(asyncio.create_task(sc.int_term_kill())) scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.shield(asyncio.wait(scs)) await asyncio.gather(*scs)
# wait for all processes to terminate # wait for all processes to terminate
for _, sc in self.running: for _, sc in self.running:
await asyncio.shield(sc.wait()) await sc.wait()
# remove all sockets # remove all sockets
scs = [] scs = []
for (executor, sock) in self.sockets: for (executor, sock) in self.sockets:
scs.append(asyncio.create_task(executor.rmtree(sock))) scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs: if scs:
await asyncio.shield(asyncio.wait(scs)) await asyncio.wait(scs)
# add all simulator components to the output # add all simulator components to the output
for sim, sc in self.running: for sim, sc in self.running:
self.out.add_sim(sim, sc) self.out.add_sim(sim, sc)
await asyncio.shield(self.after_cleanup()) await self.after_cleanup()
return self.out return self.out
async def run(self) -> ExpOutput: async def run(self) -> ExpOutput:
...@@ -222,7 +222,8 @@ class ExperimentBaseRunner(ABC): ...@@ -222,7 +222,8 @@ class ExperimentBaseRunner(ABC):
while True: while True:
try: try:
return await asyncio.shield(terminate_collect_task) return await asyncio.shield(terminate_collect_task)
except asyncio.CancelledError: except asyncio.CancelledError as e:
print(e)
pass pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment