"...composable_kernel.git" did not exist on "c20a75b07da6053cbbd07451d4ff27a95e30212e"
Commit eca65b73 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

experiments: WIP for python run scripts

parent 1eb0180c
......@@ -2,6 +2,8 @@
*.a
*.ko
.*.cmd
*.pyc
__pycache__/
_vimrc_local.vim
dummy_nic/dummy_nic
corundum/obj_dir
......
import modes.experiments as exp
import modes.simulators as sim
import modes.nodeconfig as node
e = exp.Experiment('qemu-i40e-pair')
net = sim.SwitchNet()
e.add_network(net)
nic_a = sim.I40eNIC()
nic_a.set_network(net)
e.add_nic(nic_a)
host_a = sim.QemuHost()
host_a.name = 'server'
host_a.node_config = node.I40eLinuxNode()
host_a.node_config.ip = '10.0.0.1'
host_a.node_config.app = node.IperfTCPServer()
host_a.add_nic(nic_a)
e.add_host(host_a)
for i in range (0, 2):
nic_b = sim.I40eNIC()
nic_b.set_network(net)
e.add_nic(nic_b)
host_b = sim.QemuHost()
host_b.name = 'client.%d' % i
host_b.wait = True
host_b.node_config = node.I40eLinuxNode()
host_b.node_config.ip = '10.0.0.%d' % (2 + i)
host_b.node_config.app = node.IperfTCPClient()
host_b.add_nic(nic_b)
e.add_host(host_b)
env = exp.ExpEnv('..', './work')
out = exp.run_exp_local(e, env)
print(out.dumps())
import asyncio
import shlex
import os
import signal
class HostConfig(object):
def __init__(self, name, ip, mac, sudopwd, other={}):
self.name = name
self.ip = ip
self.used_ip = ip
self.mac = mac
self.sudo_pwd = sudopwd
self.other = other.copy()
class Component(object):
def __init__(self, cmd_parts, with_stdin=False):
self.is_ready = False
self.stdout = []
self.stdout_buf = bytearray()
self.stderr = []
self.stderr_buf = bytearray()
self.cmd_parts = cmd_parts
#print(cmd_parts)
self.with_stdin = with_stdin
def _parse_buf(self, buf, data):
if data is not None:
buf.extend(data)
lines = []
start = 0
for i in range(0, len(buf)):
if buf[i] == ord('\n'):
l = buf[start:i].decode('utf-8')
lines.append(l)
start = i + 1
del buf[0:start]
if len(data) == 0 and len(buf) > 0:
lines.append(buf.decode('utf-8'))
return lines
async def _consume_out(self, data):
eof = len(data) == 0
ls = self._parse_buf(self.stdout_buf, data)
if len(ls) > 0 or eof:
await self.process_out(ls, eof=eof)
self.stdout = self.stdout + ls
async def _consume_err(self, data):
eof = len(data) == 0
ls = self._parse_buf(self.stderr_buf, data)
if len(ls) > 0 or eof:
await self.process_err(ls, eof=eof)
self.stderr = self.stderr + ls
async def _read_stream(self, stream, fn):
while True:
bs = await stream.readline()
if bs:
await fn(bs)
else:
await fn(bs)
return
async def _waiter(self):
out_handlers = asyncio.ensure_future(asyncio.wait([
self._read_stream(self.proc.stdout, self._consume_out),
self._read_stream(self.proc.stderr, self._consume_err)]))
rc = await self.proc.wait()
await out_handlers
await self.terminated(rc)
return rc
async def send_input(self, bs, eof=False):
self.proc.stdin.write(bs)
if eof:
self.proc.stdin.close()
async def start(self):
if self.with_stdin:
stdin = asyncio.subprocess.PIPE
else:
stdin = None
self.proc = await asyncio.create_subprocess_exec(*self.cmd_parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=stdin,
)
self.terminate_future = asyncio.ensure_future(self._waiter())
await self.started()
async def wait(self):
await self.terminate_future
async def interrupt(self):
if self.terminate_future.done():
return
self.proc.send_signal(signal.SIGINT)
async def terminate(self):
if self.terminate_future.done():
return
self.proc.terminate()
async def kill(self):
self.proc.terminate()
async def int_term_kill(self, delay=5):
await self.interrupt()
_,pending = await asyncio.wait([self.terminate_future], timeout=delay)
if len(pending) != 0:
print('terminating')
await self.terminate()
_,pending = await asyncio.wait([self.terminate_future],
timeout=delay)
if len(pending) != 0:
print('killing')
await self.kill()
async def started(self):
pass
async def terminated(self, rc):
pass
async def process_out(self, lines, eof):
pass
async def process_err(self, lines, eof):
pass
class RemoteComp(Component):
def __init__(self, host, cmd_parts, cwd=None, **kwargs):
if cwd is not None:
cmd_parts = ['cd', cwd, '&&',
'(' + (' '.join(map(shlex.quote, cmd_parts))) + ')']
parts = [
'ssh',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no',
host.name,
'--'] + cmd_parts
#print(parts)
super().__init__(parts, **kwargs)
class SimpleComponent(Component):
def __init__(self, label, cmd_parts, verbose=True, canfail=False,
*args, **kwargs):
self.label = label
self.verbose = verbose
self.canfail = canfail
self.cmd_parts = cmd_parts
super().__init__(cmd_parts, *args, **kwargs)
async def process_out(self, lines, eof):
if self.verbose:
for l in lines:
print(self.label, 'OUT:', lines)
async def process_err(self, lines, eof):
if self.verbose:
for l in lines:
print(self.label, 'ERR:', lines)
async def terminated(self, rc):
if self.verbose:
print(self.label, 'TERMINATED:', rc)
if not self.canfail and rc != 0:
raise Exception('Command Failed: ' + str(self.cmd_parts))
# runs the list of commands as strings sequentially
async def run_cmdlist(label, cmds, verbose=True):
i = 0
for cmd in cmds:
cmdC = SimpleComponent(label + '.' + str(i), shlex.split(cmd),
verbose=verbose)
await cmdC.start()
await cmdC.wait()
async def await_file(path, delay=0.05):
print('await_file(%s)' % path)
while not os.path.exists(path):
await asyncio.sleep(delay)
import os
import asyncio
import modes.exectools as exectools
import shlex
import time
import json
class Experiment(object):
name = None
timeout = None
checkpoint = False
def __init__(self, name):
self.name = name
self.hosts = []
self.nics = []
self.networks = []
def add_host(self, sim):
self.hosts.append(sim)
def add_nic(self, sim):
self.nics.append(sim)
def add_network(self, sim):
self.networks.append(sim)
async def prepare(self, env):
# generate config tars
for host in self.hosts:
path = env.cfgtar_path(host)
print('preparing config tar:', path)
host.node_config.make_tar(path)
# prepare all simulators in parallel
sims = []
for sim in self.hosts + self.nics + self.networks:
prep_cmds = [pc for pc in sim.prep_cmds(env)]
sims.append(exectools.run_cmdlist('prepare_' + self.name, prep_cmds))
await asyncio.wait(sims)
async def run(self, env):
running = []
sockets = []
out = ExpOutput(self)
try:
out.set_start()
print('%s: starting NICS' % self.name)
for nic in self.nics:
print('start NIC:', nic.run_cmd(env))
sc = exectools.SimpleComponent(nic.full_name(),
shlex.split(nic.run_cmd(env)))
await sc.start()
running.append((nic, sc))
sockets.append(env.nic_pci_path(nic))
sockets.append(env.nic_eth_path(nic))
sockets.append(env.nic_shm_path(nic))
print('%s: waiting for sockets' % self.name)
for s in sockets:
await exectools.await_file(s)
# start networks
for net in self.networks:
print('start Net:', net.run_cmd(env))
sc = exectools.SimpleComponent(net.full_name(),
shlex.split(net.run_cmd(env)))
await sc.start()
running.append((net, sc))
# start hosts
wait_hosts = []
for host in self.hosts:
print('start Host:', host.run_cmd(env))
sc = exectools.SimpleComponent(host.full_name(),
shlex.split(host.run_cmd(env)))
await sc.start()
running.append((host,sc))
if host.wait:
wait_hosts.append(sc)
print('%s: waiting for hosts to terminate' % self.name)
for sc in wait_hosts:
await sc.wait()
# wait for necessary hosts to terminate
except:
out.set_failed()
finally:
out.set_end()
# shut things back down
print('%s: cleaning up' % self.name)
scs = []
for _,sc in running:
scs.append(sc.int_term_kill())
await asyncio.wait(scs)
for _,sc in running:
await sc.wait()
for sock in sockets:
os.remove(sock)
for sim,sc in running:
out.add_sim(sim, sc)
return out
def resreq_mem(self):
mem = 0
for h in self.hosts:
mem += h.resreq_mem()
for n in self.nics:
mem += n.resreq_mem()
for n in self.networks:
mem += n.resreq_mem()
return mem
def resreq_cores(self):
cores = 0
for h in self.hosts:
cores += h.resreq_cores()
for n in self.nics:
cores += n.resreq_cores()
for n in self.networks:
cores += n.resreq_cores()
return cores
class ExpEnv(object):
def __init__(self, repo_path, workdir):
self.repodir = os.path.abspath(repo_path)
self.workdir = os.path.abspath(workdir)
self.qemu_img_path = self.repodir + '/qemu/qemu-img'
self.qemu_path = self.repodir + '/qemu/x86_64-softmmu/qemu-system-x86_64'
self.qemu_kernel_path = self.repodir + '/images/bzImage'
def hdcopy_path(self, sim):
return '%s/hdcopy.%s.%d' % (self.workdir, sim.name, id(sim))
def hd_path(self, hd_name):
return '%s/images/output-%s/%s' % (self.repodir, hd_name, hd_name)
def cfgtar_path(self, sim):
return '%s/cfg.%s.%d.tar' % (self.workdir, sim.name, id(sim))
def nic_pci_path(self, sim):
return '%s/nic.pci.%s.%d' % (self.workdir, sim.name, id(sim))
def nic_eth_path(self, sim):
return '%s/nic.eth.%s.%d' % (self.workdir, sim.name, id(sim))
def nic_shm_path(self, sim):
return '%s/nic.shm.%s.%d' % (self.workdir, sim.name, id(sim))
class ExpOutput(object):
def __init__(self, exp):
self.exp_name = exp.name
self.start_time = None
self.end_time = None
self.sims = {}
self.success = True
def set_start(self):
self.start_time = time.time()
def set_end(self):
self.end = time.time()
def set_failed(self):
self.success = False
def add_sim(self, sim, comp):
obj = {
'class': sim.__class__.__name__,
'cmd': comp.cmd_parts,
'stdout': comp.stdout,
'stderr': comp.stderr,
}
self.sims[sim.full_name()] = obj
def dumps(self):
return json.dumps(self.__dict__)
def run_exp_local(exp, env):
if os.path.exists(env.workdir):
raise Exception('Workdir already exists')
os.mkdir(env.workdir)
asyncio.run(exp.prepare(env))
return asyncio.run(exp.run(env))
import tarfile
import io
class NodeConfig(object):
sim = 'qemu'
ip = '10.0.0.1'
prefix = 24
cores = 1
memory = 512
app = None
def config_str(self):
if self.sim == 'qemu':
cp_es = []
exit_es = ['poweroff -f']
else:
cp_es = ['m5 checkpoint']
exit_es = ['m5 exit']
es = self.prepare_pre_cp() + cp_es + self.prepare_post_cp() + \
self.run_cmds() + self.cleanup_cmds() + exit_es
return '\n'.join(es)
def make_tar(self, path):
tar = tarfile.open(path, 'w:')
# add main run script
cfg_i = tarfile.TarInfo('guest/run.sh')
cfg_i.mode = 0o777
cfg_f = self.strfile(self.config_str())
cfg_f.seek(0, io.SEEK_END)
cfg_i.size = cfg_f.tell()
cfg_f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=cfg_i, fileobj=cfg_f)
cfg_f.close()
# add additional config files
for (n,f) in self.config_files().items():
f_i = tarfile.TarInfo('guest/' + n)
f_i.mode = 0o777
f.seek(0, io.SEEK_END)
f_i.size = f.tell()
f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=f_i, fileobj=f)
f.close()
tar.close()
def prepare_pre_cp(self):
return [
'export HOME=/root',
'export LANG=en_US',
'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:' + \
'/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"'
]
def prepare_post_cp(self):
return []
def run_cmds(self):
return self.app.run_cmds(self)
def cleanup_cmds(self):
return []
def config_files(self):
return {}
def strfile(self, s):
return io.BytesIO(bytes(s, encoding='UTF-8'))
class AppConfig(object):
def run_cmds(self, node):
return []
class LinuxNode(NodeConfig):
ifname = 'eth0'
def __init__(self):
self.drivers = []
def prepare_post_cp(self):
l = []
for d in self.drivers:
if d[0] == '/':
l.append('insmod ' + d)
else:
l.append('modprobe ' + d)
l.append('ip link set dev ' + self.ifname + ' up')
l.append('ip addr add %s/%d dev %s' %
(self.ip, self.prefix, self.ifname))
return super().prepare_post_cp() + l
class I40eLinuxNode(LinuxNode):
def __init__(self):
super().__init__()
self.drivers.append('i40e')
class CorundumLinuxNode(LinuxNode):
def __init__(self):
super().__init__()
self.drivers.append('/tmp/guest/mqnic.ko')
def config_files(self):
m = {'mqnic.ko': open('../images/mqnic/mqnic.ko', 'r')}
return {**m, **super().config_files()}
class MtcpNode(NodeConfig):
pci_dev = '0000:00:02.0'
num_hugepages = 4096
def prepare_pre_cp(self):
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'mkdir -p /dev/hugepages',
'mount -t hugetlbfs nodev /dev/hugepages',
'mkdir -p /dev/shm',
'mount -t tmpfs tmpfs /dev/shm'
'echo ' + str(self.num_hugepages) + ' > /sys/devices/system/' + \
'node/node0/hugepages/hugepages-2048kB/nr_hugepages',
]
def prepare_post_cp(self):
return super().prepare_post_cp() + [
'insmod /root/mtcp/dpdk/x86_64-native-linuxapp-gcc/kmod/igb_uio.ko',
'/root/mtcp/dpdk/usertools/dpdk-devbind.py -b igb_uio ' +
self.pci_dev,
'insmod /root/mtcp/dpdk-iface-kmod/dpdk_iface.ko',
'/root/mtcp/dpdk-iface-kmod/dpdk_iface_main',
'ip link set dev dpdk0 up',
'ip addr add %s/%d dev dpdk0' % (self.ip, self.prefix)
]
def config_files(self):
m = {'mtcp.conf': self.strfile("io = dpdk\n"
"num_cores = " + str(self.cores) + "\n"
"num_mem_ch = 4\n"
"port = dpdk0\n"
"max_concurrency = 4096\n"
"max_num_buffers = 4096\n"
"rcvbuf = 8192\n"
"sndbuf = 8192\n"
"tcp_timeout = 10\n"
"tcp_timewait = 0\n"
"stat_print = dpdk0\n")}
return {**m, **super().config_files()}
class TASNode(NodeConfig):
pci_dev = '0000:00:02.0'
num_hugepages = 4096
def prepare_pre_cp(self):
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'mkdir -p /dev/hugepages',
'mount -t hugetlbfs nodev /dev/hugepages',
'mkdir -p /dev/shm',
'mount -t tmpfs tmpfs /dev/shm'
'echo ' + str(self.num_hugepages) + ' > /sys/devices/system/' + \
'node/node0/hugepages/hugepages-2048kB/nr_hugepages',
]
def prepare_post_cp(self):
return super().prepare_post_cp() + [
'insmod /root/dpdk/lib/modules/5.4.46/extra/dpdk/igb_uio.ko',
'/root/mtcp/dpdk/usertools/dpdk-devbind.py -b igb_uio ' +
self.pci_dev,
'insmod /root/mtcp/dpdk-iface-kmod/dpdk_iface.ko',
'/root/mtcp/dpdk-iface-kmod/dpdk_iface_main',
'ip link set dev dpdk0 up',
'ip addr add %s/%d dev dpdk0' % (self.ip, self.prefix)
]
class IperfTCPServer(AppConfig):
def run_cmds(self, node):
return ['iperf -s -l 32M -w 32M']
class IperfUDPServer(AppConfig):
def run_cmds(self, node):
return ['iperf -s -u']
class IperfTCPClient(AppConfig):
server_ip = '10.0.0.1'
procs = 1
def run_cmds(self, node):
return ['iperf -l 32M -w 32M -c ' + self.server_ip + ' -i 1 -P ' +
str(self.procs)]
class IperfUDPClient(AppConfig):
server_ip = '10.0.0.1'
rate = '150m'
def run_cmds(self, node):
return ['iperf -c ' + self.server_ip + ' -u -b ' + self.rate]
class Simulator(object):
# number of cores required for this simulator
def resreq_cores(self):
return 1
# memory required for this simulator (in MB)
def resreq_mem(self):
return 64
def prep_cmds(self, env):
return []
def run_cmd(self, env):
pass
class HostSim(Simulator):
node_config = None
disk_image = 'base'
name = ''
wait = False
def __init__(self):
self.nics = []
def full_name(self):
return 'host.%s.%d' % (self.name, id(self))
def add_nic(self, nic):
self.nics.append(nic)
class NICSim(Simulator):
network = None
name = ''
def set_network(self, net):
self.network = net
net.nics.append(self)
def basic_run_cmd(self, env, name):
return '%s/%s %s %s %s' % \
(env.repodir, name, env.nic_pci_path(self), env.nic_eth_path(self),
env.nic_shm_path(self))
def full_name(self):
return 'nic.%s.%d' % (self.name, id(self))
class NetSim(Simulator):
name = ''
def __init__(self):
self.nics = []
def full_name(self):
return 'net.%s.%d' % (self.name, id(self))
class QemuHost(HostSim):
mem = 16 * 1024 # 16G
def resreq_cores(self):
return self.node_config.cores + 1
def resreq_mem(self):
return 4096
def prep_cmds(self, env):
to_path = env.hdcopy_path(self)
return [f'{env.qemu_img_path} create -f qcow2 -o '
f'backing_file="{env.hd_path(self.disk_image)}" '
f'{env.hdcopy_path(self)}']
def run_cmd(self, env):
cmd = (f'{env.qemu_path} -machine q35 -cpu host -serial mon:stdio '
'-display none -enable-kvm -nic none '
f'-kernel {env.qemu_kernel_path} '
f'-drive file={env.hdcopy_path(self)},if=ide,index=0,media=disk '
f'-drive file={env.cfgtar_path(self)},if=ide,index=1,media=disk,'
'driver=raw '
'-append "earlyprintk=ttyS0 console=ttyS0 root=/dev/sda1 '
'init=/home/ubuntu/guestinit.sh rw" '
f'-m {self.mem} -smp {self.node_config.cores} ')
if len(self.nics) > 0:
assert len(self.nics) == 1
cmd += f'-chardev socket,path={env.nic_pci_path(self.nics[0])},'
cmd += 'id=cosimcd '
cmd += '-device cosim-pci,chardev=cosimcd '
return cmd
class Gem5Host(HostSim):
mem = 16 * 1024 # 16G
cpu_type_cp = 'X86KvmCPU'
cpu_type = 'TimingSimpleCPU'
def resreq_cores(self):
return 1
def resreq_mem(self):
return 4096
def run_cmd(self, env):
cpu_type = self.cpu_type
if env.create_cp:
cpu_type = self.cpu_type_cp
self.cp_cpu_type
cmd = (f'{env.gem5_path} --outdir={env.gem5_outdir(self)} '
f'{env.gem5_py_path} --caches --l2cache --l3cache '
'--l1d_size=32kB --l1i_size=32kB --l2_size=2MB --l3_size=32MB '
'--cacheline_size=64 --cpu-clock=3GHz '
f'--checkpoint-dir={env.gem5_cpdir(self)} '
f'--kernel={env.gem5_kernel_path} '
f'--disk-image={env.hd_raw_path(self.disk_image)} '
f'--disk-image={env.cfgtar_path(self)} '
f'--cpu-type={cpu_type} --mem-size={self.mem}MB '
'--ddio-enabled --ddio-way-part=8 --mem-type=DDR4_2400_16x4 ')
if env.restore_cp:
cmd += '-r 0 '
if len(self.nics) > 0:
assert len(self.nics) == 1
nic = self.nics[0]
cmd += f'--cosim-pci={env.nic_pci_path(nic)} '
cmd += f'--cosim-shm={env.nic_shm_path(nic)} '
if cpu_type == 'TimingSimpleCPU':
cmd += '--cosim-sync '
if isinstance(nic, I40eNIC):
cmd += '--cosim-type=i40e '
return cmd
class CorundumVerilatorNIC(NICSim):
def resreq_mem(self):
# this is a guess
return 512
def run_cmd(self, env):
return self.basic_run_cmd(env, 'corundum/corundum_verilator')
class CorundumBMNIC(NICSim):
def run_cmd(self, env):
return self.basic_run_cmd(env, 'corundum_bm/corundum_bm')
class I40eNIC(NICSim):
def run_cmd(self, env):
return self.basic_run_cmd(env, 'i40e_bm/i40e_bm')
class WireNet(NetSim):
def run_cmd(self, env):
assert len(self.nics) == 2
return '%s/net_wire/net_wire %s %s' % \
(env.repodir, env.nic_eth_path(self.nics[0]),
env.nic_eth_path(self.nics[1]))
class SwitchNet(NetSim):
def run_cmd(self, env):
cmd = env.repodir + '/net_switch/net_switch'
for n in self.nics:
cmd += ' -s ' + env.nic_eth_path(n)
return cmd
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment