Commit 1ff4e4c9 authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

fix orchestration framework to run on Python3.11

Starting with Python3.11, asyncio.wait() doesn't accept coroutines anymore. Instead, we have to wrap every coroutine in a tasks.
parent 7a091d47
...@@ -100,14 +100,14 @@ class Component(object): ...@@ -100,14 +100,14 @@ class Component(object):
return return
async def _waiter(self): async def _waiter(self):
out_handlers = asyncio.ensure_future( stdout_handler = asyncio.create_task(
asyncio.wait([ self._read_stream(self._proc.stdout, self._consume_out)
self._read_stream(self._proc.stdout, self._consume_out), )
self._read_stream(self._proc.stderr, self._consume_err) stderr_handler = asyncio.create_task(
]) self._read_stream(self._proc.stderr, self._consume_err)
) )
rc = await self._proc.wait() rc = await self._proc.wait()
await out_handlers await asyncio.wait([stdout_handler, stderr_handler])
await self.terminated(rc) await self.terminated(rc)
async def send_input(self, bs, eof=False): async def send_input(self, bs, eof=False):
...@@ -158,21 +158,29 @@ class Component(object): ...@@ -158,21 +158,29 @@ class Component(object):
"""Attempts to stop this component by sending signals in the following """Attempts to stop this component by sending signals in the following
order: interrupt, terminate, kill.""" order: interrupt, terminate, kill."""
await self.interrupt() await self.interrupt()
_, pending = await asyncio.wait([self._proc.wait()], timeout=delay) try:
if len(pending) != 0: await asyncio.wait_for(self._proc.wait(), delay)
return
except TimeoutError:
print( print(
f'terminating component {self.cmd_parts[0]} ' f'terminating component {self.cmd_parts[0]} '
f'pid {self._proc.pid}' f'pid {self._proc.pid}',
flush=True
) )
await self.terminate() await self.terminate()
_, pending = await asyncio.wait([self._proc.wait()], timeout=delay)
if len(pending) != 0: try:
print( await asyncio.wait_for(self._proc.wait(), delay)
f'killing component {self.cmd_parts[0]} ' return
f'pid {self._proc.pid}' except TimeoutError:
) print(
await self.kill() f'killing component {self.cmd_parts[0]} '
await self._proc.wait() f'pid {self._proc.pid}',
flush=True
)
await self.kill()
await self._proc.wait()
async def started(self): async def started(self):
pass pass
...@@ -339,7 +347,9 @@ class Executor(object): ...@@ -339,7 +347,9 @@ class Executor(object):
async def await_files(self, paths, *args, **kwargs): async def await_files(self, paths, *args, **kwargs):
xs = [] xs = []
for p in paths: for p in paths:
xs.append(self.await_file(p, *args, **kwargs)) waiter = asyncio.create_task(self.await_file(p, *args, **kwargs))
xs.append(waiter)
await asyncio.wait(xs) await asyncio.wait(xs)
......
...@@ -125,7 +125,9 @@ class ExperimentBaseRunner(ABC): ...@@ -125,7 +125,9 @@ class ExperimentBaseRunner(ABC):
if self.verbose: if self.verbose:
print('preparing config tar:', path) print('preparing config tar:', path)
host.node_config.make_tar(path) host.node_config.make_tar(path)
copies.append(self.sim_executor(host).send_file(path, self.verbose)) executor = self.sim_executor(host)
task = asyncio.create_task(executor.send_file(path, self.verbose))
copies.append(task)
await asyncio.wait(copies) await asyncio.wait(copies)
# prepare all simulators in parallel # prepare all simulators in parallel
...@@ -133,11 +135,12 @@ class ExperimentBaseRunner(ABC): ...@@ -133,11 +135,12 @@ class ExperimentBaseRunner(ABC):
for sim in self.exp.all_simulators(): for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env)) prep_cmds = list(sim.prep_cmds(self.env))
executor = self.sim_executor(sim) executor = self.sim_executor(sim)
sims.append( task = asyncio.create_task(
executor.run_cmdlist( executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose 'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
) )
) )
sims.append(task)
await asyncio.wait(sims) await asyncio.wait(sims)
async def wait_for_sims(self): async def wait_for_sims(self):
...@@ -156,14 +159,14 @@ class ExperimentBaseRunner(ABC): ...@@ -156,14 +159,14 @@ class ExperimentBaseRunner(ABC):
ts.prepare() ts.prepare()
while ts.is_active(): while ts.is_active():
# start ready simulators in parallel # start ready simulators in parallel
starts = [] starting = []
sims = [] sims = []
for sim in ts.get_ready(): for sim in ts.get_ready():
starts.append(self.start_sim(sim)) starting.append(asyncio.create_task(self.start_sim(sim)))
sims.append(sim) sims.append(sim)
# wait for starts to complete # wait for starts to complete
await asyncio.wait(starts) await asyncio.wait(starting)
for sim in sims: for sim in sims:
ts.done(sim) ts.done(sim)
...@@ -190,7 +193,7 @@ class ExperimentBaseRunner(ABC): ...@@ -190,7 +193,7 @@ class ExperimentBaseRunner(ABC):
# "interrupt, terminate, kill" all processes # "interrupt, terminate, kill" all processes
scs = [] scs = []
for _, sc in self.running: for _, sc in self.running:
scs.append(sc.int_term_kill()) scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.wait(scs) await asyncio.wait(scs)
# wait for all processes to terminate # wait for all processes to terminate
...@@ -200,7 +203,7 @@ class ExperimentBaseRunner(ABC): ...@@ -200,7 +203,7 @@ class ExperimentBaseRunner(ABC):
# remove all sockets # remove all sockets
scs = [] scs = []
for (executor, sock) in self.sockets: for (executor, sock) in self.sockets:
scs.append(executor.rmtree(sock)) scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs: if scs:
await asyncio.wait(scs) await asyncio.wait(scs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment