"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "6a0e67ed781497289d2f12c156f723ebf874b8aa"
cli.py 5.75 KB
Newer Older
1
2
3
4
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Example cli using the Python bindings, similar to `dynamo-run`.
5
#
6
# Usage: `python cli.py in=text out=echo <your-model>`.
7
8
9
10
11
12
13
14
15
# `in` can be:
# - "http": OpenAI compliant HTTP server
# - "text": Interactive text chat
# - "batch:<file.jsonl>": Run all the prompts in the JSONL file, write out to a jsonl in current dir.
# - "stdin": Allows you to pipe something in: `echo prompt | python cli.py in=stdin out=...`
# - "dyn://name": Connect to nats/etcd and listen for requests from frontend.
#
# `out` can be:
# - "dyn": Run as the frontend node. Auto-discover workers and route traffic to them.
16
# - "sglang", "vllm", "trtllm", "echo": An LLM worker.
17
#
18
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.
19
#
20
21
# There is no provided llama.cpp engine here, but there is one in components/llama_cpp/. It would be
# easy enough to copy the few Python lines from there to here and add an `out=llama_cpp`.
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

import argparse
import asyncio
import sys
from pathlib import Path

import uvloop

from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime


def parse_args():
    in_mode = "text"
    out_mode = "echo"
    batch_file = None  # Specific to in_mode="batch"

    # List to hold arguments that argparse will process (flags and model path)
    argparse_args = []

    # --- Step 1: Manual Pre-parsing for 'in=' and 'out=' ---
    # Iterate through sys.argv[1:] to extract in= and out=
    # and collect remaining arguments for argparse.
    for arg in sys.argv[1:]:
        if arg.startswith("in="):
            in_val = arg[len("in=") :]
            if in_val.startswith("batch:"):
                in_mode = "batch"
                batch_file = in_val[len("batch:") :]
            else:
                in_mode = in_val
        elif arg.startswith("out="):
            out_mode = arg[len("out=") :]
        else:
            # This argument is not 'in=' or 'out=', so it's either a flag or the model path
            argparse_args.append(arg)

    # --- Step 2: Argparse for flags and the model path ---
    parser = argparse.ArgumentParser(
61
        description="Dynamo example CLI: Connect inputs to an engine",
62
        usage="python cli.py in=text out=echo <your-model>",
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        formatter_class=argparse.RawTextHelpFormatter,  # To preserve multi-line help formatting
    )

    # model_name: Option<String>
    parser.add_argument("--model-name", type=str, help="Name of the model to load.")
    # model_config: Option<PathBuf>
    parser.add_argument(
        "--model-config", type=Path, help="Path to the model configuration file."
    )
    # context_length: Option<u32>
    parser.add_argument(
        "--context-length", type=int, help="Maximum context length for the model (u32)."
    )
    # template_file: Option<PathBuf>
    parser.add_argument(
        "--template-file",
        type=Path,
        help="Path to the template file for text generation.",
    )
    # kv_cache_block_size: Option<u32>
    parser.add_argument(
        "--kv-cache-block-size", type=int, help="KV cache block size (u32)."
    )
    # http_port: Option<u16>
    parser.add_argument("--http-port", type=int, help="HTTP port for the engine (u16).")

    # Add the positional model argument.
    # It's made optional (nargs='?') because its requirement depends on 'out_mode',
    # which is handled in post-parsing validation.
    parser.add_argument(
        "model",
        nargs="?",  # Make it optional for argparse, we'll validate manually
        help="Path to the model (e.g., Qwen/Qwen3-0.6B).\n" "Required unless out=dyn.",
    )

    # Parse the arguments that were not 'in=' or 'out='
    flags = parser.parse_args(argparse_args)

    # --- Step 3: Post-parsing Validation and Final Assignment ---

    # Validate 'batch' mode requires a file path
    if in_mode == "batch" and not batch_file:
        parser.error("Batch mode requires a file path: in=batch:FILE")

    # Validate model path requirement based on 'out_mode'
    if out_mode != "dyn" and flags.model is None:
        parser.error("Model path is required unless out=dyn.")

    # Consolidate all parsed arguments into a dictionary
    parsed_args = {
        "in_mode": in_mode,
        "out_mode": out_mode,
        "batch_file": batch_file,  # Will be None if in_mode is not "batch"
        "model_path": flags.model,
        "flags": flags,
    }

    return parsed_args


async def run():
    loop = asyncio.get_running_loop()
    runtime = DistributedRuntime(loop, False)

    args = parse_args()

    engine_type_map = {
        "echo": EngineType.Echo,
        "dyn": EngineType.Dynamic,
    }
    out_mode = args["out_mode"]
134

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    engine_type = engine_type_map.get(out_mode)
    if engine_type is None:
        print(f"Unsupported output type: {out_mode}")
        sys.exit(1)

    entrypoint_kwargs = {"model_path": args["model_path"]}

    flags = args["flags"]
    if flags.model_name is not None:
        entrypoint_kwargs["model_name"] = flags.model_name
    if flags.model_config is not None:
        entrypoint_kwargs["model_config"] = flags.model_config
    if flags.context_length is not None:
        entrypoint_kwargs["context_length"] = flags.context_length
    if flags.template_file is not None:
        entrypoint_kwargs["template_file"] = flags.template_file
    if flags.kv_cache_block_size is not None:
        entrypoint_kwargs["kv_cache_block_size"] = flags.kv_cache_block_size
    if flags.http_port is not None:
        entrypoint_kwargs["http_port"] = flags.http_port

    e = EntrypointArgs(engine_type, **entrypoint_kwargs)
    engine = await make_engine(runtime, e)
158

159
    await run_input(runtime, args["in_mode"], engine)
160
161
162
163


if __name__ == "__main__":
    uvloop.run(run())