Unverified Commit e4eeda0a authored by Yang Wang's avatar Yang Wang Committed by GitHub
Browse files

Runner - support 'pattern' in 'mpi' mode to run tasks in parallel (#430)

* add mpi-parallels mode

* update according to comments

* fix and update doc

* update

* merge into 'mpi' mode

* udpate according to comments

* fix testcases

* fix ansible

* regard pattern as field

* udpate

* fix flake8 version

* add flake8 range

* remove map-by from host config

* udpate comments
parent 3c97381f
...@@ -401,6 +401,7 @@ Some attributes may only be suitable for specific mode. ...@@ -401,6 +401,7 @@ Some attributes may only be suitable for specific mode.
| `env` | ✓ | ✓ | ✓ | | `env` | ✓ | ✓ | ✓ |
| `mca` | ✘ | ✘ | ✓ | | `mca` | ✘ | ✘ | ✓ |
| `parallel` | ✓ | ✘ | ✘ | | `parallel` | ✓ | ✘ | ✘ |
| `pattern` | ✘ | ✘ | ✓ |
* accepted values: `local | torch.distributed | mpi` * accepted values: `local | torch.distributed | mpi`
* default value: `local` * default value: `local`
...@@ -454,3 +455,12 @@ Whether run benchmarks in parallel (all ranks at the same time) or in sequence ( ...@@ -454,3 +455,12 @@ Whether run benchmarks in parallel (all ranks at the same time) or in sequence (
Only available for `local` mode. Only available for `local` mode.
* default value: `yes` * default value: `yes`
### `pattern`
Pattern variables to run benchmarks with nodes in specified traffic pattern combination, in a flatten key-value dictionary.
Only available for `mpi` mode.
Available variables in formatted string includes:
+ `name`
* accepted values: `all-nodes`
...@@ -188,7 +188,7 @@ def run(self): ...@@ -188,7 +188,7 @@ def run(self):
'test': [ 'test': [
'flake8-docstrings>=1.5.0', 'flake8-docstrings>=1.5.0',
'flake8-quotes>=3.2.0', 'flake8-quotes>=3.2.0',
'flake8>=3.8.4', 'flake8>=3.8.4, <6.0.0',
'mypy>=0.800', 'mypy>=0.800',
'pydocstyle>=5.1.1', 'pydocstyle>=5.1.1',
'pytest-cov>=2.11.1', 'pytest-cov>=2.11.1',
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
from superbench.common.utils.lazy_import import LazyImport from superbench.common.utils.lazy_import import LazyImport
from superbench.common.utils.process import run_command from superbench.common.utils.process import run_command
from superbench.common.utils.topo_aware import gen_topo_aware_config from superbench.common.utils.topo_aware import gen_topo_aware_config
from superbench.common.utils.gen_traffic_pattern_config import gen_tarffic_pattern_host_group
device_manager = LazyImport('superbench.common.utils.device_manager') device_manager = LazyImport('superbench.common.utils.device_manager')
...@@ -24,4 +25,5 @@ ...@@ -24,4 +25,5 @@
'rotate_dir', 'rotate_dir',
'run_command', 'run_command',
'gen_topo_aware_config', 'gen_topo_aware_config',
'gen_tarffic_pattern_host_group',
] ]
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Utilities for traffic pattern config."""
from superbench.common.utils import logger
def gen_all_nodes_config(n):
"""Generate all nodes config.
Args:
n (int): the number of participants.
Returns:
config (list): the generated config list, each item in the list is a str like "0,1,2,3".
"""
config = []
if n <= 0:
logger.warning('n is not positive')
return config
config = [','.join(map(str, range(n)))]
return config
def __convert_config_to_host_group(config, host_list):
"""Convert config format to host node.
Args:
host_list (list): the list of hostnames read from hostfile.
config (list): the traffic pattern config.
Returns:
host_groups (list): the host groups converted from traffic pattern config.
"""
host_groups = []
for item in config:
groups = item.strip().strip(';').split(';')
host_group = []
for group in groups:
hosts = []
for index in group.split(','):
hosts.append(host_list[int(index)])
host_group.append(hosts)
host_groups.append(host_group)
return host_groups
def gen_tarffic_pattern_host_group(host_list, pattern):
"""Generate host group from specified traffic pattern.
Args:
host_list (list): the list of hostnames read from hostfile.
pattern (DictConfig): the mpi pattern dict.
Returns:
host_group (list): the host group generated from traffic pattern.
"""
config = []
n = len(host_list)
if pattern.name == 'all-nodes':
config = gen_all_nodes_config(n)
else:
logger.error('Unsupported traffic pattern: {}'.format(pattern.name))
host_group = __convert_config_to_host_group(config, host_list)
return host_group
...@@ -56,9 +56,16 @@ ...@@ -56,9 +56,16 @@
- '{{ workspace }}/sb.env' - '{{ workspace }}/sb.env'
- /tmp/sb.env - /tmp/sb.env
become: yes become: yes
- name: Updating Hostfile - name: Updating Hostfile to Remote
copy: copy:
content: "{{ sb_nodes | join('\n') }}\n" content: "{{ sb_nodes | join('\n') }}\n"
dest: '{{ workspace }}/hostfile' dest: '{{ workspace }}/hostfile'
mode: 0644 mode: 0644
become: yes become: yes
- name: Generating Hostfile to Local
delegate_to: localhost
run_once: true
copy:
content: "{{ sb_nodes | join('\n') }}\n"
dest: '{{ output_dir }}/hostfile'
mode: 0644
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
from joblib import Parallel, delayed from joblib import Parallel, delayed
from omegaconf import ListConfig, OmegaConf from omegaconf import ListConfig, OmegaConf
from superbench.common.utils import SuperBenchLogger, logger from superbench.common.utils import SuperBenchLogger, logger, gen_tarffic_pattern_host_group
from superbench.runner.ansible import AnsibleClient from superbench.runner.ansible import AnsibleClient
from superbench.benchmarks import ReduceType, Reducer from superbench.benchmarks import ReduceType, Reducer
from superbench.monitor import MonitorRecord from superbench.monitor import MonitorRecord
...@@ -109,6 +109,7 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None): ...@@ -109,6 +109,7 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None):
benchmark_name (str): Benchmark name. benchmark_name (str): Benchmark name.
mode (DictConfig): Runner mode. mode (DictConfig): Runner mode.
timeout (int): The timeout value in seconds. timeout (int): The timeout value in seconds.
host_list (list): The specified Host node list.
Return: Return:
str: Runner command. str: Runner command.
...@@ -143,12 +144,13 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None): ...@@ -143,12 +144,13 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None):
'mpirun ' # use default OpenMPI in image 'mpirun ' # use default OpenMPI in image
'-tag-output ' # tag mpi output with [jobid,rank]<stdout/stderr> prefix '-tag-output ' # tag mpi output with [jobid,rank]<stdout/stderr> prefix
'-allow-run-as-root ' # allow mpirun to run when executed by root user '-allow-run-as-root ' # allow mpirun to run when executed by root user
'{host_list} ' # use prepared hostfile and launch {proc_num} processes on each node '{host_list} ' # use prepared hostfile or specify nodes and launch {proc_num} processes on each node
'-bind-to numa ' # bind processes to numa '-bind-to numa ' # bind processes to numa
'{mca_list} {env_list} {command}' '{mca_list} {env_list} {command}'
).format( ).format(
host_list=f'-host localhost:{mode.proc_num}' host_list=f'-host localhost:{mode.proc_num}' if mode.node_num == 1 else
if mode.node_num == 1 else f'-hostfile hostfile -map-by ppr:{mode.proc_num}:node', f'-hostfile hostfile -map-by ppr:{mode.proc_num}:node' if mode.host_list is None else '-host ' +
','.join(f'{host}:{mode.proc_num}' for host in mode.host_list),
mca_list=' '.join(f'-mca {k} {v}' for k, v in mode.mca.items()), mca_list=' '.join(f'-mca {k} {v}' for k, v in mode.mca.items()),
env_list=' '.join( env_list=' '.join(
f'-x {k}={str(v).format(proc_rank=mode.proc_rank, proc_num=mode.proc_num)}' f'-x {k}={str(v).format(proc_rank=mode.proc_rank, proc_num=mode.proc_num)}'
...@@ -444,7 +446,21 @@ def run(self): ...@@ -444,7 +446,21 @@ def run(self):
) )
ansible_rc = sum(rc_list) ansible_rc = sum(rc_list)
elif mode.name == 'torch.distributed' or mode.name == 'mpi': elif mode.name == 'torch.distributed' or mode.name == 'mpi':
ansible_rc = self._run_proc(benchmark_name, mode, {'proc_rank': 0}) if not mode.pattern:
ansible_rc = self._run_proc(benchmark_name, mode, {'proc_rank': 0})
else:
with open(self._output_path / 'hostfile', 'r') as f:
host_list = f.read().splitlines()
pattern_hostx = gen_tarffic_pattern_host_group(host_list, mode.pattern)
for host_groups in pattern_hostx:
para_rc_list = Parallel(n_jobs=len(host_groups))(
delayed(self._run_proc)
(benchmark_name, mode, vars={
'proc_rank': 0,
'host_list': host_group,
}) for host_group in host_groups
)
ansible_rc = ansible_rc + sum(para_rc_list)
else: else:
logger.warning('Unknown mode %s.', mode.name) logger.warning('Unknown mode %s.', mode.name)
if ansible_rc != 0: if ansible_rc != 0:
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for traffic pattern config generation module."""
import argparse
import unittest
from superbench.common.utils import gen_tarffic_pattern_host_group
class GenConfigTest(unittest.TestCase):
"""Test the utils for generating config."""
def test_gen_tarffic_pattern_host_group(self):
"""Test the function of generating traffic pattern config from specified mode."""
# test under 8 nodes
hostx = ['node0', 'node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7']
parser = argparse.ArgumentParser(
add_help=False,
usage=argparse.SUPPRESS,
allow_abbrev=False,
)
parser.add_argument(
'--name',
type=str,
default='all-nodes',
required=False,
)
pattern, _ = parser.parse_known_args()
expected_host_group = [[['node0', 'node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7']]]
self.assertEqual(gen_tarffic_pattern_host_group(hostx, pattern), expected_host_group)
...@@ -196,12 +196,39 @@ def test_get_mode_command(self): ...@@ -196,12 +196,39 @@ def test_get_mode_command(self):
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo' f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
), ),
}, },
{
'benchmark_name':
'foo',
'mode': {
'name': 'mpi',
'proc_num': 8,
'proc_rank': 1,
'mca': {},
'pattern': {
'name': 'all-nodes',
},
'env': {
'PATH': None,
'LD_LIBRARY_PATH': None,
},
},
'expected_command': (
'mpirun -tag-output -allow-run-as-root -host node0:8,node1:8 -bind-to numa '
' -x PATH -x LD_LIBRARY_PATH '
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
),
},
] ]
for test_case in test_cases: for test_case in test_cases:
with self.subTest(msg='Testing with case', test_case=test_case): with self.subTest(msg='Testing with case', test_case=test_case):
mode = OmegaConf.create(test_case['mode'])
if 'pattern' in test_case['mode']:
mode.update({'host_list': ['node0', 'node1']})
self.assertEqual( self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command( self.runner._SuperBenchRunner__get_mode_command(
test_case['benchmark_name'], OmegaConf.create(test_case['mode']) test_case['benchmark_name'],
mode,
), test_case['expected_command'] ), test_case['expected_command']
) )
...@@ -210,9 +237,14 @@ def test_get_mode_command(self): ...@@ -210,9 +237,14 @@ def test_get_mode_command(self):
index = test_case['expected_command'].find('sb exec') index = test_case['expected_command'].find('sb exec')
expected_command = test_case['expected_command'][:index] + timeout_str + test_case['expected_command'][ expected_command = test_case['expected_command'][:index] + timeout_str + test_case['expected_command'][
index:] index:]
mode = OmegaConf.create(test_case['mode'])
if 'pattern' in test_case['mode']:
mode.update({'host_list': ['node0', 'node1']})
self.assertEqual( self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command( self.runner._SuperBenchRunner__get_mode_command(
test_case['benchmark_name'], OmegaConf.create(test_case['mode']), test_case['timeout'] test_case['benchmark_name'],
mode,
test_case['timeout'],
), expected_command ), expected_command
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment