file_handler.py 9.19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""A module for file related functions in analyzer."""

from pathlib import Path
import re
import json

import jsonlines
import pandas as pd
import yaml
13
from openpyxl.styles import Alignment
14
import markdown
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

from superbench.common.utils import logger


def read_raw_data(raw_data_path):
    """Read raw data from raw_data_path and store them in raw_data_df.

    Args:
        raw_data_path (str): the path of raw data jsonl file

    Returns:
        DataFrame: raw data, node as index, metric name as columns
    """
    p = Path(raw_data_path)
    raw_data_df = pd.DataFrame()
    if not p.is_file():
31
32
33
        logger.log_and_raise(
            exception=FileNotFoundError, msg='FileHandler: invalid raw data path - {}'.format(raw_data_path)
        )
34
35
36
37

    try:
        with p.open(encoding='utf-8') as f:
            for single_node_summary in jsonlines.Reader(f):
38
                raw_data_df = pd.concat([raw_data_df, pd.DataFrame([single_node_summary])], axis=0, ignore_index=True)
39
40
41
        raw_data_df = raw_data_df.rename(raw_data_df['node'])
        raw_data_df = raw_data_df.drop(columns=['node'])
    except Exception as e:
42
        logger.log_and_raise(exception=IOError, msg='Analyzer: invalid raw data fomat - {}'.format(str(e)))
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
    return raw_data_df


def read_rules(rule_file=None):
    """Read rule from rule yaml file.

    Args:
        rule_file (str, optional): The path of rule yaml file. Defaults to None.

    Returns:
        dict: dict object read from yaml file
    """
    default_rule_file = Path(__file__).parent / 'rule/default_rule.yaml'
    p = Path(rule_file) if rule_file else default_rule_file
    if not p.is_file():
58
59
60
        logger.log_and_raise(
            exception=FileNotFoundError, msg='FileHandler: invalid rule file path - {}'.format(str(p.resolve()))
        )
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    baseline = None
    with p.open() as f:
        baseline = yaml.load(f, Loader=yaml.SafeLoader)
    return baseline


def read_baseline(baseline_file):
    """Read baseline from baseline json file.

    Args:
        baseline_file (str): The path of baseline json file.

    Returns:
        dict: dict object read from json file
    """
    p = Path(baseline_file)
    if not p.is_file():
78
79
80
        logger.log_and_raise(
            exception=FileNotFoundError, msg='FileHandler: invalid baseline file path - {}'.format(str(p.resolve()))
        )
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    baseline = None
    with p.open() as f:
        baseline = json.load(f)
    return baseline


def output_excel_raw_data(writer, raw_data_df, sheet_name):
    """Output raw data into 'sheet_name' excel page.

    Args:
        writer (xlsxwriter): xlsxwriter handle
        raw_data_df (DataFrame): the DataFrame to output
        sheet_name (str): sheet name of the excel
    """
    # Output the raw data
    if isinstance(raw_data_df, pd.DataFrame) and not raw_data_df.empty:
        raw_data_df.to_excel(writer, sheet_name, index=True)
    else:
99
        logger.warning('FileHandler: excel_data_output - {} data_df is empty.'.format(sheet_name))
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124


def output_excel_data_not_accept(writer, data_not_accept_df, rules):
    """Output data_not_accept_df into 'Not Accept' excel page.

    Args:
        writer (xlsxwriter): xlsxwriter handle
        data_not_accept_df (DataFrame): the DataFrame to output
        rules (dict): the rules of DataDiagnosis
    """
    # Get the xlsxwriter workbook objects and init the format
    workbook = writer.book
    color_format_red = workbook.add_format({'bg_color': '#FFC7CE', 'font_color': '#9C0006'})
    percent_format = workbook.add_format({'num_format': '0.00%'})

    # Output the not accept
    if isinstance(data_not_accept_df, pd.DataFrame):
        data_not_accept_df.to_excel(writer, 'Not Accept', index=True)
        if not data_not_accept_df.empty:
            row_start = 1
            row_end = max(row_start, len(data_not_accept_df))
            columns = list(data_not_accept_df.columns)
            worksheet = writer.sheets['Not Accept']

            for rule in rules:
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
                if 'function' in rules[rule]:
                    for metric in rules[rule]['metrics']:
                        # The column index of the metrics should start from 1
                        col_index = columns.index(metric) + 1
                        # Apply percent format for the columns whose rules are variance type.
                        if rules[rule]['function'] == 'variance':
                            worksheet.conditional_format(
                                row_start,
                                col_index,
                                row_end,
                                col_index,    # start_row, start_col, end_row, end_col
                                {
                                    'type': 'no_blanks',
                                    'format': percent_format
                                }
                            )
                        # Apply red format if the value violates the rule.
                        if rules[rule]['function'] == 'value' or rules[rule]['function'] == 'variance':
                            match = re.search(r'(>|<|<=|>=|==|!=)(.+)', rules[rule]['criteria'])
                            if not match:
                                continue
                            symbol = match.group(1)
                            condition = float(match.group(2))
                            worksheet.conditional_format(
                                row_start,
                                col_index,
                                row_end,
                                col_index,    # start_row, start_col, end_row, end_col
                                {
                                    'type': 'cell',
                                    'criteria': symbol,
                                    'value': condition,
                                    'format': color_format_red
                                }
                            )
160
161

        else:
162
            logger.warning('FileHandler: excel_data_output - data_not_accept_df is empty.')
163
    else:
164
        logger.log_and_raise(RuntimeError, msg='FileHandler: excel_data_output - data_not_accept_df is not DataFrame.')
165
166


167
def generate_md_table(data_df, header):
168
169
170
171
172
173
    """Generate table text in markdown format.

    | header[0] | header[1] |
    |     ----  | ----      |
    |     data  | data      |
    |     data  | data      |
174
175

    Args:
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        data (DataFrame): the data in table
        header (list): the header of table

    Returns:
        list: lines of markdown table
    """
    lines = []
    data = data_df.values.tolist()
    max_width = len(max(data, key=len))
    header[len(header):max_width] = [' ' for i in range(max_width - len(header))]
    align = ['---' for i in range(max_width)]
    lines.append('| {} |\n'.format(' | '.join(header)))
    lines.append('| {} |\n'.format(' | '.join(align)))
    for line in data:
        full_line = [' ' for i in range(max_width)]
        full_line[0:len(line)] = [str(line[i]) for i in range(len(line))]
        lines.append('| {} |\n'.format(' | '.join(full_line)))
    return lines


def output_lines_in_md(lines, output_path):
    """Output lines in markdown format into a markdown file.

    Args:
        lines (list): lines in markdown format
        output_path (str): the path of output file
202
203
    """
    try:
204
        if len(lines) == 0:
205
            logger.warning('FileHandler: md_data_output is empty')
206
207
        with open(output_path, 'w') as f:
            f.writelines(lines)
208
    except Exception as e:
209
        logger.log_and_raise(exception=IOError, msg='FileHandler: md_data_output - {}'.format(str(e)))
210
211


212
213
def output_lines_in_html(lines, output_path):
    """Output markdown lines in html format file.
214
215

    Args:
216
217
        lines (list): lines in markdown format
        output_path (str): the path of output file
218
219
    """
    try:
220
        if len(lines) == 0:
221
            logger.warning('FileHandler: html_data_output is empty')
222
223
224
225
        lines = ''.join(lines)
        html_str = markdown.markdown(lines, extensions=['markdown.extensions.tables'])
        with open(output_path, 'w') as f:
            f.writelines(html_str)
226
    except Exception as e:
227
        logger.log_and_raise(exception=IOError, msg='FileHandler: html_data_output - {}'.format(str(e)))
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253


def merge_column_in_excel(ws, row, column):
    """Merge cells in the selected index of column with continuous same contents.

    Args:
        ws (worksheet): the worksheet of the excel to process
        row (int): the max row index to merge
        column (int): the index of the column to merge
    """
    dict_from = {}
    aligncenter = Alignment(horizontal='center', vertical='center')
    # record continuous row index (start, end) with the same content
    for row_index in range(1, row + 1):
        value = str(ws.cell(row_index, column).value)
        if value not in dict_from:
            dict_from[value] = [row_index, row_index]
        else:
            dict_from[value][1] = dict_from[value][1] + 1
    # merge the cells
    for value in dict_from.values():
        if value[0] != value[1]:
            ws.merge_cells(start_row=value[0], start_column=column, end_row=value[1], end_column=column)
    # align center for merged cells
    for i in range(1, row + 1):
        ws.cell(row=i, column=column).alignment = aligncenter