executor.py 10.2 KB
Newer Older
1
2
3
4
5
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""SuperBench Executor."""

6
import os
7
import json
8
9
10
11
12
from pathlib import Path

from omegaconf import ListConfig

from superbench.benchmarks import Platform, Framework, BenchmarkRegistry
13
from superbench.common.utils import SuperBenchLogger, logger, rotate_dir, stdout_logger
14
from superbench.common.devices import GPU
15
from superbench.monitor import Monitor
16
17
18
19


class SuperBenchExecutor():
    """SuperBench executor class."""
20
    def __init__(self, sb_config, sb_output_dir):
21
22
23
24
        """Initilize.

        Args:
            sb_config (DictConfig): SuperBench config object.
25
            sb_output_dir (str): SuperBench output directory.
26
27
        """
        self._sb_config = sb_config
28
29
        self._sb_output_dir = sb_output_dir
        self._output_path = Path(sb_output_dir).expanduser().resolve()
30
31

        self.__set_logger('sb-exec.log')
32
        self.__set_stdout_logger(self._output_path / 'sb-bench.log')
33
34
        logger.debug('Executor uses config: %s.', self._sb_config)
        logger.debug('Executor writes to: %s.', str(self._output_path))
35
36

        self.__validate_sb_config()
37
        self._sb_monitor_config = self._sb_config.superbench.monitor
38
39
        self._sb_benchmarks = self._sb_config.superbench.benchmarks
        self._sb_enabled = self.__get_enabled_benchmarks()
40
        logger.debug('Executor will execute: %s', self._sb_enabled)
41
42
43
44
45
46
47

    def __set_logger(self, filename):
        """Set logger and add file handler.

        Args:
            filename (str): Log file name.
        """
48
        SuperBenchLogger.add_handler(logger.logger, filename=str(self._output_path / filename))
49

50
51
52
53
54
55
56
57
58
59
    def __set_stdout_logger(self, filename):
        """Set stdout logger and redirect logs and stdout into the file.

        Args:
            filename (str): Log file name.
        """
        stdout_logger.add_file_handler(filename)
        stdout_logger.start(self.__get_rank_id())
        SuperBenchLogger.add_handler(logger.logger, filename=filename)

60
61
62
63
64
65
66
67
68
69
70
71
72
73
    def __validate_sb_config(self):
        """Validate SuperBench config object.

        Raise:
            InvalidConfigError: If input config is invalid.
        """
        # TODO: add validation

    def __get_enabled_benchmarks(self):
        """Get enabled benchmarks list.

        Return:
            list: List of benchmarks which will be executed.
        """
74
        if 'enable' in self._sb_config.superbench and self._sb_config.superbench.enable:
75
76
77
78
79
            if isinstance(self._sb_config.superbench.enable, str):
                return [self._sb_config.superbench.enable]
            elif isinstance(self._sb_config.superbench.enable, (list, ListConfig)):
                return list(self._sb_config.superbench.enable)
        # TODO: may exist order issue
80
        return [k for k, v in self._sb_benchmarks.items() if 'enable' in v and v.enable]
81
82
83

    def __get_platform(self):
        """Detect runninng platform by environment."""
84
85
86
87
88
89
        try:
            gpu = GPU()
            if gpu.vendor == 'nvidia':
                return Platform.CUDA
            elif gpu.vendor == 'amd':
                return Platform.ROCM
90
91
            elif gpu.vendor == 'hygon':
                return Platform.DTK
92
93
            elif gpu.vendor == 'amd-graphics' or gpu.vendor == 'nvidia-graphics':
                return Platform.DIRECTX
94
95
96
        except Exception as e:
            logger.error(e)
        return Platform.CPU
97
98
99
100
101
102
103
104
105
106
107

    def __get_arguments(self, parameters):
        """Get command line arguments for argparse.

        Args:
            parameters (DictConfig): Parameters config dict.

        Return:
            str: Command line arguments.
        """
        argv = []
108
109
        if not parameters:
            return ''
110
111
112
        for name, val in parameters.items():
            if val is None:
                continue
113
114
115
            if isinstance(val, bool):
                if val:
                    argv.append('--{}'.format(name))
116
            elif isinstance(val, (str, int, float)):
117
118
119
120
121
                argv.append('--{} {}'.format(name, val))
            elif isinstance(val, (list, ListConfig)):
                argv.append('--{} {}'.format(name, ' '.join(val)))
        return ' '.join(argv)

122
    def __exec_benchmark(self, benchmark_full_name, context):
123
124
125
        """Launch benchmark for context.

        Args:
126
            benchmark_full_name (str): Benchmark full name.
127
            context (BenchmarkContext): Benchmark context to launch.
128
129

        Return:
130
            dict: Benchmark result.
131
        """
132
133
134
135
136
137
138
139
        try:
            benchmark = BenchmarkRegistry.launch_benchmark(context)
            if benchmark:
                logger.info(
                    'benchmark: %s, return code: %s, result: %s.', benchmark.name, benchmark.return_code,
                    benchmark.result
                )
                if benchmark.return_code.value == 0:
140
                    logger.info('Executor succeeded in %s.', benchmark_full_name)
141
                else:
142
143
144
145
                    logger.error('Executor failed in %s.', benchmark_full_name)
                result = json.loads(benchmark.serialized_result)
                result['name'] = benchmark_full_name
                return result
146
            else:
147
                logger.error('Executor failed in %s, invalid context.', benchmark_full_name)
148
149
        except Exception as e:
            logger.error(e)
150
            logger.error('Executor failed in %s.', benchmark_full_name)
151
152
        return None

153
154
155
156
157
158
    def __get_rank_id(self):
        """Get rank ID for current process.

        Return:
            int: Rank ID.
        """
159
        for rank_env in ['PROC_RANK', 'LOCAL_RANK', 'OMPI_COMM_WORLD_LOCAL_RANK']:
160
161
162
163
164
            if os.getenv(rank_env):
                return int(os.getenv(rank_env))

        return 0

165
166
167
168
169
    def __get_benchmark_dir(self, benchmark_name):
        """Get output directory for benchmark's current rank.

        Args:
            benchmark_name (str): Benchmark name.
170
171
172

        Return:
            Path: output directory.
173
        """
174
        return self._output_path / 'benchmarks' / benchmark_name / ('rank' + str(self.__get_rank_id()))
175

176
177
178
179
180
181
    def __create_benchmark_dir(self, benchmark_name):
        """Create output directory for benchmark.

        Args:
            benchmark_name (str): Benchmark name.
        """
182
        rotate_dir(self.__get_benchmark_dir(benchmark_name))
183
184
185
186
187
        try:
            self.__get_benchmark_dir(benchmark_name).mkdir(mode=0o755, parents=True, exist_ok=True)
        except Exception:
            logger.exception('Failed to create output directory for benchmark %s.', benchmark_name)
            raise
188
189
190
191
192
193
194
195

    def __write_benchmark_results(self, benchmark_name, benchmark_results):
        """Write benchmark results.

        Args:
            benchmark_name (str): Benchmark name.
            benchmark_results (dict): Benchmark results.
        """
196
        with (self.__get_benchmark_dir(benchmark_name) / 'results.json').open(mode='w') as f:
197
            json.dump(benchmark_results, f, indent=2)
198

199
200
201
202
203
204
205
206
207
208
209
    def __get_monitor_path(self, benchmark_name):
        """Get the output file path for the monitor.

        Args:
            benchmark_name (str): Benchmark name.

        Return:
            str: monitor output file path.
        """
        return f'{self.__get_benchmark_dir(benchmark_name) / "monitor.jsonl"}'

210
211
212
213
214
215
    def exec(self):
        """Run the SuperBench benchmarks locally."""
        for benchmark_name in self._sb_benchmarks:
            if benchmark_name not in self._sb_enabled:
                continue
            benchmark_config = self._sb_benchmarks[benchmark_name]
216
            benchmark_results = list()
217
            self.__create_benchmark_dir(benchmark_name)
218
219
            cwd = os.getcwd()
            os.chdir(self.__get_benchmark_dir(benchmark_name))
220
221
222

            monitor = None
            if self.__get_rank_id() == 0 and self._sb_monitor_config and self._sb_monitor_config.enable:
223
                if self.__get_platform() is not Platform.CPU:
224
225
226
227
228
229
                    monitor = Monitor(
                        None, int(self._sb_monitor_config.sample_duration or 10),
                        int(self._sb_monitor_config.sample_interval or 1), self.__get_monitor_path(benchmark_name)
                    )
                    monitor.start()
                else:
230
                    logger.warning('Monitor can not support CPU platform.')
231

232
            benchmark_real_name = benchmark_name.split(':')[0]
233
234
235
236
237
238
239
            frameworks = benchmark_config.get('frameworks', [Framework.NONE.value])
            for framework in frameworks:
                if benchmark_real_name == 'model-benchmarks' or (
                    ':' not in benchmark_name and benchmark_name.endswith('_models')
                ):
                    for model in benchmark_config.models:
                        full_name = f'{benchmark_name}/{framework}-{model}'
240
                        logger.info('Executor is going to execute %s.', full_name)
241
                        context = BenchmarkRegistry.create_benchmark_context(
242
                            model,
243
                            platform=self.__get_platform(),
244
                            framework=Framework(framework.lower()),
245
246
247
                            parameters=self.__get_arguments(
                                {} if 'parameters' not in benchmark_config else benchmark_config.parameters
                            )
248
                        )
249
                        result = self.__exec_benchmark(full_name, context)
250
                        benchmark_results.append(result)
251
252
253
254
255
256
257
258
259
260
261
262
                else:
                    full_name = benchmark_name
                    logger.info('Executor is going to execute %s.', full_name)
                    context = BenchmarkRegistry.create_benchmark_context(
                        benchmark_real_name,
                        platform=self.__get_platform(),
                        framework=Framework(framework.lower()),
                        parameters=self.
                        __get_arguments({} if 'parameters' not in benchmark_config else benchmark_config.parameters)
                    )
                    result = self.__exec_benchmark(full_name, context)
                    benchmark_results.append(result)
263

264
265
            if monitor:
                monitor.stop()
266
            stdout_logger.stop()
267
            self.__write_benchmark_results(benchmark_name, benchmark_results)
268
            os.chdir(cwd)