executor.py 7.76 KB
Newer Older
1
2
3
4
5
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""SuperBench Executor."""

6
import os
7
import json
8
9
10
11
12
from pathlib import Path

from omegaconf import ListConfig

from superbench.benchmarks import Platform, Framework, BenchmarkRegistry
13
from superbench.common.utils import SuperBenchLogger, logger, rotate_dir
14
15
16
17


class SuperBenchExecutor():
    """SuperBench executor class."""
18
    def __init__(self, sb_config, sb_output_dir):
19
20
21
22
        """Initilize.

        Args:
            sb_config (DictConfig): SuperBench config object.
23
            sb_output_dir (str): SuperBench output directory.
24
25
        """
        self._sb_config = sb_config
26
27
        self._sb_output_dir = sb_output_dir
        self._output_path = Path(sb_output_dir).expanduser().resolve()
28
29
30

        self.__set_logger('sb-exec.log')
        logger.info('Executor uses config: %s.', self._sb_config)
31
        logger.info('Executor writes to: %s.', str(self._output_path))
32
33
34
35
36
37
38
39
40
41
42
43

        self.__validate_sb_config()
        self._sb_benchmarks = self._sb_config.superbench.benchmarks
        self._sb_enabled = self.__get_enabled_benchmarks()
        logger.info('Executor will execute: %s', self._sb_enabled)

    def __set_logger(self, filename):
        """Set logger and add file handler.

        Args:
            filename (str): Log file name.
        """
44
        SuperBenchLogger.add_handler(logger.logger, filename=str(self._output_path / filename))
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

    def __validate_sb_config(self):
        """Validate SuperBench config object.

        Raise:
            InvalidConfigError: If input config is invalid.
        """
        # TODO: add validation

    def __get_enabled_benchmarks(self):
        """Get enabled benchmarks list.

        Return:
            list: List of benchmarks which will be executed.
        """
        if self._sb_config.superbench.enable:
            if isinstance(self._sb_config.superbench.enable, str):
                return [self._sb_config.superbench.enable]
            elif isinstance(self._sb_config.superbench.enable, (list, ListConfig)):
                return list(self._sb_config.superbench.enable)
        # TODO: may exist order issue
        return [k for k, v in self._sb_benchmarks.items() if v.enable]

    def __get_platform(self):
        """Detect runninng platform by environment."""
        # TODO: check devices and env vars
        return Platform.CUDA

    def __get_arguments(self, parameters):
        """Get command line arguments for argparse.

        Args:
            parameters (DictConfig): Parameters config dict.

        Return:
            str: Command line arguments.
        """
        argv = []
83
84
        if not parameters:
            return ''
85
86
87
        for name, val in parameters.items():
            if val is None:
                continue
88
89
90
            if isinstance(val, bool) and val:
                argv.append('--{}'.format(name))
            elif isinstance(val, (str, int, float)):
91
92
93
94
95
96
97
98
99
100
101
                argv.append('--{} {}'.format(name, val))
            elif isinstance(val, (list, ListConfig)):
                argv.append('--{} {}'.format(name, ' '.join(val)))
        return ' '.join(argv)

    def __exec_benchmark(self, context, log_suffix):
        """Launch benchmark for context.

        Args:
            context (BenchmarkContext): Benchmark context to launch.
            log_suffix (str): Log string suffix.
102
103
104

        Return:
            dict: Benchmark results.
105
        """
106
107
108
109
110
111
112
113
114
115
116
        try:
            benchmark = BenchmarkRegistry.launch_benchmark(context)
            if benchmark:
                logger.info(
                    'benchmark: %s, return code: %s, result: %s.', benchmark.name, benchmark.return_code,
                    benchmark.result
                )
                if benchmark.return_code.value == 0:
                    logger.info('Executor succeeded in %s.', log_suffix)
                else:
                    logger.error('Executor failed in %s.', log_suffix)
117
                return json.loads(benchmark.serialized_result)
118
            else:
119
                logger.error('Executor failed in %s, invalid context.', log_suffix)
120
121
        except Exception as e:
            logger.error(e)
122
            logger.error('Executor failed in %s.', log_suffix)
123
124
        return None

125
126
127
128
129
130
131
132
133
    def __get_benchmark_dir(self, benchmark_name):
        """Get output directory for benchmark's current rank.

        Args:
            benchmark_name (str): Benchmark name.
        """
        benchmark_output_dir = self._output_path / 'benchmarks' / benchmark_name
        for rank_env in ['PROC_RANK', 'LOCAL_RANK']:
            if os.getenv(rank_env):
134
135
                return benchmark_output_dir / 'rank{}'.format(os.getenv(rank_env))
        return benchmark_output_dir / 'rank0'
136

137
138
139
140
141
142
    def __create_benchmark_dir(self, benchmark_name):
        """Create output directory for benchmark.

        Args:
            benchmark_name (str): Benchmark name.
        """
143
        rotate_dir(self.__get_benchmark_dir(benchmark_name))
144
145
146
147
148
        try:
            self.__get_benchmark_dir(benchmark_name).mkdir(mode=0o755, parents=True, exist_ok=True)
        except Exception:
            logger.exception('Failed to create output directory for benchmark %s.', benchmark_name)
            raise
149
150
151
152
153
154
155
156

    def __write_benchmark_results(self, benchmark_name, benchmark_results):
        """Write benchmark results.

        Args:
            benchmark_name (str): Benchmark name.
            benchmark_results (dict): Benchmark results.
        """
157
        with (self.__get_benchmark_dir(benchmark_name) / 'results.json').open(mode='w') as f:
158
            json.dump(benchmark_results, f, indent=2)
159
160
161
162
163
164
165

    def exec(self):
        """Run the SuperBench benchmarks locally."""
        for benchmark_name in self._sb_benchmarks:
            if benchmark_name not in self._sb_enabled:
                continue
            benchmark_config = self._sb_benchmarks[benchmark_name]
166
167
            benchmark_results = {}
            self.__create_benchmark_dir(benchmark_name)
168
            for framework in benchmark_config.frameworks or [Framework.NONE.value]:
169
170
171
172
173
174
175
                if benchmark_name.endswith('_models'):
                    for model in benchmark_config.models:
                        log_suffix = 'model-benchmark {}: {}/{}'.format(benchmark_name, framework, model)
                        logger.info('Executor is going to execute %s.', log_suffix)
                        context = BenchmarkRegistry.create_benchmark_context(
                            model,
                            platform=self.__get_platform(),
176
                            framework=Framework(framework.lower()),
177
178
                            parameters=self.__get_arguments(benchmark_config.parameters)
                        )
179
                        result = self.__exec_benchmark(context, log_suffix)
180
                        if framework != Framework.NONE.value:
181
182
183
                            benchmark_results['{}/{}'.format(framework, model)] = result
                        else:
                            benchmark_results[model] = result
184
                else:
185
                    log_suffix = 'micro-benchmark {}'.format(benchmark_name)
186
187
188
189
                    logger.info('Executor is going to execute %s.', log_suffix)
                    context = BenchmarkRegistry.create_benchmark_context(
                        benchmark_name,
                        platform=self.__get_platform(),
190
                        framework=Framework(framework.lower()),
191
192
                        parameters=self.__get_arguments(benchmark_config.parameters)
                    )
193
                    result = self.__exec_benchmark(context, log_suffix)
194
                    if framework != Framework.NONE.value:
195
196
197
198
                        benchmark_results[framework] = result
                    else:
                        benchmark_results = result
            self.__write_benchmark_results(benchmark_name, benchmark_results)