base.py 12.6 KB
Newer Older
Hejing Li's avatar
Hejing Li committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Copyright 2024 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

23
from __future__ import annotations
24

25
import abc
Hejing Li's avatar
Hejing Li committed
26
import itertools
27
import time
28
import asyncio
29
30
import typing as tp
import simbricks.orchestration.system as sys_conf
31
import simbricks.orchestration.system.host as sys_host_conf
Jakob Görgen's avatar
Jakob Görgen committed
32
33
34
import simbricks.orchestration.instantiation.base as inst_base
import simbricks.orchestration.simulation.channel as sim_chan
import simbricks.orchestration.utils.base as utils_base
35

36
if tp.TYPE_CHECKING:
37
38
39
40
41
42
43
    from simbricks.orchestration.simulation import (
        Channel,
        HostSim,
        PCIDevSim,
        NetSim,
        base as sim_base,
    )
Hejing Li's avatar
Hejing Li committed
44

45

46
class Simulator(utils_base.IdObj):
Hejing Li's avatar
Hejing Li committed
47
48
    """Base class for all simulators."""

49
    def __init__(
50
51
52
        self,
        simulation: sim_base.Simulation,
        name: str = "",
Jakob Görgen's avatar
Jakob Görgen committed
53
        relative_executable_path: str = "",
54
    ) -> None:
55
        super().__init__()
56
57
        self.name: str = name
        self._relative_executable_path: str = relative_executable_path
Jakob Görgen's avatar
Jakob Görgen committed
58
        self._simulation: sim_base.Simulation = simulation
Hejing Li's avatar
Hejing Li committed
59
        self._components: set[sys_conf.Component] = set()
60
        self._wait: bool = False  # TODO: FIXME
61
        simulation.add_sim(self)
Hejing Li's avatar
Hejing Li committed
62

63
64
65
66
67
68
69
70
71
72
73
    @staticmethod
    def filter_sockets(
        sockets: list[inst_base.Socket],
        filter_type: inst_base.SockType = inst_base.SockType.LISTEN,
    ) -> list[inst_base.Socket]:
        res = filter(lambda sock: sock._type == filter_type, sockets)
        return res

    @staticmethod
    def split_sockets_by_type(
        sockets: list[inst_base.Socket],
74
    ) -> tuple[list[inst_base.Socket], list[inst_base.Socket]]:
75
76
77
78
79
80
81
82
        listen = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.LISTEN
        )
        connect = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.CONNECT
        )
        return listen, connect

Jakob Görgen's avatar
Jakob Görgen committed
83
84
85
86
87
88
89
    # helper method for simulators that do not support
    # multiple sync periods etc. Should become eventually
    # at some point in the future...
    @staticmethod
    def get_unique_latency_period_sync(
        channels: list[sim_chan.Channel],
    ) -> tuple[int, int, bool]:
90
        latency = None
Jakob Görgen's avatar
Jakob Görgen committed
91
92
93
94
95
        sync_period = None
        run_sync = False
        for channel in channels:
            sync_period = min(sync_period, channel.sync_period)
            run_sync = run_sync or channel._synchronized
96
97
            latency = max(latency, channel.sys_channel.latency)
        if latency is None or sync_period is None:
Jakob Görgen's avatar
Jakob Görgen committed
98
            raise Exception("could not determine eth_latency and sync_period")
99
        return latency, sync_period, run_sync
100

Jakob Görgen's avatar
Jakob Görgen committed
101
102
103
104
105
    def filter_components_by_pred(
        self, pred: tp.Callable[[sys_conf.Component], bool]
    ) -> list[sys_conf.Component]:
        return list(filter(pred, self._components))

106
    def filter_components_by_type(self, ty) -> list[sys_conf.Component]:
Jakob Görgen's avatar
Jakob Görgen committed
107
        return self.filter_components_by_pred(lambda comp: isinstance(comp, ty))
Jakob Görgen's avatar
Jakob Görgen committed
108

109
110
111
    def components(self) -> set[sys_conf.Component]:
        return self._components

Hejing Li's avatar
Hejing Li committed
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
    def resreq_cores(self) -> int:
        """
        Number of cores this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 1

    def resreq_mem(self) -> int:
        """
        Number of memory in MB this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 64

    def full_name(self) -> str:
        """Full name of the simulator."""
130
        return ""
Hejing Li's avatar
Hejing Li committed
131

132
    def add(self, comp: sys_conf.Component) -> None:
133
134
        if comp in self._components:
            raise Exception("cannot add the same specification twice to a simulator")
135
        self._components.add(comp)
Jakob Görgen's avatar
Jakob Görgen committed
136
        self._simulation.add_spec_sim_map(comp, self)
137

138
    def _chan_needs_instance(self, chan: sys_conf.Channel) -> bool:
139
140
141
142
143
144
145
        if (
            chan.a.component in self._components
            and chan.b.component in self._components
        ):
            return False
        return True

146
    def _get_my_interface(self, chan: sys_conf.Channel) -> sys_conf.Interface:
147
148
149
150
151
152
153
154
155
156
157
        interface = None
        for inter in chan.interfaces():
            if inter.component in self._components:
                assert interface is None
                interface = inter
        if interface is None:
            raise Exception(
                "unable to find channel interface for simulators specification"
            )
        return interface

158
159
160
161
162
    def _get_sys_chan(self, interface: sys_conf.Interface) -> sys_conf.Channel:
        if not interface.is_connected():
            raise Exception("interface does not need a channel as it is not connected")
        return interface.channel

163
    def _get_socket(
164
        self, inst: inst_base.Instantiation, interface: sys_conf.Interface
165
    ) -> inst_base.Socket | None:
166
167
168
        # get the channel associated with this interface
        chan = self._get_sys_chan(interface=interface)
        # check if interfaces channel is simulator internal, i.e. doesnt need an instanciation
169
        if not self._chan_needs_instance(chan):
170
            return None
171
        # create the socket to listen on or connect to
172
173
174
        socket = inst.get_socket(
            interface=interface, supported_sock_types=self.supported_socket_types()
        )
175
        return socket
176

177
    def _get_sockets(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
178
179
        sockets = []
        for comp_spec in self._components:
180
            for interface in comp_spec.interfaces():
181
                socket = self._get_socket(inst=inst, interface=interface)
182
                if socket is None:
183
184
                    continue
                sockets.append(socket)
185
        return sockets
186

187
188
189
190
191
    def _get_all_sockets_by_type(
        self, inst: inst_base.Instantiation, sock_type: inst_base.SockType
    ) -> list[inst_base.Socket]:
        sockets = self._get_sockets(inst=inst)
        sockets = Simulator.filter_sockets(sockets=sockets, filter_type=sock_type)
192
193
194
195
        return sockets

    def _get_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel | None:
        if self._chan_needs_instance(chan):
196
            return self._simulation.retrieve_or_create_channel(chan=chan)
197
198
        return None

199
    def get_channels(self) -> list[sim_chan.Channel]:
200
201
        channels = []
        for comp_spec in self._components:
202
203
            comp_sys_channels = comp_spec.channels()
            for chan in comp_sys_channels:
204
205
206
207
208
                channel = self._get_channel(chan=chan)
                if channel is None:
                    continue
                channels.append(channel)
        return channels
209

Hejing Li's avatar
Hejing Li committed
210
    # pylint: disable=unused-argument
211
    @abc.abstractmethod
212
    def run_cmd(self, inst: inst_base.Instantiation) -> str:
Hejing Li's avatar
Hejing Li committed
213
        """Command to execute this simulator."""
214
        return ""
215

216
217
    def checkpoint_commands(self) -> list[str]:
        return []
Hejing Li's avatar
Hejing Li committed
218

219
    @abc.abstractmethod
220
    def supported_socket_types(self) -> set[inst_base.SockType]:
221
        return []
222

223
    # Sockets to be cleaned up: always the CONNECTING sockets
Hejing Li's avatar
Hejing Li committed
224
    # pylint: disable=unused-argument
225
    def sockets_cleanup(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
226
227
228
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
229
230
231

    # sockets to wait for indicating the simulator is ready
    # pylint: disable=unused-argument
232
233
234
235
    def sockets_wait(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
236
237
238
239

    def start_delay(self) -> int:
        return 5

240
    # TODO: FIXME
Hejing Li's avatar
Hejing Li committed
241
    def wait_terminate(self) -> bool:
242
        return self._wait
243

244
245
246
    def supports_checkpointing(self) -> bool:
        return False

247
248
249
250
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = [comp.prepare(inst=inst) for comp in self._components]
        await asyncio.gather(*promises)

251

252
class Simulation(utils_base.IdObj):
253
254
255
256
257
258
259
    """
    Base class for all simulation experiments.

    Contains the simulators to be run and experiment-wide parameters.
    """

    def __init__(self, name: str) -> None:
260
        super().__init__()
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
        self.name = name
        """
        This experiment's name.

        Can be used to run only a selection of experiments.
        """
        self.timeout: int | None = None
        """Timeout for experiment in seconds."""
        self.checkpoint = False
        """
        Whether to use checkpoint and restore for simulators.

        The most common use-case for this is accelerating host simulator startup
        by first running in a less accurate mode, then checkpointing the system
        state after boot and running simulations from there.
        """
        self.metadata: dict[str, tp.Any] = {}

Jakob Görgen's avatar
Jakob Görgen committed
279
        self._sys_sim_map: dict[sys_conf.Component, Simulator] = {}
280
281
        """System component and its simulator pairs"""

282
        self._chan_map: dict[sys_conf.Channel, sim_chan.Channel] = {}
283
284
        """Channel spec and its instanciation"""

285
286
287
        self._sim_list: list[Simulator] = []
        """Channel spec and its instanciation"""

288
    def add_sim(self, sim: Simulator):
289
290
291
292
        if sim in self._sim_list:
            raise Exception("Simulaotr is already added")
        self._sim_list.append(sim)

293
294
    def add_spec_sim_map(self, sys: sys_conf.Component, sim: Simulator):
        """Add a mapping from specification to simulation instance"""
Jakob Görgen's avatar
Jakob Görgen committed
295
        if sys in self._sys_sim_map:
296
            raise Exception("system component is already mapped by simulator")
Jakob Görgen's avatar
Jakob Görgen committed
297
        self._sys_sim_map[sys] = sim
298

299
    def is_channel_instantiated(self, chan: sys_conf.Channel) -> bool:
300
301
        return chan in self._chan_map

Hejing Li's avatar
Hejing Li committed
302
    def retrieve_or_create_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel:
303
304
305
        if self.is_channel_instantiated(chan):
            return self._chan_map[chan]

Hejing Li's avatar
Hejing Li committed
306
        channel = sim_chan.Channel(chan)
307
308
309
        self._chan_map[chan] = channel
        return channel

310
311
    def all_simulators(self) -> list[Simulator]:
        return self._sim_list
Jakob Görgen's avatar
Jakob Görgen committed
312

313
314
315
316
317
318
319
320
321
322
    def get_all_channels(self, lazy: bool = False) -> list[Channel]:
        if lazy:
            return list(self._chan_map.values())

        all_channels = []
        for sim in self.all_simulators():
            channels = sim.get_channels()
            all_channels.extend(channels)
        return all_channels

323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
    def resreq_mem(self) -> int:
        """Memory required to run all simulators in this experiment."""
        mem = 0
        for s in self.all_simulators():
            mem += s.resreq_mem()
        return mem

    def resreq_cores(self) -> int:
        """Number of Cores required to run all simulators in this experiment."""
        cores = 0
        for s in self.all_simulators():
            cores += s.resreq_cores()
        return cores

    def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator:
        """Returns the used simulator object for the system component."""
339
        utils_base.has_expected_type(comp, sys_conf.Component)
Jakob Görgen's avatar
Jakob Görgen committed
340
        if comp not in self._sys_sim_map:
341
            raise Exception(f"Simulator not found for component: {comp}")
Jakob Görgen's avatar
Jakob Görgen committed
342
        return self._sys_sim_map[comp]
343

344
345
346
347
348
349
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = []
        for sim in self._sim_list:
            promises.append(sim.prepare(inst=inst))
        await asyncio.gather(*promises)

Jakob Görgen's avatar
Jakob Görgen committed
350
    # TODO: FIXME
351
352
353
    def enable_checkpointing_if_supported() -> None:
        raise Exception("not implemented")

Jakob Görgen's avatar
Jakob Görgen committed
354
    # TODO: FIXME
355
356
    def is_checkpointing_enabled(self) -> bool:
        raise Exception("not implemented")