runner.py 2.17 KB
Newer Older
one's avatar
one committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import subprocess
import sys

from .parser import RcclLogParser


def run_with_input(args, rank):
    """Handle all three input modes: stdin, files, or command execution"""
    log_prefix = f"[Rank {rank}]"
    cmd = args.command

    # Case 1: No command provided - check for stdin
    if not cmd:
        if not sys.stdin.isatty():
            return _process_stdin(args, rank)
        else:
            return None

    # Case 2: Check if first argument is an existing file (treat as log file)
    if os.path.isfile(cmd[0]):
        return _process_files(args, rank, log_prefix, cmd)

    # Case 3: Execute as command
    return _execute_command(args, rank, log_prefix, cmd)


def _process_stdin(args, rank):
    rccl_parser = RcclLogParser()
    for line in sys.stdin:
        if not args.summary:
            print(f"{line}", end="", flush=True)
        rccl_parser.collect(line)

    if rank == 0:
        rccl_parser.report(verbose=args.verbose)
    return 0


def _process_files(args, rank, log_prefix, filenames):
    rccl_parser = RcclLogParser()
    for filename in filenames:
        if not os.path.isfile(filename):
            print(f"{log_prefix} Error: File not found: {filename}")
            return 1
        with open(filename, encoding="utf-8", errors="replace") as f:
            for line in f:
                if not args.summary:
                    print(f"{line}", end="", flush=True)
                rccl_parser.collect(line)

    if rank == 0:
        rccl_parser.report(verbose=args.verbose)
    return 0


def _execute_command(args, rank, log_prefix, cmd):
    env = os.environ.copy()
    env["NCCL_DEBUG"] = "INFO"
    env["NCCL_DEBUG_SUBSYS"] = "ALL"

    print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}")

    parser = RcclLogParser()
    process = subprocess.Popen(
        cmd,
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
    )

    for line in process.stdout:
        if not args.summary:
            print(f"{line}", end="", flush=True)
        parser.collect(line)

    process.wait()

    if rank == 0:
        parser.report(verbose=args.verbose)

    return process.returncode