ib_validation_performance.py 14.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""Module of the IB performance benchmarks."""

import os

from superbench.common.utils import logger
from superbench.common.utils import network
from superbench.benchmarks import BenchmarkRegistry, ReturnCode
from superbench.common.devices import GPU
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke


class IBBenchmark(MicroBenchmarkWithInvoke):
    """The IB validation performance benchmark class."""
    def __init__(self, name, parameters=''):
        """Constructor.

        Args:
            name (str): benchmark name.
            parameters (str): benchmark parameters.
        """
        super().__init__(name, parameters)

        self._bin_name = 'ib_validation'
        self.__support_ib_commands = [
            'ib_write_bw', 'ib_read_bw', 'ib_send_bw', 'ib_write_lat', 'ib_read_lat', 'ib_send_lat'
        ]
        self.__patterns = ['one-to-one', 'one-to-many', 'many-to-one']
        self.__config_path = os.getcwd() + '/config.txt'
        self.__config = []

    def add_parser_arguments(self):
        """Add the specified arguments."""
        super().add_parser_arguments()

        self._parser.add_argument(
            '--ib_index',
            type=int,
            default=0,
            required=False,
            help='The index of ib device.',
        )
        self._parser.add_argument(
            '--iters',
            type=int,
            default=5000,
            required=False,
            help='The iterations of running ib command',
        )
        self._parser.add_argument(
            '--msg_size',
            type=int,
            default=None,
            required=False,
            help='The message size of running ib command, e.g., 8388608.',
        )
        self._parser.add_argument(
            '--commands',
            type=str,
            nargs='+',
            default=['ib_write_bw'],
            help='The ib command used to run, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
        )
        self._parser.add_argument(
            '--pattern',
            type=str,
            default='one-to-one',
            required=False,
            help='Test IB traffic pattern type, e.g., {}.'.format(''.join(self.__patterns)),
        )
        self._parser.add_argument(
            '--config',
            type=str,
            default=None,
            required=False,
            help='The path of config file on the target machines',
        )
        self._parser.add_argument(
            '--bidirectional', action='store_true', default=False, help='Measure bidirectional bandwidth.'
        )
        self._parser.add_argument(
            '--gpu_index', type=int, default=None, required=False, help='Test Use GPUDirect with the gpu index.'
        )
        self._parser.add_argument(
            '--hostfile',
            type=str,
            default='/root/hostfile',
            required=False,
            help='The path of hostfile on the target machines',
        )

    def __one_to_many(self, n):
        """Generate one-to-many pattern config.

        There are a total of n rounds
        In each round, The i-th participant will be paired as a client with the remaining n-1 servers.

        Args:
            n (int): the number of participants.

        Returns:
            list: the generated config list, each item in the list is a str like "0,1;2,3".
        """
        config = []
        for client in range(n):
            row = []
            for server in range(n):
                if server != client:
                    pair = '{},{}'.format(server, client)
                    row.append(pair)
            row = ';'.join(row)
            config.append(row)
        return config

    def __many_to_one(self, n):
        """Generate many-to-one pattern config.

        There are a total of n rounds
        In each round, The i-th participant will be paired as a server with the remaining n-1 clients.

        Args:
            n (int): the number of participants.

        Returns:
            list: the generated config list, each item in the list is a str like "0,1;2,3".
        """
        config = []
        for server in range(n):
            row = []
            for client in range(n):
                if server != client:
                    pair = '{},{}'.format(server, client)
                    row.append(pair)
            row = ';'.join(row)
            config.append(row)
        return config

    def __fully_one_to_one(self, n):
        """Generate one-to-one pattern config.

        One-to-one means that each participant plays every other participant once.
        The algorithm refers circle method of Round-robin tournament in
        https://en.wikipedia.org/wiki/Round-robin_tournament.
        if n is even, there are a total of n-1 rounds, with n/2 pair of 2 unique participants in each round.
        If n is odd, there will be n rounds, each with n-1/2 pairs, and one participant rotating empty in that round.
        In each round, pair up two by two from the beginning to the middle as (begin, end),(begin+1,end-1)...
        Then, all the participants except the beginning shift left one position, and repeat the previous step.

        Args:
            n (int): the number of participants.

        Returns:
            list: the generated config list, each item in the list is a str like "0,1;2,3".
        """
        config = []
        candidates = list(range(n))
        # Add a fake participant if n is odd
        if n % 2 == 1:
            candidates.append(-1)
        count = len(candidates)
        non_moving = [candidates[0]]
        for _ in range(count - 1):
            pairs = [
                '{},{}'.format(candidates[i], candidates[count - i - 1]) for i in range(0, count // 2)
                if candidates[i] != -1 and candidates[count - i - 1] != -1
            ]
            row = ';'.join(pairs)
            config.append(row)
            robin = candidates[2:] + candidates[1:2]
            candidates = non_moving + robin
        return config

    def gen_traffic_pattern(self, n, mode, config_file_path):
        """Generate traffic pattern into config file.

        Args:
            n (int): the number of nodes.
            mode (str): the traffic mode, including 'one-to-one', 'one-to-many', 'many-to-one'.
            config_file_path (str): the path of config file to generate.
        """
        config = []
        if mode == 'one-to-many':
            config = self.__one_to_many(n)
        elif mode == 'many-to-one':
            config = self.__many_to_one(n)
        elif mode == 'one-to-one':
            config = self.__fully_one_to_one(n)
        with open(config_file_path, 'w') as f:
            for line in config:
                f.write(line + '\n')

    def __prepare_config(self, node_num):
        """Prepare and read config file.

        Args:
            node_num (int): the number of nodes.

        Returns:
            True if the config is not empty and valid.
        """
        try:
            # Generate the config file if not define
            if self._args.config is None:
                self.gen_traffic_pattern(node_num, self._args.pattern, self.__config_path)
            # Use the config file defined in args
            else:
                self.__config_path = self._args.config
210
211
212
            # Read the hostfile
            with open(self._args.hostfile, 'r') as f:
                hosts = f.readlines()
213
214
215
            # Read the config file and check if it's empty and valid
            with open(self.__config_path, 'r') as f:
                lines = f.readlines()
216
217
218
219
220
221
222
223
224
225
            for line in lines:
                pairs = line.strip().strip(';').split(';')
                # Check format of config
                for pair in pairs:
                    pair = pair.split(',')
                    if len(pair) != 2:
                        return False
                    pair[0] = int(pair[0])
                    pair[1] = int(pair[1])
                    self.__config.append('{}_{}'.format(hosts[pair[0]].strip(), hosts[pair[1]].strip()))
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
        except BaseException as e:
            self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
            logger.error('Failed to generate and check config - benchmark: {}, message: {}.'.format(self._name, str(e)))
            return False
        if len(self.__config) == 0:
            self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
            logger.error('No valid config - benchmark: {}.'.format(self._name))
            return False
        return True

    def __prepare_general_ib_command_params(self):
        """Prepare general params for ib commands.

        Returns:
            Str of ib command params if arguments are valid, otherwise False.
        """
        # Format the ib command type
        self._args.commands = [command.lower() for command in self._args.commands]
        # Add message size for ib command
        msg_size = ''
        if self._args.msg_size is None:
            msg_size = '-a'
        else:
            msg_size = '-s ' + str(self._args.msg_size)
        # Add GPUDirect for ib command
        gpu_enable = ''
252
        if self._args.gpu_index is not None:
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
            gpu = GPU()
            if gpu.vendor == 'nvidia':
                gpu_enable = ' --use_cuda={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
            elif gpu.vendor == 'amd':
                gpu_enable = ' --use_rocm={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
            else:
                self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                logger.error('No GPU found - benchmark: {}'.format(self._name))
                return False
        # Generate ib command params
        try:
            command_params = '-F --iters={iter} -d {device} {size}{gpu}'.format(
                iter=str(self._args.iters),
                device=network.get_ib_devices()[self._args.ib_index].split(':')[0],
                size=msg_size,
                gpu=gpu_enable
            )
        except BaseException as e:
            self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
            logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e)))
            return False
        return command_params

    def _preprocess(self):
        """Preprocess/preparation operations before the benchmarking.

        Return:
            True if _preprocess() succeed.
        """
        if not super()._preprocess():
            return False

        # Check MPI environment
        self._args.pattern = self._args.pattern.lower()
        if os.getenv('OMPI_COMM_WORLD_SIZE'):
            node_num = int(os.getenv('OMPI_COMM_WORLD_SIZE'))
        else:
            self._result.set_return_code(ReturnCode.MICROBENCHMARK_MPI_INIT_FAILURE)
            logger.error('No MPI environment - benchmark: {}.'.format(self._name))
            return False

        # Generate and check config
        if not self.__prepare_config(node_num):
            return False

        # Prepare general params for ib commands
        command_params = self.__prepare_general_ib_command_params()
        if not command_params:
            return False
        # Generate commands
        for ib_command in self._args.commands:
            if ib_command not in self.__support_ib_commands:
                self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
                logger.error(
                    'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
                        self._name, ib_command, ' '.join(self.__support_ib_commands)
                    )
                )
                return False
            else:
                ib_command_prefix = '{command} {command_params}'.format(
                    command=ib_command, command_params=command_params
                )
                if 'bw' in ib_command and self._args.bidirectional:
                    ib_command_prefix += ' -b'

                command = os.path.join(self._args.bin_dir, self._bin_name)
                command += ' --hostfile ' + self._args.hostfile
                command += ' --cmd_prefix ' + '\"' + ib_command_prefix + '\"'
                command += ' --input_config ' + self.__config_path
                self._commands.append(command)

        return True

    def _process_raw_result(self, cmd_idx, raw_output):    # noqa: C901
        """Function to parse raw results and save the summarized results.

          self._result.add_raw_data() and self._result.add_result() need to be called to save the results.

        Args:
            cmd_idx (int): the index of command corresponding with the raw_output.
            raw_output (str): raw output string of the micro-benchmark.

        Return:
            True if the raw output string is valid and result can be extracted.
        """
        self._result.add_raw_data('raw_output_' + self._args.commands[cmd_idx], raw_output)

        # If it's invoked by MPI and rank is not 0, no result is expected
        if os.getenv('OMPI_COMM_WORLD_RANK'):
            rank = int(os.getenv('OMPI_COMM_WORLD_RANK'))
            if rank > 0:
                return True

        valid = False
        content = raw_output.splitlines()
        line_index = 0
        config_index = 0
351
352
        command = self._args.commands[cmd_idx]
        suffix = command.split('_')[-1]
353
354
355
356
357
358
359
360
361
362
363
364
        try:
            result_index = -1
            for index, line in enumerate(content):
                if 'results' in line:
                    result_index = index + 1
                    break
            if result_index == -1:
                valid = False
            else:
                content = content[result_index:]
                for line in content:
                    line = list(filter(None, line.strip().split(',')))
365
                    pair_index = 0
366
                    for item in line:
367
368
369
370
371
372
373
374
375
376
377
                        metric = '{command}_{line}_{pair}_{host}_{suffix}'.format(
                            command=command,
                            line=str(line_index),
                            pair=pair_index,
                            host=self.__config[config_index],
                            suffix=suffix
                        )
                        value = float(item)
                        if 'bw' in command:
                            value = value / 1000
                        self._result.add_result(metric, value)
378
379
                        valid = True
                        config_index += 1
380
                        pair_index += 1
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
                    line_index += 1
        except Exception:
            valid = False
        if valid is False or config_index != len(self.__config):
            logger.error(
                'The result format is invalid - round: {}, benchmark: {}, raw output: {}.'.format(
                    self._curr_run_index, self._name, raw_output
                )
            )
            return False

        return True


BenchmarkRegistry.register_benchmark('ib-traffic', IBBenchmark)