Unverified Commit 0993db75 authored by one's avatar one Committed by GitHub
Browse files

Runner: Add local numactl GPU affinity support (#6)

- Add `numactl` support for local runner modes, including `cpunodebind`, `membind`, and `physcpubind`.
- Add `gpu_affinity` resolution through `sb node topo --get gpu-numa-affinity --gpu-id`.
- Add `sb node topo` support for GPU NUMA topology queries.
- Update BW1000 config to use the new local `numactl` semantics.
- Document the new `numactl` mode fields and limitations.
parent 800b962a
......@@ -371,6 +371,7 @@ node_num: int
env: dict
mca: dict
prefix: str
numactl: dict
parallel: bool
```
......@@ -401,6 +402,7 @@ Some attributes may only be suitable for specific mode.
| `proc_num` | ✓ | ✓ | ✓ |
| `node_num` | ✘ | ✓ | ✓ |
| `prefix` | ✓ | ✘ | ✘ |
| `numactl` | ✓ | ✘ | ✘ |
| `env` | ✓ | ✓ | ✓ |
| `mca` | ✘ | ✘ | ✓ |
| `parallel` | ✓ | ✘ | ✘ |
......@@ -437,6 +439,33 @@ Available variables in formatted string include:
So `prefix: CUDA_VISIBLE_DEVICES={proc_rank}` will be expressed as `CUDA_VISIBLE_DEVICES=0`, `CUDA_VISIBLE_DEVICES=1`, etc.
### `numactl`
`numactl` binding to use in `local` mode. The generated `numactl` command runs after `prefix` and before the benchmark command.
```yaml
numactl:
cpunodebind: gpu_affinity
membind: gpu_affinity
physcpubind: null
gpu_id: "{proc_rank}"
```
Supported fields:
+ `cpunodebind`: CPU NUMA node for `numactl -N`.
+ `membind`: memory NUMA node for `numactl -m`.
+ `physcpubind`: physical CPU list for `numactl -C`.
+ `gpu_id`: GPU id used when `cpunodebind` or `membind` is GPU-relative. Defaults to `{proc_rank}`.
`cpunodebind` and `membind` can be explicit NUMA node values, YAML lists, formatted strings using `proc_rank` and `proc_num`, or `gpu_affinity`. GPU-relative values are evaluated on the target node through `sb node topo --get gpu-numa-affinity --gpu-id`.
`physcpubind` can be an explicit CPU list or a formatted string using `proc_rank` and `proc_num`. It does not support `gpu_affinity`.
Limitations:
+ `numactl` is only supported in `local` mode.
+ `gpu_affinity` requires `sb node topo --get gpu-numa-affinity` to work on the target node.
+ `gpu_id` defaults to `{proc_rank}`. Set it explicitly if the local process rank does not match the GPU id used by node topology.
### `env`
Environment variables to use in the mode, in a flatten key-value dictionary.
......
......@@ -28,6 +28,7 @@ def load_command_table(self, args):
g.command('list-parameters', 'benchmark_list_params_command_handler')
with CommandGroup(self, 'node', 'superbench.cli._node_handler#{}') as g:
g.command('info', 'info_command_handler')
g.command('topo', 'topo_command_handler')
with CommandGroup(self, 'result', 'superbench.cli._result_handler#{}') as g:
g.command('diagnosis', 'diagnosis_command_handler')
g.command('summary', 'summary_command_handler')
......@@ -81,6 +82,10 @@ def load_arguments(self, command):
with ArgumentsContext(self, 'benchmark') as ac:
ac.argument('name', options_list=('--name', '-n'), type=str, help='Benchmark name or regular expression.')
with ArgumentsContext(self, 'node topo') as ac:
ac.argument('get', options_list=('--get', ), type=str, help='Topology field to get.')
ac.argument('gpu_id', options_list=('--gpu-id', ), type=int, help='GPU id.')
with ArgumentsContext(self, 'result') as ac:
ac.argument('raw_data_file', options_list=('--data-file', '-d'), type=str, help='Path to raw data file.')
ac.argument('rule_file', options_list=('--rule-file', '-r'), type=str, help='Path to rule file.')
......
......@@ -109,6 +109,16 @@
text: {cli_name} node info
""".format(cli_name=CLI_NAME)
helps['node topo'] = """
type: command
short-summary: Get node topology information.
examples:
- name: get GPU NUMA map
text: {cli_name} node topo --get gpu-numa-map
- name: get GPU NUMA affinity
text: {cli_name} node topo --get gpu-numa-affinity --gpu-id 0
""".format(cli_name=CLI_NAME)
helps['result'] = """
type: group
short-summary: Process or analyze the results of SuperBench benchmarks.
......
......@@ -6,8 +6,14 @@
from pathlib import Path
import json
from knack.util import CLIError
from superbench.tools import SystemInfo
from superbench.common.utils import create_sb_output_dir
from superbench.common.utils.gpu_topology import (
get_gpu_numa_affinity,
get_gpu_numa_map,
)
def info_command_handler(output_dir=None):
......@@ -28,3 +34,21 @@ def info_command_handler(output_dir=None):
except Exception as ex:
raise RuntimeError('Failed to get node info.') from ex
return info
def topo_command_handler(get=None, gpu_id=None):
"""Get node topology information.
Args:
get (str): Topology field to get.
gpu_id (int): GPU id.
"""
if get == 'gpu-numa-map':
print(json.dumps(get_gpu_numa_map()))
return
if get != 'gpu-numa-affinity':
raise CLIError('Unsupported topology field: {}.'.format(get))
if gpu_id is None:
raise CLIError('--gpu-id is required for {}.'.format(get))
print(get_gpu_numa_affinity(gpu_id))
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""GPU topology utilities."""
import json
import re
from superbench.common.utils.process import run_command
def _validate_numa_node_list(value):
"""Validate a numactl NUMA node list."""
value = str(value)
if not value:
raise ValueError('empty NUMA node list')
for item in value.split(','):
if re.fullmatch(r'\d+', item):
continue
match = re.fullmatch(r'(\d+)-(\d+)', item)
if match and int(match.group(1)) <= int(match.group(2)):
continue
raise ValueError('invalid NUMA node list: {}'.format(value))
def get_gpu_numa_map():
"""Get NUMA topology for all local GPUs.
Returns:
dict: GPU NUMA topology keyed by GPU id.
"""
output = run_command('hy-smi --showtoponuma --json', quiet=True)
if output.returncode != 0:
raise RuntimeError('Failed to get GPU NUMA topology from hy-smi - message: {}'.format(output.stdout))
try:
hygon_topology = json.loads(output.stdout)
gpu_numa_map = {}
for card, card_topology in hygon_topology.items():
match = re.fullmatch(r'card(\d+)', card)
if not match:
continue
gpu_id = int(match.group(1))
numa_node = card_topology['(Topology) Numa Node']
numa_affinity = card_topology.get('(Topology) Numa Affinity', numa_node)
int(numa_node)
_validate_numa_node_list(numa_affinity)
gpu_numa_map[gpu_id] = {
'numa_node': numa_node,
'numa_affinity': numa_affinity,
}
if not gpu_numa_map:
raise ValueError('no card topology found')
except Exception as e:
raise RuntimeError('Failed to parse GPU NUMA topology from hy-smi - message: {}'.format(e))
return gpu_numa_map
def get_gpu_numa_affinity(gpu_id):
"""Get NUMA affinity for a GPU.
Args:
gpu_id (int): GPU id.
Returns:
str: GPU NUMA affinity.
"""
try:
gpu_id = int(gpu_id)
return get_gpu_numa_map()[gpu_id]['numa_affinity']
except Exception as e:
raise RuntimeError('Failed to get GPU NUMA affinity - gpu_id: {}, message: {}'.format(gpu_id, e))
......@@ -11,6 +11,9 @@ superbench:
- name: local
proc_num: 8
prefix: HIP_VISIBLE_DEVICES={proc_rank}
numactl:
cpunodebind: gpu_affinity
membind: gpu_affinity
parallel: yes
default_pytorch_mode: &default_pytorch_mode
enable: false
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Build numactl command fragments for runner modes."""
from omegaconf import ListConfig
GPU_AFFINITY = 'gpu_affinity'
GPU_NUMA_AFFINITY_ENV = 'SB_GPU_NUMA_AFFINITY'
def _format_template_value(value, mode):
"""Format a mode template value."""
if isinstance(value, str):
return value.format(proc_rank=mode.proc_rank, proc_num=mode.proc_num)
if isinstance(value, (list, tuple, ListConfig)):
return ','.join(_format_template_value(item, mode) for item in value)
return str(value)
def _is_disabled_value(value):
"""Return whether a config value disables the corresponding option."""
return value is None or value is False or (isinstance(value, str) and value.lower() in ['none', 'null', 'false'])
def _resolve_node_value(value, mode):
"""Resolve a numactl NUMA-node value.
Args:
value: numactl node binding config value.
mode (DictConfig): Runner mode.
Returns:
tuple[str | None, bool]: Resolved value and whether it uses GPU affinity.
"""
if _is_disabled_value(value):
return None, False
if isinstance(value, str) and value.lower() == GPU_AFFINITY:
return '${%s}' % GPU_NUMA_AFFINITY_ENV, True
return _format_template_value(value, mode), False
def _resolve_cpu_value(value, mode):
"""Resolve a numactl CPU-list value."""
if _is_disabled_value(value):
return None
if isinstance(value, str) and value.lower() == GPU_AFFINITY:
raise ValueError('gpu_affinity is not supported for numactl.physcpubind.')
return _format_template_value(value, mode)
def get_local_numactl_command(mode):
"""Get setup and numactl command fragments for local mode.
Args:
mode (DictConfig): Runner mode.
Returns:
tuple[str, str]: Setup command and numactl command.
"""
if 'numactl' not in mode:
return '', ''
numactl_config = mode.numactl
if numactl_config is None:
return '', ''
cpunodebind, cpunodebind_uses_gpu = _resolve_node_value(numactl_config.get('cpunodebind', None), mode)
membind, membind_uses_gpu = _resolve_node_value(numactl_config.get('membind', None), mode)
physcpubind = _resolve_cpu_value(numactl_config.get('physcpubind', None), mode)
if cpunodebind is None and membind is None and physcpubind is None:
return '', ''
setup_command = ''
if cpunodebind_uses_gpu or membind_uses_gpu:
gpu_id = _format_template_value(numactl_config.get('gpu_id', '{proc_rank}'), mode)
setup_command = '{}=$(sb node topo --get gpu-numa-affinity --gpu-id {})'.format(
GPU_NUMA_AFFINITY_ENV,
gpu_id,
)
numactl_parts = ['numactl']
if cpunodebind is not None:
numactl_parts.extend(['-N', cpunodebind])
if membind is not None:
numactl_parts.extend(['-m', membind])
if physcpubind is not None:
numactl_parts.extend(['-C', physcpubind])
return setup_command, ' '.join(numactl_parts)
......@@ -21,6 +21,7 @@
from superbench.common.utils.lazy_import import LazyImport
from superbench.benchmarks import ReduceType, Reducer
from superbench.monitor import MonitorRecord
from superbench.runner.numactl import get_local_numactl_command
AnsibleClient = LazyImport('superbench.runner.ansible', 'AnsibleClient')
......@@ -158,14 +159,19 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None):
) if enable_nsys and mode.proc_rank == 0 else ''
# Build the command parts, only including trace if it's not empty
command_parts = []
setup_command, numactl_command = get_local_numactl_command(mode)
prefix = mode.prefix.format(proc_rank=mode.proc_rank, proc_num=mode.proc_num)
if prefix:
command_parts.append(prefix)
if numactl_command:
command_parts.append(numactl_command)
if trace_command:
command_parts.append(trace_command)
command_parts.append(exec_command)
mode_command = ' '.join(command_parts)
mode_command = f'PROC_RANK={mode.proc_rank} {mode_command}'
if setup_command:
mode_command = f'{setup_command} && {mode_command}'
elif mode.name == 'torch.distributed':
# TODO: replace with torch.distributed.run in v1.9
# TODO: only supports node_num=1 and node_num=all currently
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""CLI node handler test."""
import io
import unittest
import contextlib
from unittest import mock
from knack.util import CLIError
import superbench.cli._node_handler as node_handler
class CLINodeHandlerTestCase(unittest.TestCase):
"""A class for node handler test cases."""
@mock.patch('superbench.cli._node_handler.get_gpu_numa_map')
def test_topo_command_handler_gpu_numa_map(self, mock_get_gpu_numa_map):
"""Test topo command handler gets GPU NUMA map."""
mock_get_gpu_numa_map.return_value = {
1: {
'numa_node': '0',
'numa_affinity': '1',
},
}
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
node_handler.topo_command_handler(get='gpu-numa-map')
self.assertEqual(stdout.getvalue(), '{"1": {"numa_node": "0", "numa_affinity": "1"}}\n')
@mock.patch('superbench.cli._node_handler.get_gpu_numa_affinity')
def test_topo_command_handler_gpu_numa_affinity(self, mock_get_gpu_numa_affinity):
"""Test topo command handler gets GPU NUMA affinity."""
mock_get_gpu_numa_affinity.return_value = '1'
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
node_handler.topo_command_handler(get='gpu-numa-affinity', gpu_id=1)
self.assertEqual(stdout.getvalue(), '1\n')
mock_get_gpu_numa_affinity.assert_called_once_with(1)
def test_topo_command_handler_invalid_get(self):
"""Test topo command handler rejects invalid get value."""
with self.assertRaises(CLIError):
node_handler.topo_command_handler(get='invalid', gpu_id=1)
def test_topo_command_handler_missing_gpu_id(self):
"""Test topo command handler requires gpu_id."""
with self.assertRaises(CLIError):
node_handler.topo_command_handler(get='gpu-numa-affinity')
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for GPU topology utilities."""
import json
import unittest
from unittest import mock
from superbench.common.utils.gpu_topology import (
get_gpu_numa_affinity,
get_gpu_numa_map,
)
class GpuTopologyTest(unittest.TestCase):
"""Test GPU topology utilities."""
@mock.patch('superbench.common.utils.gpu_topology.run_command')
def test_get_gpu_numa_map(self, mock_run_command):
"""Test get_gpu_numa_map parses hy-smi output."""
mock_run_command.return_value.returncode = 0
mock_run_command.return_value.stdout = json.dumps(
{
'card0': {
'(Topology) Numa Node': '3',
'(Topology) Numa Affinity': '3',
},
'card1': {
'(Topology) Numa Node': '1',
'(Topology) Numa Affinity': '1,2',
},
'card2': {
'(Topology) Numa Node': '2',
'(Topology) Numa Affinity': '2-3',
},
}
)
self.assertEqual(
get_gpu_numa_map(), {
0: {
'numa_node': '3',
'numa_affinity': '3',
},
1: {
'numa_node': '1',
'numa_affinity': '1,2',
},
2: {
'numa_node': '2',
'numa_affinity': '2-3',
},
}
)
mock_run_command.assert_called_once_with('hy-smi --showtoponuma --json', quiet=True)
@mock.patch('superbench.common.utils.gpu_topology.run_command')
def test_get_gpu_numa_map_command_failure(self, mock_run_command):
"""Test get_gpu_numa_map command failure."""
mock_run_command.return_value.returncode = 1
mock_run_command.return_value.stdout = 'hy-smi failed'
with self.assertRaisesRegex(RuntimeError, 'Failed to get GPU NUMA topology from hy-smi'):
get_gpu_numa_map()
@mock.patch('superbench.common.utils.gpu_topology.run_command')
def test_get_gpu_numa_map_parse_failure(self, mock_run_command):
"""Test get_gpu_numa_map parse failure."""
mock_run_command.return_value.returncode = 0
mock_run_command.return_value.stdout = json.dumps({'card0': {}})
with self.assertRaisesRegex(RuntimeError, 'Failed to parse GPU NUMA topology from hy-smi'):
get_gpu_numa_map()
@mock.patch('superbench.common.utils.gpu_topology.run_command')
def test_get_gpu_numa_map_invalid_affinity(self, mock_run_command):
"""Test get_gpu_numa_map rejects invalid NUMA affinity."""
mock_run_command.return_value.returncode = 0
mock_run_command.return_value.stdout = json.dumps(
{
'card0': {
'(Topology) Numa Node': '0',
'(Topology) Numa Affinity': '0,a',
},
}
)
with self.assertRaisesRegex(RuntimeError, 'invalid NUMA node list'):
get_gpu_numa_map()
@mock.patch('superbench.common.utils.gpu_topology.get_gpu_numa_map')
def test_get_gpu_numa_affinity(self, mock_get_gpu_numa_map):
"""Test get_gpu_numa_affinity returns GPU NUMA affinity."""
mock_get_gpu_numa_map.return_value = {
1: {
'numa_node': '0',
'numa_affinity': '1',
},
}
self.assertEqual(get_gpu_numa_affinity(1), '1')
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Tests for numactl command builder."""
import unittest
from omegaconf import OmegaConf
from superbench.runner.numactl import get_local_numactl_command
class NumactlTestCase(unittest.TestCase):
"""A class for numactl command builder test cases."""
def test_get_local_numactl_command_without_config(self):
"""Test no numactl command is generated without config."""
mode = OmegaConf.create({
'name': 'local',
'proc_num': 2,
'proc_rank': 1,
})
self.assertEqual(get_local_numactl_command(mode), ('', ''))
def test_get_local_numactl_command_gpu_affinity(self):
"""Test GPU affinity generates setup command and node bindings."""
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 2,
'proc_rank': 1,
'numactl': {
'cpunodebind': 'gpu_affinity',
'membind': 'gpu_affinity',
},
}
)
self.assertEqual(
get_local_numactl_command(mode), (
'SB_GPU_NUMA_AFFINITY=$(sb node topo --get gpu-numa-affinity --gpu-id 1)',
'numactl -N ${SB_GPU_NUMA_AFFINITY} -m ${SB_GPU_NUMA_AFFINITY}',
)
)
def test_get_local_numactl_command_template_values(self):
"""Test template values are formatted with local process variables."""
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 8,
'proc_rank': 6,
'numactl': {
'cpunodebind': '$(({proc_rank}/2))',
'membind': '$(({proc_num}/4))',
'physcpubind': '$(({proc_rank}*16))-$(({proc_rank}*16+15))',
},
}
)
self.assertEqual(
get_local_numactl_command(mode), ('', 'numactl -N $((6/2)) -m $((8/4)) -C $((6*16))-$((6*16+15))')
)
def test_get_local_numactl_command_list_values(self):
"""Test list values are formatted as numactl node and CPU lists."""
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 8,
'proc_rank': 6,
'numactl': {
'cpunodebind': [0, 1],
'membind': ['{proc_rank}', 7],
'physcpubind': ['0-15', '32-47'],
},
}
)
self.assertEqual(get_local_numactl_command(mode), ('', 'numactl -N 0,1 -m 6,7 -C 0-15,32-47'))
def test_get_local_numactl_command_disabled_values(self):
"""Test disabled values do not generate numactl options."""
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 2,
'proc_rank': 1,
'numactl': {
'cpunodebind': 'none',
'membind': False,
'physcpubind': None,
},
}
)
self.assertEqual(get_local_numactl_command(mode), ('', ''))
def test_get_local_numactl_command_rejects_gpu_affinity_for_physcpubind(self):
"""Test physcpubind rejects GPU affinity."""
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 2,
'proc_rank': 1,
'numactl': {
'physcpubind': 'gpu_affinity',
},
}
)
with self.assertRaisesRegex(ValueError, 'gpu_affinity is not supported for numactl.physcpubind'):
get_local_numactl_command(mode)
......@@ -597,6 +597,40 @@ def fake_get_shell_config(cmd):
self.assertIn('docker exec sb-custom bash -lc', captured['cmd'])
@mock.patch('superbench.runner.ansible.AnsibleClient.run')
def test_run_proc_injects_local_numactl_physcpubind(self, mock_ansible_client_run):
"""Test _run_proc injects local numactl command."""
mock_ansible_client_run.return_value = 0
self.runner._sb_benchmarks = {'foo': {}}
captured = {}
def fake_get_shell_config(cmd):
captured['cmd'] = cmd
return {'module_args': cmd, 'cmdline': '', 'host_pattern': 'localhost', 'module': 'shell'}
self.runner._ansible_client.get_shell_config = fake_get_shell_config
mode = OmegaConf.create(
{
'name': 'local',
'proc_num': 2,
'env': {},
'prefix': 'HIP_VISIBLE_DEVICES={proc_rank}',
'numactl': {
'cpunodebind': 'gpu_affinity',
'membind': 'gpu_affinity',
'physcpubind': '$(({proc_rank}*16))-$(({proc_rank}*16+15))',
},
}
)
self.runner._run_proc('foo', mode, {'proc_rank': 1})
self.assertIn(
'SB_GPU_NUMA_AFFINITY=$(sb node topo --get gpu-numa-affinity --gpu-id 1) && '
'PROC_RANK=1 HIP_VISIBLE_DEVICES=1 numactl -N ${SB_GPU_NUMA_AFFINITY} '
'-m ${SB_GPU_NUMA_AFFINITY} -C $((1*16))-$((1*16+15)) sb exec', captured['cmd']
)
@mock.patch('superbench.runner.ansible.AnsibleClient.run')
def test_run_proc_no_docker_keeps_tmp_env_source(self, mock_ansible_client_run):
"""Test _run_proc still sources /tmp/sb.env in no_docker mode."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment