Unverified Commit 6b0ca1cb authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Runner - Support local mode in runner (#88)

* Support local mode in runner.
parent 44c5103b
......@@ -136,6 +136,7 @@ def run(self):
'ansible_base>=2.10.9;os_name=="posix"',
'ansible_runner>=1.4.7',
'colorlog>=4.7.2',
'joblib>=1.0.1',
'knack>=0.7.2',
'omegaconf>=2.0.6',
],
......
......@@ -4,6 +4,11 @@ superbench:
benchmarks:
matmul:
enable: true
modes:
- name: local
proc_num: 8
prefix: CUDA_VISIBLE_DEVICES={proc_rank}
parallel: no
frameworks:
- pytorch
parameters:
......
......@@ -6,6 +6,7 @@
import random
from pathlib import Path
from joblib import Parallel, delayed
from omegaconf import ListConfig, OmegaConf
from superbench.common.utils import SuperBenchLogger, logger
......@@ -34,6 +35,7 @@ def __init__(self, sb_config, docker_config, ansible_config, output_dir):
logger.info('Runner writes to: %s.', self._output_dir)
self._sb_benchmarks = self._sb_config.superbench.benchmarks
self.__validate_sb_config()
self._sb_enabled_benchmarks = self.__get_enabled_benchmarks()
logger.info('Runner will run: %s', self._sb_enabled_benchmarks)
......@@ -45,6 +47,26 @@ def __set_logger(self, filename):
"""
SuperBenchLogger.add_handler(logger.logger, filename=str(Path(self._output_dir) / filename))
def __validate_sb_config(self):
"""Validate SuperBench config object.
Raise:
InvalidConfigError: If input config is invalid.
"""
# TODO: add validation and defaulting
for name in self._sb_benchmarks:
if not self._sb_benchmarks[name].modes:
self._sb_benchmarks[name].modes = []
for idx, mode in enumerate(self._sb_benchmarks[name].modes):
if mode.name == 'local':
if not mode.proc_num:
self._sb_benchmarks[name].modes[idx].proc_num = 1
if not mode.prefix:
self._sb_benchmarks[name].modes[idx].prefix = ''
elif mode.name == 'torch.distributed':
if not mode.proc_num:
self._sb_benchmarks[name].modes[idx].proc_num = 8
def __get_enabled_benchmarks(self):
"""Get enabled benchmarks list.
......@@ -58,29 +80,42 @@ def __get_enabled_benchmarks(self):
return list(self._sb_config.superbench.enable)
return [k for k, v in self._sb_benchmarks.items() if v.enable]
def __get_mode_command(self, mode, exec_command):
def __get_mode_command(self, benchmark_name, mode):
"""Get runner command for given mode.
Args:
benchmark_name (str): Benchmark name.
mode (DictConfig): Runner mode.
exec_command (str): Executor command.
Return:
str: Runner command.
"""
if mode.name == 'torch.distributed':
exec_command = ('sb exec -c sb.config.yaml -C superbench.enable={name}').format(name=benchmark_name)
mode_command = exec_command
if mode.name == 'local':
mode_command = '{prefix} {command}'.format(
prefix=mode.prefix.format(proc_rank=mode.proc_rank, proc_num=mode.proc_num),
command=exec_command,
)
elif mode.name == 'torch.distributed':
# TODO: replace with torch.distributed.run in v1.9
# TODO: only supports node_num=1 and node_num=all currently
return (
mode_command = (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node={proc_num} '
'--nnodes={node_num} --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'{command}'
'{command} {torch_distributed_suffix}'
).format(
proc_num=mode.proc_num or 8, node_num=1 if mode.node_num == 1 else '$NNODES', command=exec_command
proc_num=mode.proc_num,
node_num=1 if mode.node_num == 1 else '$NNODES',
command=exec_command,
torch_distributed_suffix=(
'superbench.benchmarks.{name}.parameters.distributed_impl=ddp '
'superbench.benchmarks.{name}.parameters.distributed_backend=nccl'
).format(name=benchmark_name),
)
return exec_command
return mode_command.strip()
def deploy(self): # pragma: no cover
"""Deploy SuperBench environment."""
......@@ -109,32 +144,43 @@ def check_env(self): # pragma: no cover
self._ansible_client.get_playbook_config('check_env.yaml', extravars={'output_dir': self._output_dir})
)
def _run_proc(self, benchmark_name, mode, vars):
"""Run the process.
Args:
benchmark_name (str): Benchmark name.
mode (DictConfig): Runner mode.
vars (dict): Process variables.
Returns:
int: Process return code.
"""
mode.update(vars)
logger.info('Runner is going to run %s in %s mode, proc rank %d.', benchmark_name, mode.name, mode.proc_rank)
rc = self._ansible_client.run(
self._ansible_client.get_shell_config(
(
'docker exec sb-workspace bash -c '
'"set -o allexport && source sb.env && set +o allexport && {command}"'
).format(command=self.__get_mode_command(benchmark_name, mode), )
),
sudo=True
)
return rc
def run(self):
"""Run the SuperBench benchmarks distributedly."""
self.check_env()
runner_command = (
'docker exec sb-workspace bash -c '
'"set -o allexport && source sb.env && set +o allexport && {}"'
)
for benchmark_name in self._sb_benchmarks:
if benchmark_name not in self._sb_enabled_benchmarks:
continue
benchmark_config = self._sb_benchmarks[benchmark_name]
for mode in benchmark_config.modes or []:
if mode.name == 'torch.distributed':
logger.info('Runner is going to run %s.', benchmark_name)
self._ansible_client.run(
self._ansible_client.get_shell_config(
runner_command.format(
self.__get_mode_command(
mode, (
'sb exec -c sb.config.yaml -C '
'superbench.enable={name} '
'superbench.benchmarks.{name}.parameters.distributed_impl=ddp '
'superbench.benchmarks.{name}.parameters.distributed_backend=nccl'
).format(name=benchmark_name)
)
),
sudo=True
)
for mode in benchmark_config.modes:
if mode.name == 'local':
Parallel(n_jobs=mode.proc_num if mode.parallel else 1)(
delayed(self._run_proc)(benchmark_name, mode, {
'proc_rank': proc_rank
}) for proc_rank in range(mode.proc_num)
)
elif mode.name == 'torch.distributed':
self._run_proc(benchmark_name, mode, {'proc_rank': 0})
......@@ -7,6 +7,7 @@
import shutil
import tempfile
from pathlib import Path
from unittest import mock
from omegaconf import OmegaConf
......@@ -36,56 +37,78 @@ def test_get_mode_command(self):
"""Test __get_mode_command."""
test_cases = [
{
'benchmark_name': 'foo',
'mode': {
'name': 'non_exist',
},
'exec_command': 'sb exec',
'expected_command': 'sb exec',
'expected_command': 'sb exec -c sb.config.yaml -C superbench.enable=foo',
},
{
'benchmark_name': 'foo',
'mode': {
'name': 'torch.distributed',
'name': 'local',
'proc_num': 1,
'prefix': '',
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=8 '
'--nnodes=$NNODES --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
),
'expected_command': 'sb exec -c sb.config.yaml -C superbench.enable=foo',
},
{
'benchmark_name':
'foo',
'mode': {
'name': 'local',
'proc_num': 8,
'proc_rank': 6,
'prefix': 'CUDA_VISIBLE_DEVICES={proc_rank} numactl -c $(({proc_rank}/2))'
},
'expected_command':
('CUDA_VISIBLE_DEVICES=6 numactl -c $((6/2)) '
'sb exec -c sb.config.yaml -C superbench.enable=foo'),
},
{
'benchmark_name': 'foo',
'mode': {
'name': 'local',
'proc_num': 16,
'proc_rank': 1,
'prefix': 'RANK={proc_rank} NUM={proc_num}'
},
'expected_command': 'RANK=1 NUM=16 sb exec -c sb.config.yaml -C superbench.enable=foo',
},
{
'benchmark_name':
'foo',
'mode': {
'name': 'torch.distributed',
'proc_num': 1,
'node_num': 'all',
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=1 '
'--nnodes=$NNODES --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
'sb exec -c sb.config.yaml -C superbench.enable=foo '
'superbench.benchmarks.foo.parameters.distributed_impl=ddp '
'superbench.benchmarks.foo.parameters.distributed_backend=nccl'
),
},
{
'benchmark_name':
'foo',
'mode': {
'name': 'torch.distributed',
'proc_num': 8,
'node_num': 1,
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=8 '
'--nnodes=1 --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
'sb exec -c sb.config.yaml -C superbench.enable=foo '
'superbench.benchmarks.foo.parameters.distributed_impl=ddp '
'superbench.benchmarks.foo.parameters.distributed_backend=nccl'
),
},
]
......@@ -93,11 +116,21 @@ def test_get_mode_command(self):
with self.subTest(msg='Testing with case', test_case=test_case):
self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command(
OmegaConf.create(test_case['mode']), test_case['exec_command']
test_case['benchmark_name'], OmegaConf.create(test_case['mode'])
), test_case['expected_command']
)
def test_run(self):
"""Test run."""
def test_run_empty_benchmarks(self):
"""Test run empty benchmarks, nothing should happen."""
self.runner._sb_enabled_benchmarks = []
self.runner.run()
@mock.patch('superbench.runner.ansible.AnsibleClient.run')
def test_run_default_benchmarks(self, mock_ansible_client_run):
"""Test run default benchmarks, mock AnsibleClient.run function.
Args:
mock_ansible_client_run (function): Mocked AnsibleClient.run function.
"""
mock_ansible_client_run.return_value = 0
self.runner.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment