runner.py 8.73 KB
Newer Older
1
2
3
4
5
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""SuperBench Runner."""

6
import random
7
8
from pathlib import Path

9
from joblib import Parallel, delayed
10
11
from omegaconf import ListConfig, OmegaConf

12
from superbench.common.utils import SuperBenchLogger, logger
13
from superbench.runner.ansible import AnsibleClient
14
15
16
17


class SuperBenchRunner():
    """SuperBench runner class."""
18
    def __init__(self, sb_config, docker_config, ansible_config, sb_output_dir):
19
20
21
22
23
24
        """Initilize.

        Args:
            sb_config (DictConfig): SuperBench config object.
            docker_config (DictConfig): Docker config object.
            ansible_config (DictConfig): Ansible config object.
25
            sb_output_dir (str): SuperBench output directory.
26
27
28
29
        """
        self._sb_config = sb_config
        self._docker_config = docker_config
        self._ansible_config = ansible_config
30
31
        self._sb_output_dir = sb_output_dir
        self._output_path = Path(sb_output_dir).expanduser().resolve()
32
        self._ansible_client = AnsibleClient(ansible_config)
33
34
35

        self.__set_logger('sb-run.log')
        logger.info('Runner uses config: %s.', self._sb_config)
36
        logger.info('Runner writes to: %s.', str(self._output_path))
37

38
        self._sb_benchmarks = self._sb_config.superbench.benchmarks
39
        self.__validate_sb_config()
40
41
42
        self._sb_enabled_benchmarks = self.__get_enabled_benchmarks()
        logger.info('Runner will run: %s', self._sb_enabled_benchmarks)

43
44
45
46
47
48
    def __set_logger(self, filename):
        """Set logger and add file handler.

        Args:
            filename (str): Log file name.
        """
49
        SuperBenchLogger.add_handler(logger.logger, filename=str(self._output_path / filename))
50

51
52
53
54
55
56
57
    def __validate_sb_config(self):
        """Validate SuperBench config object.

        Raise:
            InvalidConfigError: If input config is invalid.
        """
        # TODO: add validation and defaulting
58
59
        if not self._sb_config.superbench.env:
            self._sb_config.superbench.env = {}
60
61
62
63
64
65
66
67
68
69
70
71
72
        for name in self._sb_benchmarks:
            if not self._sb_benchmarks[name].modes:
                self._sb_benchmarks[name].modes = []
            for idx, mode in enumerate(self._sb_benchmarks[name].modes):
                if mode.name == 'local':
                    if not mode.proc_num:
                        self._sb_benchmarks[name].modes[idx].proc_num = 1
                    if not mode.prefix:
                        self._sb_benchmarks[name].modes[idx].prefix = ''
                elif mode.name == 'torch.distributed':
                    if not mode.proc_num:
                        self._sb_benchmarks[name].modes[idx].proc_num = 8

73
74
75
76
77
78
79
80
81
82
83
84
85
    def __get_enabled_benchmarks(self):
        """Get enabled benchmarks list.

        Return:
            list: List of benchmarks which will be executed.
        """
        if self._sb_config.superbench.enable:
            if isinstance(self._sb_config.superbench.enable, str):
                return [self._sb_config.superbench.enable]
            elif isinstance(self._sb_config.superbench.enable, (list, ListConfig)):
                return list(self._sb_config.superbench.enable)
        return [k for k, v in self._sb_benchmarks.items() if v.enable]

86
    def __get_mode_command(self, benchmark_name, mode):
87
88
89
        """Get runner command for given mode.

        Args:
90
            benchmark_name (str): Benchmark name.
91
92
93
94
95
            mode (DictConfig): Runner mode.

        Return:
            str: Runner command.
        """
96
97
98
99
        exec_command = ('sb exec --output-dir {output_dir} -c sb.config.yaml -C superbench.enable={name}').format(
            name=benchmark_name,
            output_dir=self._sb_output_dir,
        )
100
101
102
103
104
105
        mode_command = exec_command
        if mode.name == 'local':
            mode_command = '{prefix} {command}'.format(
                prefix=mode.prefix.format(proc_rank=mode.proc_rank, proc_num=mode.proc_num),
                command=exec_command,
            )
106
            mode_command = f'PROC_RANK={mode.proc_rank} {mode_command.strip()}'
107
        elif mode.name == 'torch.distributed':
108
109
            # TODO: replace with torch.distributed.run in v1.9
            # TODO: only supports node_num=1 and node_num=all currently
110
            mode_command = (
111
112
113
114
                'python3 -m torch.distributed.launch '
                '--use_env --no_python --nproc_per_node={proc_num} '
                '--nnodes={node_num} --node_rank=$NODE_RANK '
                '--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
115
                '{command} {torch_distributed_suffix}'
116
            ).format(
117
118
119
120
121
122
123
                proc_num=mode.proc_num,
                node_num=1 if mode.node_num == 1 else '$NNODES',
                command=exec_command,
                torch_distributed_suffix=(
                    'superbench.benchmarks.{name}.parameters.distributed_impl=ddp '
                    'superbench.benchmarks.{name}.parameters.distributed_backend=nccl'
                ).format(name=benchmark_name),
124
            )
125
        return mode_command.strip()
126

127
128
129
130
131
    def deploy(self):    # pragma: no cover
        """Deploy SuperBench environment."""
        logger.info('Preparing SuperBench environment.')
        extravars = {
            'ssh_port': random.randint(1 << 14, (1 << 15) - 1),
132
            'output_dir': str(self._output_path),
133
            'docker_image': self._docker_config.image,
134
            'gpu_vendor': 'nvidia',
135
136
137
138
139
140
141
142
143
144
145
146
147
148
        }
        if bool(self._docker_config.username) and bool(self._docker_config.password):
            extravars.update(
                {
                    'docker_registry': self._docker_config.registry,
                    'docker_username': self._docker_config.username,
                    'docker_password': self._docker_config.password,
                }
            )
        self._ansible_client.run(self._ansible_client.get_playbook_config('deploy.yaml', extravars=extravars))

    def check_env(self):    # pragma: no cover
        """Check SuperBench environment."""
        logger.info('Checking SuperBench environment.')
149
        OmegaConf.save(config=self._sb_config, f=str(self._output_path / 'sb.config.yaml'))
150
        self._ansible_client.run(
151
152
153
            self._ansible_client.get_playbook_config(
                'check_env.yaml',
                extravars={
154
                    'output_dir': str(self._output_path),
155
156
157
                    'env': '\n'.join(f'{k}={v}' for k, v in self._sb_config.superbench.env.items()),
                }
            )
158
159
        )

160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def fetch_results(self):    # pragma: no cover
        """Fetch benchmark results on all nodes."""
        try:
            (self._output_path / 'nodes').mkdir(mode=0o755, parents=True, exist_ok=True)
        except Exception:
            logger.exception('Failed to create directory %s.', str(self._output_path / 'nodes'))
            raise
        self._ansible_client.run(
            self._ansible_client.get_playbook_config(
                'fetch_results.yaml',
                extravars={
                    'sb_output_dir': self._sb_output_dir,
                    'absolute_output_dir': str(self._output_path),
                }
            )
        )

177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    def _run_proc(self, benchmark_name, mode, vars):
        """Run the process.

        Args:
            benchmark_name (str): Benchmark name.
            mode (DictConfig): Runner mode.
            vars (dict): Process variables.

        Returns:
            int: Process return code.
        """
        mode.update(vars)
        logger.info('Runner is going to run %s in %s mode, proc rank %d.', benchmark_name, mode.name, mode.proc_rank)
        rc = self._ansible_client.run(
            self._ansible_client.get_shell_config(
                (
                    'docker exec sb-workspace bash -c '
194
                    "'set -o allexport && source sb.env && set +o allexport && {command}'"
195
196
197
198
199
200
                ).format(command=self.__get_mode_command(benchmark_name, mode), )
            ),
            sudo=True
        )
        return rc

201
    def run(self):
202
203
204
205
206
207
        """Run the SuperBench benchmarks distributedly."""
        self.check_env()
        for benchmark_name in self._sb_benchmarks:
            if benchmark_name not in self._sb_enabled_benchmarks:
                continue
            benchmark_config = self._sb_benchmarks[benchmark_name]
208
209
210
211
212
213
            for mode in benchmark_config.modes:
                if mode.name == 'local':
                    Parallel(n_jobs=mode.proc_num if mode.parallel else 1)(
                        delayed(self._run_proc)(benchmark_name, mode, {
                            'proc_rank': proc_rank
                        }) for proc_rank in range(mode.proc_num)
214
                    )
215
216
                elif mode.name == 'torch.distributed':
                    self._run_proc(benchmark_name, mode, {'proc_rank': 0})
217
            self.fetch_results()