Unverified Commit 1910a3af authored by Jakob Görgen's avatar Jakob Görgen
Browse files

added prepare methods to simulation and system module

parent b3606a54
......@@ -31,8 +31,9 @@ from simbricks.orchestration.system import base as sys_base
from simbricks.orchestration.system import pcie as sys_pcie
from simbricks.orchestration.system import mem as sys_mem
from simbricks.orchestration.system import eth as sys_eth
from simbricks.orchestration.system.host import base as sys_host
from simbricks.orchestration.system.host import disk_images
from simbricks.orchestration.simulation import base as sim_base
from simbricks.orchestration.simulation.host import disk_images
from simbricks.orchestration.runtime_new import command_executor
......@@ -60,6 +61,8 @@ class InstantiationEnvironment(util_base.IdObj):
shm_base: str = pathlib.Path().resolve(),
output_base: str = pathlib.Path().resolve(),
tmp_simulation_files: str = pathlib.Path().resolve(),
qemu_img_path: str | None = None,
qemu_path: str | None = None,
):
super().__init__()
# TODO: add more parameters that wont change during instantiation
......@@ -73,8 +76,23 @@ class InstantiationEnvironment(util_base.IdObj):
self._tmp_simulation_files: str = (
pathlib.Path(self._workdir).joinpath(tmp_simulation_files).absolute()
)
self._create_cp = create_cp
self._restore_cp = restore_cp
self._create_cp: bool = create_cp
self._restore_cp: bool = restore_cp
self._qemu_img_path: str = (
qemu_img_path
if qemu_img_path
else pathlib.Path(
f"{self._repodir}/sims/external/qemu/build/qemu-img"
).resolve()
)
self._qemu_path: str = (
qemu_path
if qemu_path
else pathlib.Path(
f"{self._repodir}/sims/external/qemu/build/x86_64-softmmu/qemu-system-x86_64"
).resolve()
)
class Instantiation(util_base.IdObj):
......@@ -85,8 +103,9 @@ class Instantiation(util_base.IdObj):
env: InstantiationEnvironment = InstantiationEnvironment(),
):
super().__init__()
self.simulation: sim_base.Simulation = sim
self._simulation: sim_base.Simulation = sim
self._env: InstantiationEnvironment = env
self._executor: command_executor.Executor | None = None
self._socket_per_interface: dict[sys_base.Interface, Socket] = {}
@staticmethod
......@@ -94,6 +113,25 @@ class Instantiation(util_base.IdObj):
path = pathlib.Path(path)
return path.is_absolute() and path.is_file()
@property
def executor(self):
if self._executor is None:
raise Exception("you must set an executor")
return self._executor
@executor.setter
def executor(self, executor: command_executor.Executor):
self._executor = executor
def restore_cp(self) -> bool:
return self._env._restore_cp
def qemu_img_path(self) -> str:
return self._env._qemu_img_path
def qemu_path(self) -> str:
return self._env._qemu_path
def _get_chan_by_interface(self, interface: sys_base.Interface) -> sys_base.Channel:
if not interface.is_connected():
raise Exception(
......@@ -201,26 +239,20 @@ class Instantiation(util_base.IdObj):
async def cleanup_sockets(
self,
sockets: list[tuple[command_executor.Executor, Socket]] = [],
sockets: list[Socket] = [],
) -> None:
# DISCLAIMER: that we pass the executor in here is an artifact of the
# sub-optimal distributed executions as we may need a remote executor to
# remove or create folders on other machines. In an ideal wolrd, we have
# some sort of runtime on each machine that executes thus making pasing
# an executor in here obsolete...
scs = []
for executor, sock in sockets:
scs.append(asyncio.create_task(executor.rmtree(path=sock._path)))
for sock in sockets:
scs.append(asyncio.create_task(self.executor.rmtree(path=sock._path)))
if len(scs) > 0:
await asyncio.gather(*scs)
async def wait_for_sockets(
self,
executor: command_executor.Executor = command_executor.LocalExecutor(),
sockets: list[Socket] = [],
) -> None:
wait_socks = list(map(lambda sock: sock._path, sockets))
await executor.await_files(wait_socks, verbose=True)
await self.executor.await_files(wait_socks, verbose=True)
# TODO: add more methods constructing paths as required by methods in simulators or image handling classes
......@@ -239,40 +271,33 @@ class Instantiation(util_base.IdObj):
def wrkdir(self) -> str:
return pathlib.Path(self._env._workdir).absolute()
async def prepare_directories(
self, executor: command_executor.Executor = command_executor.LocalExecutor()
) -> None:
# DISCLAIMER: that we poass the executor in here is an artifact of the
# sub-optimal distributed executions as we may need a remote executor to
# remove or create folders on other machines. In an ideal wolrd, we have
# some sort of runtime on each machine that executes thus making pasing
# an executor in here obsolete...
async def prepare(self) -> None:
wrkdir = self.wrkdir()
shutil.rmtree(wrkdir, ignore_errors=True)
await executor.rmtree(wrkdir)
await self.executor.rmtree(wrkdir)
shm_base = self.shm_base_dir()
shutil.rmtree(shm_base, ignore_errors=True)
await executor.rmtree(shm_base)
await self.executor.rmtree(shm_base)
cpdir = self.cpdir()
if self.create_cp():
shutil.rmtree(cpdir, ignore_errors=True)
await executor.rmtree(cpdir)
await self.executor.rmtree(cpdir)
pathlib.Path(wrkdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(wrkdir)
await self.executor.mkdir(wrkdir)
pathlib.Path(cpdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(cpdir)
await self.executor.mkdir(cpdir)
pathlib.Path(shm_base).mkdir(parents=True, exist_ok=True)
await executor.mkdir(shm_base)
await self.executor.mkdir(shm_base)
await self._simulation.prepare(inst=self)
def _join_paths(
self, base: str = "", relative_path: str = "", enforce_existence=True
self, base: str = "", relative_path: str = "", enforce_existence=False
) -> str:
path = pathlib.Path(base)
joined = path.joinpath(relative_path)
......@@ -309,18 +334,33 @@ class Instantiation(util_base.IdObj):
return self._join_paths(
base=self._env._tmp_simulation_files,
relative_path=relative_path,
enforce_existence=False,
)
def dynamic_img_path(self, img: disk_images.DiskImage, format: str) -> str:
filename = id(img) + '.' + format
filename = img._id + "." + format
return self._join_paths(
base=self._env._tmp_simulation_files,
relative_path=filename,
)
def hdcopy_path(self, img: disk_images.DiskImage, format: str) -> str:
filename = img._id + "_hdcopy" "." + format
return self._join_paths(
base=self._env._tmp_simulation_files, relative_path=filename
base=self._env._tmp_simulation_files,
relative_path=filename,
)
def cpdir_subdir(self, sim: sim_base.Simulator) -> str:
dir_path = f"/checkpoint.{sim.name}-{sim._id}"
return self._join_paths(
base=self.cpdir(), relative_path=dir_path, enforce_existence=False
)
def get_simulation_output_path(self, run_number: int) -> str:
return self._join_paths(
base=self._env._output_base,
relative_path=f"out-{run_number}.json",
enforce_existence=False,
)
def find_sim_by_spec(self, spec: sys_host.FullSystemHost) -> sim_base.Simulator:
return self._simulation.find_sim(spec)
......@@ -25,6 +25,7 @@ from __future__ import annotations
import abc
import itertools
import time
import asyncio
import typing as tp
import simbricks.orchestration.system as sys_conf
import simbricks.orchestration.system.host as sys_host_conf
......@@ -54,9 +55,10 @@ class Simulator(utils_base.IdObj):
super().__init__()
self.name: str = name
self._relative_executable_path: str = relative_executable_path
self.extra_deps: list[Simulator] = []
# self.extra_deps: list[Simulator] = []
self._simulation: sim_base.Simulation = simulation
self._components: set[sys_conf.Component] = set()
self._wait: bool = False
simulation.add_sim(self)
@staticmethod
......@@ -70,7 +72,7 @@ class Simulator(utils_base.IdObj):
@staticmethod
def split_sockets_by_type(
sockets: list[inst_base.Socket],
) -> tuple[sockets : list[inst_base.Socket], sockets : list[inst_base.Socket]]:
) -> tuple[list[inst_base.Socket], list[inst_base.Socket]]:
listen = Simulator.filter_sockets(
sockets=sockets, filter_type=inst_base.SockType.LISTEN
)
......@@ -95,7 +97,10 @@ class Simulator(utils_base.IdObj):
eth_latency = max(eth_latency, channel.sys_channel.eth_latency)
if eth_latency is None or sync_period is None:
raise Exception("could not determine eth_latency and sync_period")
return eth_latency, sync_period, sync
return eth_latency, sync_period, run_sync
def filter_components_by_type(self, ty) -> list[sys_conf.Component]:
return list(filter(lambda comp: isinstance(comp, ty), self._components))
def resreq_cores(self) -> int:
"""
......@@ -117,13 +122,11 @@ class Simulator(utils_base.IdObj):
"""Full name of the simulator."""
return ""
def prep_tar(self, inst) -> None:
pass
# TODO: move into prepare method
# pylint: disable=unused-argument
def prep_cmds(self, inst: inst_base.Instantiation) -> list[str]:
"""Commands to prepare execution of this simulator."""
return []
# def prep_cmds(self, inst: inst_base.Instantiation) -> list[str]:
# """Commands to prepare execution of this simulator."""
# return []
def add(self, comp: sys_conf.Component) -> None:
if comp in self._components:
......@@ -209,11 +212,6 @@ class Simulator(utils_base.IdObj):
"""Command to execute this simulator."""
return ""
# TODO: FIXME
def dependencies(self) -> list[Simulator]:
"""Other simulators to execute before this one."""
return []
# TODO: overwrite in sub-classes to reflect that currently not all adapters support both listening and connecting
# In future version adapters should support both which would render this method obsolete
# TODO: FIXME, this is still a little bit broken, as it might be important to create
......@@ -239,11 +237,15 @@ class Simulator(utils_base.IdObj):
return 5
def wait_terminate(self) -> bool:
return False
return self._wait
def supports_checkpointing(self) -> bool:
return False
async def prepare(self, inst: inst_base.Instantiation) -> None:
promises = [comp.prepare(inst=inst) for comp in self._components]
await asyncio.gather(*promises)
class Simulation(utils_base.IdObj):
"""
......@@ -336,6 +338,12 @@ class Simulation(utils_base.IdObj):
raise Exception("Simulator Not Found")
return self._sys_sim_map[comp]
async def prepare(self, inst: inst_base.Instantiation) -> None:
promises = []
for sim in self._sim_list:
promises.append(sim.prepare(inst=inst))
await asyncio.gather(*promises)
# TODO: FIXME
def enable_checkpointing_if_supported() -> None:
raise Exception("not implemented")
......
......@@ -23,6 +23,7 @@
import io
import tarfile
import math
import asyncio
import typing as tp
import simbricks.orchestration.simulation.base as sim_base
import simbricks.orchestration.system.pcie as system_pcie
......@@ -31,18 +32,17 @@ from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.experiment.experiment_environment_new import ExpEnv
if tp.TYPE_CHECKING:
from simbricks.orchestration.system.host.base import (Host)
from simbricks.orchestration.system.host.base import Host
class HostSim(sim_base.Simulator):
def __init__(self, e: sim_base.Simulation):
super().__init__(e)
self.wait = True
self.name = f'{self._id}'
self.name = f"{self._id}"
def full_name(self) -> str:
return 'host.' + self.name
return "host." + self.name
def dependencies(self) -> tp.List[sim_base.Simulator]:
deps = []
......@@ -59,42 +59,14 @@ class HostSim(sim_base.Simulator):
deps.append(self._simulation.find_sim(peer_if.component))
return deps
def add(self, host: 'Host'):
def add(self, host: "Host"):
super().add(host)
def config_str(self) -> str:
return []
def make_tar(self, path: str) -> None:
# TODO: update it to make multiple tar files for each host component
# Make tar file for the first host component
# One tar file for all the hosts in the simulator.
with tarfile.open(path, 'w:') as tar:
# add main run script
cfg_i = tarfile.TarInfo('guest/run.sh')
cfg_i.mode = 0o777
cfg_f = self.strfile(self.config_str())
cfg_f.seek(0, io.SEEK_END)
cfg_i.size = cfg_f.tell()
cfg_f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=cfg_i, fileobj=cfg_f)
cfg_f.close()
# add additional config files
host = self.hosts[0]
for (n, f) in host.config_files().items():
f_i = tarfile.TarInfo('guest/' + n)
f_i.mode = 0o777
f.seek(0, io.SEEK_END)
f_i.size = f.tell()
f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=f_i, fileobj=f)
f.close()
def wait_terminate(self) -> bool:
return self.wait
def supported_image_formats() -> list[str]:
raise Exception("implement me")
class Gem5Sim(HostSim):
......@@ -102,11 +74,11 @@ class Gem5Sim(HostSim):
def __init__(self, e: sim_base.Simulation):
super().__init__(e)
self.name = super().full_name()
self.cpu_type_cp = 'X86KvmCPU'
self.cpu_type = 'TimingSimpleCPU'
self.cpu_type_cp = "X86KvmCPU"
self.cpu_type = "TimingSimpleCPU"
self.extra_main_args: list[str] = []
self.extra_config_args: list[str] = []
self.variant = 'fast'
self.variant = "fast"
self.modify_checkpoint_tick = True
self.wait = True
......@@ -116,35 +88,14 @@ class Gem5Sim(HostSim):
def resreq_mem(self) -> int:
return 4096
# TODO: remove it
def config_str(self) -> str:
cp_es = [] if self.nockp else ['m5 checkpoint']
exit_es = ['m5 exit']
host = self.hosts[0]
es = host.prepare_pre_cp() + host.app.prepare_pre_cp(self) + cp_es + \
host.prepare_post_cp() + host.app.prepare_post_cp(self) + \
host.run_cmds() + host.cleanup_cmds() + exit_es
return '\n'.join(es)
def prep_tar(self, inst: inst_base.Instantiation) -> None:
path = inst.cfgtar_path(self)
print(self.name, ' preparing config tar:', path)
for c in self._components:
for d in c.disks:
d.prepare_image_path(inst, path)
def prep_cmds(self, inst: inst_base.Instantiation) -> tp.List[str]:
cmds = [f'mkdir -p {inst._env._cpdir}']
if inst._env._restore_cp and self.modify_checkpoint_tick:
cmds.append(
f'python3 {inst.utilsdir}/modify_gem5_cp_tick.py --tick 0 '
f'--cpdir {inst.gem5_cpdir(self)}'
)
return cmds
async def prepare(self, inst: inst_base.Instantiation) -> None:
await super().prepare(inst=inst)
prep_cmds = [f'mkdir -p {inst.cpdir_subdir(sim=self)}']
task = asyncio.create_task(
inst.executor.run_cmdlist(label="prepare", cmds=prep_cmds, verbose=True)
)
await task
def run_cmd(self, inst: inst_base.Instantiation) -> str:
cpu_type = self.cpu_type
......@@ -152,26 +103,25 @@ class Gem5Sim(HostSim):
cpu_type = self.cpu_type_cp
# TODO
cmd = f'{env.gem5_path(self.variant)} --outdir={env.gem5_outdir(self)} '
cmd += ' '.join(self.extra_main_args)
cmd = f"{env.gem5_path(self.variant)} --outdir={env.gem5_outdir(self)} "
cmd += " ".join(self.extra_main_args)
cmd += (
f' {env.gem5_py_path} --caches --l2cache '
'--l1d_size=32kB --l1i_size=32kB --l2_size=32MB '
'--l1d_assoc=8 --l1i_assoc=8 --l2_assoc=16 '
f'--cacheline_size=64 --cpu-clock={self.hosts[0].cpu_freq}'
f' --sys-clock={self.hosts[0].sys_clock} '
f'--checkpoint-dir={env.gem5_cpdir(self)} '
f'--kernel={env.gem5_kernel_path} '
f'--disk-image={env.hd_raw_path(self.hosts[0].disk_image)} '
f'--disk-image={env.cfgtar_path(self)} '
f'--cpu-type={cpu_type} --mem-size={self.hosts[0].memory}MB '
f'--num-cpus={self.hosts[0].cores} '
'--mem-type=DDR4_2400_16x4 '
f" {env.gem5_py_path} --caches --l2cache "
"--l1d_size=32kB --l1i_size=32kB --l2_size=32MB "
"--l1d_assoc=8 --l1i_assoc=8 --l2_assoc=16 "
f"--cacheline_size=64 --cpu-clock={self.hosts[0].cpu_freq}"
f" --sys-clock={self.hosts[0].sys_clock} "
f"--checkpoint-dir={env.gem5_cpdir(self)} "
f"--kernel={env.gem5_kernel_path} "
f"--disk-image={env.hd_raw_path(self.hosts[0].disk_image)} "
f"--disk-image={env.cfgtar_path(self)} "
f"--cpu-type={cpu_type} --mem-size={self.hosts[0].memory}MB "
f"--num-cpus={self.hosts[0].cores} "
"--mem-type=DDR4_2400_16x4 "
)
for dev in self.hosts[0].ifs: # TODO
if (dev == dev.channel.a):
if dev == dev.channel.a:
peer_if = dev.channel.b
else:
peer_if = dev.channel.a
......@@ -179,26 +129,25 @@ class Gem5Sim(HostSim):
peer_sim = self.experiment.find_sim(peer_if)
chn_sim = self.experiment.find_sim(dev.channel)
cmd += (
f'--simbricks-pci=connect:{env.dev_pci_path(peer_sim)}'
f':latency={dev.channel.latency}ns'
f':sync_interval={chn_sim.sync_period}ns'
f"--simbricks-pci=connect:{env.dev_pci_path(peer_sim)}"
f":latency={dev.channel.latency}ns"
f":sync_interval={chn_sim.sync_period}ns"
)
# if cpu_type == 'TimingSimpleCPU' and: #TODO: FIXME
# cmd += ':sync'
cmd += ' '
cmd += " "
return cmd
def wait_terminate(self) -> bool:
return self.wait
class QemuSim(HostSim):
def __init__(self, e: sim_base.Simulation):
super().__init__(e)
def resreq_cores(self) -> int:
if self.sync:
return 1
......@@ -209,56 +158,65 @@ class QemuSim(HostSim):
def resreq_mem(self) -> int:
return 8192
def config_str(self) -> str:
cp_es = ['echo ready to checkpoint']
exit_es = ['poweroff -f']
es = self.hosts[0].prepare_pre_cp() + self.hosts[0].app.prepare_pre_cp(self) + cp_es + \
self.hosts[0].prepare_post_cp() + self.hosts[0].app.prepare_post_cp(self) + \
self.hosts[0].run_cmds() + self.hosts[0].cleanup_cmds() + exit_es
return '\n'.join(es)
async def prepare(self, inst: inst_base.Instantiation) -> None:
await super().prepare(inst=inst)
prep_cmds = []
full_sys_hosts = tp.cast(
list[system.FullSystemHost],
self.filter_components_by_type(ty=system.FullSystemHost),
)
prep_cmds = []
for fsh in full_sys_hosts:
disks = tp.cast(list[system.DiskImage], fsh.disks)
for disk in disks:
prep_cmds.append(
f"{inst.qemu_img_path()} create -f qcow2 -o "
f'backing_file="{disk.path(inst=inst, format="qcow2")}" '
f"{inst.hdcopy_path(img=disk, format="qcow2")}"
)
def prep_cmds(self, env: ExpEnv) -> tp.List[str]:
return [
f'{env.qemu_img_path} create -f qcow2 -o '
f'backing_file="{env.hd_path(self.hosts[0].disks[0])}" '
f'{env.hdcopy_path(self)}'
]
task = asyncio.create_task(
inst.executor.run_cmdlist(label="prepare", cmds=prep_cmds, verbose=True)
)
await task
def run_cmd(self, env: ExpEnv) -> str:
accel = ',accel=kvm:tcg' if not self.sync else ''
accel = ",accel=kvm:tcg" if not self.sync else ""
if self.hosts[0].disks[0].kcmd_append:
kcmd_append = ' ' + self.hosts[0].kcmd_append
kcmd_append = " " + self.hosts[0].kcmd_append
else:
kcmd_append = ''
kcmd_append = ""
cmd = (
f'{env.qemu_path} -machine q35{accel} -serial mon:stdio '
'-cpu Skylake-Server -display none -nic none '
f'-kernel {env.qemu_kernel_path} '
f'-drive file={env.hdcopy_path(self)},if=ide,index=0,media=disk '
f'-drive file={env.cfgtar_path(self)},if=ide,index=1,media=disk,'
'driver=raw '
f"{env.qemu_path} -machine q35{accel} -serial mon:stdio "
"-cpu Skylake-Server -display none -nic none "
f"-kernel {env.qemu_kernel_path} "
f"-drive file={env.hdcopy_path(self)},if=ide,index=0,media=disk "
f"-drive file={env.cfgtar_path(self)},if=ide,index=1,media=disk,"
"driver=raw "
'-append "earlyprintk=ttyS0 console=ttyS0 root=/dev/sda1 '
f'init=/home/ubuntu/guestinit.sh rw{kcmd_append}" '
f'-m {self.hosts[0].memory} -smp {self.hosts[0].cores} '
f"-m {self.hosts[0].memory} -smp {self.hosts[0].cores} "
)
if self.sync:
unit = self.hosts[0].cpu_freq[-3:]
if unit.lower() == 'ghz':
if unit.lower() == "ghz":
base = 0
elif unit.lower() == 'mhz':
elif unit.lower() == "mhz":
base = 3
else:
raise ValueError('cpu frequency specified in unsupported unit')
raise ValueError("cpu frequency specified in unsupported unit")
num = float(self.hosts[0].cpu_freq[:-3])
shift = base - int(math.ceil(math.log(num, 2)))
cmd += f' -icount shift={shift},sleep=off '
cmd += f" -icount shift={shift},sleep=off "
for dev in self.hosts[0].ifs:
if (dev == dev.channel.a):
if dev == dev.channel.a:
peer_if = dev.channel.b
else:
peer_if = dev.channel.a
......@@ -266,17 +224,17 @@ class QemuSim(HostSim):
peer_sim = self.experiment.find_sim(peer_if)
chn_sim = self.experiment.find_sim(dev.channel)
cmd += f'-device simbricks-pci,socket={env.dev_pci_path(peer_sim)}'
cmd += f"-device simbricks-pci,socket={env.dev_pci_path(peer_sim)}"
if self.sync:
cmd += ',sync=on'
cmd += f',pci-latency={dev.channel.latency}'
cmd += f',sync-period={chn_sim.sync_period}'
cmd += ",sync=on"
cmd += f",pci-latency={dev.channel.latency}"
cmd += f",sync-period={chn_sim.sync_period}"
# if self.sync_drift is not None:
# cmd += f',sync-drift={self.sync_drift}'
# if self.sync_offset is not None:
# cmd += f',sync-offset={self.sync_offset}'
else:
cmd += ',sync=off'
cmd += ' '
cmd += ",sync=off"
cmd += " "
return cmd
......@@ -113,7 +113,7 @@ class SwitchNet(NetSim):
def run_cmd(self, inst: inst_base.Instantiation) -> str:
channels = self._get_channels(inst=inst)
eth_latency, sync_period, sync = (
eth_latency, sync_period, run_sync = (
sim_base.Simulator.get_unique_latency_period_sync(channels=channels)
)
......
......@@ -24,6 +24,7 @@ from __future__ import annotations
import abc
import typing as tp
from simbricks.orchestration.utils import base as util_base
from simbricks.orchestration.instantiation import base as inst_base
class System:
......@@ -57,6 +58,9 @@ class Component(util_base.IdObj):
def channels(self) -> list[Channel]:
return [i.channel for i in self.interfaces() if i.is_connected()]
async def prepare(self, inst: inst_base.Instantiation) -> None:
pass
class Interface(util_base.IdObj):
def __init__(self, c: Component) -> None:
......
......@@ -21,7 +21,6 @@
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from simbricks.orchestration.system import base
from simbricks.orchestration.system import pcie
from simbricks.orchestration.utils import base as utils_base
......
......@@ -35,7 +35,7 @@ class Application(abc.ABC):
# Note AK: Maybe we can factor most of the duplicate calls with the host out
# into a separate module.
class BaseLinuxApplication(abc.ABC):
class BaseLinuxApplication(abc.ABC): # TODO: FIXME!!!
def __init__(self, h: 'LinuxHost') -> None:
self.host = h
self.start_delay: float | None = None
......
......@@ -22,13 +22,15 @@
import typing as tp
import io
import asyncio
from os import path
import simbricks.orchestration.instantiation.base as instantiation
from simbricks.orchestration.system import base as base
from simbricks.orchestration.system import eth as eth
from simbricks.orchestration.system.host import app
if tp.TYPE_CHECKING:
from simbricks.orchestration.system import (eth, mem, pcie)
from simbricks.orchestration.system import eth, mem, pcie
from simbricks.orchestration.system.host import disk_images
......@@ -53,19 +55,23 @@ class FullSystemHost(Host):
super().__init__(s)
self.memory = 512
self.cores = 1
self.cpu_freq = '3GHz'
self.disks: list['DiskImage'] = []
self.cpu_freq = "3GHz"
self.disks: list["DiskImage"] = []
def add_disk(self, disk: 'DiskImage') -> None:
def add_disk(self, disk: "DiskImage") -> None:
self.disks.append(disk)
async def prepare(self, inst: instantiation.Instantiation) -> None:
promises = [disk.prepare(inst) for disk in self.disks]
await asyncio.gather(*promises)
class BaseLinuxHost(FullSystemHost):
def __init__(self, s: base.System) -> None:
super().__init__(s)
self.applications: list[app.BaseLinuxApplication] = []
self.load_modules = []
self.kcmd_append = ''
self.kcmd_append = ""
def add_app(self, a: app.BaseLinuxApplication) -> None:
self.applications.append(a)
......@@ -73,8 +79,9 @@ class BaseLinuxHost(FullSystemHost):
def _concat_app_cmds(
self,
inst: instantiation.Instantiation,
mapper: tp.Callable[['BaseLinuxApplication', instantiation.Instantiation],
list[str]]
mapper: tp.Callable[
["BaseLinuxApplication", instantiation.Instantiation], list[str]
],
) -> list[str]:
"""
Generate command list from applications by applying `mapper` to each
......@@ -111,7 +118,6 @@ class BaseLinuxHost(FullSystemHost):
"""Commands to run to prepare node before checkpointing."""
return self._concat_app_cmds(inst, app.BaseLinuxApplication.prepare_pre_cp)
def prepare_post_cp(self, inst: instantiation.Instantiation) -> list[str]:
"""Commands to run to prepare node after checkpoint restore."""
return self._concat_app_cmds(inst, app.BaseLinuxApplication.prepare_post_cp)
......@@ -122,11 +128,17 @@ class BaseLinuxHost(FullSystemHost):
else:
cp_cmd = []
es = self.prepare_pre_cp(inst) + self.applications[0].prepare_pre_cp(inst) + \
cp_cmd + \
self.prepare_post_cp(inst) + self.applications[0].prepare_post_cp(inst) + \
self.run_cmds(inst) + self.cleanup_cmds(inst)
return '\n'.join(es)
# TODO: FIXME
es = (
self.prepare_pre_cp(inst)
+ self.applications[0].prepare_pre_cp(inst)
+ cp_cmd
+ self.prepare_post_cp(inst)
+ self.applications[0].prepare_post_cp(inst)
+ self.run_cmds(inst)
+ self.cleanup_cmds(inst)
)
return "\n".join(es)
def strfile(self, s: str) -> io.BytesIO:
"""
......@@ -136,7 +148,7 @@ class BaseLinuxHost(FullSystemHost):
Using this, you can create a file with the string as its content on the
simulated node.
"""
return io.BytesIO(bytes(s, encoding='UTF-8'))
return io.BytesIO(bytes(s, encoding="UTF-8"))
class LinuxHost(BaseLinuxHost):
......@@ -145,64 +157,63 @@ class LinuxHost(BaseLinuxHost):
self.drivers: list[str] = []
def cleanup_cmds(self, inst: instantiation.Instantiation) -> list[str]:
return super().cleanup_cmds(inst) + ['poweroff -f']
return super().cleanup_cmds(inst) + ["poweroff -f"]
def prepare_pre_cp(self, inst: instantiation.Instantiation) -> list[str]:
"""Commands to run to prepare node before checkpointing."""
return [
'set -x',
'export HOME=/root',
'export LANG=en_US',
'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:' + \
'/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"'
"set -x",
"export HOME=/root",
"export LANG=en_US",
'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:'
+ '/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"',
] + super().prepare_pre_cp(inst)
def prepare_post_cp(self, inst) -> tp.List[str]:
l = []
for d in self.drivers:
if d[0] == '/':
l.append(f'insmod {d}')
if d[0] == "/":
l.append(f"insmod {d}")
else:
l.append(f'modprobe {d}')
l.append(f"modprobe {d}")
eth_i = 0
for i in self.interfaces():
# Get ifname parameter if set, otherwise default to ethX
if isinstance(i, eth.EthSimpleNIC):
ifn = f'eth{eth_i}'
ifn = f"eth{eth_i}"
eth_i += 1
else:
continue
# Force MAC if requested
if 'force_mac_addr' in i.parameters:
mac = i.parameters['force_mac_addr']
l.append(f'ip link set dev {ifn} address '
f'{mac}')
if "force_mac_addr" in i.parameters:
mac = i.parameters["force_mac_addr"]
l.append(f"ip link set dev {ifn} address " f"{mac}")
# Bring interface up
l.append(f'ip link set dev {ifn} up')
l.append(f"ip link set dev {ifn} up")
# Add IP addresses if included
if 'ipv4_addrs' in i.parameters:
for a in i.parameters['ipv4_addrs']:
l.append(f'ip addr add {a} dev {ifn}')
if "ipv4_addrs" in i.parameters:
for a in i.parameters["ipv4_addrs"]:
l.append(f"ip addr add {a} dev {ifn}")
return super().prepare_post_cp(inst) + l
class I40ELinuxHost(LinuxHost):
def __init__(self, sys) -> None:
super().__init__(sys)
self.drivers.append('i40e')
self.drivers.append("i40e")
def checkpoint_commands(self) -> list[str]:
return ['m5 checkpoint']
return ["m5 checkpoint"]
class CorundumLinuxHost(LinuxHost):
def __init__(self, sys) -> None:
super().__init__(sys)
self.drivers.append('/tmp/guest/mqnic.ko')
self.drivers.append("/tmp/guest/mqnic.ko")
def config_files(self, inst: instantiation.Instantiation) -> tp.Dict[str, tp.IO]:
m = {'mqnic.ko': open('../images/mqnic/mqnic.ko', 'rb')}
m = {"mqnic.ko": open("../images/mqnic/mqnic.ko", "rb")}
return {**m, **super().config_files()}
......@@ -25,13 +25,14 @@ import io
import os.path
import tarfile
import typing as tp
from simbricks.orchestration.utils import base as utils_base
from simbricks.orchestration.instantiation import base as inst_base
from simbricks.orchestration.experiment import experiment_environment as expenv
if tp.TYPE_CHECKING:
from simbricks.orchestration.system.host import base
class DiskImage(abc.ABC):
class DiskImage(utils_base.IdObj):
def __init__(self, h: 'Host') -> None:
self.host = None | str
......@@ -43,12 +44,12 @@ class DiskImage(abc.ABC):
def path(self, inst: inst_base.Instantiation, format: str) -> str:
return
async def prepare_format(self, inst: inst_base.Instantiation, format: str) -> str:
async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
pass
async def prepare(self, inst: inst_base.Instantiation) -> None:
# Find first supported disk image format in order of simulator pref.
sim = inst.simulation.find_sim(self.host)
sim = inst.find_sim_by_spec(self.host)
format = None
av_fmt = self.available_formats()
for f in sim.supported_image_formats():
......@@ -59,7 +60,7 @@ class DiskImage(abc.ABC):
if format is None:
raise Exception('No supported image format found')
await self.prepare_format(inst, format)
await self._prepare_format(inst, format)
# Disk image where user just provides a path
......@@ -100,14 +101,14 @@ class DistroDiskImage(DiskImage):
# Abstract base class for dynamically generated images
class DynamicDiskImage(DiskImage):
def __init__(self, h: 'FullSystemHost', path: str) -> None:
def __init__(self, h: 'FullSystemHost') -> None:
super().__init__(h)
def path(self, inst: inst_base.Instantiation, format: str) -> str:
return inst.dynamic_img_path(self, format)
@abc.abstractmethod
async def prepare_format(self, inst: inst_base.Instantiation, format: str) -> str:
async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
pass
# Builds the Tar with the commands to run etc.
......@@ -119,7 +120,7 @@ class LinuxConfigDiskImage(DynamicDiskImage):
def available_formats(self) -> list[str]:
return ["raw"]
async def prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
path = self.path(inst, format)
with tarfile.open(path, 'w:') as tar:
# add main run script
......@@ -155,6 +156,6 @@ class PackerDiskImage(DynamicDiskImage):
def available_formats(self) -> list[str]:
return ["raw", "qcow"]
async def prepare_image_path(self, inst: inst_base.Instantiation, format: str) -> str:
async def _prepare_format(self, inst: inst_base.Instantiation, format: str) -> None:
# TODO: invoke packer to build the image if necessary
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment