Unverified Commit 45d06647 authored by Hongtao Zhang's avatar Hongtao Zhang Committed by GitHub
Browse files

Bugfix - nvbandwidth benchmark need to handle N/A value (#675)



**Description**

1. Fixed the bug that nvbandwidth benchmark need to handle 'N/A' values
in nvbandwidth cmd output.
2. Replaced the input format of test cases with a list.
3. Add nvbandwidth configuration example in default config files.

---------
Co-authored-by: default avatarhongtaozhang <hongtaozhang@microsoft.com>
Co-authored-by: default avatarYifan Xiong <yifan.xiong@microsoft.com>
parent 7af7c0b7
......@@ -13,10 +13,10 @@
if __name__ == '__main__':
context = BenchmarkRegistry.create_benchmark_context(
'nvbandwidth',
platform=Platform.CPU,
platform=Platform.CUDA,
parameters=(
'--buffer_size 128 '
'--test_cases 0,1,19,20 '
'--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce '
'--skip_verification '
'--disable_affinity '
'--use_mean '
......
......@@ -4,15 +4,23 @@
"""Module of the NV Bandwidth Test."""
import os
import subprocess
import re
from superbench.common.utils import logger
from superbench.benchmarks import BenchmarkRegistry, Platform
from superbench.benchmarks import BenchmarkRegistry, Platform, ReturnCode
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
class NvBandwidthBenchmark(MicroBenchmarkWithInvoke):
"""The NV Bandwidth Test benchmark class."""
# Regular expressions for summary line and matrix header detection
re_block_start_pattern = re.compile(r'^Running\s+(.+)$')
re_matrix_header_line = re.compile(r'^(memcpy|memory latency)')
re_matrix_row_pattern = re.compile(r'^\s*\d')
re_summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)')
re_unsupported_pattern = re.compile(r'ERROR: Testcase (\S+) not found!')
def __init__(self, name, parameters=''):
"""Constructor.
......@@ -38,12 +46,14 @@ def add_parser_arguments(self):
self._parser.add_argument(
'--test_cases',
nargs='+',
type=str,
default='',
default=[],
required=False,
help=(
'Specify the test case(s) to run, either by name or index. By default, all test cases are executed. '
'Example: --test_cases 0,1,2,19,20'
'Specify the test case(s) to execute by name only. '
'If no specific test case is specified, all test cases will be executed by default.'
'Supported test cases are: ' + ', '.join(self._get_all_test_cases())
),
)
......@@ -92,7 +102,9 @@ def _preprocess(self):
command += f' --bufferSize {self._args.buffer_size}'
if self._args.test_cases:
command += ' --testcase ' + ' '.join([testcase.strip() for testcase in self._args.test_cases.split(',')])
command += ' --testcase ' + ' '.join(self._args.test_cases)
else:
self._args.test_cases = self._get_all_test_cases()
if self._args.skip_verification:
command += ' --skipVerification'
......@@ -111,72 +123,79 @@ def _preprocess(self):
return True
def _process_raw_line(self, line, parse_status):
"""Process a single line of raw output from the nvbandwidth benchmark.
This function updates the `parse_status` dictionary with parsed results from the given `line`.
It detects the start of a test, parses matrix headers and rows, and extracts summary results.
"""Process a raw line of text and update the parse status accordingly.
Args:
line (str): A single line of raw output from the benchmark.
parse_status (dict): A dictionary to maintain the current parsing state and results. It should contain:
- 'test_name' (str): The name of the current test being parsed.
- 'benchmark_type' (str): 'bw' or 'lat'. It also indicating if matrix data is being parsed.
- 'matrix_header' (list): The header of the matrix being parsed.
- 'results' (dict): A dictionary to store the parsed results.
line (str): The raw line of text to be processed.
parse_status (dict): A dictionary containing the current parsing status,
which will be updated based on the content of the line.
Return:
Returns:
None
"""
# Regular expressions for summary line and matrix header detection
block_start_pattern = re.compile(r'^Running\s+(.+)$')
summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)')
matrix_header_line = re.compile(r'^(memcpy|memory latency)')
matrix_row_pattern = re.compile(r'^\s*\d')
line = line.strip()
# Detect unsupported test cases
if self.re_unsupported_pattern.match(line):
parse_status['unsupported_testcases'].add(self.re_unsupported_pattern.match(line).group(1).lower())
return
# Detect the start of a test
if block_start_pattern.match(line):
parse_status['test_name'] = block_start_pattern.match(line).group(1).lower()[:-1]
if self.re_block_start_pattern.match(line):
parse_status['test_name'] = self.re_block_start_pattern.match(line).group(1).lower()[:-1]
parse_status['excuted_testcases'].add(parse_status['test_name'])
return
# Detect the start of matrix data
if parse_status['test_name'] and matrix_header_line.match(line):
if parse_status['test_name'] and self.re_matrix_header_line.match(line):
parse_status['benchmark_type'] = 'bw' if 'bandwidth' in line else 'lat'
# Parse the row and column name
tmp_idx = line.find('(row)')
parse_status['metrix_row'] = line[tmp_idx - 3:tmp_idx].lower()
tmp_idx = line.find('(column)')
parse_status['metrix_col'] = line[tmp_idx - 3:tmp_idx].lower()
return
# Parse the matrix header
if (
parse_status['test_name'] and parse_status['benchmark_type'] and not parse_status['matrix_header']
and matrix_row_pattern.match(line)
and self.re_matrix_row_pattern.match(line)
):
parse_status['matrix_header'] = line.split()
return
# Parse matrix rows
if parse_status['test_name'] and parse_status['benchmark_type'] and matrix_row_pattern.match(line):
if parse_status['test_name'] and parse_status['benchmark_type'] and self.re_matrix_row_pattern.match(line):
row_data = line.split()
row_index = row_data[0]
for col_index, value in enumerate(row_data[1:], start=1):
# Skip 'N/A' values, 'N/A' indicates the test path is self to self.
if value == 'N/A':
continue
col_header = parse_status['matrix_header'][col_index - 1]
test_name = parse_status['test_name']
benchmark_type = parse_status['benchmark_type']
metric_name = f'{test_name}_cpu{row_index}_gpu{col_header}_{benchmark_type}'
row_name = parse_status['metrix_row']
col_name = parse_status['metrix_col']
metric_name = f'{test_name}_{row_name}{row_index}_{col_name}{col_header}_{benchmark_type}'
parse_status['results'][metric_name] = float(value)
return
# Parse summary results
summary_match = summary_pattern.search(line)
if summary_match:
value = float(summary_match.group(2))
if self.re_summary_pattern.match(line):
value = self.re_summary_pattern.match(line).group(2)
test_name = parse_status['test_name']
benchmark_type = parse_status['benchmark_type']
parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = value
parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = float(value)
# Reset parsing state for next test
parse_status['test_name'] = ''
parse_status['benchmark_type'] = None
parse_status['matrix_header'].clear()
parse_status['metrix_row'] = ''
parse_status['metrix_col'] = ''
return
def _process_raw_result(self, cmd_idx, raw_output):
"""Function to parse raw results and save the summarized results.
......@@ -195,22 +214,45 @@ def _process_raw_result(self, cmd_idx, raw_output):
content = raw_output.splitlines()
parsing_status = {
'results': {},
'excuted_testcases': set(),
'unsupported_testcases': set(),
'benchmark_type': None,
'matrix_header': [],
'test_name': '',
'metrix_row': '',
'metrix_col': '',
}
for line in content:
self._process_raw_line(line, parsing_status)
return_code = ReturnCode.SUCCESS
# Log unsupported test cases
for testcase in parsing_status['unsupported_testcases']:
logger.warning(f'Test case {testcase} is not supported.')
return_code = ReturnCode.INVALID_ARGUMENT
self._result.add_raw_data(testcase, 'Not supported', self._args.log_raw_data)
# Check if the test case was waived
for testcase in self._args.test_cases:
if (
testcase not in parsing_status['unsupported_testcases']
and testcase not in parsing_status['excuted_testcases']
):
logger.warning(f'Test case {testcase} was waived.')
self._result.add_raw_data(testcase, 'waived', self._args.log_raw_data)
return_code = ReturnCode.INVALID_ARGUMENT
if not parsing_status['results']:
self._result.add_raw_data('nvbandwidth', 'No valid results found', self._args.log_raw_data)
return_code = ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE
return False
# Store parsed results
for metric, value in parsing_status['results'].items():
self._result.add_result(metric, value)
self._result.set_return_code(return_code)
return True
except Exception as e:
logger.error(
......@@ -221,5 +263,29 @@ def _process_raw_result(self, cmd_idx, raw_output):
self._result.add_result('abort', 1)
return False
@staticmethod
def _get_all_test_cases():
command = 'nvbandwidth -l'
test_case_pattern = re.compile(r'(\d+),\s+([\w_]+):')
try:
# Execute the command and capture output
result = subprocess.run(command, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check the return code
if result.returncode != 0:
logger.error(f'{command} failed with return code {result.returncode}')
return []
if result.stderr:
logger.error(f'{command} failed with {result.stderr}')
return []
# Parse the output
return [name for _, name in test_case_pattern.findall(result.stdout)]
except Exception as e:
logger.error(f'Failed to get all test case names: {e}')
return []
BenchmarkRegistry.register_benchmark('nvbandwidth', NvBandwidthBenchmark, platform=Platform.CUDA)
......@@ -134,6 +134,22 @@ superbench:
copy_type:
- sm
- dma
nvbandwidth:
enable: true
modes:
- name: local
parallel: no
parameters:
buffer_size: 128
test_cases:
- host_to_device_memcpy_ce
- device_to_host_memcpy_ce
- host_to_device_memcpy_sm
- device_to_host_memcpy_sm
num_loops: 18
skip_verification: false
disable_affinity: false
use_mean: false
kernel-launch:
<<: *default_local_mode
gemm-flops:
......
......@@ -22,8 +22,7 @@ def setUpClass(cls):
def test_nvbandwidth_preprocess(self):
"""Test NV Bandwidth benchmark preprocess."""
benchmark_name = 'nvbandwidth'
(benchmark_class,
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
(benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
assert (benchmark_class)
# Test preprocess with default parameters
......@@ -34,7 +33,7 @@ def test_nvbandwidth_preprocess(self):
# Test preprocess with specified parameters
parameters = (
'--buffer_size 256 '
'--test_cases 0,1,2,19,20 '
'--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce '
'--skip_verification '
'--disable_affinity '
'--use_mean '
......@@ -47,7 +46,7 @@ def test_nvbandwidth_preprocess(self):
# Check command
assert (1 == len(benchmark._commands))
assert ('--bufferSize 256' in benchmark._commands[0])
assert ('--testcase 0 1 2 19 20' in benchmark._commands[0])
assert ('--testcase host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce' in benchmark._commands[0])
assert ('--skipVerification' in benchmark._commands[0])
assert ('--disableAffinity' in benchmark._commands[0])
assert ('--useMean' in benchmark._commands[0])
......@@ -57,8 +56,7 @@ def test_nvbandwidth_preprocess(self):
def test_nvbandwidth_result_parsing_real_output(self, results):
"""Test NV Bandwidth benchmark result parsing."""
benchmark_name = 'nvbandwidth'
(benchmark_class,
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
(benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
assert (benchmark_class)
benchmark = benchmark_class(benchmark_name, parameters='')
......@@ -78,3 +76,91 @@ def test_nvbandwidth_result_parsing_real_output(self, results):
assert benchmark.result['device_to_host_memcpy_ce_sum_bw'][0] == 607.26
assert benchmark.result['host_device_latency_sm_cpu0_gpu0_lat'][0] == 772.58
assert benchmark.result['host_device_latency_sm_sum_lat'][0] == 772.58
def test_nvbandwidth_process_raw_result_unsupported_testcases(self):
"""Test NV Bandwidth benchmark result parsing with unsupported test cases."""
benchmark_name = 'nvbandwidth'
(benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
assert (benchmark_class)
benchmark = benchmark_class(benchmark_name, parameters='')
# Preprocess and validate command
assert benchmark._preprocess()
# Mock raw output with unsupported test cases
raw_output = """
ERROR: Testcase unsupported_testcase_1 not found!
ERROR: Testcase unsupported_testcase_2 not found!
"""
# Parse the provided raw output
assert not benchmark._process_raw_result(0, raw_output)
# Validate unsupported test cases
assert 'unsupported_testcase_1' in benchmark._result.raw_data
assert benchmark._result.raw_data['unsupported_testcase_1'][0] == 'Not supported'
assert 'unsupported_testcase_2' in benchmark._result.raw_data
assert benchmark._result.raw_data['unsupported_testcase_1'][0] == 'Not supported'
def test_nvbandwidth_process_raw_result_waived_testcases(self):
"""Test NV Bandwidth benchmark result parsing with waived test cases."""
benchmark_name = 'nvbandwidth'
(benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
assert (benchmark_class)
benchmark = benchmark_class(benchmark_name, parameters='')
# Preprocess and validate command
assert benchmark._preprocess()
# Mock raw output with no executed test cases
raw_output = """
"""
# Set test cases to include some that will be waived
benchmark._args.test_cases = ['waived_testcase_1', 'waived_testcase_2']
# Parse the provided raw output
assert not benchmark._process_raw_result(0, raw_output)
# Validate waived test cases
assert 'waived_testcase_1' in benchmark._result.raw_data
assert benchmark._result.raw_data['waived_testcase_1'][0] == 'waived'
assert 'waived_testcase_2' in benchmark._result.raw_data
assert benchmark._result.raw_data['waived_testcase_2'][0] == 'waived'
def test_get_all_test_cases(self):
"""Test _get_all_test_cases method."""
benchmark_name = 'nvbandwidth'
(benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA)
assert (benchmark_class)
benchmark = benchmark_class(benchmark_name, parameters='')
# Mock subprocess.run for successful execution with valid output
with unittest.mock.patch('subprocess.run') as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = (
'1, host_to_device_memcpy_ce:\n'
'2, device_to_host_bidirectional_memcpy_ce:'
)
mock_run.return_value.stderr = ''
test_cases = benchmark._get_all_test_cases()
assert test_cases == ['host_to_device_memcpy_ce', 'device_to_host_bidirectional_memcpy_ce']
# Mock subprocess.run for execution with non-zero return code
with unittest.mock.patch('subprocess.run') as mock_run:
mock_run.return_value.returncode = 1
mock_run.return_value.stdout = ''
mock_run.return_value.stderr = 'Error'
test_cases = benchmark._get_all_test_cases()
assert test_cases == []
# Mock subprocess.run for execution with error message in stderr
with unittest.mock.patch('subprocess.run') as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = ''
mock_run.return_value.stderr = 'Error'
test_cases = benchmark._get_all_test_cases()
assert test_cases == []
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment