Unverified Commit 8b4f613a authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Runner - Support torch.distributed mode in runner (#81)

* Support `torch.distributed` mode in runner.
* Support given `proc_num` and `node_num` in `torch.distributed` mode.
parent 87f6b371
......@@ -10,6 +10,10 @@ superbench:
num_steps: 2048
bert_models:
enable: true
modes:
- name: torch.distributed
proc_num: 8
node_num: all
frameworks:
- pytorch
models:
......
......@@ -58,6 +58,30 @@ def __get_enabled_benchmarks(self):
return list(self._sb_config.superbench.enable)
return [k for k, v in self._sb_benchmarks.items() if v.enable]
def __get_mode_command(self, mode, exec_command):
"""Get runner command for given mode.
Args:
mode (DictConfig): Runner mode.
exec_command (str): Executor command.
Return:
str: Runner command.
"""
if mode.name == 'torch.distributed':
# TODO: replace with torch.distributed.run in v1.9
# TODO: only supports node_num=1 and node_num=all currently
return (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node={proc_num} '
'--nnodes={node_num} --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'{command}'
).format(
proc_num=mode.proc_num or 8, node_num=1 if mode.node_num == 1 else '$NNODES', command=exec_command
)
return exec_command
def deploy(self): # pragma: no cover
"""Deploy SuperBench environment."""
logger.info('Preparing SuperBench environment.')
......@@ -86,11 +110,31 @@ def check_env(self): # pragma: no cover
)
def run(self):
"""Run the SuperBench benchmarks distributedly.
Raises:
NotImplementedError: Not implemented yet.
"""
logger.info(self._sb_config)
logger.error('Work in progress, not implemented yet.')
pass
"""Run the SuperBench benchmarks distributedly."""
self.check_env()
runner_command = (
'docker exec sb-workspace bash -c '
'"set -o allexport && source sb.env && set +o allexport && {}"'
)
for benchmark_name in self._sb_benchmarks:
if benchmark_name not in self._sb_enabled_benchmarks:
continue
benchmark_config = self._sb_benchmarks[benchmark_name]
for mode in benchmark_config.modes or []:
if mode.name == 'torch.distributed':
logger.info('Runner is going to run %s.', benchmark_name)
self._ansible_client.run(
self._ansible_client.get_shell_config(
runner_command.format(
self.__get_mode_command(
mode, (
'sb exec -c sb.config.yaml -C '
'superbench.enable={name} '
'superbench.benchmarks.{name}.parameters.distributed_impl=ddp '
'superbench.benchmarks.{name}.parameters.distributed_backend=nccl'
).format(name=benchmark_name)
)
),
sudo=True
)
)
......@@ -61,7 +61,7 @@ def test_sb_exec(self):
def test_sb_run(self):
"""Test sb run."""
self.cmd('sb run --host-list localhost', checks=[NoneCheck()])
self.cmd('sb run --host-list localhost --config-override superbench.enable=none', checks=[NoneCheck()])
def test_sb_run_no_docker_auth(self):
"""Test sb run, only --docker-username argument, should fail."""
......
......@@ -32,6 +32,72 @@ def test_set_logger(self):
expected_log_file = Path(self.runner._output_dir) / 'sb-run.log'
self.assertTrue(expected_log_file.is_file())
def test_get_mode_command(self):
"""Test __get_mode_command."""
test_cases = [
{
'mode': {
'name': 'non_exist',
},
'exec_command': 'sb exec',
'expected_command': 'sb exec',
},
{
'mode': {
'name': 'torch.distributed',
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=8 '
'--nnodes=$NNODES --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
),
},
{
'mode': {
'name': 'torch.distributed',
'proc_num': 1,
'node_num': 'all',
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=1 '
'--nnodes=$NNODES --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
),
},
{
'mode': {
'name': 'torch.distributed',
'proc_num': 8,
'node_num': 1,
},
'exec_command':
'sb exec',
'expected_command': (
'python3 -m torch.distributed.launch '
'--use_env --no_python --nproc_per_node=8 '
'--nnodes=1 --node_rank=$NODE_RANK '
'--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT '
'sb exec'
),
},
]
for test_case in test_cases:
with self.subTest(msg='Testing with case', test_case=test_case):
self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command(
OmegaConf.create(test_case['mode']), test_case['exec_command']
), test_case['expected_command']
)
def test_run(self):
"""Test run."""
self.runner._sb_enabled_benchmarks = []
self.runner.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment