Commit 35b309ba authored by one's avatar one
Browse files

[xcl-lens] Refactor main.py

parent 6e26d46d
#!/usr/bin/env python3
import argparse
import os
import subprocess
import sys
from .parser import RcclLogParser
from .runner import run_with_input
from .utils import get_mpi_rank
def get_mpi_rank():
"""
Try to get Rank ID from common environment variables.
If not found, return "0".
"""
# Common MPI Rank environment variables
rank_vars = [
"OMPI_COMM_WORLD_RANK", # OpenMPI
"PMI_RANK", # MPICH / MVAPICH
"SLURM_PROCID", # Slurm
"RANK", # General / Torch
]
for var in rank_vars:
if var in os.environ:
return int(os.environ[var])
return 0
def main():
rank = get_mpi_rank()
log_prefix = f"[Rank {rank}]"
# Parse command line arguments
def create_parser():
parser = argparse.ArgumentParser(
description="RCCL Log Parser Wrapper\n\n"
"Usage modes:\n"
......@@ -45,87 +21,25 @@ def main():
parser.add_argument(
"command", nargs=argparse.REMAINDER, help="Executable to run, or log files to read"
)
return parser
args = parser.parse_args()
cmd = args.command
# Case 1: No command provided - check for stdin
if not cmd:
if not sys.stdin.isatty():
try:
rccl_parser = RcclLogParser()
for line in sys.stdin:
if not args.summary:
print(f"{line}", end="", flush=True)
rccl_parser.collect(line)
def main():
rank = get_mpi_rank()
if rank == 0:
rccl_parser.report(verbose=args.verbose)
sys.exit(0)
except KeyboardInterrupt:
sys.exit(130)
else:
if rank == 0:
parser.print_help()
sys.exit(1)
parser = create_parser()
args = parser.parse_args()
# Case 2: Check if first argument is an existing file (treat as log file)
if os.path.isfile(cmd[0]):
try:
rccl_parser = RcclLogParser()
for filename in cmd:
if not os.path.isfile(filename):
print(f"{log_prefix} Error: File not found: {filename}")
sys.exit(1)
with open(filename, encoding="utf-8", errors="replace") as f:
for line in f:
if not args.summary:
print(f"{line}", end="", flush=True)
rccl_parser.collect(line)
if rank == 0:
rccl_parser.report(verbose=args.verbose)
sys.exit(0)
exit_code = run_with_input(args, rank)
if exit_code is not None:
sys.exit(exit_code)
except KeyboardInterrupt:
sys.exit(130)
# Get the environment variables
env = os.environ.copy()
# Inject RCCL environment variables
env["NCCL_DEBUG"] = "INFO"
env["NCCL_DEBUG_SUBSYS"] = "ALL"
print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}")
try:
parser = RcclLogParser()
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
# Collect all output lines
for line in process.stdout:
if not args.summary:
print(f"{line}", end="", flush=True)
parser.collect(line)
process.wait()
# If we got here, no command was provided and stdin is a tty
if rank == 0:
parser.report(verbose=args.verbose)
sys.exit(process.returncode)
except KeyboardInterrupt:
sys.exit(130)
except FileNotFoundError:
print(f"{log_prefix} Error: Command not found: {cmd[0]}")
parser.print_help()
sys.exit(1)
......
import os
import subprocess
import sys
from .parser import RcclLogParser
def run_with_input(args, rank):
"""Handle all three input modes: stdin, files, or command execution"""
log_prefix = f"[Rank {rank}]"
cmd = args.command
# Case 1: No command provided - check for stdin
if not cmd:
if not sys.stdin.isatty():
return _process_stdin(args, rank)
else:
return None
# Case 2: Check if first argument is an existing file (treat as log file)
if os.path.isfile(cmd[0]):
return _process_files(args, rank, log_prefix, cmd)
# Case 3: Execute as command
return _execute_command(args, rank, log_prefix, cmd)
def _process_stdin(args, rank):
rccl_parser = RcclLogParser()
for line in sys.stdin:
if not args.summary:
print(f"{line}", end="", flush=True)
rccl_parser.collect(line)
if rank == 0:
rccl_parser.report(verbose=args.verbose)
return 0
def _process_files(args, rank, log_prefix, filenames):
rccl_parser = RcclLogParser()
for filename in filenames:
if not os.path.isfile(filename):
print(f"{log_prefix} Error: File not found: {filename}")
return 1
with open(filename, encoding="utf-8", errors="replace") as f:
for line in f:
if not args.summary:
print(f"{line}", end="", flush=True)
rccl_parser.collect(line)
if rank == 0:
rccl_parser.report(verbose=args.verbose)
return 0
def _execute_command(args, rank, log_prefix, cmd):
env = os.environ.copy()
env["NCCL_DEBUG"] = "INFO"
env["NCCL_DEBUG_SUBSYS"] = "ALL"
print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}")
parser = RcclLogParser()
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
for line in process.stdout:
if not args.summary:
print(f"{line}", end="", flush=True)
parser.collect(line)
process.wait()
if rank == 0:
parser.report(verbose=args.verbose)
return process.returncode
import os
def get_mpi_rank():
"""
Try to get Rank ID from common environment variables.
If not found, return "0".
"""
rank_vars = [
"OMPI_COMM_WORLD_RANK",
"PMI_RANK",
"SLURM_PROCID",
"RANK",
]
for var in rank_vars:
if var in os.environ:
return int(os.environ[var])
return 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment