Unverified Commit fb7d4a73 authored by Yifan Xiong's avatar Yifan Xiong Committed by GitHub
Browse files

Runner - Fetch benchmarks results on all nodes (#116)

Fetch benchmarks results on all nodes, will rsync after each benchmark.
The results directory structure on control node is as follows:

```
outputs/
└── datetime
    ├── nodes
    │   └── node-0
    │       ├── benchmarks
    │       │   ├── benchmark-0
    │       │   │   ├── rank-0
    │       │   │   │   └── results.json
    │       └── sb-exec.log
    ├── sb-run.log
    └── sb.config.yaml
```
parent 60ba63bb
......@@ -21,5 +21,5 @@ postinstall:
ifeq ($(shell which ansible-galaxy),)
$(error 'Cannot find ansible-galaxy')
else
ansible-galaxy collection install ansible.utils community.crypto
ansible-galaxy collection install ansible.posix ansible.utils community.crypto
endif
......@@ -4,9 +4,17 @@
"""Exposes the interface of SuperBench common utilities."""
from superbench.common.utils.logging import SuperBenchLogger, logger
from superbench.common.utils.file_handler import create_sb_output_dir, get_sb_config
from superbench.common.utils.file_handler import rotate_dir, create_sb_output_dir, get_sb_config
from superbench.common.utils.lazy_import import LazyImport
nv_helper = LazyImport('superbench.common.utils.nvidia_helper')
__all__ = ['SuperBenchLogger', 'logger', 'create_sb_output_dir', 'get_sb_config', 'LazyImport', 'nv_helper']
__all__ = [
'LazyImport',
'SuperBenchLogger',
'create_sb_output_dir',
'get_sb_config',
'logger',
'nv_helper',
'rotate_dir',
]
......@@ -3,12 +3,34 @@
"""Utilities for file."""
import itertools
from pathlib import Path
from datetime import datetime
import yaml
from omegaconf import OmegaConf
from superbench.common.utils import logger
def rotate_dir(target_dir):
"""Rotate directory if it is not empty.
Args:
target_dir (str): Target directory path.
"""
try:
if target_dir.is_dir() and any(target_dir.iterdir()):
logger.warning('Directory %s is not empty.', str(target_dir))
for i in itertools.count(start=1):
backup_dir = target_dir.with_name(f'{target_dir.name}.{i}')
if not backup_dir.is_dir():
target_dir.rename(backup_dir)
break
except Exception:
logger.exception('Failed to rotate directory %s.', str(target_dir))
raise
def create_sb_output_dir(output_dir=None):
"""Create output directory.
......@@ -24,7 +46,11 @@ def create_sb_output_dir(output_dir=None):
if not output_dir:
output_dir = str(Path('outputs', datetime.now().strftime('%Y-%m-%d_%H-%M-%S')))
output_path = Path(output_dir).expanduser().resolve()
try:
output_path.mkdir(mode=0o755, parents=True, exist_ok=True)
except Exception:
logger.exception('Failed to create directory %s.', str(output_path))
raise
return output_dir
......
......@@ -3,14 +3,14 @@
"""SuperBench Executor."""
import os
import json
import itertools
from pathlib import Path
from omegaconf import ListConfig
from superbench.benchmarks import Platform, Framework, BenchmarkRegistry
from superbench.common.utils import SuperBenchLogger, logger
from superbench.common.utils import SuperBenchLogger, logger, rotate_dir
class SuperBenchExecutor():
......@@ -122,21 +122,31 @@ def __exec_benchmark(self, context, log_suffix):
logger.error('Executor failed in %s.', log_suffix)
return None
def __create_benchmark_dir(self, benchmark_name):
"""Create output directory for benchmark.
def __get_benchmark_dir(self, benchmark_name):
"""Get output directory for benchmark's current rank.
Args:
benchmark_name (str): Benchmark name.
"""
benchmark_output_dir = self._output_path / 'benchmarks' / benchmark_name
if benchmark_output_dir.is_dir() and any(benchmark_output_dir.iterdir()):
logger.warning('Benchmark output directory %s is not empty.', str(benchmark_output_dir))
for i in itertools.count(start=1):
backup_dir = benchmark_output_dir.with_name('{}.{}'.format(benchmark_name, i))
if not backup_dir.is_dir():
benchmark_output_dir.rename(backup_dir)
for rank_env in ['PROC_RANK', 'LOCAL_RANK']:
if os.getenv(rank_env):
benchmark_output_dir /= 'rank{}'.format(os.getenv(rank_env))
break
benchmark_output_dir.mkdir(mode=0o755, parents=True, exist_ok=True)
return benchmark_output_dir
def __create_benchmark_dir(self, benchmark_name):
"""Create output directory for benchmark.
Args:
benchmark_name (str): Benchmark name.
"""
rotate_dir(self._output_path / 'benchmarks' / benchmark_name)
try:
self.__get_benchmark_dir(benchmark_name).mkdir(mode=0o755, parents=True, exist_ok=True)
except Exception:
logger.exception('Failed to create output directory for benchmark %s.', benchmark_name)
raise
def __write_benchmark_results(self, benchmark_name, benchmark_results):
"""Write benchmark results.
......@@ -145,7 +155,7 @@ def __write_benchmark_results(self, benchmark_name, benchmark_results):
benchmark_name (str): Benchmark name.
benchmark_results (dict): Benchmark results.
"""
with (self._output_path / 'benchmarks' / benchmark_name / 'results.json').open(mode='w') as f:
with (self.__get_benchmark_dir(benchmark_name) / 'results.json').open(mode='w') as f:
json.dump(benchmark_results, f, indent=2)
def exec(self):
......
- name: Fetch Results
hosts: all
gather_facts: true
vars:
workspace: '{{ ansible_user_dir }}/sb-workspace'
tasks:
- name: Synchronize Output Directory
ansible.posix.synchronize:
mode: pull
src: '{{ workspace }}/{{ sb_output_dir }}/'
dest: '{{ absolute_output_dir }}/nodes/{{ ansible_hostname }}'
......@@ -103,6 +103,7 @@ def __get_mode_command(self, benchmark_name, mode):
prefix=mode.prefix.format(proc_rank=mode.proc_rank, proc_num=mode.proc_num),
command=exec_command,
)
mode_command = f'PROC_RANK={mode.proc_rank} {mode_command.strip()}'
elif mode.name == 'torch.distributed':
# TODO: replace with torch.distributed.run in v1.9
# TODO: only supports node_num=1 and node_num=all currently
......@@ -156,6 +157,23 @@ def check_env(self): # pragma: no cover
)
)
def fetch_results(self): # pragma: no cover
"""Fetch benchmark results on all nodes."""
try:
(self._output_path / 'nodes').mkdir(mode=0o755, parents=True, exist_ok=True)
except Exception:
logger.exception('Failed to create directory %s.', str(self._output_path / 'nodes'))
raise
self._ansible_client.run(
self._ansible_client.get_playbook_config(
'fetch_results.yaml',
extravars={
'sb_output_dir': self._sb_output_dir,
'absolute_output_dir': str(self._output_path),
}
)
)
def _run_proc(self, benchmark_name, mode, vars):
"""Run the process.
......@@ -196,3 +214,4 @@ def run(self):
)
elif mode.name == 'torch.distributed':
self._run_proc(benchmark_name, mode, {'proc_rank': 0})
self.fetch_results()
......@@ -47,14 +47,16 @@ def test_get_mode_command(self):
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo',
},
{
'benchmark_name': 'foo',
'benchmark_name':
'foo',
'mode': {
'name': 'local',
'proc_num': 1,
'proc_rank': 0,
'prefix': '',
},
'expected_command':
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo',
f'PROC_RANK=0 sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo',
},
{
'benchmark_name':
......@@ -66,7 +68,7 @@ def test_get_mode_command(self):
'prefix': 'CUDA_VISIBLE_DEVICES={proc_rank} numactl -c $(({proc_rank}/2))'
},
'expected_command': (
'CUDA_VISIBLE_DEVICES=6 numactl -c $((6/2)) '
'PROC_RANK=6 CUDA_VISIBLE_DEVICES=6 numactl -c $((6/2)) '
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
),
},
......@@ -79,8 +81,10 @@ def test_get_mode_command(self):
'proc_rank': 1,
'prefix': 'RANK={proc_rank} NUM={proc_num}'
},
'expected_command':
f'RANK=1 NUM=16 sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo',
'expected_command': (
'PROC_RANK=1 RANK=1 NUM=16 '
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
),
},
{
'benchmark_name':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment