Commit 232c94f8 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: add remote execution components

parent 6a57ee10
...@@ -21,9 +21,10 @@ ...@@ -21,9 +21,10 @@
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio import asyncio
import shlex
import os import os
import pathlib import pathlib
import re
import shlex
import shutil import shutil
import signal import signal
...@@ -154,22 +155,6 @@ class Component(object): ...@@ -154,22 +155,6 @@ class Component(object):
async def process_err(self, lines, eof): async def process_err(self, lines, eof):
pass pass
class RemoteComp(Component):
def __init__(self, host, cmd_parts, cwd=None, **kwargs):
if cwd is not None:
cmd_parts = ['cd', cwd, '&&',
'(' + (' '.join(map(shlex.quote, cmd_parts))) + ')']
parts = [
'ssh',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no',
host.name,
'--'] + cmd_parts
#print(parts)
super().__init__(parts, **kwargs)
class SimpleComponent(Component): class SimpleComponent(Component):
def __init__(self, label, cmd_parts, verbose=True, canfail=False, def __init__(self, label, cmd_parts, verbose=True, canfail=False,
...@@ -196,6 +181,78 @@ class SimpleComponent(Component): ...@@ -196,6 +181,78 @@ class SimpleComponent(Component):
if not self.canfail and rc != 0: if not self.canfail and rc != 0:
raise Exception('Command Failed: ' + str(self.cmd_parts)) raise Exception('Command Failed: ' + str(self.cmd_parts))
class SimpleRemoteComponent(SimpleComponent):
def __init__(self, host_name, label, cmd_parts, cwd=None, *args, **kwargs):
self.host_name = host_name
# add a wrapper to print the PID
remote_parts = ['echo', 'PID', '$$', '&&']
if cwd is not None:
# if necessary add a CD command
remote_parts += ['cd', cwd, '&&']
# escape actual command parts
cmd_parts = list(map(shlex.quote, cmd_parts))
# use exec to make sure the command we run keeps the PIDS
remote_parts += ['exec'] + cmd_parts
# wrap up command in ssh invocation
parts = self._ssh_cmd(remote_parts)
super().__init__(label, parts, *args, **kwargs)
def _ssh_cmd(self, parts):
""" SSH invocation of command for this host. """
return [
'ssh',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no',
self.host_name,
'--'] + parts
async def start(self):
""" Start this command (includes waiting for its pid. """
self.pid_fut = asyncio.get_running_loop().create_future()
await super().start()
await self.pid_fut
async def process_out(self, lines, eof):
""" Scans output and set PID future once PID line found. """
if not self.pid_fut.done():
newlines = []
pid_re = re.compile(r'^PID\s+(\d+)\s*$')
for l in lines:
m = pid_re.match(l)
if m:
pid = int(m.group(1))
self.pid_fut.set_result(pid)
else:
newlines.append(l)
lines = newlines
if eof and not self.pid_fut.done():
# cancel PID future if it's not going to happen
print('PID not found but EOF already found:', self.label)
self.pid_fut.cancel()
await super().process_out(lines, eof)
async def _kill_cmd(self, sig):
""" Send signal to command by running ssh kill -$sig $PID. """
cmd_parts = self._ssh_cmd(['kill', '-' + sig,
str(self.pid_fut.result())])
proc = await asyncio.create_subprocess_exec(*cmd_parts)
await proc.wait()
async def interrupt(self):
await self._kill_cmd('INT')
async def terminate(self):
await self._kill_cmd('TERM')
async def kill(self):
await self._kill_cmd('KILL')
class Executor(object): class Executor(object):
def create_component(self, label, parts, **kwargs): def create_component(self, label, parts, **kwargs):
...@@ -241,3 +298,46 @@ class LocalExecutor(Executor): ...@@ -241,3 +298,46 @@ class LocalExecutor(Executor):
async def rmtree(self, path, verbose=False): async def rmtree(self, path, verbose=False):
shutil.rmtree(path, ignore_errors=True) shutil.rmtree(path, ignore_errors=True)
class RemoteExecutor(Executor):
def __init__(self, host_name, workdir):
self.host_name = host_name
self.cwd = workdir
def create_component(self, label, parts, **kwargs):
return SimpleRemoteComponent(self.host_name, label, parts,
cwd=self.cwd, **kwargs)
async def await_file(self, path, delay=0.05, verbose=False):
loop_cmd = 'while [ ! -e %s ] ; do sleep %f ; done' % (path, delay)
parts = ['/bin/sh', '-c', loop_cmd]
sc = self.create_component("%s.await_file('%s')" % (self.host_name,
path), parts, canfail=False, verbose=verbose)
await sc.start()
await sc.wait()
async def send_file(self, path, verbose):
parts = [
'scp',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no',
path,
'%s:%s' % (self.host_name, path)]
sc = SimpleComponent("%s.send_file('%s')" % (
self.host_name, path), parts, canfail=False, verbose=verbose)
await sc.start()
await sc.wait()
async def mkdir(self, path, verbose=False):
sc = self.create_component("%s.mkdir('%s')" % (self.host_name, path),
['mkdir', '-p', path], canfail=False, verbose=verbose)
await sc.start()
await sc.wait()
async def rmtree(self, path, verbose=False):
sc = self.create_component("%s.rmtree('%s')" % (self.host_name, path),
['rm', '-rf', path], canfail=False, verbose=verbose)
await sc.start()
await sc.wait()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment