#!/usr/bin/env python3 import subprocess import xml.etree.ElementTree as ET import os import re import argparse NIC_NUMA_FILE = "/sys/class/infiniband/{}/device/numa_node" NIC_HCA_TYPE_FILE = "/sys/class/infiniband/{}/hca_type" HY_SMI_TOPO_CMD = "hy-smi --showtopo" IBSTAT_CMD = "ibstat" RESET = "\033[0m" RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" DCU_NUMA_RULES = { (0, 1): 0, (2, 3): 3, (4, 5): 4, (6, 7): 7, } NUMA_GROUP_RULES = { (0, 1, 2, 3): 0, (4, 5, 6, 7): 1, } GPU_ARCH = "gfx936" GPU_COUNT = 8 def get_cpu_info(): result = { 'architecture': 'x86_64', 'vendor': 'HygonGenuine', 'success': False } try: output = subprocess.run(['lscpu'], capture_output=True, text=True, check=True) for line in output.stdout.split('\n'): if 'Architecture:' in line or '架构:' in line: parts = line.split(':') if len(parts) > 1: result['architecture'] = parts[1].strip() if 'Vendor ID:' in line or '厂商 ID:' in line: parts = line.split(':') if len(parts) > 1: vendor = parts[1].strip() if vendor and vendor != 'Unknown': if 'Hygon' in vendor: result['vendor'] = 'HygonGenuine' else: result['vendor'] = vendor result['success'] = True except Exception as e: print(f"{YELLOW}Warning: Failed to get CPU info: {e}{RESET}") return result def get_nic_info_from_ibstat(): result = { 'nics': {}, 'success': False, 'error': None } try: output = subprocess.run([IBSTAT_CMD], capture_output=True, text=True, check=True) current_nic = None for line in output.stdout.split('\n'): if line.startswith("CA '"): current_nic = line.split("'")[1] result['nics'][current_nic] = {'rate': None, 'link_layer': None} elif 'Rate:' in line and current_nic: rate = line.split(':')[1].strip() result['nics'][current_nic]['rate'] = rate elif 'Link layer:' in line and current_nic: link_layer = line.split(':')[1].strip() result['nics'][current_nic]['link_layer'] = link_layer result['success'] = True except FileNotFoundError: result['error'] = "ibstat command not found" print(f"{YELLOW}Warning: ibstat not found, using sysfs fallback{RESET}") except Exception as e: result['error'] = str(e) print(f"{YELLOW}Warning: Failed to get NIC info from ibstat: {e}{RESET}") return result def get_nic_info(nic_type=None): nic_info_map = {} infiniband_path = "/sys/class/infiniband" for fname in os.listdir(infiniband_path): if fname.startswith("mlx5_"): hca_type_file = NIC_HCA_TYPE_FILE.format(fname) numa_node_file = NIC_NUMA_FILE.format(fname) hca_type = None numa = None try: with open(hca_type_file, 'r') as f: hca_type = f.read().strip() except Exception as e: print(f"Warning: Failed to read hca_type for {fname}: {e}") continue try: with open(numa_node_file, 'r') as f: numa = int(f.read().strip()) except Exception as e: print(f"Warning: Failed to read NUMA for {fname}: {e}") continue nic_info_map[fname] = { 'hca_type': hca_type, 'numa': numa } return nic_info_map def filter_nics_by_type(nic_info_map, nic_type): filtered = {} for nic, info in nic_info_map.items(): if info['hca_type'] == nic_type: filtered[nic] = info original_nics = list(nic_info_map.keys()) filtered_nics = list(filtered.keys()) print(f"\n[Filter] Filtered by NIC type: {nic_type}") print(f" Original NICs: {original_nics}") print(f" Filtered NICs: {filtered_nics}") return filtered def get_dcu_numa(): dcu_numa_map = {} try: output = subprocess.run(HY_SMI_TOPO_CMD.split(), capture_output=True, text=True, check=True) for line in output.stdout.split('\n'): match = re.search(r'HCU\[(\d+)\].*?Numa Node\s+(\d+)', line, re.IGNORECASE) if match: dcu_id = int(match.group(1)) numa = int(match.group(2)) dcu_numa_map[dcu_id] = numa except Exception as e: print(f"Error getting DCU NUMA info: {e}") return {} return dcu_numa_map def get_effective_numa(dcu_id, dcu_numa_map): physical_numa = dcu_numa_map.get(dcu_id) if physical_numa is None: return None for dcu_pair, effective_dcu in DCU_NUMA_RULES.items(): if dcu_id in dcu_pair: return dcu_numa_map.get(effective_dcu, physical_numa) return physical_numa def map_numa_to_group(numa): for numa_range, group in NUMA_GROUP_RULES.items(): if numa in numa_range: return group return 0 def group_nics_by_numa(nic_info_map): numa_nics = {} for nic, info in nic_info_map.items(): numa = info['numa'] if numa not in numa_nics: numa_nics[numa] = [] numa_nics[numa].append(nic) return numa_nics def group_dcus_by_effective_numa(dcu_numa_map): numa_dcus = {} for dcu_id in range(8): effective_numa = get_effective_numa(dcu_id, dcu_numa_map) if effective_numa not in numa_dcus: numa_dcus[effective_numa] = [] numa_dcus[effective_numa].append(dcu_id) return numa_dcus def generate_group_name(cpu_info, nic_info_ibstat, nic_info_map, nic_type_filter): if not cpu_info['success'] or not nic_info_ibstat['success']: print(f"{YELLOW}Warning: Some information is incomplete, using 'topo' as group name{RESET}") return "topo" architecture = cpu_info['architecture'] vendor = cpu_info['vendor'] filtered_nics = {k: v for k, v in nic_info_map.items() if nic_type_filter is None or v['hca_type'] == nic_type_filter} all_mlx_nics = {k: v for k, v in nic_info_map.items() if k.startswith('mlx5_')} all_mlx_count = len(all_mlx_nics) if all_mlx_count == 0: print(f"{YELLOW}Warning: No mlx NICs found, using 'topo' as group name{RESET}") return "topo" nic_prefix = "mlx5" link_layers = set() for nic, info in nic_info_ibstat['nics'].items(): if nic in filtered_nics: if info.get('link_layer'): link_layers.add(info['link_layer']) if len(link_layers) == 1: link_type = list(link_layers)[0] elif len(link_layers) > 1: link_type = "Mixed" else: link_type = "Unknown" rates = [] for nic in sorted(all_mlx_nics.keys(), key=lambda x: int(x.split('_')[1])): if nic in nic_info_ibstat['nics']: rate = nic_info_ibstat['nics'][nic].get('rate') if rate: try: rates.append(int(float(rate))) except: rates.append(0) else: rates.append(0) else: rates.append(0) nic_count = all_mlx_count rate_str = "-".join(map(str, rates)) if rates else "0" vendor_clean = vendor.replace(" ", "_") link_type_clean = link_type.replace(" ", "_") group_name = f"{GPU_ARCH}_{GPU_COUNT}_{architecture}_{vendor_clean}_{nic_prefix}_{nic_count}_{link_type_clean}_{rate_str}_1_8_1" return group_name def generate_xml_mapping(nic_info_map, dcu_numa_map, nic_type, group_name): nic_numa_map = {nic: info['numa'] for nic, info in nic_info_map.items()} numa_nics = group_nics_by_numa(nic_info_map) numa_dcus = group_dcus_by_effective_numa(dcu_numa_map) valid_numas = set(numa_dcus.keys()) filtered_numa_nics = {} for numa, nics in numa_nics.items(): if numa in valid_numas: filtered_numa_nics[numa] = nics if len(filtered_numa_nics) < len(numa_nics): dropped_numas = set(numa_nics.keys()) - valid_numas print(f"\n[Filter] Dropped NUMAs (no corresponding GPU): {dropped_numas}") for numa in dropped_numas: print(f" NUMA {numa}: {numa_nics[numa]}") group_pci_groups = {} numa_to_nics = {} for original_numa, nics in filtered_numa_nics.items(): group_numa = map_numa_to_group(original_numa) if group_numa not in numa_to_nics: numa_to_nics[group_numa] = {} numa_to_nics[group_numa][original_numa] = sorted(nics) for group_numa in range(2): group_pci_groups[group_numa] = [] if group_numa not in numa_to_nics: continue all_dcus = [] for orig_numa, dcus in numa_dcus.items(): if map_numa_to_group(orig_numa) == group_numa: for dcu in dcus: all_dcus.append((dcu, orig_numa)) all_dcus.sort(key=lambda x: x[0]) dcus_by_numa = {} for dcu, orig_numa in all_dcus: if orig_numa not in dcus_by_numa: dcus_by_numa[orig_numa] = [] dcus_by_numa[orig_numa].append(dcu) orig_numas_in_order = [] seen = set() for dcu, orig_numa in all_dcus: if orig_numa not in seen: orig_numas_in_order.append(orig_numa) seen.add(orig_numa) for orig_numa in orig_numas_in_order: nics_sorted = numa_to_nics[group_numa].get(orig_numa, []) dcus_sorted = dcus_by_numa.get(orig_numa, []) num_pci = (len(dcus_sorted) + 1) // 2 for i in range(num_pci): gpu_start = i * 2 gpu_end = min(gpu_start + 2, len(dcus_sorted)) gpu_pair = dcus_sorted[gpu_start:gpu_end] nic_start = i * 2 nic_end = min(nic_start + 2, len(nics_sorted)) nic_pair = nics_sorted[nic_start:nic_end] group_pci_groups[group_numa].append({ 'dcus': gpu_pair, 'nics': nic_pair }) final_xml = '\n' final_xml += f' \n' for group_numa in sorted(group_pci_groups.keys()): pci_list = group_pci_groups[group_numa] final_xml += f' \n' for pci in pci_list: final_xml += ' \n' for gpu in sorted(pci['dcus']): final_xml += f' \n' for nic in sorted(pci['nics'], key=lambda x: int(x.split('_')[1])): final_xml += f' \n' final_xml += ' \n' final_xml += ' \n' final_xml += ' \n' final_xml += '\n' pci_groups_summary = {} for group_numa, pci_list in group_pci_groups.items(): pci_groups_summary[group_numa] = { 'dcus': sorted([d for p in pci_list for d in p['dcus']]), 'nics': sorted([n for p in pci_list for n in p['nics']]) } return final_xml, pci_groups_summary def main(): parser = argparse.ArgumentParser(description='DCU-NIC Topology Mapping Generator') parser.add_argument('--nic-type', type=str, default='MT4129', help='Filter NICs by HCA type (e.g., MT4129 for CX7)') parser.add_argument('--output', type=str, default='./topo_mapping_custom.xml', help='Output XML file path') args = parser.parse_args() print(f"{BLUE}{'=' * 60}{RESET}") print(f"{BLUE}DCU-NIC Topology Mapping Generator{RESET}") print(f"{BLUE}{'=' * 60}{RESET}") print(f"\n{GREEN}[Config] NIC Type Filter: {args.nic_type}{RESET}") print(f"\n{GREEN}[Step 1] Getting CPU info...{RESET}") cpu_info = get_cpu_info() if cpu_info['success']: print(f" Architecture: {cpu_info['architecture']}") print(f" Vendor: {cpu_info['vendor']}") else: print(f" {YELLOW}Warning: Failed to get CPU info{RESET}") print(f"\n{GREEN}[Step 2] Getting NIC info from ibstat...{RESET}") nic_info_ibstat = get_nic_info_from_ibstat() if nic_info_ibstat['success']: print(f" Successfully retrieved info for {len(nic_info_ibstat['nics'])} NICs") for nic, info in sorted(nic_info_ibstat['nics'].items(), key=lambda x: int(x[0].split('_')[1])): print(f" {nic}: Rate={info.get('rate', 'N/A')}, Link={info.get('link_layer', 'N/A')}") else: print(f" {YELLOW}Warning: {nic_info_ibstat['error']}{RESET}") print(f"\n{GREEN}[Step 3] Getting NIC info (type + NUMA)...{RESET}") nic_info_map = get_nic_info() print(f"\n All detected NICs:") for nic, info in sorted(nic_info_map.items()): print(f" {nic} -> HCA Type: {info['hca_type']}, NUMA: {info['numa']}") print(f"\n{GREEN}[Step 4] Filtering NICs by type...{RESET}") filtered_nic_info = filter_nics_by_type(nic_info_map, args.nic_type) print(f"\n{GREEN}[Step 5] Getting DCU NUMA mapping from hy-smi...{RESET}") dcu_numa_map = get_dcu_numa() for dcu, numa in sorted(dcu_numa_map.items()): effective = get_effective_numa(dcu, dcu_numa_map) group_numa = map_numa_to_group(effective) print(f" DCU {dcu} -> Physical NUMA {numa}, Effective NUMA {effective}, Group NUMA {group_numa}") print(f"\n{GREEN}[Step 6] Applying DCU NUMA rules...{RESET}") print(" DCU 0,1 -> use DCU 0's NUMA") print(" DCU 2,3 -> use DCU 3's NUMA") print(" DCU 4,5 -> use DCU 4's NUMA") print(" DCU 6,7 -> use DCU 7's NUMA") print(f"\n{GREEN}[Step 7] Applying NUMA group rules...{RESET}") print(" NUMA 0,1,2,3 -> Group 0") print(" NUMA 4,5,6,7 -> Group 1") print(f"\n{GREEN}[Step 8] Generating group name...{RESET}") group_name = generate_group_name(cpu_info, nic_info_ibstat, nic_info_map, args.nic_type) print(f" Group name: {group_name}") print(f"\n{GREEN}[Step 9] Generating XML mapping...{RESET}") xml_content, pci_groups = generate_xml_mapping(filtered_nic_info, dcu_numa_map, args.nic_type, group_name) output_file = args.output with open(output_file, 'w') as f: f.write(xml_content) print(f"\n{GREEN}[Step 10] Output written to: {output_file}{RESET}") print(f"\n{BLUE}{'=' * 60}{RESET}") print(f"{BLUE}Generated XML Content:{RESET}") print(f"{BLUE}{'=' * 60}{RESET}") print(xml_content) print(f"\n{BLUE}{'=' * 60}{RESET}") print(f"{BLUE}Summary (NIC -> GPU mapping by NUMA Group):{RESET}") print(f"{BLUE}{'=' * 60}{RESET}") for group_numa in sorted(pci_groups.keys()): group = pci_groups[group_numa] dcus = group['dcus'] nics = group['nics'] print(f" NUMA Group {group_numa}: NICs {nics} <-> GPUs {dcus}") if group_name == "topo": print(f"\n{RED}Warning: Some information was incomplete!{RESET}") print(f"{YELLOW}Please manually check and update the group name if needed.{RESET}") print(f"{YELLOW}You can try using the current topology mapping first:{RESET}") else: print(f"\n{GREEN}Group name generated successfully!{RESET}") print(f"{YELLOW}You can try using the current topology mapping:{RESET}") print(f"\n {GREEN}export NCCL_TOPO_MAPPING_FILE={output_file}{RESET}") if __name__ == "__main__": main()