cli.py 5.31 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Example cli using the Python bindings, similar to `dynamo-run`.
# Usage: `python cli.py in=text out=mistralrs <your-model>`.
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.

import argparse
import asyncio
import sys
from pathlib import Path

import uvloop

from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime


def parse_args():
    in_mode = "text"
    out_mode = "echo"
    batch_file = None  # Specific to in_mode="batch"

    # List to hold arguments that argparse will process (flags and model path)
    argparse_args = []

    # --- Step 1: Manual Pre-parsing for 'in=' and 'out=' ---
    # Iterate through sys.argv[1:] to extract in= and out=
    # and collect remaining arguments for argparse.
    for arg in sys.argv[1:]:
        if arg.startswith("in="):
            in_val = arg[len("in=") :]
            if in_val.startswith("batch:"):
                in_mode = "batch"
                batch_file = in_val[len("batch:") :]
            else:
                in_mode = in_val
        elif arg.startswith("out="):
            out_mode = arg[len("out=") :]
        else:
            # This argument is not 'in=' or 'out=', so it's either a flag or the model path
            argparse_args.append(arg)

    # --- Step 2: Argparse for flags and the model path ---
    parser = argparse.ArgumentParser(
        description="Dynamo CLI: Connect inputs to an engine",
        formatter_class=argparse.RawTextHelpFormatter,  # To preserve multi-line help formatting
    )

    # model_name: Option<String>
    parser.add_argument("--model-name", type=str, help="Name of the model to load.")
    # model_config: Option<PathBuf>
    parser.add_argument(
        "--model-config", type=Path, help="Path to the model configuration file."
    )
    # context_length: Option<u32>
    parser.add_argument(
        "--context-length", type=int, help="Maximum context length for the model (u32)."
    )
    # template_file: Option<PathBuf>
    parser.add_argument(
        "--template-file",
        type=Path,
        help="Path to the template file for text generation.",
    )
    # kv_cache_block_size: Option<u32>
    parser.add_argument(
        "--kv-cache-block-size", type=int, help="KV cache block size (u32)."
    )
    # http_port: Option<u16>
    parser.add_argument("--http-port", type=int, help="HTTP port for the engine (u16).")

    # TODO: Not yet used here
    parser.add_argument(
        "--tensor-parallel-size",
        type=int,
        help="Tensor parallel size for the model (e.g., 4).",
    )

    # Add the positional model argument.
    # It's made optional (nargs='?') because its requirement depends on 'out_mode',
    # which is handled in post-parsing validation.
    parser.add_argument(
        "model",
        nargs="?",  # Make it optional for argparse, we'll validate manually
        help="Path to the model (e.g., Qwen/Qwen3-0.6B).\n" "Required unless out=dyn.",
    )

    # Parse the arguments that were not 'in=' or 'out='
    flags = parser.parse_args(argparse_args)

    # --- Step 3: Post-parsing Validation and Final Assignment ---

    # Validate 'batch' mode requires a file path
    if in_mode == "batch" and not batch_file:
        parser.error("Batch mode requires a file path: in=batch:FILE")

    # Validate model path requirement based on 'out_mode'
    if out_mode != "dyn" and flags.model is None:
        parser.error("Model path is required unless out=dyn.")

    # Consolidate all parsed arguments into a dictionary
    parsed_args = {
        "in_mode": in_mode,
        "out_mode": out_mode,
        "batch_file": batch_file,  # Will be None if in_mode is not "batch"
        "model_path": flags.model,
        "flags": flags,
    }

    return parsed_args


async def run():
    loop = asyncio.get_running_loop()
    runtime = DistributedRuntime(loop, False)

    args = parse_args()

    engine_type_map = {
        "echo": EngineType.Echo,
        "mistralrs": EngineType.MistralRs,
        "llamacpp": EngineType.LlamaCpp,
        "dyn": EngineType.Dynamic,
    }
    out_mode = args["out_mode"]
    engine_type = engine_type_map.get(out_mode)
    if engine_type is None:
        print(f"Unsupported output type: {out_mode}")
        sys.exit(1)

    # TODO: The "vllm", "sglang" and "trtllm" cases should call Python directly

    entrypoint_kwargs = {"model_path": args["model_path"]}

    flags = args["flags"]
    if flags.model_name is not None:
        entrypoint_kwargs["model_name"] = flags.model_name
    if flags.model_config is not None:
        entrypoint_kwargs["model_config"] = flags.model_config
    if flags.context_length is not None:
        entrypoint_kwargs["context_length"] = flags.context_length
    if flags.template_file is not None:
        entrypoint_kwargs["template_file"] = flags.template_file
    if flags.kv_cache_block_size is not None:
        entrypoint_kwargs["kv_cache_block_size"] = flags.kv_cache_block_size
    if flags.http_port is not None:
        entrypoint_kwargs["http_port"] = flags.http_port

    e = EntrypointArgs(engine_type, **entrypoint_kwargs)
    engine = await make_engine(runtime, e)
    await run_input(runtime, args["in_mode"], engine)


if __name__ == "__main__":
    uvloop.run(run())