import os import subprocess import sys from .parser import RcclLogParser def run_with_input(args, rank): """Handle all three input modes: stdin, files, or command execution""" log_prefix = f"[Rank {rank}]" cmd = args.command # Case 1: No command provided - check for stdin if not cmd: if not sys.stdin.isatty(): return _process_stdin(args, rank) else: return None # Case 2: Check if first argument is an existing file (treat as log file) if os.path.isfile(cmd[0]): return _process_files(args, rank, log_prefix, cmd) # Case 3: Execute as command return _execute_command(args, rank, log_prefix, cmd) def _process_stdin(args, rank): rccl_parser = RcclLogParser() for line in sys.stdin: if not args.summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report(verbose=args.verbose) return 0 def _process_files(args, rank, log_prefix, filenames): rccl_parser = RcclLogParser() for filename in filenames: if not os.path.isfile(filename): print(f"{log_prefix} Error: File not found: {filename}") return 1 with open(filename, encoding="utf-8", errors="replace") as f: for line in f: if not args.summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report(verbose=args.verbose) return 0 def _execute_command(args, rank, log_prefix, cmd): env = os.environ.copy() env["NCCL_DEBUG"] = "INFO" env["NCCL_DEBUG_SUBSYS"] = "ALL" print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}") parser = RcclLogParser() process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) for line in process.stdout: if not args.summary: print(f"{line}", end="", flush=True) parser.collect(line) process.wait() if rank == 0: parser.report(verbose=args.verbose) return process.returncode