base.py 13.2 KB
Newer Older
Hejing Li's avatar
Hejing Li committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Copyright 2024 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

23
from __future__ import annotations
24

25
import abc
Hejing Li's avatar
Hejing Li committed
26
import itertools
27
import time
28
import asyncio
29
30
import typing as tp
import simbricks.orchestration.system as sys_conf
31
import simbricks.orchestration.system.host as sys_host_conf
Jakob Görgen's avatar
Jakob Görgen committed
32
33
34
import simbricks.orchestration.instantiation.base as inst_base
import simbricks.orchestration.simulation.channel as sim_chan
import simbricks.orchestration.utils.base as utils_base
35

36
if tp.TYPE_CHECKING:
37
38
39
40
41
42
43
    from simbricks.orchestration.simulation import (
        Channel,
        HostSim,
        PCIDevSim,
        NetSim,
        base as sim_base,
    )
Hejing Li's avatar
Hejing Li committed
44

45

46
class Simulator(utils_base.IdObj):
Hejing Li's avatar
Hejing Li committed
47
48
    """Base class for all simulators."""

49
    def __init__(
50
51
        self,
        simulation: sim_base.Simulation,
Jakob Görgen's avatar
Jakob Görgen committed
52
        executable: str,
53
        name: str = "",
54
    ) -> None:
55
        super().__init__()
56
        self.name: str = name
Jakob Görgen's avatar
Jakob Görgen committed
57
        self._executable = executable
Jakob Görgen's avatar
Jakob Görgen committed
58
        self._simulation: sim_base.Simulation = simulation
Hejing Li's avatar
Hejing Li committed
59
        self._components: set[sys_conf.Component] = set()
Jakob Görgen's avatar
Jakob Görgen committed
60
61
62
63
64
65
66
67
        self._wait: bool = False
        self._start_tick = 0
        """The timestamp at which to start the simulation. This is useful when
        the simulator is only attached at a later point in time and needs to
        synchronize with connected simulators. For example, this could be used
        when taking checkpoints to only attach certain simulators after the
        checkpoint has been taken."""
        self._extra_args: str | None = None
68
        simulation.add_sim(self)
Hejing Li's avatar
Hejing Li committed
69

Jakob Görgen's avatar
Jakob Görgen committed
70
71
72
73
74
75
76
77
    @property
    def extra_args(self) -> str:
        return self._extra_args

    @extra_args.setter
    def extra_args(self, extra_args: str):
        self._extra_args = extra_args

78
79
80
81
82
83
84
85
86
87
88
    @staticmethod
    def filter_sockets(
        sockets: list[inst_base.Socket],
        filter_type: inst_base.SockType = inst_base.SockType.LISTEN,
    ) -> list[inst_base.Socket]:
        res = filter(lambda sock: sock._type == filter_type, sockets)
        return res

    @staticmethod
    def split_sockets_by_type(
        sockets: list[inst_base.Socket],
89
    ) -> tuple[list[inst_base.Socket], list[inst_base.Socket]]:
90
91
92
93
94
95
96
97
        listen = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.LISTEN
        )
        connect = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.CONNECT
        )
        return listen, connect

Jakob Görgen's avatar
Jakob Görgen committed
98
99
100
101
102
103
104
    # helper method for simulators that do not support
    # multiple sync periods etc. Should become eventually
    # at some point in the future...
    @staticmethod
    def get_unique_latency_period_sync(
        channels: list[sim_chan.Channel],
    ) -> tuple[int, int, bool]:
105
        latency = None
Jakob Görgen's avatar
Jakob Görgen committed
106
107
108
109
110
        sync_period = None
        run_sync = False
        for channel in channels:
            sync_period = min(sync_period, channel.sync_period)
            run_sync = run_sync or channel._synchronized
111
112
            latency = max(latency, channel.sys_channel.latency)
        if latency is None or sync_period is None:
Jakob Görgen's avatar
Jakob Görgen committed
113
            raise Exception("could not determine eth_latency and sync_period")
114
        return latency, sync_period, run_sync
115

Jakob Görgen's avatar
Jakob Görgen committed
116
117
    T = tp.TypeVar("T")

Jakob Görgen's avatar
Jakob Görgen committed
118
    def filter_components_by_pred(
Jakob Görgen's avatar
Jakob Görgen committed
119
120
121
122
        self,
        pred: tp.Callable[[sys_conf.Component], bool],
        ty: type[T] = sys_conf.Component,
    ) -> list[T]:
Jakob Görgen's avatar
Jakob Görgen committed
123
124
        return list(filter(pred, self._components))

Jakob Görgen's avatar
Jakob Görgen committed
125
126
127
128
    def filter_components_by_type(self, ty: type[T]) -> list[T]:
        return self.filter_components_by_pred(
            pred=lambda comp: isinstance(comp, ty), ty=ty
        )
Jakob Görgen's avatar
Jakob Görgen committed
129

130
131
132
    def components(self) -> set[sys_conf.Component]:
        return self._components

Hejing Li's avatar
Hejing Li committed
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    def resreq_cores(self) -> int:
        """
        Number of cores this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 1

    def resreq_mem(self) -> int:
        """
        Number of memory in MB this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 64

    def full_name(self) -> str:
        """Full name of the simulator."""
151
        return ""
Hejing Li's avatar
Hejing Li committed
152

153
    def add(self, comp: sys_conf.Component) -> None:
154
155
        if comp in self._components:
            raise Exception("cannot add the same specification twice to a simulator")
156
        self._components.add(comp)
Jakob Görgen's avatar
Jakob Görgen committed
157
        self._simulation.add_spec_sim_map(comp, self)
158

159
    def _chan_needs_instance(self, chan: sys_conf.Channel) -> bool:
160
161
162
163
164
165
166
        if (
            chan.a.component in self._components
            and chan.b.component in self._components
        ):
            return False
        return True

167
    def _get_my_interface(self, chan: sys_conf.Channel) -> sys_conf.Interface:
168
169
170
171
172
173
174
175
176
177
178
        interface = None
        for inter in chan.interfaces():
            if inter.component in self._components:
                assert interface is None
                interface = inter
        if interface is None:
            raise Exception(
                "unable to find channel interface for simulators specification"
            )
        return interface

179
180
181
182
183
    def _get_sys_chan(self, interface: sys_conf.Interface) -> sys_conf.Channel:
        if not interface.is_connected():
            raise Exception("interface does not need a channel as it is not connected")
        return interface.channel

184
    def _get_socket(
185
        self, inst: inst_base.Instantiation, interface: sys_conf.Interface
186
    ) -> inst_base.Socket | None:
187
188
189
        # get the channel associated with this interface
        chan = self._get_sys_chan(interface=interface)
        # check if interfaces channel is simulator internal, i.e. doesnt need an instanciation
190
        if not self._chan_needs_instance(chan):
191
            return None
192
        # create the socket to listen on or connect to
193
194
195
        socket = inst.get_socket(
            interface=interface, supported_sock_types=self.supported_socket_types()
        )
196
        return socket
197

198
    def _get_sockets(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
199
200
        sockets = []
        for comp_spec in self._components:
201
            for interface in comp_spec.interfaces():
202
                socket = self._get_socket(inst=inst, interface=interface)
203
                if socket is None:
204
205
                    continue
                sockets.append(socket)
206
        return sockets
207

208
209
210
211
212
    def _get_all_sockets_by_type(
        self, inst: inst_base.Instantiation, sock_type: inst_base.SockType
    ) -> list[inst_base.Socket]:
        sockets = self._get_sockets(inst=inst)
        sockets = Simulator.filter_sockets(sockets=sockets, filter_type=sock_type)
213
214
215
216
        return sockets

    def _get_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel | None:
        if self._chan_needs_instance(chan):
217
            return self._simulation.retrieve_or_create_channel(chan=chan)
218
219
        return None

220
    def get_channels(self) -> list[sim_chan.Channel]:
221
222
        channels = []
        for comp_spec in self._components:
223
224
            comp_sys_channels = comp_spec.channels()
            for chan in comp_sys_channels:
225
226
227
228
229
                channel = self._get_channel(chan=chan)
                if channel is None:
                    continue
                channels.append(channel)
        return channels
230

Hejing Li's avatar
Hejing Li committed
231
    # pylint: disable=unused-argument
232
    @abc.abstractmethod
233
    def run_cmd(self, inst: inst_base.Instantiation) -> str:
Hejing Li's avatar
Hejing Li committed
234
        """Command to execute this simulator."""
235
        return ""
236

237
238
    def checkpoint_commands(self) -> list[str]:
        return []
Hejing Li's avatar
Hejing Li committed
239

240
    @abc.abstractmethod
241
    def supported_socket_types(self) -> set[inst_base.SockType]:
242
        return []
243

244
    # Sockets to be cleaned up: always the CONNECTING sockets
Hejing Li's avatar
Hejing Li committed
245
    # pylint: disable=unused-argument
246
    def sockets_cleanup(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
247
248
249
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
250
251
252

    # sockets to wait for indicating the simulator is ready
    # pylint: disable=unused-argument
253
254
255
256
    def sockets_wait(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
257
258
259
260

    def start_delay(self) -> int:
        return 5

261
    # TODO: FIXME
Hejing Li's avatar
Hejing Li committed
262
    def wait_terminate(self) -> bool:
263
        return self._wait
264

265
266
267
    def supports_checkpointing(self) -> bool:
        return False

268
269
270
271
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = [comp.prepare(inst=inst) for comp in self._components]
        await asyncio.gather(*promises)

272

273
class Simulation(utils_base.IdObj):
274
275
276
277
278
279
280
    """
    Base class for all simulation experiments.

    Contains the simulators to be run and experiment-wide parameters.
    """

    def __init__(self, name: str) -> None:
281
        super().__init__()
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
        self.name = name
        """
        This experiment's name.

        Can be used to run only a selection of experiments.
        """
        self.timeout: int | None = None
        """Timeout for experiment in seconds."""
        self.checkpoint = False
        """
        Whether to use checkpoint and restore for simulators.

        The most common use-case for this is accelerating host simulator startup
        by first running in a less accurate mode, then checkpointing the system
        state after boot and running simulations from there.
        """
        self.metadata: dict[str, tp.Any] = {}

Jakob Görgen's avatar
Jakob Görgen committed
300
        self._sys_sim_map: dict[sys_conf.Component, Simulator] = {}
301
302
        """System component and its simulator pairs"""

303
        self._chan_map: dict[sys_conf.Channel, sim_chan.Channel] = {}
304
305
        """Channel spec and its instanciation"""

306
307
308
        self._sim_list: list[Simulator] = []
        """Channel spec and its instanciation"""

309
    def add_sim(self, sim: Simulator):
310
311
312
313
        if sim in self._sim_list:
            raise Exception("Simulaotr is already added")
        self._sim_list.append(sim)

314
315
    def add_spec_sim_map(self, sys: sys_conf.Component, sim: Simulator):
        """Add a mapping from specification to simulation instance"""
Jakob Görgen's avatar
Jakob Görgen committed
316
        if sys in self._sys_sim_map:
317
            raise Exception("system component is already mapped by simulator")
Jakob Görgen's avatar
Jakob Görgen committed
318
        self._sys_sim_map[sys] = sim
319

320
    def is_channel_instantiated(self, chan: sys_conf.Channel) -> bool:
321
322
        return chan in self._chan_map

Hejing Li's avatar
Hejing Li committed
323
    def retrieve_or_create_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel:
324
325
326
        if self.is_channel_instantiated(chan):
            return self._chan_map[chan]

Hejing Li's avatar
Hejing Li committed
327
        channel = sim_chan.Channel(chan)
328
329
330
        self._chan_map[chan] = channel
        return channel

331
332
    def all_simulators(self) -> list[Simulator]:
        return self._sim_list
Jakob Görgen's avatar
Jakob Görgen committed
333

334
335
336
337
338
339
340
341
342
343
    def get_all_channels(self, lazy: bool = False) -> list[Channel]:
        if lazy:
            return list(self._chan_map.values())

        all_channels = []
        for sim in self.all_simulators():
            channels = sim.get_channels()
            all_channels.extend(channels)
        return all_channels

344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
    def resreq_mem(self) -> int:
        """Memory required to run all simulators in this experiment."""
        mem = 0
        for s in self.all_simulators():
            mem += s.resreq_mem()
        return mem

    def resreq_cores(self) -> int:
        """Number of Cores required to run all simulators in this experiment."""
        cores = 0
        for s in self.all_simulators():
            cores += s.resreq_cores()
        return cores

    def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator:
        """Returns the used simulator object for the system component."""
360
        utils_base.has_expected_type(comp, sys_conf.Component)
Jakob Görgen's avatar
Jakob Görgen committed
361
        if comp not in self._sys_sim_map:
362
            raise Exception(f"Simulator not found for component: {comp}")
Jakob Görgen's avatar
Jakob Görgen committed
363
        return self._sys_sim_map[comp]
364

365
366
367
368
369
370
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = []
        for sim in self._sim_list:
            promises.append(sim.prepare(inst=inst))
        await asyncio.gather(*promises)

Jakob Görgen's avatar
Jakob Görgen committed
371
    # TODO: FIXME
372
373
374
    def enable_checkpointing_if_supported() -> None:
        raise Exception("not implemented")

Jakob Görgen's avatar
Jakob Görgen committed
375
    # TODO: FIXME
376
377
    def is_checkpointing_enabled(self) -> bool:
        raise Exception("not implemented")