Commit 02d08801 authored by one's avatar one
Browse files

[xcl-lens] Refactor RcclLogParser to use structured logging with validation

parent bf904971
...@@ -5,7 +5,8 @@ import pandas as pd ...@@ -5,7 +5,8 @@ import pandas as pd
class RcclLogParser: class RcclLogParser:
def __init__(self): def __init__(self):
self.output = set() # (rank, content) -> None
self.log_entries = dict()
self.raw_lines = set() self.raw_lines = set()
# Pattern -> output string or as-is # Pattern -> output string or as-is
...@@ -19,37 +20,37 @@ class RcclLogParser: ...@@ -19,37 +20,37 @@ class RcclLogParser:
r"Disabled GDRCopy": "GDRCopy: disabled", r"Disabled GDRCopy": "GDRCopy: disabled",
} }
# Pattern -> column # Pattern -> column with strict validation
self.graph_info_fields = { self.graph_info_fields = {
r"Pattern": "Pattern", r"Pattern": ("Pattern", r"\d+"),
r"crossNic": "crossNic", r"crossNic": ("crossNic", r"\d+"),
r"nChannels": "nChannels", r"nChannels": ("nChannels", r"\d+"),
r"bw": "bandwidth", r"bw": ("bandwidth", r"[\d.]+/[\d.]+"),
r"type": "type", r"type": ("type", r"[\w/]+"),
r"sameChannels": "sameChannels", r"sameChannels": ("sameChannels", r"\d+"),
} }
# Pattern -> column # Pattern -> column with strict validation
self.cl_transfer_fields = { self.cl_transfer_fields = {
r"protocol": "protocol", r"protocol": ("protocol", r"Simple|LL|LL128"),
r"nbytes": "nbytes", r"nbytes": ("nbytes", r"\d+"),
r"algorithm": "algorithm", r"algorithm": ("algorithm", r"Tree|Ring"),
r"slicesteps": "slicesteps", r"slicesteps": ("slicesteps", r"\d+"),
r"nchannels": "nchannels", r"nchannels": ("nchannels", r"\d+"),
r"nloops": "nloops", r"nloops": ("nloops", r"\d+"),
r"nsteps": "nsteps", r"nsteps": ("nsteps", r"\d+"),
r"chunksize": "chunksize", r"chunksize": ("chunksize", r"\d+"),
} }
# Pattern -> column # Pattern -> column with strict validation
self.p2p_fields = { self.p2p_fields = {
r"p2p : rank": "local", r"p2p : rank": ("local", r"\d+"),
r"send rank": "send", r"send rank": ("send", r"\d+"),
r"recv rank": "recv", r"recv rank": ("recv", r"\d+"),
r"p2pnChannelsPerPeer": "p2pnChannelsPerPeer", r"p2pnChannelsPerPeer": ("p2pnChannelsPerPeer", r"\d+"),
r"p2pnChannels": "p2pnChannels", r"p2pnChannels": ("p2pnChannels", r"\d+"),
r"nChannelsMax": "nChannelsMax", r"nChannelsMax": ("nChannelsMax", r"\d+"),
r"protocol": "protocol", r"protocol": ("protocol", r"Simple|LL|LL128"),
} }
def collect(self, line): def collect(self, line):
...@@ -71,121 +72,88 @@ class RcclLogParser: ...@@ -71,121 +72,88 @@ class RcclLogParser:
print(" End of Report ".center(80, "=")) print(" End of Report ".center(80, "="))
def _preprocess_line(self, line): def _preprocess_line(self, line):
match = re.search(r"\[\d+\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line) """Extract and validate NCCL log lines with rank information"""
# Match lines that have a valid NCCL log format with rank
# Pattern: [rank] NCCL INFO/WARN/ERROR followed by content
match = re.search(r"\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line)
if match: if match:
self.output.add(match.group(1)) rank, content = int(match.group(1)), match.group(2)
if len(content) >= 20:
self.log_entries[(rank, content)] = None
def _report_sys(self): def _report_sys(self):
"""Search patterns and print pre-defined strings if matched""" """Search patterns and print pre-defined strings if matched"""
print("===> System Information:\n") print("===> System Information:\n")
reported_lines = [] reported = set()
for line in self.output: for (_, content), _ in self.log_entries.items():
for pattern, output in self.sys_patterns.items(): for pattern, out in self.sys_patterns.items():
if re.search(pattern, line, re.IGNORECASE): if re.search(pattern, content, re.IGNORECASE):
reported_lines.append(output if output else line) reported.add(out if out else content)
break break
for line in reported_lines: for line in sorted(reported):
print(line) print(line)
print() print()
def _report_user_envs(self): def _report_user_envs(self):
"""Search environment variables set by user""" """Search environment variables set by user"""
print("===> User-defined Environment Variables:\n") print("===> User-defined Environment Variables:\n")
env_vars = {}
pattern = re.compile(r"(\w+)\s+set by environment to\s+(.+)") pattern = re.compile(r"(\w+)\s+set by environment to\s+(.+)")
for line in self.output: for (_, content), _ in self.log_entries.items():
m = pattern.search(line) m = pattern.search(content)
if m: if m:
print(f"{m.group(1)}: {m.group(2)}") env_vars[m.group(1)] = m.group(2)
for key, value in sorted(env_vars.items()):
print(f"{key}: {value}")
print() print()
def _report_graph_info(self): def _extract_and_print(self, title, filter_func, fields, mandatory, sort_cols, move_rank=True):
"""Extract graph information""" """
print("===> Graph Info:\n") Generic function to extract structured data from log lines and print as a table.
# Filter lines by looking for 'Pattern' and 'crossNic' This function handles the common workflow for tabular report sections like
filtered_lines = [line for line in self.output if "Pattern" in line and "crossNic" in line] (Graph Info, Ring/Tree Transfers, P2P Transfers). Does NOT apply to
free-form sections like System Information or User-defined Environment Variables.
if not filtered_lines:
print(" (No graph info found)\n") Workflow:
1. Filter relevant log lines
2. Extract fields using regex patterns with validation
3. Clean and validate the data
4. Reorder columns for readability
5. Sort and print the table
Args:
title: Section title to display (e.g., "Graph Info")
filter_func: Function to filter relevant log lines (content -> bool)
fields: Dict of {pattern: (col_name, value_pattern)} for field extraction
- pattern: Regex pattern to match the field key (e.g., r"protocol")
- col_name: Name of the DataFrame column
- value_pattern: Regex pattern to validate/extract the field value
mandatory: List of column names that must not be NaN (drop rows missing these)
sort_cols: List of column names to sort by (in order)
move_rank: If True, move "rank" column to front and "protocol" to second if present
"""
print(f"===> {title}:\n")
# Filter relevant log lines using the provided filter function
data = [(r, c) for (r, c), _ in self.log_entries.items() if filter_func(c)]
if not data:
print(" (No data found)\n")
return return
df = pd.DataFrame(filtered_lines, columns=["raw_log"]) # Create DataFrame and extract all fields using regex with validation
df = pd.DataFrame(data, columns=["rank", "raw_log"])
# Extract each field independently (order-agnostic) for pattern, (col_name, val_pattern) in fields.items():
# Values are comma-separated, so use [^,\s]+ to exclude trailing commas # Extract field with strict value validation using word boundary
for pattern, col_name in self.graph_info_fields.items(): df[col_name] = df["raw_log"].str.extract(
df[col_name] = df["raw_log"].str.extract(rf"\b{pattern}\s+([^,\s]+)", expand=False) rf"\b{pattern}\s+({val_pattern})", expand=False
)
# Type conversion for correct sorting
if "Pattern" in df.columns: # Convert numeric fields to appropriate types
df["Pattern"] = pd.to_numeric(df["Pattern"], errors="coerce") numeric_columns = [
"Pattern",
# Clean up "nbytes",
df.drop(columns=["raw_log"], inplace=True) "nchannels",
df.drop_duplicates(inplace=True)
df.sort_values(by="Pattern", ascending=False, inplace=True)
print(df.fillna("-").to_string(index=False))
print()
def _report_cl_transfers(self):
"""Extract non-P2P transfer arguments"""
print("===> Unique Ring/Tree Transfers:\n")
# Filter lines by looking for 'protocol' and 'nbytes'
raw_lines = [line for line in self.output if "protocol" in line and "nbytes" in line]
if not raw_lines:
print(" (No transfer patterns found)\n")
return
df = pd.DataFrame(raw_lines, columns=["raw_log"])
# Extract all fields using a single loop
for pattern, col_name in self.cl_transfer_fields.items():
df[col_name] = df["raw_log"].str.extract(rf"\b{pattern}\s+(\S+)", expand=False)
# Type conversion for correct sorting
for field in ["nbytes", "nchannels"]:
if field in df.columns:
df[field] = pd.to_numeric(df[field], errors="coerce")
# Drop rows where mandatory fields are missing
mandatory_cols = [c for c in ["protocol", "nbytes"] if c in df.columns]
df.dropna(subset=mandatory_cols, inplace=True)
# Clean up
df.drop(columns=["raw_log"], inplace=True)
df.drop_duplicates(inplace=True)
sort_cols = ["nbytes", "protocol", "nchannels"]
sort_cols = [col for col in sort_cols if col in df.columns]
if sort_cols:
df.sort_values(by=sort_cols, inplace=True)
# Fill NaNs with "-" and print
print(df.fillna("-").to_string(index=False))
print()
def _report_p2p_transfers(self):
"""Extract P2P transfer details"""
print("===> Unique P2P Transfers:\n")
# Filter lines by looking for 'p2p :' and 'send rank'
raw_lines = [line for line in self.output if "p2p :" in line and "send rank" in line]
if not raw_lines:
print(" (No P2P transfers found)\n")
return
# Extract all fields using a single loop
df = pd.DataFrame(raw_lines, columns=["raw_log"])
for pattern, col_name in self.p2p_fields.items():
df[col_name] = df["raw_log"].str.extract(rf"{pattern}\s+(\S+)", expand=False)
# Type conversion for correct sorting
numeric_cols = [
"local", "local",
"send", "send",
"recv", "recv",
...@@ -193,26 +161,65 @@ class RcclLogParser: ...@@ -193,26 +161,65 @@ class RcclLogParser:
"p2pnChannels", "p2pnChannels",
"nChannelsMax", "nChannelsMax",
] ]
for col in numeric_cols: for col in numeric_columns:
if col in df.columns: if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce") df[col] = pd.to_numeric(df[col], errors="coerce")
# Clean up # Clean data - drop invalid rows and duplicates
df.drop(columns=["raw_log"], inplace=True) # Only keep columns that actually exist in the DataFrame
df.drop_duplicates(inplace=True) mandatory = [c for c in mandatory if c in df.columns]
df.dropna(subset=mandatory, inplace=True) # Remove rows missing mandatory fields
df.drop(columns=["raw_log"], inplace=True) # No longer need raw log
df.drop_duplicates(inplace=True) # Deduplicate identical records
if df.empty:
print(" (No valid data found)\n")
return
# Reorder columns for better readability
if move_rank:
cols = df.columns.tolist()
cols.remove("rank")
# Move protocol to second position if present
if "protocol" in cols:
cols.remove("protocol")
cols.insert(0, "protocol")
# Always move rank to first position
cols.insert(0, "rank")
df = df[cols]
sort_cols = ["protocol", "local", "send", "recv"] # SSort the data
sort_cols = [c for c in sort_cols if c in df.columns] sort_cols = [c for c in sort_cols if c in df.columns]
if sort_cols: if sort_cols:
df.sort_values(by=sort_cols, inplace=True) df.sort_values(by=sort_cols, inplace=True)
# Move 'protocol' to the first column # Print the final table with NaN values replaced by "-"
cols = df.columns.tolist()
if "protocol" in cols:
cols.remove("protocol")
cols.insert(0, "protocol")
df = df[cols]
# Fill NaNs with "-" and print
print(df.fillna("-").to_string(index=False)) print(df.fillna("-").to_string(index=False))
print() print()
def _report_graph_info(self):
self._extract_and_print(
title="Graph Info",
filter_func=lambda c: "Pattern" in c and "crossNic" in c,
fields=self.graph_info_fields,
mandatory=["Pattern"],
sort_cols=["rank", "Pattern"],
)
def _report_cl_transfers(self):
self._extract_and_print(
title="Unique Ring/Tree Transfers",
filter_func=lambda c: "protocol" in c and "nbytes" in c,
fields=self.cl_transfer_fields,
mandatory=["protocol", "nbytes"],
sort_cols=["rank", "nbytes", "protocol", "nchannels"],
)
def _report_p2p_transfers(self):
self._extract_and_print(
title="Unique P2P Transfers",
filter_func=lambda c: "p2p :" in c and "send rank" in c,
fields=self.p2p_fields,
mandatory=["local", "send", "recv"],
sort_cols=["rank", "protocol", "local", "send", "recv"],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment