Unverified Commit 2bf27924 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(python): Python bindings for the Dynamo CLI tools (#1799)

parent 3e3ff934
......@@ -1789,6 +1789,7 @@ dependencies = [
"dynamo-engine-mistralrs",
"dynamo-llm",
"dynamo-runtime",
"either",
"futures",
"futures-util",
"libc",
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Example cli using the Python bindings, similar to `dynamo-run`.
# Usage: `python cli.py in=text out=mistralrs <your-model>`.
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.
import argparse
import asyncio
import sys
from pathlib import Path
import uvloop
from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime
def parse_args():
in_mode = "text"
out_mode = "echo"
batch_file = None # Specific to in_mode="batch"
# List to hold arguments that argparse will process (flags and model path)
argparse_args = []
# --- Step 1: Manual Pre-parsing for 'in=' and 'out=' ---
# Iterate through sys.argv[1:] to extract in= and out=
# and collect remaining arguments for argparse.
for arg in sys.argv[1:]:
if arg.startswith("in="):
in_val = arg[len("in=") :]
if in_val.startswith("batch:"):
in_mode = "batch"
batch_file = in_val[len("batch:") :]
else:
in_mode = in_val
elif arg.startswith("out="):
out_mode = arg[len("out=") :]
else:
# This argument is not 'in=' or 'out=', so it's either a flag or the model path
argparse_args.append(arg)
# --- Step 2: Argparse for flags and the model path ---
parser = argparse.ArgumentParser(
description="Dynamo CLI: Connect inputs to an engine",
formatter_class=argparse.RawTextHelpFormatter, # To preserve multi-line help formatting
)
# model_name: Option<String>
parser.add_argument("--model-name", type=str, help="Name of the model to load.")
# model_config: Option<PathBuf>
parser.add_argument(
"--model-config", type=Path, help="Path to the model configuration file."
)
# context_length: Option<u32>
parser.add_argument(
"--context-length", type=int, help="Maximum context length for the model (u32)."
)
# template_file: Option<PathBuf>
parser.add_argument(
"--template-file",
type=Path,
help="Path to the template file for text generation.",
)
# kv_cache_block_size: Option<u32>
parser.add_argument(
"--kv-cache-block-size", type=int, help="KV cache block size (u32)."
)
# http_port: Option<u16>
parser.add_argument("--http-port", type=int, help="HTTP port for the engine (u16).")
# TODO: Not yet used here
parser.add_argument(
"--tensor-parallel-size",
type=int,
help="Tensor parallel size for the model (e.g., 4).",
)
# Add the positional model argument.
# It's made optional (nargs='?') because its requirement depends on 'out_mode',
# which is handled in post-parsing validation.
parser.add_argument(
"model",
nargs="?", # Make it optional for argparse, we'll validate manually
help="Path to the model (e.g., Qwen/Qwen3-0.6B).\n" "Required unless out=dyn.",
)
# Parse the arguments that were not 'in=' or 'out='
flags = parser.parse_args(argparse_args)
# --- Step 3: Post-parsing Validation and Final Assignment ---
# Validate 'batch' mode requires a file path
if in_mode == "batch" and not batch_file:
parser.error("Batch mode requires a file path: in=batch:FILE")
# Validate model path requirement based on 'out_mode'
if out_mode != "dyn" and flags.model is None:
parser.error("Model path is required unless out=dyn.")
# Consolidate all parsed arguments into a dictionary
parsed_args = {
"in_mode": in_mode,
"out_mode": out_mode,
"batch_file": batch_file, # Will be None if in_mode is not "batch"
"model_path": flags.model,
"flags": flags,
}
return parsed_args
async def run():
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, False)
args = parse_args()
engine_type_map = {
"echo": EngineType.Echo,
"mistralrs": EngineType.MistralRs,
"llamacpp": EngineType.LlamaCpp,
"dyn": EngineType.Dynamic,
}
out_mode = args["out_mode"]
engine_type = engine_type_map.get(out_mode)
if engine_type is None:
print(f"Unsupported output type: {out_mode}")
sys.exit(1)
# TODO: The "vllm", "sglang" and "trtllm" cases should call Python directly
entrypoint_kwargs = {"model_path": args["model_path"]}
flags = args["flags"]
if flags.model_name is not None:
entrypoint_kwargs["model_name"] = flags.model_name
if flags.model_config is not None:
entrypoint_kwargs["model_config"] = flags.model_config
if flags.context_length is not None:
entrypoint_kwargs["context_length"] = flags.context_length
if flags.template_file is not None:
entrypoint_kwargs["template_file"] = flags.template_file
if flags.kv_cache_block_size is not None:
entrypoint_kwargs["kv_cache_block_size"] = flags.kv_cache_block_size
if flags.http_port is not None:
entrypoint_kwargs["http_port"] = flags.http_port
e = EntrypointArgs(engine_type, **entrypoint_kwargs)
engine = await make_engine(runtime, e)
await run_input(runtime, args["in_mode"], engine)
if __name__ == "__main__":
uvloop.run(run())
......@@ -34,6 +34,7 @@ anyhow = { workspace = true }
async-openai = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
either = { workspace = true }
futures = { workspace = true }
libc = { workspace = true }
serde = { workspace = true }
......
......@@ -11,6 +11,7 @@ use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::CancellationToken;
mod flags;
use either::Either;
pub use flags::Flags;
mod opt;
pub use dynamo_llm::request_template::RequestTemplate;
......@@ -41,14 +42,19 @@ pub async fn run(
.kv_cache_block_size(flags.kv_cache_block_size)
// Only set if user provides. Usually loaded from tokenizer_config.json
.context_length(flags.context_length)
.http_port(flags.http_port)
.http_port(Some(flags.http_port))
.router_config(flags.router_config())
.request_template(flags.request_template.clone());
// If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
// If not, then the endpoint isn't exposed so we let LocalModel invent one.
let mut rt = Either::Left(runtime.clone());
if let Input::Endpoint(path) = &in_opt {
builder.endpoint_id(path.parse().with_context(|| path.clone())?);
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));
let distributed_runtime =
dynamo_runtime::DistributedRuntime::from_settings(runtime.clone()).await?;
rt = Either::Right(distributed_runtime);
};
let local_model = builder.build().await?;
......@@ -70,8 +76,7 @@ pub async fn run(
//
// Run in from an input
//
dynamo_llm::entrypoint::input::run_input(in_opt, runtime, engine_config).await?;
dynamo_llm::entrypoint::input::run_input(rt, in_opt, engine_config).await?;
// Allow engines to ask main thread to wait on an extra future.
// We use this to stop the vllm and sglang sub-process
......
This diff is collapsed.
......@@ -36,21 +36,26 @@ crate-type = ["cdylib", "rlib"]
[features]
default = []
block-manager = ["dynamo-llm/block-manager", "dep:dlpark"]
mistralrs = ["dep:dynamo-engine-mistralrs"]
llamacpp = ["dep:dynamo-engine-llamacpp"]
[dependencies]
dynamo-llm = { path = "../../llm" }
dynamo-runtime = { path = "../../runtime" }
dynamo-engine-mistralrs = { path = "../../engines/mistralrs", features = ["cuda"], optional = true }
dynamo-engine-llamacpp = { path = "../../engines/llamacpp", features = ["cuda", "dynamic-link"], optional = true }
anyhow = { version = "1" }
async-openai = { version = "0.29.0" }
async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
either = { version = "1.13", features = ["serde"] }
futures = { version = "0.3" }
once_cell = { version = "1.20.3" }
serde = { version = "1" }
serde_json = { version = "1.0.138" }
thiserror = { version = "2.0" }
tokio = { version = "1", features = ["full"] }
tokio = { version = "1.46.0", features = ["full"] }
tokio-stream = { version = "0" }
tokio-util = { version = "0.7" }
tracing = { version = "0" }
......
......@@ -46,6 +46,26 @@ uv pip install maturin
maturin develop --uv
```
5. Experimental: To allow using mistral.rs and llama.cpp via the bindings, build with feature flags:
```
maturin develop --features mistralrs,llamacpp --release
```
`--release` is optional. It builds slower but the resulting library is significantly faster.
See `examples/cli/cli.py` for usage.
They will both be built for CUDA by default. If you see a runtime error `CUDA_ERROR_STUB_LIBRARY` this is because
the stub `libcuda.so` is earlier on the library search path than the real libcuda. Try removing the `rpath` from the library:
```
patchelf --set-rpath '' _core.cpython-312-x86_64-linux-gnu.so
```
If you include the `llamacpp` feature flag, `libllama.so` and `libggml.so` (and family) will need to be available at runtime.
## Run Examples
### Prerequisite
......
......@@ -63,6 +63,8 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(llm::kv::compute_block_hash_for_seq_py, m)?)?;
m.add_function(wrap_pyfunction!(log_message, m)?)?;
m.add_function(wrap_pyfunction!(register_llm, m)?)?;
m.add_function(wrap_pyfunction!(llm::entrypoint::make_engine, m)?)?;
m.add_function(wrap_pyfunction!(llm::entrypoint::run_input, m)?)?;
m.add_class::<DistributedRuntime>()?;
m.add_class::<CancellationToken>()?;
......@@ -73,6 +75,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<EtcdClient>()?;
m.add_class::<AsyncResponseStream>()?;
m.add_class::<llm::disagg_router::DisaggregatedRouter>()?;
m.add_class::<llm::entrypoint::EntrypointArgs>()?;
m.add_class::<llm::entrypoint::EngineConfig>()?;
m.add_class::<llm::entrypoint::EngineType>()?;
m.add_class::<llm::kv::WorkerMetricsPublisher>()?;
m.add_class::<llm::model_card::ModelDeploymentCard>()?;
m.add_class::<llm::preprocessor::OAIChatPreprocessor>()?;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// This module provides a high-performance interface that bridges Python
/// applications with the Rust-powered Dynamo LLM runtime.
......@@ -41,6 +29,7 @@ use super::*;
pub mod backend;
pub mod block_manager;
pub mod disagg_router;
pub mod entrypoint;
pub mod kv;
pub mod model_card;
pub mod nats;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::fmt::Display;
use std::path::PathBuf;
use pyo3::{exceptions::PyException, prelude::*};
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::protocols::Endpoint as EndpointId;
#[pyclass(eq, eq_int)]
#[derive(Clone, Debug, PartialEq)]
#[repr(i32)]
pub enum EngineType {
Echo = 1,
MistralRs = 2,
LlamaCpp = 3,
Dynamic = 4,
}
#[pyclass]
#[derive(Clone, Debug)]
pub(crate) struct EntrypointArgs {
engine_type: EngineType,
model_path: Option<PathBuf>,
model_name: Option<String>,
model_config: Option<PathBuf>,
endpoint_id: Option<EndpointId>,
context_length: Option<u32>,
template_file: Option<PathBuf>,
//router_config: Option<RouterConfig>,
kv_cache_block_size: Option<u32>,
http_port: Option<u16>,
}
#[pymethods]
impl EntrypointArgs {
#[allow(clippy::too_many_arguments)]
#[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, model_config=None, endpoint_id=None, context_length=None, template_file=None, kv_cache_block_size=None, http_port=None))]
pub fn new(
engine_type: EngineType,
model_path: Option<PathBuf>,
model_name: Option<String>, // e.g. "dyn://namespace.component.endpoint"
model_config: Option<PathBuf>,
endpoint_id: Option<String>,
context_length: Option<u32>,
template_file: Option<PathBuf>,
//router_config: Option<RouterConfig>,
kv_cache_block_size: Option<u32>,
http_port: Option<u16>,
) -> PyResult<Self> {
let endpoint_id_obj: Option<EndpointId> = match endpoint_id {
Some(eid) => Some(eid.parse().map_err(|_| {
PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Invalid endpoint_id format: {eid}"
))
})?),
None => None,
};
Ok(EntrypointArgs {
engine_type,
model_path,
model_name,
model_config,
endpoint_id: endpoint_id_obj,
context_length,
template_file,
//router_config,
kv_cache_block_size,
http_port,
})
}
}
#[pyclass]
#[derive(Clone)]
pub(crate) struct EngineConfig {
inner: RsEngineConfig,
}
#[pyfunction]
#[pyo3(signature = (distributed_runtime, args))]
pub fn make_engine<'p>(
py: Python<'p>,
distributed_runtime: super::DistributedRuntime,
args: EntrypointArgs,
) -> PyResult<Bound<'p, PyAny>> {
let mut builder = LocalModelBuilder::default();
builder
.model_path(args.model_path)
.model_name(args.model_name)
.model_config(args.model_config)
.endpoint_id(args.endpoint_id)
.context_length(args.context_length)
.request_template(args.template_file)
.kv_cache_block_size(args.kv_cache_block_size)
.http_port(args.http_port);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let local_model = builder.build().await.map_err(to_pyerr)?;
let inner = select_engine(distributed_runtime, args.engine_type, local_model)
.await
.map_err(to_pyerr)?;
Ok(EngineConfig { inner })
})
}
async fn select_engine(
#[allow(unused_variables)] distributed_runtime: super::DistributedRuntime,
engine_type: EngineType,
local_model: LocalModel,
) -> anyhow::Result<RsEngineConfig> {
let inner = match engine_type {
EngineType::Echo => {
// There is no validation for the echo engine
RsEngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
}
}
EngineType::Dynamic => RsEngineConfig::Dynamic(Box::new(local_model)),
EngineType::MistralRs => {
#[cfg(feature = "mistralrs")]
{
RsEngineConfig::StaticFull {
engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
model: Box::new(local_model),
}
}
#[cfg(not(feature = "mistralrs"))]
{
anyhow::bail!(
"mistralrs engine is not enabled. Rebuild bindings with `--features mistralrs`"
);
}
}
EngineType::LlamaCpp => {
#[cfg(feature = "llamacpp")]
{
RsEngineConfig::StaticCore {
engine: dynamo_engine_llamacpp::make_engine(
distributed_runtime.inner.primary_token(),
&local_model,
)
.await?,
model: Box::new(local_model),
}
}
#[cfg(not(feature = "llamacpp"))]
{
anyhow::bail!(
"llamacpp engine is not enabled. Rebuild bindings with `--features llamacpp`"
);
}
}
};
Ok(inner)
}
#[pyfunction]
#[pyo3(signature = (distributed_runtime, input, engine_config))]
pub fn run_input<'p>(
py: Python<'p>,
distributed_runtime: super::DistributedRuntime,
input: &str,
engine_config: EngineConfig,
) -> PyResult<Bound<'p, PyAny>> {
let input_enum: Input = input.parse().map_err(to_pyerr)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
dynamo_llm::entrypoint::input::run_input(
either::Either::Right(distributed_runtime.inner.clone()),
input_enum,
engine_config.inner,
)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
pub fn to_pyerr<E>(err: E) -> PyErr
where
E: Display,
{
PyException::new_err(format!("{}", err))
}
......@@ -839,6 +839,18 @@ async def register_llm(model_type: ModelType, endpoint: Endpoint, model_path: st
"""Attach the model at path to the given endpoint, and advertise it as model_type"""
...
class EngineConfig:
"""Holds internal configuration for a Dynamo engine."""
...
async def make_engine(args: EntrypointArgs) -> EngineConfig:
"""Make an engine matching the args"""
...
async def run_input(runtime: DistributedRuntime, input: str, engine_config: EngineConfig) -> None:
"""Start an engine, connect it to an input, and run until stopped."""
...
class NatsQueue:
"""
A queue implementation using NATS JetStream for task distribution
......@@ -1144,3 +1156,11 @@ class ZmqKvEventListener:
ValueError: If events cannot be serialized to JSON
"""
...
class EntrypointArgs:
"""
Settings to connect an input to a worker and run them.
Use by `dynamo run`.
"""
...
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# flake8: noqa
import logging
......@@ -24,6 +14,8 @@ except ImportError:
from dynamo._core import ApproxKvIndexer as ApproxKvIndexer
from dynamo._core import DisaggregatedRouter as DisaggregatedRouter
from dynamo._core import EngineType
from dynamo._core import EntrypointArgs as EntrypointArgs
from dynamo._core import ForwardPassMetrics as ForwardPassMetrics
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpError as HttpError
......@@ -43,7 +35,9 @@ from dynamo._core import ZmqKvEventListener as ZmqKvEventListener
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
from dynamo._core import compute_block_hash_for_seq_py as compute_block_hash_for_seq_py
from dynamo._core import make_engine
from dynamo._core import register_llm as register_llm
from dynamo._core import run_input
try:
from dynamo.llm.tensorrtllm import ( # noqa: F401
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "dynamo-engine-llamacpp"
......@@ -30,6 +18,8 @@ cuda = ["llama-cpp-2/cuda"]
metal = ["llama-cpp-2/metal"]
vulkan = ["llama-cpp-2/vulkan"]
openmp = ["llama-cpp-2/openmp"]
# We cannot link libllama into a `.so`, so the bindings need this
dynamic-link = ["llama-cpp-2/dynamic-link"]
[dependencies]
dynamo-runtime = { workspace = true }
......
......@@ -31,6 +31,7 @@ impl RouterConfig {
}
}
#[derive(Clone)]
pub enum EngineConfig {
/// Remote networked engines
Dynamic(Box<LocalModel>),
......
......@@ -11,6 +11,7 @@ use std::{
fmt,
io::{IsTerminal as _, Read as _},
path::PathBuf,
str::FromStr,
};
pub mod batch;
......@@ -19,7 +20,8 @@ pub mod endpoint;
pub mod http;
pub mod text;
use dynamo_runtime::{protocols::ENDPOINT_SCHEME, DistributedRuntime};
use dynamo_runtime::protocols::ENDPOINT_SCHEME;
use either::Either;
const BATCH_PREFIX: &str = "batch:";
......@@ -42,6 +44,14 @@ pub enum Input {
Batch(PathBuf),
}
impl FromStr for Input {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Input::try_from(s)
}
}
impl TryFrom<&str> for Input {
type Error = anyhow::Error;
......@@ -87,28 +97,36 @@ impl Default for Input {
/// Run the given engine (EngineConfig) connected to an input.
/// Does not return until the input exits.
/// For Input::Endpoint pass a DistributedRuntime. For everything else pass either a Runtime or a
/// DistributedRuntime.
pub async fn run_input(
rt: Either<dynamo_runtime::Runtime, dynamo_runtime::DistributedRuntime>,
in_opt: Input,
runtime: dynamo_runtime::Runtime,
engine_config: super::EngineConfig,
) -> anyhow::Result<()> {
let runtime = match &rt {
Either::Left(rt) => rt.clone(),
Either::Right(drt) => drt.runtime().clone(),
};
match in_opt {
Input::Http => {
http::run(runtime.clone(), engine_config).await?;
http::run(runtime, engine_config).await?;
}
Input::Text => {
text::run(runtime.clone(), None, engine_config).await?;
text::run(runtime, None, engine_config).await?;
}
Input::Stdin => {
let mut prompt = String::new();
std::io::stdin().read_to_string(&mut prompt).unwrap();
text::run(runtime.clone(), Some(prompt), engine_config).await?;
text::run(runtime, Some(prompt), engine_config).await?;
}
Input::Batch(path) => {
batch::run(runtime.clone(), path, engine_config).await?;
batch::run(runtime, path, engine_config).await?;
}
Input::Endpoint(path) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let Either::Right(distributed_runtime) = rt else {
anyhow::bail!("Input::Endpoint requires passing a DistributedRuntime");
};
endpoint::run(distributed_runtime, path, engine_config).await?;
}
}
......
......@@ -80,8 +80,8 @@ impl LocalModelBuilder {
self
}
pub fn endpoint_id(&mut self, endpoint_id: EndpointId) -> &mut Self {
self.endpoint_id = Some(endpoint_id);
pub fn endpoint_id(&mut self, endpoint_id: Option<EndpointId>) -> &mut Self {
self.endpoint_id = endpoint_id;
self
}
......@@ -96,8 +96,9 @@ impl LocalModelBuilder {
self
}
pub fn http_port(&mut self, port: u16) -> &mut Self {
self.http_port = port;
/// Passing None resets it to default
pub fn http_port(&mut self, port: Option<u16>) -> &mut Self {
self.http_port = port.unwrap_or(DEFAULT_HTTP_PORT);
self
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment