import re import pandas as pd class RcclLogParser: def __init__(self): # (rank, content) -> None self.log_entries = dict() # Pattern -> output string or as-is self.sys_patterns = { r"kernel version": None, r"ROCr version": None, r"RCCL version": None, r"Librccl path": None, r"iommu": None, r"Dmabuf feature disabled": "Dmabuf: disabled", r"Disabled GDRCopy": "GDRCopy: disabled", } # Pattern -> column with strict validation self.graph_info_fields = { r"Pattern": ("Pattern", r"\d+"), r"crossNic": ("crossNic", r"\d+"), r"nChannels": ("nChannels", r"\d+"), r"bw": ("bandwidth", r"[\d.]+/[\d.]+"), r"type": ("type", r"[\w/]+"), r"sameChannels": ("sameChannels", r"\d+"), } # Pattern -> column with strict validation self.cl_transfer_fields = { r"protocol": ("protocol", r"Simple|LL|LL128"), r"nbytes": ("nbytes", r"\d+"), r"algorithm": ("algorithm", r"Tree|Ring"), r"slicesteps": ("slicesteps", r"\d+"), r"nchannels": ("nchannels", r"\d+"), r"nloops": ("nloops", r"\d+"), r"nsteps": ("nsteps", r"\d+"), r"chunksize": ("chunksize", r"\d+"), } # Pattern -> column with strict validation self.p2p_fields = { r"p2p : rank": ("local", r"\d+"), r"send rank": ("send", r"\d+"), r"recv rank": ("recv", r"\d+"), r"p2pnChannelsPerPeer": ("p2pnChannelsPerPeer", r"\d+"), r"p2pnChannels": ("p2pnChannels", r"\d+"), r"nChannelsMax": ("nChannelsMax", r"\d+"), r"protocol": ("protocol", r"Simple|LL|LL128"), } def collect(self, line): self._preprocess_line(line) def report(self): print(" RCCL Log Parser Report ".center(80, "=")) print() self._report_sys() self._report_user_envs() self._report_graph_info() self._report_cl_transfers() self._report_p2p_transfers() print(" End of Report ".center(80, "=")) def _preprocess_line(self, line): """Extract and validate NCCL log lines with rank information""" # Match lines that have a valid NCCL log format with rank # Pattern: [rank] NCCL INFO/WARN/ERROR followed by content match = re.search(r"\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line) if match: rank, content = int(match.group(1)), match.group(2) if len(content) >= 20: self.log_entries[(rank, content)] = None def _report_sys(self): """Search patterns and print pre-defined strings if matched""" print("===> System Information:\n") reported = set() for (_, content), _ in self.log_entries.items(): for pattern, out in self.sys_patterns.items(): if re.search(pattern, content, re.IGNORECASE): reported.add(out if out is not None else content) break for line in sorted(reported): print(line) print() def _report_user_envs(self): """Search environment variables set by user""" print("===> User-defined Environment Variables:\n") env_vars = {} pattern = re.compile(r"(\w+)\s+set by environment to\s+(.+)") for (_, content), _ in self.log_entries.items(): m = pattern.search(content) if m: var_name, var_value = m.group(1), m.group(2) env_vars.setdefault(var_name, set()).add(var_value) for key, values in sorted(env_vars.items()): if len(values) == 1: print(f"{key}: {next(iter(values))}") else: print( f"{key}: {', '.join(sorted(values))} (WARNING: Different values across ranks)" ) print() def _extract_and_print(self, title, filter_func, fields, mandatory, sort_cols, move_rank=True): """ Generic function to extract structured data from log lines and print as a table. This function handles the common workflow for tabular report sections like (Graph Info, Ring/Tree Transfers, P2P Transfers). Does NOT apply to free-form sections like System Information or User-defined Environment Variables. Workflow: 1. Filter relevant log lines 2. Extract fields using regex patterns with validation 3. Clean and validate the data 4. Reorder columns for readability 5. Sort and print the table Args: title: Section title to display (e.g., "Graph Info") filter_func: Function to filter relevant log lines (content -> bool) fields: Dict of {pattern: (col_name, value_pattern)} for field extraction - pattern: Regex pattern to match the field key (e.g., r"protocol") - col_name: Name of the DataFrame column - value_pattern: Regex pattern to validate/extract the field value mandatory: List of column names that must not be NaN (drop rows missing these) sort_cols: List of column names to sort by (in order) move_rank: If True, move "rank" column to front and "protocol" to second if present """ print(f"===> {title}:\n") # Filter relevant log lines using the provided filter function data = [(r, c) for (r, c), _ in self.log_entries.items() if filter_func(c)] if not data: print(" (No data found)\n") return # Create DataFrame and extract all fields using regex with validation df = pd.DataFrame(data, columns=["rank", "raw_log"]) for pattern, (col_name, val_pattern) in fields.items(): # Extract field with strict value validation using word boundary df[col_name] = df["raw_log"].str.extract( rf"\b{pattern}\s+({val_pattern})", expand=False ) # Convert numeric fields to appropriate types numeric_columns = [ "Pattern", "nbytes", "nchannels", "local", "send", "recv", "p2pnChannelsPerPeer", "p2pnChannels", "nChannelsMax", "crossNic", "nChannels", "sameChannels", "slicesteps", "nloops", "nsteps", "chunksize", ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") # Clean data - drop invalid rows and duplicates # Only keep columns that actually exist in the DataFrame mandatory = [c for c in mandatory if c in df.columns] df.dropna(subset=mandatory, inplace=True) # Remove rows missing mandatory fields df.drop(columns=["raw_log"], inplace=True) # No longer need raw log df.drop_duplicates(inplace=True) # Deduplicate identical records if df.empty: print(" (No valid data found)\n") return # Reorder columns for better readability if move_rank: cols = df.columns.tolist() cols.remove("rank") # Move protocol to second position if present if "protocol" in cols: cols.remove("protocol") cols.insert(0, "protocol") # Always move rank to first position cols.insert(0, "rank") df = df[cols] # Sort the data sort_cols = [c for c in sort_cols if c in df.columns] if sort_cols: df.sort_values(by=sort_cols, inplace=True) # Format integer columns to avoid trailing .0 for col in numeric_columns: if col in df.columns: df[col] = df[col].apply(lambda x: str(int(x)) if pd.notna(x) else x) # Print the final table with NaN values replaced by "-" print(df.fillna("-").to_string(index=False)) print() def _report_graph_info(self): self._extract_and_print( title="Graph Info", filter_func=lambda c: "Pattern" in c and "crossNic" in c, fields=self.graph_info_fields, mandatory=["Pattern"], sort_cols=["rank", "Pattern"], ) def _report_cl_transfers(self): self._extract_and_print( title="Unique Ring/Tree Transfers", filter_func=lambda c: "protocol" in c and "nbytes" in c, fields=self.cl_transfer_fields, mandatory=["protocol", "nbytes"], sort_cols=["rank", "nbytes", "protocol", "nchannels"], ) def _report_p2p_transfers(self): self._extract_and_print( title="Unique P2P Transfers", filter_func=lambda c: "p2p :" in c and "send rank" in c, fields=self.p2p_fields, mandatory=["local", "send", "recv"], sort_cols=["rank", "protocol", "local", "send", "recv"], )