"vllm/vscode:/vscode.git/clone" did not exist on "8a7fe47d322920bdff1b1c3472fe7f423a73a23b"
cli.py 8.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Example cli using the Python bindings, similar to `dynamo-run`.
# Usage: `python cli.py in=text out=mistralrs <your-model>`.
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.

import argparse
import asyncio
10
import signal
11
12
13
14
15
16
17
18
import sys
from pathlib import Path

import uvloop

from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime

19
20
21
subprocess_ref = None  # Global process reference for cleanup
subprocess_task = None  # Global async task reference for cleanup

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

def parse_args():
    in_mode = "text"
    out_mode = "echo"
    batch_file = None  # Specific to in_mode="batch"

    # List to hold arguments that argparse will process (flags and model path)
    argparse_args = []

    # --- Step 1: Manual Pre-parsing for 'in=' and 'out=' ---
    # Iterate through sys.argv[1:] to extract in= and out=
    # and collect remaining arguments for argparse.
    for arg in sys.argv[1:]:
        if arg.startswith("in="):
            in_val = arg[len("in=") :]
            if in_val.startswith("batch:"):
                in_mode = "batch"
                batch_file = in_val[len("batch:") :]
            else:
                in_mode = in_val
        elif arg.startswith("out="):
            out_mode = arg[len("out=") :]
        else:
            # This argument is not 'in=' or 'out=', so it's either a flag or the model path
            argparse_args.append(arg)

    # --- Step 2: Argparse for flags and the model path ---
    parser = argparse.ArgumentParser(
        description="Dynamo CLI: Connect inputs to an engine",
        formatter_class=argparse.RawTextHelpFormatter,  # To preserve multi-line help formatting
    )

    # model_name: Option<String>
    parser.add_argument("--model-name", type=str, help="Name of the model to load.")
    # model_config: Option<PathBuf>
    parser.add_argument(
        "--model-config", type=Path, help="Path to the model configuration file."
    )
    # context_length: Option<u32>
    parser.add_argument(
        "--context-length", type=int, help="Maximum context length for the model (u32)."
    )
    # template_file: Option<PathBuf>
    parser.add_argument(
        "--template-file",
        type=Path,
        help="Path to the template file for text generation.",
    )
    # kv_cache_block_size: Option<u32>
    parser.add_argument(
        "--kv-cache-block-size", type=int, help="KV cache block size (u32)."
    )
    # http_port: Option<u16>
    parser.add_argument("--http-port", type=int, help="HTTP port for the engine (u16).")

    # TODO: Not yet used here
    parser.add_argument(
        "--tensor-parallel-size",
        type=int,
        help="Tensor parallel size for the model (e.g., 4).",
    )

    # Add the positional model argument.
    # It's made optional (nargs='?') because its requirement depends on 'out_mode',
    # which is handled in post-parsing validation.
    parser.add_argument(
        "model",
        nargs="?",  # Make it optional for argparse, we'll validate manually
        help="Path to the model (e.g., Qwen/Qwen3-0.6B).\n" "Required unless out=dyn.",
    )

    # Parse the arguments that were not 'in=' or 'out='
    flags = parser.parse_args(argparse_args)

    # --- Step 3: Post-parsing Validation and Final Assignment ---

    # Validate 'batch' mode requires a file path
    if in_mode == "batch" and not batch_file:
        parser.error("Batch mode requires a file path: in=batch:FILE")

    # Validate model path requirement based on 'out_mode'
    if out_mode != "dyn" and flags.model is None:
        parser.error("Model path is required unless out=dyn.")

    # Consolidate all parsed arguments into a dictionary
    parsed_args = {
        "in_mode": in_mode,
        "out_mode": out_mode,
        "batch_file": batch_file,  # Will be None if in_mode is not "batch"
        "model_path": flags.model,
        "flags": flags,
    }

    return parsed_args


118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def cleanup_subprocess_async():
    """Clean up the sglang/vllm/trtllm subprocess if it exists."""
    global subprocess_ref
    if subprocess_ref and subprocess_ref.returncode is None:
        subprocess_ref.terminate()
        try:
            await asyncio.wait_for(subprocess_ref.wait(), timeout=2)
        except asyncio.TimeoutError:
            subprocess_ref.kill()
            await subprocess_ref.wait()

        # Only cleanup once
        subprocess_ref = None


def signal_handler():
    """Handle signals in async context by cleaning up subprocess and exiting."""
    asyncio.create_task(cleanup_subprocess_async())
    sys.exit(0)


139
async def run():
140
141
142
143
    global subprocess_ref
    global subprocess_task

    # Register signal handlers
144
    loop = asyncio.get_running_loop()
145
146
147
148
149
150
    loop.add_signal_handler(signal.SIGINT, signal_handler)  # Ctrl-C
    loop.add_signal_handler(signal.SIGTERM, signal_handler)  # kill

    # If we find cases where subprocess does not stop we may need this. Seem OK so far.
    # atexit.register(cleanup_subprocess)

151
152
153
154
155
156
157
158
159
160
161
    runtime = DistributedRuntime(loop, False)

    args = parse_args()

    engine_type_map = {
        "echo": EngineType.Echo,
        "mistralrs": EngineType.MistralRs,
        "llamacpp": EngineType.LlamaCpp,
        "dyn": EngineType.Dynamic,
    }
    out_mode = args["out_mode"]
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213

    # Handle subprocess execution for sglang and vllm
    if out_mode in ["sglang", "vllm", "trtllm"]:
        # Determine which script to run
        script_name = f"{out_mode}_inc.py"
        script_path = Path(__file__).parent / script_name
        if not script_path.exists():
            print(f"Error: Script '{script_path}' not found")
            sys.exit(1)

        # Build command with all relevant arguments
        cmd = [sys.executable, str(script_path)]

        # Add arguments if they exist
        if args["model_path"]:
            cmd.extend(["--model-path", args["model_path"]])

        flags = args["flags"]
        if flags.model_name:
            cmd.extend(["--model-name", flags.model_name])
        if flags.context_length:
            cmd.extend(["--context-length", str(flags.context_length)])
        if flags.kv_cache_block_size:
            cmd.extend(["--kv-cache-block-size", str(flags.kv_cache_block_size)])

        # Start subprocess in background and stream output
        print(f"Starting {out_mode} subprocess: {' '.join(cmd)}")

        async def stream_subprocess_output():
            global subprocess_ref
            subprocess_ref = await asyncio.create_subprocess_exec(
                *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
            )

            try:
                if subprocess_ref.stdout is not None:
                    async for line in subprocess_ref.stdout:
                        print(f"Engine: {line.decode().rstrip()}")
                    await subprocess_ref.wait()
            except asyncio.CancelledError:
                # Task was cancelled, terminate the subprocess
                await cleanup_subprocess_async()
                raise

        task = asyncio.create_task(stream_subprocess_output())

        # Store the task reference for potential cleanup
        subprocess_task = task

        # Set out_mode to "dyn" because we talk to the subprocess over NATS
        out_mode = "dyn"

214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
    engine_type = engine_type_map.get(out_mode)
    if engine_type is None:
        print(f"Unsupported output type: {out_mode}")
        sys.exit(1)

    entrypoint_kwargs = {"model_path": args["model_path"]}

    flags = args["flags"]
    if flags.model_name is not None:
        entrypoint_kwargs["model_name"] = flags.model_name
    if flags.model_config is not None:
        entrypoint_kwargs["model_config"] = flags.model_config
    if flags.context_length is not None:
        entrypoint_kwargs["context_length"] = flags.context_length
    if flags.template_file is not None:
        entrypoint_kwargs["template_file"] = flags.template_file
    if flags.kv_cache_block_size is not None:
        entrypoint_kwargs["kv_cache_block_size"] = flags.kv_cache_block_size
    if flags.http_port is not None:
        entrypoint_kwargs["http_port"] = flags.http_port

    e = EntrypointArgs(engine_type, **entrypoint_kwargs)
    engine = await make_engine(runtime, e)
237
238
239
240
241
242
243
244
245
246
247
248
249
250

    try:
        await run_input(runtime, args["in_mode"], engine)
    finally:
        # Clean up subprocess when main execution finishes
        await cleanup_subprocess_async()

        # Cancel the subprocess task if it exists
        if subprocess_task:
            subprocess_task.cancel()
            try:
                await subprocess_task
            except asyncio.CancelledError:
                pass
251
252
253
254


if __name__ == "__main__":
    uvloop.run(run())