# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. """SuperBench Executor.""" import os import json from pathlib import Path from omegaconf import ListConfig from superbench.benchmarks import Platform, Framework, BenchmarkRegistry from superbench.common.utils import SuperBenchLogger, logger, rotate_dir class SuperBenchExecutor(): """SuperBench executor class.""" def __init__(self, sb_config, sb_output_dir): """Initilize. Args: sb_config (DictConfig): SuperBench config object. sb_output_dir (str): SuperBench output directory. """ self._sb_config = sb_config self._sb_output_dir = sb_output_dir self._output_path = Path(sb_output_dir).expanduser().resolve() self.__set_logger('sb-exec.log') logger.info('Executor uses config: %s.', self._sb_config) logger.info('Executor writes to: %s.', str(self._output_path)) self.__validate_sb_config() self._sb_benchmarks = self._sb_config.superbench.benchmarks self._sb_enabled = self.__get_enabled_benchmarks() logger.info('Executor will execute: %s', self._sb_enabled) def __set_logger(self, filename): """Set logger and add file handler. Args: filename (str): Log file name. """ SuperBenchLogger.add_handler(logger.logger, filename=str(self._output_path / filename)) def __validate_sb_config(self): """Validate SuperBench config object. Raise: InvalidConfigError: If input config is invalid. """ # TODO: add validation def __get_enabled_benchmarks(self): """Get enabled benchmarks list. Return: list: List of benchmarks which will be executed. """ if self._sb_config.superbench.enable: if isinstance(self._sb_config.superbench.enable, str): return [self._sb_config.superbench.enable] elif isinstance(self._sb_config.superbench.enable, (list, ListConfig)): return list(self._sb_config.superbench.enable) # TODO: may exist order issue return [k for k, v in self._sb_benchmarks.items() if v.enable] def __get_platform(self): """Detect runninng platform by environment.""" # TODO: check devices and env vars return Platform.CUDA def __get_arguments(self, parameters): """Get command line arguments for argparse. Args: parameters (DictConfig): Parameters config dict. Return: str: Command line arguments. """ argv = [] if not parameters: return '' for name, val in parameters.items(): if val is None: continue if isinstance(val, bool) and val: argv.append('--{}'.format(name)) elif isinstance(val, (str, int, float)): argv.append('--{} {}'.format(name, val)) elif isinstance(val, (list, ListConfig)): argv.append('--{} {}'.format(name, ' '.join(val))) return ' '.join(argv) def __exec_benchmark(self, context, log_suffix): """Launch benchmark for context. Args: context (BenchmarkContext): Benchmark context to launch. log_suffix (str): Log string suffix. Return: dict: Benchmark results. """ try: benchmark = BenchmarkRegistry.launch_benchmark(context) if benchmark: logger.info( 'benchmark: %s, return code: %s, result: %s.', benchmark.name, benchmark.return_code, benchmark.result ) if benchmark.return_code.value == 0: logger.info('Executor succeeded in %s.', log_suffix) else: logger.error('Executor failed in %s.', log_suffix) return json.loads(benchmark.serialized_result) else: logger.error('Executor failed in %s, invalid context.', log_suffix) except Exception as e: logger.error(e) logger.error('Executor failed in %s.', log_suffix) return None def __get_benchmark_dir(self, benchmark_name): """Get output directory for benchmark's current rank. Args: benchmark_name (str): Benchmark name. """ benchmark_output_dir = self._output_path / 'benchmarks' / benchmark_name for rank_env in ['PROC_RANK', 'LOCAL_RANK']: if os.getenv(rank_env): return benchmark_output_dir / 'rank{}'.format(os.getenv(rank_env)) return benchmark_output_dir / 'rank0' def __create_benchmark_dir(self, benchmark_name): """Create output directory for benchmark. Args: benchmark_name (str): Benchmark name. """ rotate_dir(self.__get_benchmark_dir(benchmark_name)) try: self.__get_benchmark_dir(benchmark_name).mkdir(mode=0o755, parents=True, exist_ok=True) except Exception: logger.exception('Failed to create output directory for benchmark %s.', benchmark_name) raise def __write_benchmark_results(self, benchmark_name, benchmark_results): """Write benchmark results. Args: benchmark_name (str): Benchmark name. benchmark_results (dict): Benchmark results. """ with (self.__get_benchmark_dir(benchmark_name) / 'results.json').open(mode='w') as f: json.dump(benchmark_results, f, indent=2) def exec(self): """Run the SuperBench benchmarks locally.""" for benchmark_name in self._sb_benchmarks: if benchmark_name not in self._sb_enabled: continue benchmark_config = self._sb_benchmarks[benchmark_name] benchmark_results = {} self.__create_benchmark_dir(benchmark_name) for framework in benchmark_config.frameworks or [Framework.NONE.value]: if benchmark_name.endswith('_models'): for model in benchmark_config.models: log_suffix = 'model-benchmark {}: {}/{}'.format(benchmark_name, framework, model) logger.info('Executor is going to execute %s.', log_suffix) context = BenchmarkRegistry.create_benchmark_context( model, platform=self.__get_platform(), framework=Framework(framework.lower()), parameters=self.__get_arguments(benchmark_config.parameters) ) result = self.__exec_benchmark(context, log_suffix) if framework != Framework.NONE.value: benchmark_results['{}/{}'.format(framework, model)] = result else: benchmark_results[model] = result else: log_suffix = 'micro-benchmark {}'.format(benchmark_name) logger.info('Executor is going to execute %s.', log_suffix) context = BenchmarkRegistry.create_benchmark_context( benchmark_name, platform=self.__get_platform(), framework=Framework(framework.lower()), parameters=self.__get_arguments(benchmark_config.parameters) ) result = self.__exec_benchmark(context, log_suffix) if framework != Framework.NONE.value: benchmark_results[framework] = result else: benchmark_results = result self.__write_benchmark_results(benchmark_name, benchmark_results)