"nndet/vscode:/vscode.git/clone" did not exist on "938687e7f838020e7f0c758057ac46c830521d17"
Unverified Commit c13ed2a2 authored by Yuting Jiang's avatar Yuting Jiang Committed by GitHub
Browse files

Analyzer: Initialization - Add baseline-based data diagnosis module (#242)

**Description**
Add data diagnosis module.

**Major Revision**
- Add DataDiagnosis class to support rule-based data diagnosis for result summary jsonl file of multi nodes
- Add RuleOp class to define rule operators
parent 213ab14b
......@@ -138,11 +138,16 @@ def run(self):
'colorlog>=4.7.2',
'jinja2>=2.10.1',
'joblib>=1.0.1',
'jsonlines>=2.0.0',
'knack>=0.7.2',
'natsort>=7.1.1',
'openpyxl>=3.0.7',
'omegaconf==2.0.6',
'pandas>=1.1.5',
'pyyaml>=5.3',
'tcping>=0.1.1rc1',
'xlrd>=2.0.1',
'xlsxwriter>=1.3.8',
'xmltodict>=0.12.0',
],
extras_require={
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Exposes interfaces of SuperBench Analyzer."""
from superbench.analyzer.data_diagnosis import DataDiagnosis
from superbench.analyzer.diagnosis_rule_op import RuleOp, DiagnosisRuleType
__all__ = ['DataDiagnosis', 'DiagnosisRuleType', 'RuleOp']
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""A module for baseline-based data diagnosis."""
import re
from typing import Callable
import pandas as pd
from superbench.common.utils import logger
from superbench.analyzer.diagnosis_rule_op import RuleOp, DiagnosisRuleType
import superbench.analyzer.file_handler as file_handler
class DataDiagnosis():
"""The DataDiagnosis class to do the baseline-based data diagnosis."""
def __init__(self):
"""Init function."""
self._sb_rules = {}
self._metrics = {}
def _get_metrics_by_benchmarks(self, metrics_list):
"""Get mappings of benchmarks:metrics of metrics_list.
Args:
metrics_list (list): list of metrics
Returns:
dict: metrics organized by benchmarks
"""
benchmarks_metrics = {}
for metric in metrics_list:
benchmark = metric.split('/')[0]
if benchmark not in benchmarks_metrics:
benchmarks_metrics[benchmark] = set()
benchmarks_metrics[benchmark].add(metric)
return benchmarks_metrics
def _check_rules(self, rule, name):
"""Check the rule of the metric whether the formart is valid.
Args:
rule (dict): the rule
name (str): the rule name
Returns:
dict: the rule for the metric
"""
# check if rule is supported
if 'function' not in rule:
logger.log_and_raise(exception=Exception, msg='{} lack of function'.format(name))
if not isinstance(DiagnosisRuleType(rule['function']), DiagnosisRuleType):
logger.log_and_raise(exception=Exception, msg='{} invalid function name'.format(name))
# check rule format
if 'criteria' not in rule:
logger.log_and_raise(exception=Exception, msg='{} lack of criteria'.format(name))
if not isinstance(eval(rule['criteria']), Callable):
logger.log_and_raise(exception=Exception, msg='invalid criteria format')
if 'categories' not in rule:
logger.log_and_raise(exception=Exception, msg='{} lack of category'.format(name))
if 'metrics' not in rule:
logger.log_and_raise(exception=Exception, msg='{} lack of metrics'.format(name))
if isinstance(rule['metrics'], str):
rule['metrics'] = [rule['metrics']]
return rule
def _get_baseline_of_metric(self, baseline, metric):
"""Get the baseline value of the metric.
Args:
baseline (dict): baseline defined in baseline file
metric (str): the full name of the metric
Returns:
numeric: the baseline value of the metric
"""
if metric in baseline:
return baseline[metric]
else:
# exclude rank info
short = metric.split(':')[0]
if short in baseline:
return baseline[short]
# baseline not defined
else:
logger.warning('DataDiagnosis: get baseline - {} baseline not found'.format(metric))
return -1
def _get_criteria(self, rule_file, baseline_file):
"""Get and generate criteria of metrics.
Read rule file and baseline file. For each rule, use metric with regex
in the metrics of the rule to match the metric full name from raw data
for each benchmark in the rule, and then merge baseline and rule for
matched metrics.
Args:
rule_file (str): The path of rule yaml file
baseline_file (str): The path of baseline json file
Returns:
bool: return True if successfully get the criteria for all rules, otherwise False.
"""
try:
rules = file_handler.read_rules(rule_file)
baseline = file_handler.read_baseline(baseline_file)
if not rules or not baseline:
logger.error('DataDiagnosis: get criteria failed')
return False
self._sb_rules = {}
self._enable_metrics = []
benchmark_rules = rules['superbench']['rules']
for rule in benchmark_rules:
benchmark_rules[rule] = self._check_rules(benchmark_rules[rule], rule)
self._sb_rules[rule] = {}
self._sb_rules[rule]['function'] = benchmark_rules[rule]['function']
self._sb_rules[rule]['criteria'] = benchmark_rules[rule]['criteria']
self._sb_rules[rule]['categories'] = benchmark_rules[rule]['categories']
self._sb_rules[rule]['metrics'] = {}
single_rule_metrics = benchmark_rules[rule]['metrics']
benchmark_metrics = self._get_metrics_by_benchmarks(single_rule_metrics)
for benchmark_name in benchmark_metrics:
# get rules and criteria for each metric
for metric in self._metrics[benchmark_name]:
# metric full name in baseline
if metric in single_rule_metrics:
self._sb_rules[rule]['metrics'][metric] = self._get_baseline_of_metric(baseline, metric)
self._enable_metrics.append(metric)
continue
# metric full name not in baseline, use regex to match
for metric_regex in benchmark_metrics[benchmark_name]:
if re.search(metric_regex, metric):
self._sb_rules[rule]['metrics'][metric] = self._get_baseline_of_metric(baseline, metric)
self._enable_metrics.append(metric)
except Exception as e:
logger.error('DataDiagnosis: get criteria failed - {}'.format(str(e)))
return False
return True
def _run_diagnosis_rules_for_single_node(self, node):
"""Use rules to diagnosis single node data.
Use the rules defined in rule_file to diagnose the raw data of each node,
if the node violate any rule, label as defective node and save
the 'Category', 'Defective Details' and data summary of defective node.
Args:
node (str): the node to do the diagosis
Returns:
details_row (list): None if the node is not labeled as defective,
otherwise details of ['Category', 'Defective Details']
summary_data_row (dict): None if the node is not labeled as defective,
otherwise data summary of the metrics
"""
data_row = self._raw_data_df.loc[node]
issue_label = False
details = []
categories = set()
summary_data_row = pd.Series(index=self._enable_metrics, name=node, dtype=float)
# Check each rule
for rule in self._sb_rules:
# Get rule op function and run the rule
function_name = self._sb_rules[rule]['function']
rule_op = RuleOp.get_rule_func(DiagnosisRuleType(function_name))
pass_rule = rule_op(data_row, self._sb_rules[rule], summary_data_row, details, categories)
# label the node as defective one
if not pass_rule:
issue_label = True
if issue_label:
# Add category information
general_cat_str = ','.join(categories)
details_cat_str = ','.join(details)
details_row = [general_cat_str, details_cat_str]
return details_row, summary_data_row
return None, None
def run_diagnosis_rules(self, rule_file, baseline_file):
"""Rule-based data diagnosis for multiple nodes' raw data.
Use the rules defined in rule_file to diagnose the raw data of each node,
if the node violate any rule, label as defective node and save
the 'Category', 'Defective Details' and processed data of defective node.
Args:
rule_file (str): The path of rule yaml file
baseline_file (str): The path of baseline json file
Returns:
data_not_accept_df (DataFrame): defective nodes's detailed information
label_df (DataFrame): labels for all nodes
"""
try:
summary_columns = ['Category', 'Defective Details']
data_not_accept_df = pd.DataFrame(columns=summary_columns)
summary_details_df = pd.DataFrame()
label_df = pd.DataFrame(columns=['label'])
# check raw data whether empty
if len(self._raw_data_df) == 0:
logger.error('DataDiagnosis: empty raw data')
return data_not_accept_df, label_df
# get criteria
if not self._get_criteria(rule_file, baseline_file):
return data_not_accept_df, label_df
# run diagnosis rules for each node
for node in self._raw_data_df.index:
details_row, summary_data_row = self._run_diagnosis_rules_for_single_node(node)
if details_row:
data_not_accept_df.loc[node] = details_row
summary_details_df = summary_details_df.append(summary_data_row)
label_df.loc[node] = 1
else:
label_df.loc[node] = 0
# combine details for defective nodes
if len(data_not_accept_df) != 0:
data_not_accept_df = data_not_accept_df.join(summary_details_df)
data_not_accept_df = data_not_accept_df.sort_values(by=summary_columns, ascending=False)
except Exception as e:
logger.error('DataDiagnosis: run diagnosis rules failed, message: {}'.format(str(e)))
return data_not_accept_df, label_df
def run(self, raw_data_file, rule_file, baseline_file, output_dir, output_format='excel'):
"""Run the data diagnosis and output the results.
Args:
raw_data_file (str): the path of raw data jsonl file.
rule_file (str): The path of baseline yaml file
baseline_file (str): The path of baseline json file
output_dir (str): the directory of output file
output_format (str): the format of the output, 'excel' or 'json'
"""
try:
self._raw_data_df = file_handler.read_raw_data(raw_data_file)
self._metrics = self._get_metrics_by_benchmarks(list(self._raw_data_df.columns))
logger.info('DataDiagnosis: Begin to processe {} nodes'.format(len(self._raw_data_df)))
data_not_accept_df, label_df = self.run_diagnosis_rules(rule_file, baseline_file)
logger.info('DataDiagnosis: Processed finished')
outpout_path = ''
if output_format == 'excel':
output_path = output_dir + '/diagnosis_summary.xlsx'
file_handler.output_excel(self._raw_data_df, data_not_accept_df, outpout_path, self._sb_rules)
elif output_format == 'json':
output_path = output_dir + '/diagnosis_summary.jsonl'
file_handler.output_json_data_not_accept(data_not_accept_df, output_path)
else:
logger.error('DataDiagnosis: output failed - unsupported output format')
logger.info('DataDiagnosis: Output results to {}'.format(output_path))
except Exception as e:
logger.error('DataDiagnosis: run failed - {}'.format(str(e)))
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""A module for data diagnosis rule ops."""
from typing import Dict, Callable
import pandas as pd
from superbench.benchmarks.context import Enum
from superbench.common.utils import logger
class DiagnosisRuleType(Enum):
"""The Enum class representing different rule ops."""
VARIANCE = 'variance'
VALUE = 'value'
class RuleOp:
"""RuleOp class to maintain all rule functions."""
functions: Dict[DiagnosisRuleType, Callable] = dict()
@classmethod
def add_rule_func(cls, rule_type):
"""Add rule fuction.
Args:
rule_type (DiagnosisRuleType): The type of rule function.
Return:
decorator (Callable): return the decorator to add the rule function.
"""
def decorator(func):
cls.functions[rule_type] = func
return func
return decorator
@classmethod
def get_rule_func(cls, rule_type):
"""Get rule fuction by rule_type.
Args:
rule_type (DiagnosisRuleType): The type of rule function.
Return:
func (Callable): rule function, None means invalid rule type.
"""
if rule_type in cls.functions:
return cls.functions[rule_type]
return None
@staticmethod
def variance(data_row, rule, summary_data_row, details, categories):
"""Rule op function of variance.
Each metric in the rule will calculate the variance (val - baseline / baseline),
and use criteria in the rule to determine whether metric's variance meet the criteria,
if any metric is labeled, the rule is not passed.
Args:
data_row (pd.Series): raw data of the metrics
rule (dict): rule including function, criteria, metrics with their baseline values and categories
summary_data_row (pd.Series): results of the metrics processed after the function
details (list): defective details including data and rules
categories (set): categories of violated rules
Returns:
bool: whether the rule is passed
"""
pass_rule = True
# parse criteria and check if valid
if not isinstance(eval(rule['criteria'])(0), bool):
logger.log_and_raise(exception=Exception, msg='invalid criteria format')
# every metric should pass the rule
for metric in rule['metrics']:
violate_metric = False
# metric not in raw_data or the value is none, miss test
if metric not in data_row or pd.isna(data_row[metric]):
pass_rule = False
details.append(metric + '_miss')
categories.add(rule['categories'])
else:
# check if metric pass the rule
val = data_row[metric]
baseline = rule['metrics'][metric]
if baseline == 0:
logger.log_and_raise(exception=Exception, msg='invalid baseline 0 in variance rule')
var = (val - baseline) / baseline
summary_data_row[metric] = var
violate_metric = eval(rule['criteria'])(var)
# add issued details and categories
if violate_metric:
pass_rule = False
info = '(B/L: {:.4f} VAL: {:.4f} VAR: {:.2f}% Rule:{})'.format(
baseline, val, var * 100, rule['criteria']
)
details.append(metric + info)
categories.add(rule['categories'])
return pass_rule
@staticmethod
def value(data_row, rule, summary_data_row, details, categories):
"""Rule op function of value.
Each metric in the rule will use criteria in the rule
to determine whether metric's value meet the criteria,
if any metric is labeled, the rule is not passed.
Args:
data_row (pd.Series): raw data of the metrics
rule (dict): rule including function, criteria, metrics with their baseline values and categories
summary_data_row (pd.Series): results of the metrics processed after the function
details (list): defective details including data and rules
categories (set): categories of violated rules
Returns:
bool: whether the rule is passed
"""
pass_rule = True
# parse criteria and check if valid
if not isinstance(eval(rule['criteria'])(0), bool):
logger.log_and_raise(exception=Exception, msg='invalid criteria format')
# every metric should pass the rule
for metric in rule['metrics']:
violate_metric = False
# metric not in raw_data or the value is none, miss test
if metric not in data_row or pd.isna(data_row[metric]):
pass_rule = False
details.append(metric + '_miss')
categories.add(rule['categories'])
else:
# check if metric pass the rule
val = data_row[metric]
summary_data_row[metric] = val
violate_metric = eval(rule['criteria'])(val)
# add issued details and categories
if violate_metric:
pass_rule = False
info = '(VAL: {:.4f} Rule:{})'.format(val, rule['criteria'])
details.append(metric + info)
categories.add(rule['categories'])
return pass_rule
RuleOp.add_rule_func(DiagnosisRuleType.VARIANCE)(RuleOp.variance)
RuleOp.add_rule_func(DiagnosisRuleType.VALUE)(RuleOp.value)
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""A module for file related functions in analyzer."""
from pathlib import Path
import re
import json
import jsonlines
import pandas as pd
import yaml
from superbench.common.utils import logger
def read_raw_data(raw_data_path):
"""Read raw data from raw_data_path and store them in raw_data_df.
Args:
raw_data_path (str): the path of raw data jsonl file
Returns:
DataFrame: raw data, node as index, metric name as columns
"""
p = Path(raw_data_path)
raw_data_df = pd.DataFrame()
if not p.is_file():
logger.error('DataDiagnosis: invalid raw data path - {}'.format(raw_data_path))
return raw_data_df
try:
with p.open(encoding='utf-8') as f:
for single_node_summary in jsonlines.Reader(f):
raw_data_df = raw_data_df.append(single_node_summary, ignore_index=True)
raw_data_df = raw_data_df.rename(raw_data_df['node'])
raw_data_df = raw_data_df.drop(columns=['node'])
except Exception as e:
logger.error('Analyzer: invalid raw data fomat - {}'.format(str(e)))
return raw_data_df
def read_rules(rule_file=None):
"""Read rule from rule yaml file.
Args:
rule_file (str, optional): The path of rule yaml file. Defaults to None.
Returns:
dict: dict object read from yaml file
"""
default_rule_file = Path(__file__).parent / 'rule/default_rule.yaml'
p = Path(rule_file) if rule_file else default_rule_file
if not p.is_file():
logger.error('DataDiagnosis: invalid rule file path - {}'.format(str(p.resolve())))
return None
baseline = None
with p.open() as f:
baseline = yaml.load(f, Loader=yaml.SafeLoader)
return baseline
def read_baseline(baseline_file):
"""Read baseline from baseline json file.
Args:
baseline_file (str): The path of baseline json file.
Returns:
dict: dict object read from json file
"""
p = Path(baseline_file)
if not p.is_file():
logger.error('DataDiagnosis: invalid baseline file path - {}'.format(str(p.resolve())))
return None
baseline = None
with p.open() as f:
baseline = json.load(f)
return baseline
def output_excel_raw_data(writer, raw_data_df, sheet_name):
"""Output raw data into 'sheet_name' excel page.
Args:
writer (xlsxwriter): xlsxwriter handle
raw_data_df (DataFrame): the DataFrame to output
sheet_name (str): sheet name of the excel
"""
# Output the raw data
if isinstance(raw_data_df, pd.DataFrame) and not raw_data_df.empty:
raw_data_df.to_excel(writer, sheet_name, index=True)
else:
logger.warning('DataDiagnosis: excel_data_output - {} data_df is empty.'.format(sheet_name))
def output_excel_data_not_accept(writer, data_not_accept_df, rules):
"""Output data_not_accept_df into 'Not Accept' excel page.
Args:
writer (xlsxwriter): xlsxwriter handle
data_not_accept_df (DataFrame): the DataFrame to output
rules (dict): the rules of DataDiagnosis
"""
# Get the xlsxwriter workbook objects and init the format
workbook = writer.book
color_format_red = workbook.add_format({'bg_color': '#FFC7CE', 'font_color': '#9C0006'})
percent_format = workbook.add_format({'num_format': '0.00%'})
# Output the not accept
if isinstance(data_not_accept_df, pd.DataFrame):
data_not_accept_df.to_excel(writer, 'Not Accept', index=True)
if not data_not_accept_df.empty:
row_start = 1
row_end = max(row_start, len(data_not_accept_df))
columns = list(data_not_accept_df.columns)
worksheet = writer.sheets['Not Accept']
for rule in rules:
for metric in rules[rule]['metrics']:
col_index = columns.index(metric)
# Apply percent format for the columns whose rules are variance type.
if rules[rule]['function'] == 'variance':
worksheet.conditional_format(
row_start,
col_index,
row_end,
col_index, # start_row, start_col, end_row, end_col
{
'type': 'no_blanks',
'format': percent_format
}
)
# Apply red format if the value violates the rule.
if rules[rule]['function'] == 'value' or rules[rule]['function'] == 'variance':
match = re.search(r'(>|<|<=|>=|==|!=)(.+)', rules[rule]['criteria'])
if not match:
continue
symbol = match.group(1)
condition = float(match.group(2))
worksheet.conditional_format(
row_start,
col_index,
row_end,
col_index, # start_row, start_col, end_row, end_col
{
'type': 'cell',
'criteria': symbol,
'value': condition,
'format': color_format_red
}
)
else:
logger.warning('DataDiagnosis: excel_data_output - data_not_accept_df is empty.')
else:
logger.warning('DataDiagnosis: excel_data_output - data_not_accept_df is not DataFrame.')
def output_excel(raw_data_df, data_not_accept_df, output_path, rules):
"""Output the raw_data_df and data_not_accept_df results into excel file.
Args:
raw_data_df (DataFrame): raw data
data_not_accept_df (DataFrame): defective nodes's detailed information
output_path (str): the path of output excel file
rules (dict): the rules of DataDiagnosis
"""
try:
writer = pd.ExcelWriter(output_path, engine='xlsxwriter')
# Check whether writer is valiad
if not isinstance(writer, pd.ExcelWriter):
logger.error('DataDiagnosis: excel_data_output - invalid file path.')
return
output_excel_raw_data(writer, raw_data_df, 'Raw Data')
output_excel_data_not_accept(writer, data_not_accept_df, rules)
writer.save()
except Exception as e:
logger.error('DataDiagnosis: excel_data_output - {}'.format(str(e)))
def output_json_data_not_accept(data_not_accept_df, output_path):
"""Output data_not_accept_df into jsonl file.
Args:
data_not_accept_df (DataFrame): the DataFrame to output
output_path (str): the path of output jsonl file
"""
p = Path(output_path)
try:
data_not_accept_json = data_not_accept_df.to_json(orient='index')
data_not_accept = json.loads(data_not_accept_json)
if not isinstance(data_not_accept_df, pd.DataFrame):
logger.warning('DataDiagnosis: output json data - data_not_accept_df is not DataFrame.')
return
if data_not_accept_df.empty:
logger.warning('DataDiagnosis: output json data - data_not_accept_df is empty.')
return
with p.open('w') as f:
for node in data_not_accept:
line = data_not_accept[node]
line['Index'] = node
json_str = json.dumps(line)
f.write(json_str + '\n')
except Exception as e:
logger.error('DataDiagnosis: output json data failed, msg: {}'.format(str(e)))
{
"kernel-launch/event_overhead": 0.00596,
"kernel-launch/wall_overhead": 0.01026,
"kernel-launch/return_code": 0,
"mem-bw/H2D_Mem_BW": 25.6,
"mem-bw/D2H_Mem_BW": 24.3,
"mem-bw/D2D_Mem_BW": 1118.0,
"mem-bw/return_code": 0
}
\ No newline at end of file
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for DataDiagnosis module."""
import json
import unittest
import yaml
from pathlib import Path
import pandas as pd
from superbench.analyzer import DataDiagnosis
import superbench.analyzer.file_handler as file_handler
class TestDataDiagnosis(unittest.TestCase):
"""Test for DataDiagnosis class."""
def setUp(self):
"""Method called to prepare the test fixture."""
self.output_excel_file = str(Path(__file__).parent.resolve()) + '/diagnosis_summary.xlsx'
self.test_rule_file_fake = str(Path(__file__).parent.resolve()) + '/test_rules_fake.yaml'
self.output_json_file = str(Path(__file__).parent.resolve()) + '/diagnosis_summary.jsonl'
def tearDown(self):
"""Method called after the test method has been called and the result recorded."""
for file in [self.output_excel_file, self.output_json_file, self.test_rule_file_fake]:
p = Path(file)
if p.is_file():
p.unlink()
def test_data_diagnosis(self):
"""Test for rule-based data diagnosis."""
# Test - read_raw_data and get_metrics_from_raw_data
# Positive case
test_raw_data = str(Path(__file__).parent.resolve()) + '/test_results.jsonl'
test_rule_file = str(Path(__file__).parent.resolve()) + '/test_rules.yaml'
test_baseline_file = str(Path(__file__).parent.resolve()) + '/test_baseline.json'
diag1 = DataDiagnosis()
diag1._raw_data_df = file_handler.read_raw_data(test_raw_data)
diag1._metrics = diag1._get_metrics_by_benchmarks(list(diag1._raw_data_df))
assert (len(diag1._raw_data_df) == 3)
# Negative case
test_raw_data_fake = str(Path(__file__).parent.resolve()) + '/test_results_fake.jsonl'
test_rule_file_fake = str(Path(__file__).parent.resolve()) + '/test_rules_fake.yaml'
diag2 = DataDiagnosis()
diag2._raw_data_df = file_handler.read_raw_data(test_raw_data_fake)
diag2._metrics = diag2._get_metrics_by_benchmarks(list(diag2._raw_data_df))
assert (len(diag2._raw_data_df) == 0)
assert (len(diag2._metrics) == 0)
# Test - read rules
rules = file_handler.read_rules(test_rule_file_fake)
assert (not rules)
rules = file_handler.read_rules(test_rule_file)
assert (rules)
# Test - _check_rules
# Negative case
false_rules = [
{
'criteria': 'lambda x:x>0',
'categories': 'KernelLaunch',
'metrics': ['kernel-launch/event_overhead:\\d+']
}, {
'criteria': 'lambda x:x>0',
'function': 'variance',
'metrics': ['kernel-launch/event_overhead:\\d+']
}, {
'categories': 'KernelLaunch',
'function': 'variance',
'metrics': ['kernel-launch/event_overhead:\\d+']
}, {
'criteria': 'lambda x:x>0',
'function': 'abb',
'categories': 'KernelLaunch',
'metrics': ['kernel-launch/event_overhead:\\d+']
}, {
'criteria': 'lambda x:x>0',
'function': 'abb',
'categories': 'KernelLaunch',
}, {
'criteria': 'x>5',
'function': 'abb',
'categories': 'KernelLaunch',
'metrics': ['kernel-launch/event_overhead:\\d+']
}
]
metric = 'kernel-launch/event_overhead:0'
for rules in false_rules:
self.assertRaises(Exception, diag1._check_rules, rules, metric)
# Positive case
true_rules = [
{
'categories': 'KernelLaunch',
'criteria': 'lambda x:x>0.05',
'function': 'variance',
'metrics': ['kernel-launch/event_overhead:\\d+']
}, {
'categories': 'KernelLaunch',
'criteria': 'lambda x:x<-0.05',
'function': 'variance',
'metrics': 'kernel-launch/event_overhead:\\d+'
}, {
'categories': 'KernelLaunch',
'criteria': 'lambda x:x>0',
'function': 'value',
'metrics': ['kernel-launch/event_overhead:\\d+']
}
]
for rules in true_rules:
assert (diag1._check_rules(rules, metric))
# Test - _get_baseline_of_metric
baseline = file_handler.read_baseline(test_baseline_file)
assert (diag1._get_baseline_of_metric(baseline, 'kernel-launch/event_overhead:0') == 0.00596)
assert (diag1._get_baseline_of_metric(baseline, 'kernel-launch/return_code') == 0)
assert (diag1._get_baseline_of_metric(baseline, 'mem-bw/H2D:0') == -1)
# Test - _get_criteria
# Negative case
assert (diag2._get_criteria(test_rule_file_fake, test_baseline_file) is False)
diag2 = DataDiagnosis()
diag2._raw_data_df = file_handler.read_raw_data(test_raw_data)
diag2._metrics = diag2._get_metrics_by_benchmarks(list(diag2._raw_data_df))
p = Path(test_rule_file)
with p.open() as f:
rules = yaml.load(f, Loader=yaml.SafeLoader)
rules['superbench']['rules']['fake'] = false_rules[0]
with open(test_rule_file_fake, 'w') as f:
yaml.dump(rules, f)
assert (diag1._get_criteria(test_rule_file_fake, test_baseline_file) is False)
# Positive case
assert (diag1._get_criteria(test_rule_file, test_baseline_file))
# Test - _run_diagnosis_rules_for_single_node
(details_row, summary_data_row) = diag1._run_diagnosis_rules_for_single_node('sb-validation-01')
assert (details_row)
(details_row, summary_data_row) = diag1._run_diagnosis_rules_for_single_node('sb-validation-02')
assert (not details_row)
# Test - _run_diagnosis_rules
data_not_accept_df, label_df = diag1.run_diagnosis_rules(test_rule_file, test_baseline_file)
assert (len(label_df) == 3)
assert (label_df.loc['sb-validation-01']['label'] == 1)
assert (label_df.loc['sb-validation-02']['label'] == 0)
assert (label_df.loc['sb-validation-03']['label'] == 1)
node = 'sb-validation-01'
row = data_not_accept_df.loc[node]
assert (len(row) == 36)
assert (row['Category'] == 'KernelLaunch')
assert (
row['Defective Details'] ==
'kernel-launch/event_overhead:0(B/L: 0.0060 VAL: 0.1000 VAR: 1577.85% Rule:lambda x:x>0.05)'
)
node = 'sb-validation-03'
row = data_not_accept_df.loc[node]
assert (len(row) == 36)
assert ('FailedTest' in row['Category'])
assert ('mem-bw/return_code(VAL: 1.0000 Rule:lambda x:x>0)' in row['Defective Details'])
assert ('mem-bw/H2D_Mem_BW:0_miss' in row['Defective Details'])
assert (len(data_not_accept_df) == 2)
# Test - output in excel
file_handler.output_excel(diag1._raw_data_df, data_not_accept_df, self.output_excel_file, diag1._sb_rules)
excel_file = pd.ExcelFile(self.output_excel_file, engine='openpyxl')
data_sheet_name = 'Raw Data'
raw_data_df = excel_file.parse(data_sheet_name)
assert (len(raw_data_df) == 3)
data_sheet_name = 'Not Accept'
data_not_accept_read_from_excel = excel_file.parse(data_sheet_name)
assert (len(data_not_accept_read_from_excel) == 2)
assert ('Category' in data_not_accept_read_from_excel)
assert ('Defective Details' in data_not_accept_read_from_excel)
# Test - output in json
file_handler.output_json_data_not_accept(data_not_accept_df, self.output_json_file)
assert (Path(self.output_json_file).is_file())
with Path(self.output_json_file).open() as f:
data_not_accept_read_from_json = f.readlines()
assert (len(data_not_accept_read_from_json) == 2)
for line in data_not_accept_read_from_json:
json.loads(line)
assert ('Category' in line)
assert ('Defective Details' in line)
assert ('Index' in line)
This diff is collapsed.
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for RuleOp module."""
import unittest
import pandas as pd
from superbench.analyzer import RuleOp, DiagnosisRuleType
class TestRuleOp(unittest.TestCase):
"""Test for Diagnosis Rule Ops."""
def test_rule_op(self):
"""Test for defined rule operators."""
# Test - get_rule_func
# Negative case
assert (not RuleOp.get_rule_func('fake'))
# Positive case
rule_op = RuleOp.get_rule_func(DiagnosisRuleType.VARIANCE)
assert (rule_op == RuleOp.variance)
# Test - variance and value rule function
# Check whether arguments are valid
# Negative case
details = []
categories = set()
summary_data_row = pd.Series(index=['kernel-launch/event_overhead:0'], dtype=float)
data = {'kernel-launch/event_overhead:0': 3.1, 'kernel-launch/event_overhead:1': 2}
data_row = pd.Series(data)
false_rule_and_baselines = [
{
'categories': 'KernelLaunch',
'criteria': '>',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2
}
}, {
'categories': 'KernelLaunch',
'criteria': '5',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2
}
}, {
'categories': 'KernelLaunch',
'criteria': '>5',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2
}
}, {
'categories': 'KernelLaunch',
'criteria': 'lambda x:x+1',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2
}
}
]
for rule in false_rule_and_baselines:
self.assertRaises(Exception, RuleOp.variance, data_row, rule, summary_data_row, details, categories)
self.assertRaises(Exception, RuleOp.value, data_row, rule, summary_data_row, details, categories)
# Positive case
true_baselines = [
{
'categories': 'KernelLaunch',
'criteria': 'lambda x:x>0.5',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2,
'kernel-launch/event_overhead:1': 2
}
}, {
'categories': 'KernelLaunch',
'criteria': 'lambda x:x<-0.5',
'function': 'variance',
'metrics': {
'kernel-launch/event_overhead:0': 2,
'kernel-launch/event_overhead:1': 2
}
}, {
'categories': 'KernelLaunch2',
'criteria': 'lambda x:x>0',
'function': 'value',
'metrics': {
'kernel-launch/event_overhead:0': 0
}
}
]
# Check results
details = []
categories = set()
summary_data_row = pd.Series(index=['kernel-launch/event_overhead:0'], dtype=float)
# variance
data = {'kernel-launch/event_overhead:0': 3.1, 'kernel-launch/event_overhead:1': 2}
data_row = pd.Series(data)
pass_rule = rule_op(data_row, true_baselines[0], summary_data_row, details, categories)
assert (not pass_rule)
assert (categories == {'KernelLaunch'})
assert (details == ['kernel-launch/event_overhead:0(B/L: 2.0000 VAL: 3.1000 VAR: 55.00% Rule:lambda x:x>0.5)'])
data = {'kernel-launch/event_overhead:0': 1.5, 'kernel-launch/event_overhead:1': 1.5}
data_row = pd.Series(data)
pass_rule = rule_op(data_row, true_baselines[1], summary_data_row, details, categories)
assert (pass_rule)
assert (categories == {'KernelLaunch'})
# value
rule_op = RuleOp.get_rule_func(DiagnosisRuleType.VALUE)
pass_rule = rule_op(data_row, true_baselines[2], summary_data_row, details, categories)
assert (not pass_rule)
assert (categories == {'KernelLaunch', 'KernelLaunch2'})
assert ('kernel-launch/event_overhead:0(VAL: 1.5000 Rule:lambda x:x>0)' in details)
assert ('kernel-launch/event_overhead:0(B/L: 2.0000 VAL: 3.1000 VAR: 55.00% Rule:lambda x:x>0.5)' in details)
# SuperBench rules
version: v0.3
superbench:
rules:
rule0:
function: variance
criteria: lambda x:x>0.05
categories: KernelLaunch
metrics:
- kernel-launch/event_overhead:\d+
- kernel-launch/wall_overhead:\d+
rule1:
function: variance
criteria: 'lambda x:x<-0.05'
categories: Mem
metrics:
- mem-bw/H2D_Mem_BW:\d+
- mem-bw/D2H_Mem_BW:\d+
falure_rule:
function: value
criteria: 'lambda x:x>0'
categories: FailedTest
metrics:
- kernel-launch/return_code
- mem-bw/return_code
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment