Commit 43f3a022 authored by one's avatar one
Browse files

[xcl-lens] Support topo mapping file info

parent 0286389b
import re import re
from typing import Final
import pandas as pd import pandas as pd
class RcclLogParser: class RcclLogParser:
# Pattern -> output string or as-is
sys_patterns: Final = {
r"kernel version": None,
r"ROCr version": None,
r"RCCL version": None,
r"Librccl path": None,
r"iommu": None,
r"Dmabuf feature disabled": "Dmabuf: disabled",
r"Disabled GDRCopy": "GDRCopy: disabled",
r"Using network IB": "NET/IB: enabled",
r"NET/Plugin: Could not find: librccl-net.so": "NET/Plugin: internal",
}
# Pattern -> column with strict validation
graph_info_fields: Final = {
r"Pattern": ("Pattern", r"\d+"),
r"crossNic": ("crossNic", r"\d+"),
r"nChannels": ("nChannels", r"\d+"),
r"bw": ("bandwidth", r"[\d.]+/[\d.]+"),
r"type": ("type", r"[\w/]+"),
r"sameChannels": ("sameChannels", r"\d+"),
}
# Pattern -> column with strict validation
cl_transfer_fields: Final = {
r"protocol": ("protocol", r"Simple|LL|LL128"),
r"nbytes": ("nbytes", r"\d+"),
r"algorithm": ("algorithm", r"Tree|Ring"),
r"slicesteps": ("slicesteps", r"\d+"),
r"nchannels": ("nchannels", r"\d+"),
r"nloops": ("nloops", r"\d+"),
r"nsteps": ("nsteps", r"\d+"),
r"chunksize": ("chunksize", r"\d+"),
}
# Pattern -> column with strict validation
p2p_fields: Final = {
r"p2p : rank": ("local", r"\d+"),
r"send rank": ("send", r"\d+"),
r"recv rank": ("recv", r"\d+"),
r"p2pnChannelsPerPeer": ("p2pnChannelsPerPeer", r"\d+"),
r"p2pnChannels": ("p2pnChannels", r"\d+"),
r"nChannelsMax": ("nChannelsMax", r"\d+"),
r"protocol": ("protocol", r"Simple|LL|LL128"),
}
def __init__(self, verbose=False, hosts=None, ranks=None): def __init__(self, verbose=False, hosts=None, ranks=None):
# Deduplicated set of (host, rank, content) tuples # Deduplicated set of (host, rank, content) tuples
self.log_entries: set[tuple[str, int, str]] = set() self.log_entries: set[tuple[str, int, str]] = set()
...@@ -71,6 +24,7 @@ class RcclLogParser: ...@@ -71,6 +24,7 @@ class RcclLogParser:
self._report_sys() self._report_sys()
self._report_user_envs() self._report_user_envs()
self._report_topo_mapping_info()
self._report_net_ib_info() self._report_net_ib_info()
self._report_gdr_rw_info() self._report_gdr_rw_info()
self._report_graph_info() self._report_graph_info()
...@@ -108,10 +62,24 @@ class RcclLogParser: ...@@ -108,10 +62,24 @@ class RcclLogParser:
def _report_sys(self): def _report_sys(self):
"""Search patterns and print pre-defined strings if matched""" """Search patterns and print pre-defined strings if matched"""
# Pattern -> output string or as-is
sys_patterns = {
r"kernel version": None,
r"ROCr version": None,
r"RCCL version": None,
r"Librccl path": None,
r"iommu": None,
r"Dmabuf feature disabled": "Dmabuf: disabled",
r"Disabled GDRCopy": "GDRCopy: disabled",
r"Using network IB": "NET/IB: enabled",
r"NET/Plugin: Could not find: librccl-net.so": "NET/Plugin: internal",
r"XDP is disabled": "XDP: disabled",
}
print("===> System Information:\n") print("===> System Information:\n")
reported = set() reported = set()
for _, _, content in self.log_entries: for _, _, content in self.log_entries:
for pattern, out in self.sys_patterns.items(): for pattern, out in sys_patterns.items():
if re.search(pattern, content, re.IGNORECASE): if re.search(pattern, content, re.IGNORECASE):
reported.add(out if out is not None else content) reported.add(out if out is not None else content)
break break
...@@ -321,30 +289,60 @@ class RcclLogParser: ...@@ -321,30 +289,60 @@ class RcclLogParser:
print() print()
def _report_graph_info(self): def _report_graph_info(self):
# Pattern -> column with strict validation
graph_info_fields = {
r"Pattern": ("Pattern", r"\d+"),
r"crossNic": ("crossNic", r"\d+"),
r"nChannels": ("nChannels", r"\d+"),
r"bw": ("bandwidth", r"[\d.]+/[\d.]+"),
r"type": ("type", r"[\w/]+"),
r"sameChannels": ("sameChannels", r"\d+"),
}
self._extract_and_print( self._extract_and_print(
title="Graph Info", title="Graph Info",
filter_func=lambda c: "Pattern" in c and "crossNic" in c, filter_func=lambda c: "Pattern" in c and "crossNic" in c,
fields=self.graph_info_fields, fields=graph_info_fields,
mandatory=["Pattern"], mandatory=["Pattern"],
verbose_cols=["host", "rank"], verbose_cols=["host", "rank"],
sort_cols=["host", "rank", "Pattern"], sort_cols=["host", "rank", "Pattern"],
) )
def _report_collective_transfers(self): def _report_collective_transfers(self):
# Pattern -> column with strict validation
cl_transfer_fields = {
r"protocol": ("protocol", r"Simple|LL|LL128"),
r"nbytes": ("nbytes", r"\d+"),
r"algorithm": ("algorithm", r"Tree|Ring"),
r"slicesteps": ("slicesteps", r"\d+"),
r"nchannels": ("nchannels", r"\d+"),
r"nloops": ("nloops", r"\d+"),
r"nsteps": ("nsteps", r"\d+"),
r"chunksize": ("chunksize", r"\d+"),
}
self._extract_and_print( self._extract_and_print(
title="Unique Ring/Tree Transfers", title="Unique Ring/Tree Transfers",
filter_func=lambda c: "protocol" in c and "nbytes" in c, filter_func=lambda c: "protocol" in c and "nbytes" in c,
fields=self.cl_transfer_fields, fields=cl_transfer_fields,
mandatory=["protocol", "nbytes"], mandatory=["protocol", "nbytes"],
verbose_cols=["host", "rank"], verbose_cols=["host", "rank"],
sort_cols=["host", "rank", "nbytes", "protocol", "nchannels"], sort_cols=["host", "rank", "nbytes", "protocol", "nchannels"],
) )
def _report_p2p_transfers(self): def _report_p2p_transfers(self):
# Pattern -> column with strict validation
p2p_fields = {
r"p2p : rank": ("local", r"\d+"),
r"send rank": ("send", r"\d+"),
r"recv rank": ("recv", r"\d+"),
r"p2pnChannelsPerPeer": ("p2pnChannelsPerPeer", r"\d+"),
r"p2pnChannels": ("p2pnChannels", r"\d+"),
r"nChannelsMax": ("nChannelsMax", r"\d+"),
r"protocol": ("protocol", r"Simple|LL|LL128"),
}
self._extract_and_print( self._extract_and_print(
title="Unique P2P Transfers", title="Unique P2P Transfers",
filter_func=lambda c: "p2p :" in c and "send rank" in c, filter_func=lambda c: "p2p :" in c and "send rank" in c,
fields=self.p2p_fields, fields=p2p_fields,
mandatory=["local", "send", "recv"], mandatory=["local", "send", "recv"],
verbose_cols=["host", "rank", "local", "send", "recv"], verbose_cols=["host", "rank", "local", "send", "recv"],
sort_cols=["host", "rank", "protocol", "local", "send", "recv"], sort_cols=["host", "rank", "protocol", "local", "send", "recv"],
...@@ -396,3 +394,89 @@ class RcclLogParser: ...@@ -396,3 +394,89 @@ class RcclLogParser:
df.sort_values(by=["host", "rank", "channel", "sender", "receiver"], inplace=True) df.sort_values(by=["host", "rank", "channel", "sender", "receiver"], inplace=True)
print(df.to_string(index=False)) print(df.to_string(index=False))
print() print()
def _report_aggregated_info(
self,
title: str,
filter_func,
patterns: list,
accumulate_fields: set,
col_order: list,
):
"""
Generic method for sections where multiple log lines contribute different
fields to a single per-(host, rank) record.
Unlike _extract_and_print (one line → all fields), this aggregates
across lines: each line may fill one field of the record.
Args:
title: Section title.
filter_func: Pre-filter for log content (content -> bool).
patterns: List of (search_pattern, field, group_idx, literal).
accumulate_fields: Fields that collect values across multiple lines (stored as set).
col_order: Preferred column display order.
"""
print(f"===> {title}:\n")
records: dict[tuple, dict] = {}
for host, rank, content in self.log_entries:
if not filter_func(content):
continue
for pattern, field, group_idx, literal in patterns:
m = re.search(pattern, content, re.IGNORECASE)
if m:
key = (host, rank)
rec = records.setdefault(key, {"host": host, "rank": rank})
value = (literal if group_idx is None else m.group(group_idx).strip()) or "-"
if field in accumulate_fields:
rec.setdefault(field, set()).add(value)
else:
rec[field] = value
break # each line matches at most one pattern
if not records:
print(" (No data found)\n")
return
# Flatten accumulated sets → sorted string
for rec in records.values():
for field in accumulate_fields:
if field in rec:
rec[field] = " | ".join(sorted(rec[field]))
df = pd.DataFrame(list(records.values()))
df.sort_values(by=["host", "rank"], inplace=True)
ordered = [c for c in col_order if c in df.columns]
remaining = [c for c in df.columns if c not in ordered]
df = df[ordered + remaining]
print(df.fillna("-").to_string(index=False))
print()
def _report_topo_mapping_info(self):
# (search_pattern, field_name, capture_group_index_or_None, literal_value_or_None)
# - capture_group_index: int → get regex group, None → use literal_value
# - accumulate: True → this field may come from multiple lines, append rather than overwrite
topo_mapping_patterns = [
(r"No topo mapping file", "status", None, "no_file"),
(r"environmental key word is (\S+)", "fingerprint", 1, None),
(r"Loading topology mapping file (\S+)", "loaded", 1, None),
(r"(?:parseing|parsing) topology mapping group[:\s]*(.*)", "parsed", 1, None),
(r"skip topology mapping group:\s*([^,]+)", "skipped", 1, None),
]
# Fields that should accumulate across multiple matching lines (per host/rank)
topo_mapping_accumulate_fields = {"skipped", "parsed"}
self._report_aggregated_info(
title="Topology Mapping File Info",
filter_func=lambda c: any(
s in c.lower()
for s in ("topo mapping", "topology mapping", "environmental key word")
),
patterns=topo_mapping_patterns,
accumulate_fields=topo_mapping_accumulate_fields,
col_order=["host", "rank", "status", "fingerprint", "loaded", "parsed", "skipped"],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment