"custom_nodes/comfyui-easy-use/README.md" did not exist on "57b0ad8e820e370e608810364d80d8212d2407e9"
runner.py 2.94 KB
Newer Older
one's avatar
one committed
1
2
3
4
5
6
7
import os
import subprocess
import sys

from .parser import RcclLogParser


one's avatar
one committed
8
9
10
11
12
13
14
15
16
def run_with_input(
    *,
    command: list[str],
    summary: bool,
    verbose: bool,
    hosts: list[str] | None,
    ranks: list[str] | None,
    rank: int,
):
one's avatar
one committed
17
18
19
20
    """Handle all three input modes: stdin, files, or command execution"""
    log_prefix = f"[Rank {rank}]"

    # Case 1: No command provided - check for stdin
one's avatar
one committed
21
    if not command:
one's avatar
one committed
22
        if not sys.stdin.isatty():
one's avatar
one committed
23
24
25
            return _process_stdin(
                summary=summary, verbose=verbose, hosts=hosts, ranks=ranks, rank=rank
            )
one's avatar
one committed
26
27
28
29
        else:
            return None

    # Case 2: Check if first argument is an existing file (treat as log file)
one's avatar
one committed
30
31
32
33
34
35
36
37
38
39
    if os.path.isfile(command[0]):
        return _process_files(
            summary=summary,
            verbose=verbose,
            hosts=hosts,
            ranks=ranks,
            rank=rank,
            log_prefix=log_prefix,
            filenames=command,
        )
one's avatar
one committed
40
41

    # Case 3: Execute as command
one's avatar
one committed
42
43
44
    return _execute_command(
        summary=summary, verbose=verbose, rank=rank, log_prefix=log_prefix, cmd=command
    )
one's avatar
one committed
45
46


one's avatar
one committed
47
48
49
50
def _process_stdin(
    *, summary: bool, verbose: bool, hosts: list[str] | None, ranks: list[str] | None, rank: int
):
    rccl_parser = RcclLogParser(verbose=verbose, hosts=hosts, ranks=ranks)
one's avatar
one committed
51
    for line in sys.stdin:
one's avatar
one committed
52
        if not summary:
one's avatar
one committed
53
54
55
56
            print(f"{line}", end="", flush=True)
        rccl_parser.collect(line)

    if rank == 0:
one's avatar
one committed
57
        rccl_parser.report()
one's avatar
one committed
58
59
60
    return 0


one's avatar
one committed
61
62
63
64
65
66
67
68
69
70
71
def _process_files(
    *,
    summary: bool,
    verbose: bool,
    hosts: list[str] | None,
    ranks: list[str] | None,
    rank: int,
    log_prefix: str,
    filenames: list[str],
):
    rccl_parser = RcclLogParser(verbose=verbose, hosts=hosts, ranks=ranks)
one's avatar
one committed
72
73
74
75
76
77
    for filename in filenames:
        if not os.path.isfile(filename):
            print(f"{log_prefix} Error: File not found: {filename}")
            return 1
        with open(filename, encoding="utf-8", errors="replace") as f:
            for line in f:
one's avatar
one committed
78
                if not summary:
one's avatar
one committed
79
80
81
82
                    print(f"{line}", end="", flush=True)
                rccl_parser.collect(line)

    if rank == 0:
one's avatar
one committed
83
        rccl_parser.report()
one's avatar
one committed
84
85
86
    return 0


one's avatar
one committed
87
def _execute_command(*, summary: bool, verbose: bool, rank: int, log_prefix: str, cmd: list[str]):
one's avatar
one committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
    env = os.environ.copy()
    env["NCCL_DEBUG"] = "INFO"
    env["NCCL_DEBUG_SUBSYS"] = "ALL"

    print(f"{log_prefix} [Wrapper] Running command: {' '.join(cmd)}")

    parser = RcclLogParser()
    process = subprocess.Popen(
        cmd,
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
    )

one's avatar
one committed
104
105
106
107
108
    if process.stdout is not None:
        for line in process.stdout:
            if not summary:
                print(f"{line}", end="", flush=True)
            parser.collect(line)
one's avatar
one committed
109
110
111
112

    process.wait()

    if rank == 0:
one's avatar
one committed
113
        parser.report(verbose=verbose)
one's avatar
one committed
114
115

    return process.returncode