Commit 574d4ccb authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

implement Ctrl+C handling for LocalParallelRuntime

parent 8b8280b7
...@@ -97,12 +97,15 @@ class LocalParallelRuntime(Runtime): ...@@ -97,12 +97,15 @@ class LocalParallelRuntime(Runtime):
"""Runs with no prerequesite runs.""" """Runs with no prerequesite runs."""
self.runs_prereq: tp.List[Run] = [] self.runs_prereq: tp.List[Run] = []
"""Runs with prerequesite runs.""" """Runs with prerequesite runs."""
self.complete = set() self.complete: tp.Set[Run] = set()
self.cores = cores self.cores = cores
self.mem = mem self.mem = mem
self.verbose = verbose self.verbose = verbose
self.executor = executor self.executor = executor
self._pending_jobs: tp.Set[asyncio.Task] = set()
self._starter_task: asyncio.Task
def add_run(self, run: Run): def add_run(self, run: Run):
if run.experiment.resreq_cores() > self.cores: if run.experiment.resreq_cores() > self.cores:
raise Exception('Not enough cores available for run') raise Exception('Not enough cores available for run')
...@@ -117,13 +120,19 @@ class LocalParallelRuntime(Runtime): ...@@ -117,13 +120,19 @@ class LocalParallelRuntime(Runtime):
async def do_run(self, run: Run): async def do_run(self, run: Run):
"""Actually executes `run`.""" """Actually executes `run`."""
runner = ExperimentSimpleRunner( try:
self.executor, run.experiment, run.env, self.verbose runner = ExperimentSimpleRunner(
) self.executor, run.experiment, run.env, self.verbose
await run.prep_dirs(executor=self.executor) )
await runner.prepare() await run.prep_dirs(executor=self.executor)
await runner.prepare()
except asyncio.CancelledError:
# it is safe to just exit here because we are not running any
# simulators yet
return
print('starting run ', run.name()) print('starting run ', run.name())
run.output = await runner.run() run.output = await runner.run() # already handles CancelledError
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w', encoding='utf-8') as f: with open(run.outpath, 'w', encoding='utf-8') as f:
...@@ -133,10 +142,10 @@ class LocalParallelRuntime(Runtime): ...@@ -133,10 +142,10 @@ class LocalParallelRuntime(Runtime):
async def wait_completion(self): async def wait_completion(self):
"""Wait for any run to terminate and return.""" """Wait for any run to terminate and return."""
assert self.pending_jobs assert self._pending_jobs
done, self.pending_jobs = await asyncio.wait( done, self._pending_jobs = await asyncio.wait(
self.pending_jobs, return_when=asyncio.FIRST_COMPLETED self._pending_jobs, return_when=asyncio.FIRST_COMPLETED
) )
for run in done: for run in done:
...@@ -174,16 +183,15 @@ class LocalParallelRuntime(Runtime): ...@@ -174,16 +183,15 @@ class LocalParallelRuntime(Runtime):
#self.completions = asyncio.Queue() #self.completions = asyncio.Queue()
self.cores_used = 0 self.cores_used = 0
self.mem_used = 0 self.mem_used = 0
self.pending_jobs = set()
runs = self.runs_noprereq + self.runs_prereq runs = self.runs_noprereq + self.runs_prereq
for run in runs: for run in runs:
# check if we first have to wait for memory or cores # if necessary, wait for enough memory or cores
while not self.enough_resources(run): while not self.enough_resources(run):
print('waiting for resources') print('waiting for resources')
await self.wait_completion() await self.wait_completion()
# check if we first have to wait for memory or cores # if necessary, wait for prerequesite runs to complete
while not self.prereq_ready(run): while not self.prereq_ready(run):
print('waiting for prereq') print('waiting for prereq')
await self.wait_completion() await self.wait_completion()
...@@ -191,17 +199,23 @@ class LocalParallelRuntime(Runtime): ...@@ -191,17 +199,23 @@ class LocalParallelRuntime(Runtime):
self.cores_used += run.experiment.resreq_cores() self.cores_used += run.experiment.resreq_cores()
self.mem_used += run.experiment.resreq_mem() self.mem_used += run.experiment.resreq_mem()
job = self.do_run(run) job = asyncio.create_task(self.do_run(run))
self.pending_jobs.add(job) self._pending_jobs.add(job)
# wait for all runs to finish # wait for all runs to finish
while self.pending_jobs: await asyncio.wait(self._pending_jobs)
await self.wait_completion()
async def start(self): async def start(self):
"""Execute all defined runs.""" """Execute all defined runs."""
asyncio.run(self.do_start()) self._starter_task = asyncio.create_task(self.do_start())
try:
await self._starter_task
except asyncio.CancelledError:
for job in self._pending_jobs:
job.cancel()
# wait for all runs to finish
await asyncio.wait(self._pending_jobs)
def interrupt(self): def interrupt(self):
return super().interrupt() super().interrupt()
# TODO implement this self._starter_task.cancel()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment