nvbandwidth.py 7.99 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""Module of the NV Bandwidth Test."""

import os
import re

from superbench.common.utils import logger
from superbench.benchmarks import BenchmarkRegistry, Platform
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke


class NvBandwidthBenchmark(MicroBenchmarkWithInvoke):
    """The NV Bandwidth Test benchmark class."""
    def __init__(self, name, parameters=''):
        """Constructor.

        Args:
            name (str): benchmark name.
            parameters (str): benchmark parameters.
        """
        super().__init__(name, parameters)

        self._bin_name = 'nvbandwidth'

    def add_parser_arguments(self):
        """Add the specified arguments."""
        super().add_parser_arguments()

        self._parser.add_argument(
            '--buffer_size',
            type=int,
            default=64,
            required=False,
            help='Memcpy buffer size in MiB. Default is 64.',
        )

        self._parser.add_argument(
            '--test_cases',
            type=str,
            default='',
            required=False,
            help=(
                'Specify the test case(s) to run, either by name or index. By default, all test cases are executed. '
                'Example: --test_cases 0,1,2,19,20'
            ),
        )

        self._parser.add_argument(
            '--skip_verification',
            action='store_true',
            help='Skips data verification after copy. Default is False.',
        )

        self._parser.add_argument(
            '--disable_affinity',
            action='store_true',
            help='Disable automatic CPU affinity control. Default is False.',
        )

        self._parser.add_argument(
            '--use_mean',
            action='store_true',
            help='Use mean instead of median for results. Default is False.',
        )

        self._parser.add_argument(
            '--num_loops',
            type=int,
            default=3,
            required=False,
            help='Iterations of the benchmark. Default is 3.',
        )

    def _preprocess(self):
        """Preprocess/preparation operations before the benchmarking.

        Return:
            True if _preprocess() succeed.
        """
        if not super()._preprocess():
            return False

        if not self._set_binary_path():
            return False

        # Construct the command for nvbandwidth
        command = os.path.join(self._args.bin_dir, self._bin_name)

        if self._args.buffer_size:
            command += f' --bufferSize {self._args.buffer_size}'

        if self._args.test_cases:
            command += ' --testcase ' + ' '.join([testcase.strip() for testcase in self._args.test_cases.split(',')])

        if self._args.skip_verification:
            command += ' --skipVerification'

        if self._args.disable_affinity:
            command += ' --disableAffinity'

        if self._args.use_mean:
            command += ' --useMean'

        if self._args.num_loops:
            command += f' --testSamples {self._args.num_loops}'

        self._commands.append(command)

        return True

    def _process_raw_line(self, line, parse_status):
        """Process a single line of raw output from the nvbandwidth benchmark.

        This function updates the `parse_status` dictionary with parsed results from the given `line`.
        It detects the start of a test, parses matrix headers and rows, and extracts summary results.

        Args:
            line (str): A single line of raw output from the benchmark.
            parse_status (dict): A dictionary to maintain the current parsing state and results. It should contain:
                - 'test_name' (str): The name of the current test being parsed.
                - 'benchmark_type' (str): 'bw' or 'lat'. It also indicating if matrix data is being parsed.
                - 'matrix_header' (list): The header of the matrix being parsed.
                - 'results' (dict): A dictionary to store the parsed results.

        Return:
            None
        """
        # Regular expressions for summary line and matrix header detection
        block_start_pattern = re.compile(r'^Running\s+(.+)$')
        summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)')
        matrix_header_line = re.compile(r'^(memcpy|memory latency)')
        matrix_row_pattern = re.compile(r'^\s*\d')

        line = line.strip()

        # Detect the start of a test
        if block_start_pattern.match(line):
            parse_status['test_name'] = block_start_pattern.match(line).group(1).lower()[:-1]
            return

        # Detect the start of matrix data
        if parse_status['test_name'] and matrix_header_line.match(line):
            parse_status['benchmark_type'] = 'bw' if 'bandwidth' in line else 'lat'
            return

        # Parse the matrix header
        if (
            parse_status['test_name'] and parse_status['benchmark_type'] and not parse_status['matrix_header']
            and matrix_row_pattern.match(line)
        ):
            parse_status['matrix_header'] = line.split()
            return

        # Parse matrix rows
        if parse_status['test_name'] and parse_status['benchmark_type'] and matrix_row_pattern.match(line):
            row_data = line.split()
            row_index = row_data[0]
            for col_index, value in enumerate(row_data[1:], start=1):
                col_header = parse_status['matrix_header'][col_index - 1]
                test_name = parse_status['test_name']
                benchmark_type = parse_status['benchmark_type']
                metric_name = f'{test_name}_cpu{row_index}_gpu{col_header}_{benchmark_type}'
                parse_status['results'][metric_name] = float(value)
            return

        # Parse summary results
        summary_match = summary_pattern.search(line)
        if summary_match:
            value = float(summary_match.group(2))
            test_name = parse_status['test_name']
            benchmark_type = parse_status['benchmark_type']
            parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = value

            # Reset parsing state for next test
            parse_status['test_name'] = ''
            parse_status['benchmark_type'] = None
            parse_status['matrix_header'].clear()

    def _process_raw_result(self, cmd_idx, raw_output):
        """Function to parse raw results and save the summarized results.

           self._result.add_raw_data() and self._result.add_result() need to be called to save the results.

        Args:
            cmd_idx (int): the index of command corresponding with the raw_output.
            raw_output (str): raw output string of the micro-benchmark.

        Return:
            True if the raw output string is valid and result can be extracted.
        """
        try:
            self._result.add_raw_data('raw_output_' + str(cmd_idx), raw_output, self._args.log_raw_data)
            content = raw_output.splitlines()
            parsing_status = {
                'results': {},
                'benchmark_type': None,
                'matrix_header': [],
                'test_name': '',
            }

            for line in content:
                self._process_raw_line(line, parsing_status)

            if not parsing_status['results']:
                self._result.add_raw_data('nvbandwidth', 'No valid results found', self._args.log_raw_data)
                return False

            # Store parsed results
            for metric, value in parsing_status['results'].items():
                self._result.add_result(metric, value)

            return True
        except Exception as e:
            logger.error(
                'The result format is invalid - round: {}, benchmark: {}, raw output: {}, message: {}.'.format(
                    self._curr_run_index, self._name, raw_output, str(e)
                )
            )
            self._result.add_result('abort', 1)
            return False


BenchmarkRegistry.register_benchmark('nvbandwidth', NvBandwidthBenchmark, platform=Platform.CUDA)