"...composable_kernel.git" did not exist on "792b62dd6fc9a0649bbf5b14c42caa684e000b8d"
Commit 1efbda52 authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

implement Ctrl+C handling for SlurmRuntime

parent 574d4ccb
...@@ -51,6 +51,8 @@ class Run(object): ...@@ -51,6 +51,8 @@ class Run(object):
self.outpath = outpath self.outpath = outpath
self.output: tp.Optional[ExpOutput] = None self.output: tp.Optional[ExpOutput] = None
self.prereq = prereq self.prereq = prereq
self.job_id: tp.Optional[int] = None
"""Slurm job id."""
def name(self): def name(self):
return self.experiment.name + '.' + str(self.index) return self.experiment.name + '.' + str(self.index)
......
...@@ -20,10 +20,12 @@ ...@@ -20,10 +20,12 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import os import os
import pathlib import pathlib
import pickle import pickle
import re import re
import typing as tp
from simbricks.runtime.common import Run, Runtime from simbricks.runtime.common import Run, Runtime
...@@ -32,12 +34,14 @@ class SlurmRuntime(Runtime): ...@@ -32,12 +34,14 @@ class SlurmRuntime(Runtime):
def __init__(self, slurmdir, args, verbose=False, cleanup=True): def __init__(self, slurmdir, args, verbose=False, cleanup=True):
super().__init__() super().__init__()
self.runnable = [] self.runnable: tp.List[Run] = []
self.slurmdir = slurmdir self.slurmdir = slurmdir
self.args = args self.args = args
self.verbose = verbose self.verbose = verbose
self.cleanup = cleanup self.cleanup = cleanup
self._start_task: asyncio.Task
def add_run(self, run: Run): def add_run(self, run: Run):
self.runnable.append(run) self.runnable.append(run)
...@@ -88,7 +92,7 @@ class SlurmRuntime(Runtime): ...@@ -88,7 +92,7 @@ class SlurmRuntime(Runtime):
return exp_script return exp_script
async def start(self): async def _do_start(self):
pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True) pathlib.Path(self.slurmdir).mkdir(parents=True, exist_ok=True)
jid_re = re.compile(r'Submitted batch job ([0-9]+)') jid_re = re.compile(r'Submitted batch job ([0-9]+)')
...@@ -111,6 +115,23 @@ class SlurmRuntime(Runtime): ...@@ -111,6 +115,23 @@ class SlurmRuntime(Runtime):
m = jid_re.search(output) m = jid_re.search(output)
run.job_id = int(m.group(1)) run.job_id = int(m.group(1))
async def start(self):
self._start_task = asyncio.create_task(self._do_start())
try:
await self._start_task
except asyncio.CancelledError:
# stop all runs that have already been scheduled
# (existing slurm job id)
job_ids = []
for run in self.runnable:
if run.job_id:
job_ids.append(str(run.job_id))
scancel_process = await asyncio.create_subprocess_shell(
f"scancel {' '.join(job_ids)}"
)
await scancel_process.wait()
def interrupt(self): def interrupt(self):
return super().interrupt() super().interrupt()
# TODO implement this self._start_task.cancel()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment