import os import subprocess import sys from .parser import RcclLogParser def run_with_input( *, command: list[str], summary: bool, verbose: bool, hosts: list[str] | None, ranks: list[str] | None, rank: int, ): """Handle all three input modes: stdin, files, or command execution""" log_prefix = f"[Rank {rank}]" # Case 1: No command provided - check for stdin if not command: if not sys.stdin.isatty(): return _process_stdin( summary=summary, verbose=verbose, hosts=hosts, ranks=ranks, rank=rank ) else: return None # Case 2: Check if first argument is an existing file (treat as log file) if os.path.isfile(command[0]): return _process_files( summary=summary, verbose=verbose, hosts=hosts, ranks=ranks, rank=rank, log_prefix=log_prefix, filenames=command, ) # Case 3: Execute as command return _execute_command( summary=summary, verbose=verbose, rank=rank, log_prefix=log_prefix, cmd=command ) def _process_stdin( *, summary: bool, verbose: bool, hosts: list[str] | None, ranks: list[str] | None, rank: int ): rccl_parser = RcclLogParser(verbose=verbose, hosts=hosts, ranks=ranks) for line in sys.stdin: if not summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report() return 0 def _process_files( *, summary: bool, verbose: bool, hosts: list[str] | None, ranks: list[str] | None, rank: int, log_prefix: str, filenames: list[str], ): rccl_parser = RcclLogParser(verbose=verbose, hosts=hosts, ranks=ranks) for filename in filenames: if not os.path.isfile(filename): print(f"{log_prefix} Error: File not found: {filename}") return 1 with open(filename, encoding="utf-8", errors="replace") as f: for line in f: if not summary: print(f"{line}", end="", flush=True) rccl_parser.collect(line) if rank == 0: rccl_parser.report() return 0 def _execute_command(*, summary: bool, verbose: bool, rank: int, log_prefix: str, cmd: list[str]): env = os.environ.copy() env["NCCL_DEBUG"] = "INFO" env["NCCL_DEBUG_SUBSYS"] = "ALL" print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}") parser = RcclLogParser() process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) if process.stdout is not None: for line in process.stdout: if not summary: print(f"{line}", end="", flush=True) parser.collect(line) process.wait() if rank == 0: parser.report(verbose=verbose) return process.returncode