#!/usr/bin/env python3 import argparse import os import subprocess import sys from .parser import RcclLogParser def get_mpi_rank(): """ Try to get Rank ID from common environment variables. If not found, return "0". """ # Common MPI Rank environment variables rank_vars = [ "OMPI_COMM_WORLD_RANK", # OpenMPI "PMI_RANK", # MPICH / MVAPICH "SLURM_PROCID", # Slurm "RANK", # General / Torch ] for var in rank_vars: if var in os.environ: return int(os.environ[var]) return 0 def main(): rank = get_mpi_rank() log_prefix = f"[Rank {rank}]" # Parse command line arguments parser = argparse.ArgumentParser( description="RCCL Log Parser Wrapper\n\n" "Usage modes:\n" " 1. Pipe input: cat log.txt | xcl-lens\n" " 2. Read files: xcl-lens log1.txt log2.txt\n" " 3. Wrap command: xcl-lens ./all_reduce_perf", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--summary", action="store_true", help="Print summary report only") parser.add_argument("-v", "--verbose", action="store_true", help="Print verbose reports") parser.add_argument( "command", nargs=argparse.REMAINDER, help="Executable to run, or log files to read" ) args = parser.parse_args() cmd = args.command # Case 1: No command provided - check for stdin if not cmd: if not sys.stdin.isatty(): try: rccl_parser = RcclLogParser() for line in sys.stdin: if not args.summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report(verbose=args.verbose) sys.exit(0) except KeyboardInterrupt: sys.exit(130) else: if rank == 0: parser.print_help() sys.exit(1) # Case 2: Check if first argument is an existing file (treat as log file) if os.path.isfile(cmd[0]): try: rccl_parser = RcclLogParser() for filename in cmd: if not os.path.isfile(filename): print(f"{log_prefix} Error: File not found: {filename}") sys.exit(1) with open(filename, encoding="utf-8", errors="replace") as f: for line in f: if not args.summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report(verbose=args.verbose) sys.exit(0) except KeyboardInterrupt: sys.exit(130) # Get the environment variables env = os.environ.copy() # Inject RCCL environment variables env["NCCL_DEBUG"] = "INFO" env["NCCL_DEBUG_SUBSYS"] = "ALL" print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}") try: parser = RcclLogParser() process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) # Collect all output lines for line in process.stdout: if not args.summary: print(f"{line}", end="", flush=True) parser.collect(line) process.wait() if rank == 0: parser.report(verbose=args.verbose) sys.exit(process.returncode) except KeyboardInterrupt: sys.exit(130) except FileNotFoundError: print(f"{log_prefix} Error: Command not found: {cmd[0]}") sys.exit(1) if __name__ == "__main__": main()