Commit 8743b701 authored by one's avatar one
Browse files

[xcl-lens] Support GDR info

parent 317eab29
...@@ -5,9 +5,12 @@ import pandas as pd ...@@ -5,9 +5,12 @@ import pandas as pd
class RcclLogParser: class RcclLogParser:
def __init__(self): def __init__(self):
# (rank, content) -> None # (host, rank, content) -> None
self.log_entries = dict() self.log_entries = dict()
# Verbosity flag used by report sections
self._verbose = False
# Pattern -> output string or as-is # Pattern -> output string or as-is
self.sys_patterns = { self.sys_patterns = {
r"kernel version": None, r"kernel version": None,
...@@ -61,29 +64,44 @@ class RcclLogParser: ...@@ -61,29 +64,44 @@ class RcclLogParser:
print(" RCCL Log Parser Report ".center(80, "=")) print(" RCCL Log Parser Report ".center(80, "="))
print() print()
# Remember verbosity for sub-sections
self._verbose = verbose
self._report_sys() self._report_sys()
self._report_user_envs() self._report_user_envs()
self._report_gdr_info()
self._report_graph_info() self._report_graph_info()
self._report_channel_transport_info(verbose) self._report_channel_transport_info()
self._report_cl_transfers() self._report_collective_transfers()
self._report_p2p_transfers() self._report_p2p_transfers()
print(" End of Report ".center(80, "=")) print(" End of Report ".center(80, "="))
def _preprocess_line(self, line): def _preprocess_line(self, line):
"""Extract and validate NCCL log lines with rank information""" """Extract NCCL log lines with host/rank information."""
# Match lines that have a valid NCCL log format with rank # Preferred format:
# Pattern: [rank] NCCL INFO/WARN/ERROR followed by content # <host>:<pid>:<tid> [rank] NCCL INFO/WARN/ERROR <content>
# where <host> itself does NOT contain ':' (so we always stop at first colon)
match = re.search(
r"^([^:\s]+):\d+:\d+\s+\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)",
line,
)
if match:
host, rank, content = match.group(1), int(match.group(2)), match.group(3)
self.log_entries[(host, rank, content)] = None
return
# Backward-compatible fallback for logs without host/pid/tid prefix
match = re.search(r"\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line) match = re.search(r"\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line)
if match: if match:
rank, content = int(match.group(1)), match.group(2) rank, content = int(match.group(1)), match.group(2)
self.log_entries[(rank, content)] = None self.log_entries[("-", rank, content)] = None
def _report_sys(self): def _report_sys(self):
"""Search patterns and print pre-defined strings if matched""" """Search patterns and print pre-defined strings if matched"""
print("===> System Information:\n") print("===> System Information:\n")
reported = set() reported = set()
for (_, content), _ in self.log_entries.items(): for (_, _, content), _ in self.log_entries.items():
for pattern, out in self.sys_patterns.items(): for pattern, out in self.sys_patterns.items():
if re.search(pattern, content, re.IGNORECASE): if re.search(pattern, content, re.IGNORECASE):
reported.add(out if out is not None else content) reported.add(out if out is not None else content)
...@@ -97,7 +115,7 @@ class RcclLogParser: ...@@ -97,7 +115,7 @@ class RcclLogParser:
print("===> User-defined Environment Variables:\n") print("===> User-defined Environment Variables:\n")
env_vars = {} env_vars = {}
pattern = re.compile(r"((?:N|R)CCL_\w+)\s+set(?: by environment)? to\s+(.+)") pattern = re.compile(r"((?:N|R)CCL_\w+)\s+set(?: by environment)? to\s+(.+)")
for (_, content), _ in self.log_entries.items(): for (_, _, content), _ in self.log_entries.items():
m = pattern.search(content) m = pattern.search(content)
if m: if m:
var_name, var_value = m.group(1), m.group(2) var_name, var_value = m.group(1), m.group(2)
...@@ -111,6 +129,89 @@ class RcclLogParser: ...@@ -111,6 +129,89 @@ class RcclLogParser:
) )
print() print()
def _report_gdr_info(self):
"""Parse and print GPU Direct RDMA (GDR) related information."""
print("===> GDR Info:\n")
# Part 1: NET/IB : GPU Direct RDMA Enabled for HCA <hca_no> '<hca_id>'
ib_rows = []
pattern_ib = re.compile(
r"NET/IB\s+:\s+GPU Direct RDMA Enabled for HCA\s+(\d+)\s+'([^']+)'"
)
for (host, rank, content), _ in self.log_entries.items():
m = pattern_ib.search(content)
if m:
hca_no, hca_id = m.groups()
ib_rows.append(
{
"host": host,
"rank": rank,
"hca_no": int(hca_no),
"hca_id": hca_id,
"gdr": 1,
}
)
print(" NET/IB : GPU Direct RDMA Enabled for:\n")
if ib_rows:
df_ib = pd.DataFrame(ib_rows)
df_ib.drop_duplicates(inplace=True)
df_ib.sort_values(by=["host", "rank", "hca_no", "hca_id"], inplace=True)
df_ib = df_ib[["host", "rank", "hca_no", "hca_id", "gdr"]]
if not self._verbose:
df_ib = df_ib.drop(columns=["host"])
df_ib.drop_duplicates(inplace=True)
print(df_ib.to_string(index=False))
print()
else:
print(" (No data found)\n")
# Part 2: GPU Direct RDMA Enabled for GPU <gpu> / HCA <hca_no> (distance <expr>), read <0|1>
gpu_rows = []
pattern_gpu = re.compile(
r"GPU Direct RDMA Enabled for GPU\s+(\S+)\s*/\s*HCA\s+(\d+)\s*\(distance\s+([^)]*)\),\s*read\s+([01])"
)
for (host, rank, content), _ in self.log_entries.items():
m = pattern_gpu.search(content)
if m:
gpu, hca_no, distance, read_flag = m.groups()
rw = "read" if read_flag == "1" else "write"
distance_expr = distance.strip()
# Split expressions like "4 <= 7" into distance and max_distance
m_dist = re.match(r"^([+-]?\d+)\s*<=\s*([+-]?\d+)$", distance_expr)
if m_dist:
distance_val, max_distance = m_dist.groups()
else:
distance_val, max_distance = distance_expr, "-"
gpu_rows.append(
{
"host": host,
"rank": rank,
"gpu": gpu,
"hca_no": int(hca_no),
"distance": distance_val,
"max_distance": max_distance,
"r/w": rw,
}
)
print(" GPU Direct RDMA Enabled for GPU:\n")
if gpu_rows:
df_gpu = pd.DataFrame(gpu_rows)
df_gpu.drop_duplicates(inplace=True)
df_gpu.sort_values(
by=["host", "rank", "gpu", "hca_no", "distance", "max_distance", "r/w"],
inplace=True,
)
df_gpu = df_gpu[["host", "rank", "gpu", "hca_no", "distance", "max_distance", "r/w"]]
if not self._verbose:
df_gpu = df_gpu.drop(columns=["host"])
df_gpu.drop_duplicates(inplace=True)
print(df_gpu.to_string(index=False))
print()
else:
print(" (No data found)\n")
def _extract_and_print(self, title, filter_func, fields, mandatory, sort_cols, move_rank=True): def _extract_and_print(self, title, filter_func, fields, mandatory, sort_cols, move_rank=True):
""" """
Generic function to extract structured data from log lines and print as a table. Generic function to extract structured data from log lines and print as a table.
...@@ -140,13 +241,13 @@ class RcclLogParser: ...@@ -140,13 +241,13 @@ class RcclLogParser:
print(f"===> {title}:\n") print(f"===> {title}:\n")
# Filter relevant log lines using the provided filter function # Filter relevant log lines using the provided filter function
data = [(r, c) for (r, c), _ in self.log_entries.items() if filter_func(c)] data = [(h, r, c) for (h, r, c), _ in self.log_entries.items() if filter_func(c)]
if not data: if not data:
print(" (No data found)\n") print(" (No data found)\n")
return return
# Create DataFrame and extract all fields using regex with validation # Create DataFrame and extract all fields using regex with validation
df = pd.DataFrame(data, columns=["rank", "raw_log"]) df = pd.DataFrame(data, columns=["host", "rank", "raw_log"])
for pattern, (col_name, val_pattern) in fields.items(): for pattern, (col_name, val_pattern) in fields.items():
# Extract field with strict value validation using word boundary # Extract field with strict value validation using word boundary
df[col_name] = df["raw_log"].str.extract( df[col_name] = df["raw_log"].str.extract(
...@@ -181,6 +282,9 @@ class RcclLogParser: ...@@ -181,6 +282,9 @@ class RcclLogParser:
mandatory = [c for c in mandatory if c in df.columns] mandatory = [c for c in mandatory if c in df.columns]
df.dropna(subset=mandatory, inplace=True) # Remove rows missing mandatory fields df.dropna(subset=mandatory, inplace=True) # Remove rows missing mandatory fields
df.drop(columns=["raw_log"], inplace=True) # No longer need raw log df.drop(columns=["raw_log"], inplace=True) # No longer need raw log
if not self._verbose and "host" in df.columns:
df = df.drop(columns=["host"])
df.drop_duplicates(inplace=True) # Deduplicate identical records df.drop_duplicates(inplace=True) # Deduplicate identical records
if df.empty: if df.empty:
...@@ -190,17 +294,23 @@ class RcclLogParser: ...@@ -190,17 +294,23 @@ class RcclLogParser:
# Reorder columns for better readability # Reorder columns for better readability
if move_rank: if move_rank:
cols = df.columns.tolist() cols = df.columns.tolist()
if "host" in cols:
cols.remove("host")
cols.remove("rank") cols.remove("rank")
# Move protocol to second position if present # Move protocol to second position if present
if "protocol" in cols: if "protocol" in cols:
cols.remove("protocol") cols.remove("protocol")
cols.insert(0, "protocol") cols.insert(0, "protocol")
# Always move rank to first position # Always move host and rank to front
cols.insert(0, "rank") cols.insert(0, "rank")
if "host" in cols:
cols.insert(0, "host")
df = df[cols] df = df[cols]
# Sort the data # Sort the data
sort_cols = [c for c in sort_cols if c in df.columns] sort_cols = [c for c in sort_cols if c in df.columns]
if "host" in df.columns and "host" not in sort_cols:
sort_cols.insert(0, "host")
if sort_cols: if sort_cols:
df.sort_values(by=sort_cols, inplace=True) df.sort_values(by=sort_cols, inplace=True)
...@@ -222,7 +332,7 @@ class RcclLogParser: ...@@ -222,7 +332,7 @@ class RcclLogParser:
sort_cols=["rank", "Pattern"], sort_cols=["rank", "Pattern"],
) )
def _report_channel_transport_info(self, verbose=False): def _report_channel_transport_info(self):
print("===> Channel Transport Info:\n") print("===> Channel Transport Info:\n")
data = [] data = []
...@@ -237,12 +347,13 @@ class RcclLogParser: ...@@ -237,12 +347,13 @@ class RcclLogParser:
r"(?: \[(\w+)\])?\s+via\s+([\w/]+)" r"(?: \[(\w+)\])?\s+via\s+([\w/]+)"
) )
for (rank, content), _ in self.log_entries.items(): for (host, rank, content), _ in self.log_entries.items():
m = pattern.search(content) m = pattern.search(content)
if m: if m:
channel, src, dst, type_, transport = m.groups() channel, src, dst, type_, transport = m.groups()
data.append( data.append(
{ {
"host": host,
"rank": rank, "rank": rank,
"channel": channel, "channel": channel,
"sender": int(src), "sender": int(src),
...@@ -257,14 +368,14 @@ class RcclLogParser: ...@@ -257,14 +368,14 @@ class RcclLogParser:
return return
df = pd.DataFrame(data) df = pd.DataFrame(data)
df.sort_values(by=["rank", "channel", "sender", "receiver"], inplace=True) df.sort_values(by=["host", "rank", "channel", "sender", "receiver"], inplace=True)
if not verbose: if not self._verbose:
df = df.drop(columns=["channel", "sender", "receiver"]) df = df.drop(columns=["host", "channel", "sender", "receiver"])
df.drop_duplicates(inplace=True) df.drop_duplicates(inplace=True)
print(df.to_string(index=False)) print(df.to_string(index=False))
print() print()
def _report_cl_transfers(self): def _report_collective_transfers(self):
self._extract_and_print( self._extract_and_print(
title="Unique Ring/Tree Transfers", title="Unique Ring/Tree Transfers",
filter_func=lambda c: "protocol" in c and "nbytes" in c, filter_func=lambda c: "protocol" in c and "nbytes" in c,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment