Unverified Commit 2fabac52 authored by Yang Wang's avatar Yang Wang Committed by GitHub
Browse files

Auto generate ibstat file by pssh (#402)

**Description**
As MPI can not be inited twice in one same process (by py and c)
Also, MPI env initialized by mpi4py can not be reused in C env

To avoid MPI init issue introduced from mpi4py, rewrite gen_ibstat_file function to generate ibstat file leveraged by pssh

**Major Revision**
- Rewrite gen_ibstat_file function to generate ibstat file leveraged by pssh

**Minor Revision**
- Remove mpi4py dependency

Tested the functionality of topo-aware on 36 nodes cluster
parent 8afaa376
...@@ -155,6 +155,7 @@ def run(self): ...@@ -155,6 +155,7 @@ def run(self):
'omegaconf==2.0.6', 'omegaconf==2.0.6',
'openpyxl>=3.0.7', 'openpyxl>=3.0.7',
'pandas>=1.1.5', 'pandas>=1.1.5',
'pssh @ git+https://github.com/lilydjwg/pssh.git@v2.3.4',
'pyyaml>=5.3', 'pyyaml>=5.3',
'requests>=2.27.1', 'requests>=2.27.1',
'seaborn>=0.11.2', 'seaborn>=0.11.2',
...@@ -169,8 +170,8 @@ def run(self): ...@@ -169,8 +170,8 @@ def run(self):
**x, **x,
'develop': x['dev'] + x['test'], 'develop': x['dev'] + x['test'],
'cpuworker': x['torch'], 'cpuworker': x['torch'],
'amdworker': x['torch'] + x['ort'] + x['mpi'], 'amdworker': x['torch'] + x['ort'],
'nvworker': x['torch'] + x['ort'] + x['mpi'] + x['nvidia'], 'nvworker': x['torch'] + x['ort'] + x['nvidia'],
} }
)( )(
{ {
...@@ -199,7 +200,6 @@ def run(self): ...@@ -199,7 +200,6 @@ def run(self):
'onnx>=1.10.2', 'onnx>=1.10.2',
'onnxruntime-gpu==1.10.0', 'onnxruntime-gpu==1.10.0',
], ],
'mpi': ['mpi4py>=3.1.3'],
'nvidia': ['py3nvml>=0.2.6'], 'nvidia': ['py3nvml>=0.2.6'],
} }
), ),
......
...@@ -254,7 +254,7 @@ def __prepare_config(self): ...@@ -254,7 +254,7 @@ def __prepare_config(self):
if not self._args.hostfile: if not self._args.hostfile:
self._args.hostfile = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'hostfile') self._args.hostfile = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'hostfile')
with open(self._args.hostfile, 'r') as f: with open(self._args.hostfile, 'r') as f:
hosts = f.readlines() hosts = f.read().splitlines()
# Generate the config file if not define # Generate the config file if not define
if self._args.config is None: if self._args.config is None:
self.gen_traffic_pattern(hosts, self._args.pattern, self.__config_path) self.gen_traffic_pattern(hosts, self._args.pattern, self.__config_path)
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
import re import re
import os import os
from pathlib import Path from pathlib import Path
from time import sleep
import networkx as nx import networkx as nx
...@@ -35,37 +36,34 @@ def search(self, pattern, string, flags=0): ...@@ -35,37 +36,34 @@ def search(self, pattern, string, flags=0):
return self.matched return self.matched
def gen_ibstat_file(ibstat_file): def gen_ibstat_file(host_list, ibstat_file):
"""Generate ibstat file for each node with specified path. """Generate ibstat file in each node with specified path.
Args: Args:
host_list (list): list of VM read from hostfile.
ibstat_file (str): path of ibstat output. ibstat_file (str): path of ibstat output.
""" """
from mpi4py import MPI try:
# Only exec on rank0
if not MPI.Is_initialized(): if os.environ.get('OMPI_COMM_WORLD_NODE_RANK') == '0' and os.environ.get('OMPI_COMM_WORLD_LOCAL_RANK') == '0':
MPI.Init() pssh_cmd = "pssh -i -t 5 -p 512 -x '-o StrictHostKeyChecking=no' -H '{}' ".format(' '.join(host_list))
cmd = "'cat /sys/class/infiniband/*/sys_image_guid | tr -d :'" \
comm = MPI.COMM_WORLD r"| sed -e 's/^.*\[SUCCESS\]/VM_hostname/g;s/^.*\[FAILURE\]/VM_hostname/g' | cut -d ' ' -f 1,2"
name = MPI.Get_processor_name() output = os.popen(pssh_cmd + cmd).read()
# Generate ibstat file
# The command to fetch ibstat info ibstate_file_path = Path(ibstat_file)
cmd = r"ibstat | grep -Po 'System image GUID: \K\S+$'" with ibstate_file_path.open(mode='w') as f:
output = os.popen(cmd) f.write(output)
ibstat = 'VM_hostname ' + name + '\n' + str(output.read()) scp_cmd = "pscp -t 5 -p 512 -H '{0}' {1} {1}".format(' '.join(host_list), ibstat_file)
# Distribute ibstat file for others
# Fetch all ibstate from each node errorn = os.system(scp_cmd)
ibstats = comm.allgather(ibstat) if errorn != 0:
logger.error('Failed to distribute ibstate file')
ibstate_file_path = Path(ibstat_file) else:
# Wait for rank0 done
# Filter the duplicate info sleep(5)
ibstat_infos = set(ibstats) except BaseException as e:
logger.error('Failed to generate ibstate file, message: {}.'.format(str(e)))
with ibstate_file_path.open(mode='w') as f:
for ibstat_info in ibstat_infos:
f.write(ibstat_info)
MPI.Finalize()
def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist, max_dist): # noqa: C901 def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist, max_dist): # noqa: C901
...@@ -91,7 +89,9 @@ def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist, ...@@ -91,7 +89,9 @@ def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist,
if not ibstat_file: if not ibstat_file:
ibstat_file = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'ib_traffic_topo_aware_ibstat.txt') ibstat_file = os.path.join(os.environ.get('SB_WORKSPACE', '.'), 'ib_traffic_topo_aware_ibstat.txt')
gen_ibstat_file(ibstat_file) gen_ibstat_file(host_list, ibstat_file)
# sync all the rank
sleep(5)
if not Path(ibstat_file).exists(): if not Path(ibstat_file).exists():
logger.error('ibstat file does not exist.') logger.error('ibstat file does not exist.')
...@@ -125,8 +125,8 @@ def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist, ...@@ -125,8 +125,8 @@ def gen_topo_aware_config(host_list, ibstat_file, ibnetdiscover_file, min_dist,
r = quick_regexp() r = quick_regexp()
if r.search(r'^(VM_hostname)\s+(.+)', line): if r.search(r'^(VM_hostname)\s+(.+)', line):
vmhost = r.groups[1] vmhost = r.groups[1]
elif r.search(r'^(0x)(.+)', line): elif r.search(r'^(?!0{16})([a-f0-9]{16})$', line):
sysimgguid = r.groups[1] sysimgguid = r.groups[0]
sysimgguid_to_vmhost[sysimgguid] = vmhost sysimgguid_to_vmhost[sysimgguid] = vmhost
except BaseException as e: except BaseException as e:
logger.error('Failed to read ibstate file, message: {}.'.format(str(e))) logger.error('Failed to read ibstate file, message: {}.'.format(str(e)))
......
VM_hostname vma414bbc00005I VM_hostname vma414bbc00005I
0x0ff08c4321664e96 0ff08c4321664e96
VM_hostname vma414bbc00005J VM_hostname vma414bbc00005J
0x0ff08c43217299f2 0ff08c43217299f2
VM_hostname vma414bbc00005K VM_hostname vma414bbc00005K
0x0ff08c4321729742 0ff08c4321729742
VM_hostname vma414bbc00005L VM_hostname vma414bbc00005L
0x0ff08c4321729986 0ff08c4321729986
VM_hostname vma414bbc00005M VM_hostname vma414bbc00005M
0x1c34da03005baca4 1c34da03005baca4
VM_hostname vma414bbc00005N VM_hostname vma414bbc00005N
0x0ff08c432166275a 0ff08c432166275a
VM_hostname vma414bbc00005O VM_hostname vma414bbc00005O
0x0ff08c4321664b66 0ff08c4321664b66
VM_hostname vma414bbc00005P VM_hostname vma414bbc00005P
0x0ff08c432166274e 0ff08c432166274e
VM_hostname vma414bbc00005Q VM_hostname vma414bbc00005Q
0x0ff08c4321664f2a 0ff08c4321664f2a
VM_hostname vma414bbc00005R VM_hostname vma414bbc00005R
0x043f720300e61112 043f720300e61112
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment