import re import pandas as pd class RcclLogParser: def __init__(self, verbose=False, hosts=None, ranks=None): # Deduplicated set of (host, rank, content) tuples self.log_entries: set[tuple[str, int, str]] = set() # Verbosity flag used by report sections self._verbose = verbose # Filters self._hosts = hosts if hosts is not None else [] self._ranks = [int(r) for r in ranks] if ranks is not None else [] def collect(self, line): self._preprocess_line(line) def report(self): print(" RCCL Log Parser Report ".center(80, "=")) print() self._report_sys() self._report_user_envs() self._report_topo_mapping_info() self._report_net_ib_info() self._report_gdr_rw_info() self._report_graph_info() self._report_channel_transport_info() self._report_collective_transfers() self._report_p2p_transfers() print(" End of Report ".center(80, "=")) def _preprocess_line(self, line): """Extract NCCL log lines with host/rank information.""" # Preferred format: # :: [rank] NCCL INFO/WARN/ERROR # where itself does NOT contain ':' (so we always stop at first colon) match = re.match( r"([^:\s]+):\d+:\d+\s+\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line, ) if match: host, rank, content = match.group(1), int(match.group(2)), match.group(3) if self._hosts and host not in self._hosts: return if self._ranks and rank not in self._ranks: return self.log_entries.add((host, rank, content)) return # Backward-compatible fallback for logs without host/pid/tid prefix match = re.search(r"\[(\d+)\]\s+NCCL\s+(?:INFO|WARN|ERROR)\s+(.*)", line) if match: rank, content = int(match.group(1)), match.group(2) if self._ranks and rank not in self._ranks: return self.log_entries.add(("-", rank, content)) def _report_sys(self): """Search patterns and print pre-defined strings if matched""" # Pattern -> output string or as-is sys_patterns = { r"kernel version": None, r"ROCr version": None, r"RCCL version": None, r"Librccl path": None, r"iommu": None, r"Dmabuf feature disabled": "Dmabuf: disabled", r"Disabled GDRCopy": "GDRCopy: disabled", r"Using network IB": "NET/IB: enabled", r"NET/Plugin: Could not find: librccl-net.so": "NET/Plugin: internal", r"XDP is disabled": "XDP: disabled", } print("===> System Information:\n") reported = set() for _, _, content in self.log_entries: for pattern, out in sys_patterns.items(): if re.search(pattern, content, re.IGNORECASE): reported.add(out if out is not None else content) break for line in sorted(reported): print(line) print() def _report_user_envs(self): """Search environment variables set by user""" print("===> User-defined Environment Variables:\n") env_vars = {} pattern = re.compile(r"((?:N|R)CCL_\w+)\s+set(?: by environment)? to\s+(.+)") for _, _, content in self.log_entries: m = pattern.search(content) if m: var_name, var_value = m.group(1), m.group(2) env_vars.setdefault(var_name, set()).add(var_value) for key, values in sorted(env_vars.items()): if len(values) == 1: print(f"{key}: {next(iter(values))}") else: print( f"{key}: {', '.join(sorted(values))} (WARNING: Different values across ranks)" ) print() def _report_net_ib_info(self): """Parse and print NET/IB GPU Direct RDMA HCA information.""" print("===> NET/IB Info:\n") ib_rows = [] pattern_ib = re.compile(r"NET/IB\s+:\s+GPU Direct RDMA Enabled for HCA\s+(\d+)\s+'([^']+)'") for host, rank, content in self.log_entries: m = pattern_ib.search(content) if m: hca_no, hca_id = m.groups() ib_rows.append( { "host": host, "rank": rank, "hca_no": int(hca_no), "hca_id": hca_id, "gdr": 1, } ) if ib_rows: df_ib = pd.DataFrame(ib_rows) df_ib.drop_duplicates(inplace=True) df_ib.sort_values(by=["host", "rank", "hca_no", "hca_id"], inplace=True) df_ib = df_ib[["host", "rank", "hca_no", "hca_id", "gdr"]] print(df_ib.to_string(index=False)) print() else: print(" (No data found)\n") def _report_gdr_rw_info(self): """Parse and print GPU Direct RDMA read/write information.""" print("===> GDR R/W Info:\n") gpu_rows = [] pattern_gpu = re.compile( r"GPU Direct RDMA Enabled for GPU\s+(\S+)\s*/\s*" r"HCA\s+(\d+)\s*\(distance\s+([^)]*)\),\s*read\s+([01])" ) for host, rank, content in self.log_entries: m = pattern_gpu.search(content) if m: gpu, hca_no, distance, read_flag = m.groups() rw = "read" if read_flag == "1" else "write" distance_expr = distance.strip() # Split expressions like "4 <= 7" into distance and max_distance m_dist = re.match(r"^([+-]?\d+)\s*<=\s*([+-]?\d+)$", distance_expr) if m_dist: distance_val, max_distance = m_dist.groups() else: distance_val, max_distance = distance_expr, "-" gpu_rows.append( { "host": host, "rank": rank, "gpu": gpu, "hca_no": int(hca_no), "distance": distance_val, "max_distance": max_distance, "r/w": rw, } ) if gpu_rows: df_gpu = pd.DataFrame(gpu_rows) df_gpu.drop_duplicates(inplace=True) df_gpu.sort_values( by=["host", "rank", "gpu", "hca_no", "distance", "max_distance", "r/w"], inplace=True, ) df_gpu = df_gpu[["host", "rank", "gpu", "hca_no", "distance", "max_distance", "r/w"]] print(df_gpu.to_string(index=False)) print() else: print(" (No data found)\n") def _extract_and_print( self, title, filter_func, fields, mandatory, verbose_cols, sort_cols, move_rank=True ): """ Generic function to extract structured data from log lines and print as a table. This function handles the common workflow for tabular report sections like (Graph Info, Ring/Tree Transfers, P2P Transfers). Does NOT apply to free-form sections like System Information or User-defined Environment Variables. Workflow: 1. Filter relevant log lines 2. Extract fields using regex patterns with validation 3. Clean and validate the data 4. Reorder columns for readability 5. Sort and print the table Args: title: Section title to display (e.g., "Graph Info") filter_func: Function to filter relevant log lines (content -> bool) fields: Dict of {pattern: (col_name, value_pattern)} for field extraction - pattern: Regex pattern to match the field key (e.g., r"protocol") - col_name: Name of the DataFrame column - value_pattern: Regex pattern to validate/extract the field value mandatory: List of column names that must not be NaN (drop rows missing these) verbose_cols: List of column names to keep when not verbose sort_cols: List of column names to sort by (in order) move_rank: If True, move "rank" column to front and "protocol" to second if present """ print(f"===> {title}:\n") # Filter relevant log lines using the provided filter function data = [(h, r, c) for h, r, c in self.log_entries if filter_func(c)] if not data: print(" (No data found)\n") return # Create DataFrame and extract all fields using regex with validation df = pd.DataFrame(data, columns=["host", "rank", "raw_log"]) for pattern, (col_name, val_pattern) in fields.items(): # Extract field with strict value validation using word boundary df[col_name] = df["raw_log"].str.extract( rf"\b{pattern}\s+({val_pattern})", expand=False ) # Drop verbose columns if not verbose if not self._verbose: df = df.drop(columns=verbose_cols, errors="ignore") # Convert numeric fields to appropriate types numeric_columns = [ "Pattern", "nbytes", "nchannels", "local", "send", "recv", "p2pnChannelsPerPeer", "p2pnChannels", "nChannelsMax", "crossNic", "nChannels", "sameChannels", "slicesteps", "nloops", "nsteps", "chunksize", ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") # Clean data - drop invalid rows and duplicates # Only keep columns that actually exist in the DataFrame mandatory = [c for c in mandatory if c in df.columns] df.dropna(subset=mandatory, inplace=True) # Remove rows missing mandatory fields df.drop(columns=["raw_log"], inplace=True) # No longer need raw log df.drop_duplicates(inplace=True) # Deduplicate identical records if df.empty: print(" (No valid data found)\n") return # Reorder columns for better readability if move_rank: target_order = ["host", "rank", "protocol"] leading_cols = [c for c in target_order if c in df.columns] remaining_cols = [c for c in df.columns if c not in leading_cols] df = df[leading_cols + remaining_cols] # Sort the data sort_cols = [c for c in sort_cols if c in df.columns] if "host" in df.columns and "host" not in sort_cols: sort_cols.insert(0, "host") if sort_cols: df.sort_values(by=sort_cols, inplace=True) # Format integer columns to avoid trailing .0 for col in numeric_columns: if col in df.columns: df[col] = df[col].apply(lambda x: str(int(x)) if pd.notna(x) else x) # Print the final table with NaN values replaced by "-" print(df.fillna("-").to_string(index=False)) print() def _report_graph_info(self): # Pattern -> column with strict validation graph_info_fields = { r"Pattern": ("Pattern", r"\d+"), r"crossNic": ("crossNic", r"\d+"), r"nChannels": ("nChannels", r"\d+"), r"bw": ("bandwidth", r"[\d.]+/[\d.]+"), r"type": ("type", r"[\w/]+"), r"sameChannels": ("sameChannels", r"\d+"), } self._extract_and_print( title="Graph Info", filter_func=lambda c: "Pattern" in c and "crossNic" in c, fields=graph_info_fields, mandatory=["Pattern"], verbose_cols=["host", "rank"], sort_cols=["host", "rank", "Pattern"], ) def _report_collective_transfers(self): # Pattern -> column with strict validation cl_transfer_fields = { r"protocol": ("protocol", r"Simple|LL|LL128"), r"nbytes": ("nbytes", r"\d+"), r"algorithm": ("algorithm", r"Tree|Ring"), r"slicesteps": ("slicesteps", r"\d+"), r"nchannels": ("nchannels", r"\d+"), r"nloops": ("nloops", r"\d+"), r"nsteps": ("nsteps", r"\d+"), r"chunksize": ("chunksize", r"\d+"), } self._extract_and_print( title="Unique Ring/Tree Transfers", filter_func=lambda c: "protocol" in c and "nbytes" in c, fields=cl_transfer_fields, mandatory=["protocol", "nbytes"], verbose_cols=["host", "rank"], sort_cols=["host", "rank", "nbytes", "protocol", "nchannels"], ) def _report_p2p_transfers(self): # Pattern -> column with strict validation p2p_fields = { r"p2p : rank": ("local", r"\d+"), r"send rank": ("send", r"\d+"), r"recv rank": ("recv", r"\d+"), r"p2pnChannelsPerPeer": ("p2pnChannelsPerPeer", r"\d+"), r"p2pnChannels": ("p2pnChannels", r"\d+"), r"nChannelsMax": ("nChannelsMax", r"\d+"), r"protocol": ("protocol", r"Simple|LL|LL128"), } self._extract_and_print( title="Unique P2P Transfers", filter_func=lambda c: "p2p :" in c and "send rank" in c, fields=p2p_fields, mandatory=["local", "send", "recv"], verbose_cols=["host", "rank", "local", "send", "recv"], sort_cols=["host", "rank", "protocol", "local", "send", "recv"], ) def _report_channel_transport_info(self): print("===> Channel Transport Info:\n") if not self._verbose: print(" (Skipped because verbose mode is not enabled)") print() return data = [] # Match pattern: Channel 00/0 : 2[5d000] -> 1[56000] [send] via NET/IB/6/GDRDMA # Group 1: channel (e.g., 00/0) # Group 2: src (e.g., 2) # Group 3: dst (e.g., 1) # Group 4: type (e.g., send or receive, optional) # Group 5: transport (e.g., P2P/IPC, NET/IB/6/GDRDMA) pattern = re.compile( r"Channel\s+(\d+/\d+)\s+:\s+(\d+)\[.*?\]\s+->\s+(\d+)\[.*?\]" r"(?: \[(\w+)\])?\s+via\s+([\w/]+)" ) for host, rank, content in self.log_entries: m = pattern.search(content) if m: channel, src, dst, type_, transport = m.groups() data.append( { "host": host, "rank": rank, "channel": channel, "sender": int(src), "receiver": int(dst), "type": type_ if type_ else "-", "transport": transport, } ) if not data: print(" (No data found)\n") return df = pd.DataFrame(data) df.drop_duplicates(inplace=True) df.sort_values(by=["host", "rank", "channel", "sender", "receiver"], inplace=True) print(df.to_string(index=False)) print() def _report_aggregated_info( self, title: str, filter_func, patterns: list, accumulate_fields: set, col_order: list, ): """ Generic method for sections where multiple log lines contribute different fields to a single per-(host, rank) record. Unlike _extract_and_print (one line → all fields), this aggregates across lines: each line may fill one field of the record. Args: title: Section title. filter_func: Pre-filter for log content (content -> bool). patterns: List of (search_pattern, field, group_idx, literal). accumulate_fields: Fields that collect values across multiple lines (stored as set). col_order: Preferred column display order. """ print(f"===> {title}:\n") records: dict[tuple, dict] = {} for host, rank, content in self.log_entries: if not filter_func(content): continue for pattern, field, group_idx, literal in patterns: m = re.search(pattern, content, re.IGNORECASE) if m: key = (host, rank) rec = records.setdefault(key, {"host": host, "rank": rank}) value = (literal if group_idx is None else m.group(group_idx).strip()) or "-" if field in accumulate_fields: rec.setdefault(field, set()).add(value) else: rec[field] = value break # each line matches at most one pattern if not records: print(" (No data found)\n") return # Flatten accumulated sets → sorted string for rec in records.values(): for field in accumulate_fields: if field in rec: rec[field] = " | ".join(sorted(rec[field])) df = pd.DataFrame(list(records.values())) df.sort_values(by=["host", "rank"], inplace=True) ordered = [c for c in col_order if c in df.columns] remaining = [c for c in df.columns if c not in ordered] df = df[ordered + remaining] print(df.fillna("-").to_string(index=False)) print() def _report_topo_mapping_info(self): # (search_pattern, field_name, capture_group_index_or_None, literal_value_or_None) # - capture_group_index: int → get regex group, None → use literal_value # - accumulate: True → this field may come from multiple lines, append rather than overwrite topo_mapping_patterns = [ (r"No topo mapping file", "status", None, "no_file"), (r"environmental key word is (\S+)", "fingerprint", 1, None), (r"Loading topology mapping file (\S+)", "loaded", 1, None), (r"(?:parseing|parsing) topology mapping group[:\s]*(.*)", "parsed", 1, None), (r"skip topology mapping group:\s*([^,]+)", "skipped", 1, None), ] # Fields that should accumulate across multiple matching lines (per host/rank) topo_mapping_accumulate_fields = {"skipped", "parsed"} self._report_aggregated_info( title="Topology Mapping File Info", filter_func=lambda c: any( s in c.lower() for s in ("topo mapping", "topology mapping", "environmental key word") ), patterns=topo_mapping_patterns, accumulate_fields=topo_mapping_accumulate_fields, col_order=["host", "rank", "status", "fingerprint", "loaded", "parsed", "skipped"], )