ib_validation_performance.py 18.2 KB
Newer Older
1
2
3
4
5
6
7
8
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""Module of the IB performance benchmarks."""

import os

from superbench.common.utils import logger
9
from superbench.common.utils import gen_pair_wise_config, gen_topo_aware_config
10
11
12
from superbench.benchmarks import BenchmarkRegistry, ReturnCode
from superbench.common.devices import GPU
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
13
from superbench.common.utils import network
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30


class IBBenchmark(MicroBenchmarkWithInvoke):
    """The IB validation performance benchmark class."""
    def __init__(self, name, parameters=''):
        """Constructor.

        Args:
            name (str): benchmark name.
            parameters (str): benchmark parameters.
        """
        super().__init__(name, parameters)

        self._bin_name = 'ib_validation'
        self.__support_ib_commands = [
            'ib_write_bw', 'ib_read_bw', 'ib_send_bw', 'ib_write_lat', 'ib_read_lat', 'ib_send_lat'
        ]
31
        self.__support_directions = ['gpu-to-gpu', 'cpu-to-cpu', 'cpu-to-gpu', 'gpu-to-cpu']
32
        self.__patterns = ['one-to-one', 'one-to-many', 'many-to-one', 'topo-aware']
33
        self.__config_path = os.path.join(os.getcwd(), 'config.txt')
34
35
36
37
38
39
40
        self.__config = []

    def add_parser_arguments(self):
        """Add the specified arguments."""
        super().add_parser_arguments()

        self._parser.add_argument(
41
42
43
44
45
46
            '--ib_dev',
            type=str,
            default='mlx5_0',
            required=False,
            help='The IB device, e.g., mlx5_0, mlx5_$LOCAL_RANK, mlx5_$((LOCAL_RANK/2)), etc.',
        )
47
48
49
50
51
52
53
        self._parser.add_argument(
            '--set_ib_devices',
            action='store_true',
            default=False,
            help='Set irregular IB devices automatically according to the local rank. \
            If IB devices are not able to be probed, use env IB_DEVICES to set them manually.',
        )
54
55
56
57
58
59
60
61
62
63
64
        self._parser.add_argument(
            '--gpu_dev',
            type=str,
            default=None,
            required=False,
            help='The GPU device, e.g., 0, $LOCAL_RANK, $((LOCAL_RANK/2)), etc.',
        )
        self._parser.add_argument(
            '--numa_dev',
            type=str,
            default=None,
65
            required=False,
66
            help='The NUMA node to bind, e.g., 0, $LOCAL_RANK, $((LOCAL_RANK/2)), etc.',
67
        )
68
69
70
71
72
73
74
        self._parser.add_argument(
            '--timeout',
            type=int,
            default=120,
            required=False,
            help='Timeout in seconds for each perftest command in case ib is too slow.',
        )
75
        # perftest configurations
76
77
78
79
80
        self._parser.add_argument(
            '--iters',
            type=int,
            default=5000,
            required=False,
81
            help='The iterations of perftest command',
82
83
84
85
        )
        self._parser.add_argument(
            '--msg_size',
            type=int,
86
            nargs='+',
87
            default=8388608,
88
            required=False,
89
            help='The message size of perftest command, e.g., 8388608.',
90
91
        )
        self._parser.add_argument(
92
93
94
95
            '--bidirectional', action='store_true', default=False, help='Measure bidirectional bandwidth.'
        )
        self._parser.add_argument(
            '--command',
96
            type=str,
97
            nargs='+',
98
99
100
            default='ib_write_bw',
            required=False,
            help='The perftest command to use, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
101
        )
102
        # customized configurations
103
104
105
106
        self._parser.add_argument(
            '--pattern',
            type=str,
            default='one-to-one',
107
            help='IB traffic pattern type, e.g., {}.'.format(''.join(self.__patterns)),
108
109
110
111
112
113
        )
        self._parser.add_argument(
            '--config',
            type=str,
            default=None,
            required=False,
114
            help='The path of config file on the target machines.',
115
116
117
118
        )
        self._parser.add_argument(
            '--hostfile',
            type=str,
119
            default=None,
120
            required=False,
121
            help='The path of hostfile on the target machines.',
122
        )
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
        self._parser.add_argument(
            '--min_dist',
            type=int,
            default=2,
            required=False,
            help='The minimum distance of VM pair in topo-aware pattern',
        )
        self._parser.add_argument(
            '--max_dist',
            type=int,
            default=6,
            required=False,
            help='The maximum distance of VM pair in topo-aware pattern',
        )
        self._parser.add_argument(
            '--ibstat',
            type=str,
            default=None,
            required=False,
            help='The path of ibstat output',
        )
        self._parser.add_argument(
            '--ibnetdiscover',
            type=str,
            default=None,
            required=False,
            help='The path of ibnetdiscover output',
        )
151
152
153
154
155
156
157
158
        self._parser.add_argument(
            '--direction',
            type=str,
            nargs='+',
            default='gpu-to-gpu',
            required=False,
            help='The direction of traffic pattern, e.g., gpu-to-gpu, cpu-to-cpu, cpu-to-gpu, gpu-to-cpu'
        )
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

    def __one_to_many(self, n):
        """Generate one-to-many pattern config.

        There are a total of n rounds
        In each round, The i-th participant will be paired as a client with the remaining n-1 servers.

        Args:
            n (int): the number of participants.

        Returns:
            list: the generated config list, each item in the list is a str like "0,1;2,3".
        """
        config = []
        for client in range(n):
            row = []
            for server in range(n):
                if server != client:
                    pair = '{},{}'.format(server, client)
                    row.append(pair)
            row = ';'.join(row)
            config.append(row)
        return config

    def __many_to_one(self, n):
        """Generate many-to-one pattern config.

        There are a total of n rounds
        In each round, The i-th participant will be paired as a server with the remaining n-1 clients.

        Args:
            n (int): the number of participants.

        Returns:
            list: the generated config list, each item in the list is a str like "0,1;2,3".
        """
        config = []
        for server in range(n):
            row = []
            for client in range(n):
                if server != client:
                    pair = '{},{}'.format(server, client)
                    row.append(pair)
            row = ';'.join(row)
            config.append(row)
        return config

206
    def gen_traffic_pattern(self, hosts, mode, config_file_path):
207
208
209
        """Generate traffic pattern into config file.

        Args:
210
211
            hosts (list): the list of VM hostnames read from hostfile.
            mode (str): the traffic mode, including 'one-to-one', 'one-to-many', 'many-to-one', 'topo-aware'.
212
213
214
            config_file_path (str): the path of config file to generate.
        """
        config = []
215
        n = len(hosts)
216
217
218
219
220
        if mode == 'one-to-many':
            config = self.__one_to_many(n)
        elif mode == 'many-to-one':
            config = self.__many_to_one(n)
        elif mode == 'one-to-one':
221
            config = gen_pair_wise_config(n)
222
223
224
225
        elif mode == 'topo-aware':
            config = gen_topo_aware_config(
                hosts, self._args.ibstat, self._args.ibnetdiscover, self._args.min_dist, self._args.max_dist
            )
226
227
228
229
        with open(config_file_path, 'w') as f:
            for line in config:
                f.write(line + '\n')

230
    def __prepare_config(self):
231
232
233
234
235
236
        """Prepare and read config file.

        Returns:
            True if the config is not empty and valid.
        """
        try:
237
238
239
240
            # Read the hostfile
            if not self._args.hostfile:
                self._args.hostfile = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'hostfile')
            with open(self._args.hostfile, 'r') as f:
241
                hosts = f.read().splitlines()
242
243
            # Generate the config file if not define
            if self._args.config is None:
244
                self.gen_traffic_pattern(hosts, self._args.pattern, self.__config_path)
245
246
247
248
249
250
            # Use the config file defined in args
            else:
                self.__config_path = self._args.config
            # Read the config file and check if it's empty and valid
            with open(self.__config_path, 'r') as f:
                lines = f.readlines()
251
252
253
254
255
256
257
258
259
260
            for line in lines:
                pairs = line.strip().strip(';').split(';')
                # Check format of config
                for pair in pairs:
                    pair = pair.split(',')
                    if len(pair) != 2:
                        return False
                    pair[0] = int(pair[0])
                    pair[1] = int(pair[1])
                    self.__config.append('{}_{}'.format(hosts[pair[0]].strip(), hosts[pair[1]].strip()))
261
262
263
264
265
266
267
268
269
270
        except BaseException as e:
            self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
            logger.error('Failed to generate and check config - benchmark: {}, message: {}.'.format(self._name, str(e)))
            return False
        if len(self.__config) == 0:
            self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
            logger.error('No valid config - benchmark: {}.'.format(self._name))
            return False
        return True

271
    def __prepare_general_ib_command_params(self, msg_size, device='cpu'):
272
273
274
275
276
277
        """Prepare general params for ib commands.

        Returns:
            Str of ib command params if arguments are valid, otherwise False.
        """
        # Add message size for ib command
278
        msg_size = f'-s {msg_size}' if msg_size > 0 else '-a'
279
        # Add GPUDirect for ib command
280
        gpu_dev = ''
281
282
283
284
        if device == 'gpu' and self._args.gpu_dev is not None:
            gpu = GPU()
            if gpu.vendor == 'nvidia':
                gpu_dev = f'--use_cuda={self._args.gpu_dev}'
one's avatar
one committed
285
            elif gpu.vendor == 'amd' or gpu.vendor == 'hygon':
286
287
288
289
290
                gpu_dev = f'--use_rocm={self._args.gpu_dev}'
            else:
                self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                logger.error('No GPU found - benchmark: {}'.format(self._name))
                return False
291
        # Generate ib command params
292
        command_params = f'-F -n {self._args.iters} -d {self._args.ib_dev} {msg_size} {gpu_dev}'
293
294
295
296
297
298
299
300
301
302
        if self._args.set_ib_devices:
            ib_devices = network.get_ib_devices()
            local_rank = int(os.getenv('OMPI_COMM_WORLD_LOCAL_RANK', 0))
            if local_rank >= len(ib_devices):
                self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                logger.error(
                    f'Local rank {local_rank} exceeds IB devices ({len(ib_devices)}) - benchmark: {self._name}'
                )
                return False
            command_params = f'-F -n {self._args.iters} -d {ib_devices[local_rank].split(":")[0]} {msg_size} {gpu_dev}'
303
        command_params = f'{command_params.strip()} --report_gbits'
304
305
        return command_params

306
    def _preprocess(self):    # noqa: C901
307
308
309
310
311
312
313
314
315
        """Preprocess/preparation operations before the benchmarking.

        Return:
            True if _preprocess() succeed.
        """
        if not super()._preprocess():
            return False

        # Generate and check config
316
        if not self.__prepare_config():
317
318
            return False

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
        self._commands_ib_commands = []
        self._commands_msg_size = []
        self._commands_direction = []
        if not isinstance(self._args.msg_size, list):
            self._args.msg_size = [self._args.msg_size]
        for msg_size in self._args.msg_size:
            if msg_size < 0:
                logger.error('Invalid message size - benchmark: {}, message size: {}.'.format(self._name, msg_size))
                return False
            # Prepare general params for ib commands
            cpu_command_params = self.__prepare_general_ib_command_params(msg_size)
            gpu_command_params = self.__prepare_general_ib_command_params(msg_size, 'gpu')
            if not cpu_command_params or (self._args.gpu_dev and not gpu_command_params):
                return False
            # Generate commands
            if isinstance(self._args.command, str):
                self._args.command = [self._args.command]
            for ib_command in self._args.command:
                if ib_command not in self.__support_ib_commands:
                    self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                    logger.error(
                        'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
                            self._name, ib_command, ' '.join(self.__support_ib_commands)
                        )
                    )
                    return False
                else:
                    # Format the ib command type
                    ib_command = ib_command.lower()
                    cpu_ib_command_prefix = f'{os.path.join(self._args.bin_dir, ib_command)} {cpu_command_params}'
                    gpu_ib_command_prefix = f'{os.path.join(self._args.bin_dir, ib_command)} {gpu_command_params}'
                    if self._args.numa_dev is not None:
                        cpu_ib_command_prefix = f'numactl -N {self._args.numa_dev} {cpu_ib_command_prefix}'
                        gpu_ib_command_prefix = f'numactl -N {self._args.numa_dev} {gpu_ib_command_prefix}'
                    if 'bw' in ib_command and self._args.bidirectional:
                        cpu_ib_command_prefix += ' -b'
                        gpu_ib_command_prefix += ' -b'
                    if not isinstance(self._args.direction, list):
                        self._args.direction = [self._args.direction]
                    for direction in self._args.direction:
                        if direction not in self.__support_directions:
                            self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                            logger.error(
                                'Unsupported direction - benchmark: {}, direction: {}, expected: {}.'.format(
                                    self._name, direction, ' '.join(self.__support_directions)
                                )
                            )
                            return False
                        # Generate commands
                        command = os.path.join(self._args.bin_dir, self._bin_name)
                        command += ' --send_cmd_prefix ' + "'" + cpu_ib_command_prefix + "'" \
                            if 'cpu-to' in direction else ' --send_cmd_prefix ' + "'" + gpu_ib_command_prefix + "'"
                        command += ' --recv_cmd_prefix ' + "'" + cpu_ib_command_prefix + "'" \
                            if 'to-cpu' in direction else ' --recv_cmd_prefix ' + "'" + gpu_ib_command_prefix + "'"
                        command += f' --timeout {self._args.timeout} ' + \
                            f'--hostfile {self._args.hostfile} --input_config {self.__config_path}'
                        self._commands.append(command)
                        self._commands_ib_commands.append(ib_command)
                        self._commands_msg_size.append(msg_size)
                        self._commands_direction.append(direction)
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393

        return True

    def _process_raw_result(self, cmd_idx, raw_output):    # noqa: C901
        """Function to parse raw results and save the summarized results.

          self._result.add_raw_data() and self._result.add_result() need to be called to save the results.

        Args:
            cmd_idx (int): the index of command corresponding with the raw_output.
            raw_output (str): raw output string of the micro-benchmark.

        Return:
            True if the raw output string is valid and result can be extracted.
        """
394
395
396
397
        command = self._commands_ib_commands[cmd_idx]
        msg_size = self._commands_msg_size[cmd_idx]
        direction = self._commands_direction[cmd_idx]
        self._result.add_raw_data(f'raw_output_{command}_{msg_size}_{direction}', raw_output, self._args.log_raw_data)
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417

        # If it's invoked by MPI and rank is not 0, no result is expected
        if os.getenv('OMPI_COMM_WORLD_RANK'):
            rank = int(os.getenv('OMPI_COMM_WORLD_RANK'))
            if rank > 0:
                return True

        valid = False
        content = raw_output.splitlines()
        config_index = 0
        try:
            result_index = -1
            for index, line in enumerate(content):
                if 'results' in line:
                    result_index = index + 1
                    break
            if result_index == -1:
                valid = False
            else:
                content = content[result_index:]
418
419
420
421
422
                for line_index, line in enumerate(content):
                    line_result = list(filter(None, line.strip().split(',')))
                    for pair_index, pair_result in enumerate(line_result):
                        rank_results = list(filter(None, pair_result.strip().split(' ')))
                        for rank_index, rank_result in enumerate(rank_results):
423
424
                            metric = f'{command}_{msg_size}_{direction}_{line_index}_{pair_index}:' \
                                + f'{self.__config[config_index]}:{rank_index}'
425
                            value = float(rank_result)
426
427
                            # Check if the value is valid before the base conversion
                            if 'bw' in command and value >= 0.0:
428
429
430
                                value = value / 8.0
                            self._result.add_result(metric, value)
                            valid = True
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
                        config_index += 1
        except Exception:
            valid = False
        if valid is False or config_index != len(self.__config):
            logger.error(
                'The result format is invalid - round: {}, benchmark: {}, raw output: {}.'.format(
                    self._curr_run_index, self._name, raw_output
                )
            )
            return False

        return True


BenchmarkRegistry.register_benchmark('ib-traffic', IBBenchmark)