"magic_pdf/vscode:/vscode.git/clone" did not exist on "b7e9d454e9e9f818883e249a8d50200675bf9752"
base.py 12.4 KB
Newer Older
Hejing Li's avatar
Hejing Li committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Copyright 2024 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

23
from __future__ import annotations
24

25
import abc
Hejing Li's avatar
Hejing Li committed
26
import itertools
27
import time
28
import asyncio
29
30
import typing as tp
import simbricks.orchestration.system as sys_conf
31
import simbricks.orchestration.system.host as sys_host_conf
Jakob Görgen's avatar
Jakob Görgen committed
32
33
34
import simbricks.orchestration.instantiation.base as inst_base
import simbricks.orchestration.simulation.channel as sim_chan
import simbricks.orchestration.utils.base as utils_base
35

36
if tp.TYPE_CHECKING:
37
38
39
40
41
42
43
    from simbricks.orchestration.simulation import (
        Channel,
        HostSim,
        PCIDevSim,
        NetSim,
        base as sim_base,
    )
Hejing Li's avatar
Hejing Li committed
44

45

46
class Simulator(utils_base.IdObj):
Hejing Li's avatar
Hejing Li committed
47
48
    """Base class for all simulators."""

49
    def __init__(
50
51
52
        self,
        simulation: sim_base.Simulation,
        name: str = "",
Jakob Görgen's avatar
Jakob Görgen committed
53
        relative_executable_path: str = "",
54
    ) -> None:
55
        super().__init__()
56
57
        self.name: str = name
        self._relative_executable_path: str = relative_executable_path
Jakob Görgen's avatar
Jakob Görgen committed
58
        self._simulation: sim_base.Simulation = simulation
Hejing Li's avatar
Hejing Li committed
59
        self._components: set[sys_conf.Component] = set()
60
        self._wait: bool = False  # TODO: FIXME
61
        simulation.add_sim(self)
Hejing Li's avatar
Hejing Li committed
62

63
64
65
66
67
68
69
70
71
72
73
    @staticmethod
    def filter_sockets(
        sockets: list[inst_base.Socket],
        filter_type: inst_base.SockType = inst_base.SockType.LISTEN,
    ) -> list[inst_base.Socket]:
        res = filter(lambda sock: sock._type == filter_type, sockets)
        return res

    @staticmethod
    def split_sockets_by_type(
        sockets: list[inst_base.Socket],
74
    ) -> tuple[list[inst_base.Socket], list[inst_base.Socket]]:
75
76
77
78
79
80
81
82
        listen = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.LISTEN
        )
        connect = Simulator.filter_sockets(
            sockets=sockets, filter_type=inst_base.SockType.CONNECT
        )
        return listen, connect

Jakob Görgen's avatar
Jakob Görgen committed
83
84
85
86
87
88
89
    # helper method for simulators that do not support
    # multiple sync periods etc. Should become eventually
    # at some point in the future...
    @staticmethod
    def get_unique_latency_period_sync(
        channels: list[sim_chan.Channel],
    ) -> tuple[int, int, bool]:
90
        latency = None
Jakob Görgen's avatar
Jakob Görgen committed
91
92
93
94
95
        sync_period = None
        run_sync = False
        for channel in channels:
            sync_period = min(sync_period, channel.sync_period)
            run_sync = run_sync or channel._synchronized
96
97
            latency = max(latency, channel.sys_channel.latency)
        if latency is None or sync_period is None:
Jakob Görgen's avatar
Jakob Görgen committed
98
            raise Exception("could not determine eth_latency and sync_period")
99
        return latency, sync_period, run_sync
100
101
102

    def filter_components_by_type(self, ty) -> list[sys_conf.Component]:
        return list(filter(lambda comp: isinstance(comp, ty), self._components))
Jakob Görgen's avatar
Jakob Görgen committed
103

104
105
106
    def components(self) -> set[sys_conf.Component]:
        return self._components

Hejing Li's avatar
Hejing Li committed
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
    def resreq_cores(self) -> int:
        """
        Number of cores this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 1

    def resreq_mem(self) -> int:
        """
        Number of memory in MB this simulator requires during execution.

        This is used for scheduling multiple runs and experiments.
        """
        return 64

    def full_name(self) -> str:
        """Full name of the simulator."""
125
        return ""
Hejing Li's avatar
Hejing Li committed
126

127
    def add(self, comp: sys_conf.Component) -> None:
128
129
        if comp in self._components:
            raise Exception("cannot add the same specification twice to a simulator")
130
        self._components.add(comp)
Jakob Görgen's avatar
Jakob Görgen committed
131
        self._simulation.add_spec_sim_map(comp, self)
132

133
    def _chan_needs_instance(self, chan: sys_conf.Channel) -> bool:
134
135
136
137
138
139
140
        if (
            chan.a.component in self._components
            and chan.b.component in self._components
        ):
            return False
        return True

141
    def _get_my_interface(self, chan: sys_conf.Channel) -> sys_conf.Interface:
142
143
144
145
146
147
148
149
150
151
152
        interface = None
        for inter in chan.interfaces():
            if inter.component in self._components:
                assert interface is None
                interface = inter
        if interface is None:
            raise Exception(
                "unable to find channel interface for simulators specification"
            )
        return interface

153
154
155
156
157
    def _get_sys_chan(self, interface: sys_conf.Interface) -> sys_conf.Channel:
        if not interface.is_connected():
            raise Exception("interface does not need a channel as it is not connected")
        return interface.channel

158
    def _get_socket(
159
        self, inst: inst_base.Instantiation, interface: sys_conf.Interface
160
    ) -> inst_base.Socket | None:
161
162
163
        # get the channel associated with this interface
        chan = self._get_sys_chan(interface=interface)
        # check if interfaces channel is simulator internal, i.e. doesnt need an instanciation
164
        if not self._chan_needs_instance(chan):
165
            return None
166
        # create the socket to listen on or connect to
167
168
169
        socket = inst.get_socket(
            interface=interface, supported_sock_types=self.supported_socket_types()
        )
170
        return socket
171

172
    def _get_sockets(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
173
174
        sockets = []
        for comp_spec in self._components:
175
            for interface in comp_spec.interfaces():
176
                socket = self._get_socket(inst=inst, interface=interface)
177
                if socket is None:
178
179
                    continue
                sockets.append(socket)
180
        return sockets
181

182
183
184
185
186
    def _get_all_sockets_by_type(
        self, inst: inst_base.Instantiation, sock_type: inst_base.SockType
    ) -> list[inst_base.Socket]:
        sockets = self._get_sockets(inst=inst)
        sockets = Simulator.filter_sockets(sockets=sockets, filter_type=sock_type)
187
188
189
190
        return sockets

    def _get_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel | None:
        if self._chan_needs_instance(chan):
191
            return self._simulation.retrieve_or_create_channel(chan=chan)
192
193
        return None

194
    def get_channels(self) -> list[sim_chan.Channel]:
195
196
        channels = []
        for comp_spec in self._components:
197
198
            comp_sys_channels = comp_spec.channels()
            for chan in comp_sys_channels:
199
200
201
202
203
                channel = self._get_channel(chan=chan)
                if channel is None:
                    continue
                channels.append(channel)
        return channels
204

Hejing Li's avatar
Hejing Li committed
205
    # pylint: disable=unused-argument
206
    @abc.abstractmethod
207
    def run_cmd(self, inst: inst_base.Instantiation) -> str:
Hejing Li's avatar
Hejing Li committed
208
        """Command to execute this simulator."""
209
        return ""
210

211
212
    def checkpoint_commands(self) -> list[str]:
        return []
Hejing Li's avatar
Hejing Li committed
213

214
    @abc.abstractmethod
215
    def supported_socket_types(self) -> set[inst_base.SockType]:
216
        return []
217

218
    # Sockets to be cleaned up: always the CONNECTING sockets
Hejing Li's avatar
Hejing Li committed
219
    # pylint: disable=unused-argument
220
    def sockets_cleanup(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
221
222
223
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
224
225
226

    # sockets to wait for indicating the simulator is ready
    # pylint: disable=unused-argument
227
228
229
230
    def sockets_wait(self, inst: inst_base.Instantiation) -> list[inst_base.Socket]:
        return self._get_all_sockets_by_type(
            inst=inst, sock_type=inst_base.SockType.LISTEN
        )
Hejing Li's avatar
Hejing Li committed
231
232
233
234

    def start_delay(self) -> int:
        return 5

235
    # TODO: FIXME
Hejing Li's avatar
Hejing Li committed
236
    def wait_terminate(self) -> bool:
237
        return self._wait
238

239
240
241
    def supports_checkpointing(self) -> bool:
        return False

242
243
244
245
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = [comp.prepare(inst=inst) for comp in self._components]
        await asyncio.gather(*promises)

246

247
class Simulation(utils_base.IdObj):
248
249
250
251
252
253
254
    """
    Base class for all simulation experiments.

    Contains the simulators to be run and experiment-wide parameters.
    """

    def __init__(self, name: str) -> None:
255
        super().__init__()
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
        self.name = name
        """
        This experiment's name.

        Can be used to run only a selection of experiments.
        """
        self.timeout: int | None = None
        """Timeout for experiment in seconds."""
        self.checkpoint = False
        """
        Whether to use checkpoint and restore for simulators.

        The most common use-case for this is accelerating host simulator startup
        by first running in a less accurate mode, then checkpointing the system
        state after boot and running simulations from there.
        """
        self.metadata: dict[str, tp.Any] = {}

Jakob Görgen's avatar
Jakob Görgen committed
274
        self._sys_sim_map: dict[sys_conf.Component, Simulator] = {}
275
276
        """System component and its simulator pairs"""

277
        self._chan_map: dict[sys_conf.Channel, sim_chan.Channel] = {}
278
279
        """Channel spec and its instanciation"""

280
281
282
        self._sim_list: list[Simulator] = []
        """Channel spec and its instanciation"""

283
    def add_sim(self, sim: Simulator):
284
285
286
287
        if sim in self._sim_list:
            raise Exception("Simulaotr is already added")
        self._sim_list.append(sim)

288
289
    def add_spec_sim_map(self, sys: sys_conf.Component, sim: Simulator):
        """Add a mapping from specification to simulation instance"""
Jakob Görgen's avatar
Jakob Görgen committed
290
        if sys in self._sys_sim_map:
291
            raise Exception("system component is already mapped by simulator")
Jakob Görgen's avatar
Jakob Görgen committed
292
        self._sys_sim_map[sys] = sim
293

294
    def is_channel_instantiated(self, chan: sys_conf.Channel) -> bool:
295
296
        return chan in self._chan_map

Hejing Li's avatar
Hejing Li committed
297
    def retrieve_or_create_channel(self, chan: sys_conf.Channel) -> sim_chan.Channel:
298
299
300
        if self.is_channel_instantiated(chan):
            return self._chan_map[chan]

Hejing Li's avatar
Hejing Li committed
301
        channel = sim_chan.Channel(chan)
302
303
304
        self._chan_map[chan] = channel
        return channel

305
306
    def all_simulators(self) -> list[Simulator]:
        return self._sim_list
Jakob Görgen's avatar
Jakob Görgen committed
307

308
309
310
311
312
313
314
315
316
317
    def get_all_channels(self, lazy: bool = False) -> list[Channel]:
        if lazy:
            return list(self._chan_map.values())

        all_channels = []
        for sim in self.all_simulators():
            channels = sim.get_channels()
            all_channels.extend(channels)
        return all_channels

318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
    def resreq_mem(self) -> int:
        """Memory required to run all simulators in this experiment."""
        mem = 0
        for s in self.all_simulators():
            mem += s.resreq_mem()
        return mem

    def resreq_cores(self) -> int:
        """Number of Cores required to run all simulators in this experiment."""
        cores = 0
        for s in self.all_simulators():
            cores += s.resreq_cores()
        return cores

    def find_sim(self, comp: sys_conf.Component) -> sim_base.Simulator:
        """Returns the used simulator object for the system component."""
334
        utils_base.has_expected_type(comp, sys_conf.Component)
Jakob Görgen's avatar
Jakob Görgen committed
335
        if comp not in self._sys_sim_map:
336
            raise Exception(f"Simulator not found for component: {comp}")
Jakob Görgen's avatar
Jakob Görgen committed
337
        return self._sys_sim_map[comp]
338

339
340
341
342
343
344
    async def prepare(self, inst: inst_base.Instantiation) -> None:
        promises = []
        for sim in self._sim_list:
            promises.append(sim.prepare(inst=inst))
        await asyncio.gather(*promises)

Jakob Görgen's avatar
Jakob Görgen committed
345
    # TODO: FIXME
346
347
348
    def enable_checkpointing_if_supported() -> None:
        raise Exception("not implemented")

Jakob Görgen's avatar
Jakob Görgen committed
349
    # TODO: FIXME
350
351
    def is_checkpointing_enabled(self) -> bool:
        raise Exception("not implemented")