main.py 3.77 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python3

import argparse
import os
import subprocess
import sys

from .parser import RcclLogParser


def get_mpi_rank():
    """
    Try to get Rank ID from common environment variables.
    If not found, return "0".
    """
    # Common MPI Rank environment variables
    rank_vars = [
        "OMPI_COMM_WORLD_RANK",  # OpenMPI
        "PMI_RANK",  # MPICH / MVAPICH
        "SLURM_PROCID",  # Slurm
        "RANK",  # General / Torch
    ]

    for var in rank_vars:
        if var in os.environ:
            return int(os.environ[var])
    return 0


def main():
    rank = get_mpi_rank()
    log_prefix = f"[Rank {rank}]"

    # Parse command line arguments
one's avatar
one committed
35
36
37
38
39
40
41
    parser = argparse.ArgumentParser(
        description="RCCL Log Parser Wrapper\n\n"
        "Usage modes:\n"
        "  1. Pipe input:    cat log.txt | xcl-lens\n"
        "  2. Read files:    xcl-lens log1.txt log2.txt\n"
        "  3. Wrap command:  xcl-lens ./all_reduce_perf",
        formatter_class=argparse.RawDescriptionHelpFormatter,
42
    )
one's avatar
one committed
43
    parser.add_argument("--summary", action="store_true", help="Print summary report only")
one's avatar
one committed
44
    parser.add_argument("-v", "--verbose", action="store_true", help="Print verbose reports")
45
    parser.add_argument(
one's avatar
one committed
46
        "command", nargs=argparse.REMAINDER, help="Executable to run, or log files to read"
47
48
49
50
51
52
    )

    args = parser.parse_args()

    cmd = args.command

one's avatar
one committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    # Case 1: No command provided - check for stdin
    if not cmd:
        if not sys.stdin.isatty():
            try:
                rccl_parser = RcclLogParser()
                for line in sys.stdin:
                    if not args.summary:
                        print(f"{line}", end="", flush=True)
                    rccl_parser.collect(line)

                if rank == 0:
                    rccl_parser.report(verbose=args.verbose)
                sys.exit(0)
            except KeyboardInterrupt:
                sys.exit(130)
        else:
            if rank == 0:
                parser.print_help()
            sys.exit(1)

    # Case 2: Check if first argument is an existing file (treat as log file)
    if os.path.isfile(cmd[0]):
        try:
            rccl_parser = RcclLogParser()
            for filename in cmd:
                if not os.path.isfile(filename):
                    print(f"{log_prefix} Error: File not found: {filename}")
                    sys.exit(1)
                with open(filename, encoding="utf-8", errors="replace") as f:
                    for line in f:
                        if not args.summary:
                            print(f"{line}", end="", flush=True)
                        rccl_parser.collect(line)

            if rank == 0:
                rccl_parser.report(verbose=args.verbose)
            sys.exit(0)
        except KeyboardInterrupt:
            sys.exit(130)
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

    # Get the environment variables
    env = os.environ.copy()

    # Inject RCCL environment variables
    env["NCCL_DEBUG"] = "INFO"
    env["NCCL_DEBUG_SUBSYS"] = "ALL"

    print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}")

    try:
        parser = RcclLogParser()
        process = subprocess.Popen(
            cmd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,
        )

        # Collect all output lines
        for line in process.stdout:
one's avatar
one committed
115
            if not args.summary:
116
117
118
119
120
121
                print(f"{line}", end="", flush=True)
            parser.collect(line)

        process.wait()

        if rank == 0:
one's avatar
one committed
122
            parser.report(verbose=args.verbose)
123
124
125
126
127
128
129
130
131
132
133

        sys.exit(process.returncode)
    except KeyboardInterrupt:
        sys.exit(130)
    except FileNotFoundError:
        print(f"{log_prefix} Error: Command not found: {cmd[0]}")
        sys.exit(1)


if __name__ == "__main__":
    main()