Unverified Commit 99cd9d85 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: dynamo-run <-> python interop (#934)

Adding this to a Python script makes it register on the network so that `dynamo-run` can discover it and send it requests:
```
from dynamo.llm import register_llm

MODEL = "Qwen/Qwen2.5-0.5B-Instruct"
await register_llm(endpoint, MODEL, 3)
```

Full vllm example, with pre-processing in dynamo:
- `dynamo-run in=text out=dyn://dynamo.backend.generate`
- `cd lib/bindings/python/examples/hello_world`
- `python server_vllm.py`

This builds on top of the work to move pre-processor to ingress side. It means we can decouple Rust and Python using NATS as the bus.

The `register_llm` call does this:

- Download the model from HF if necessary
- Load the model deployment card from the HF folder or extract from GGUF
- Push the tokenizer config etc into NATS object store so ingress can access it from a different machine
- Publish the model deployment card to ETCD
parent 829e1cf5
......@@ -1678,7 +1678,6 @@ dependencies = [
"dynamo-runtime",
"futures",
"futures-util",
"hf-hub",
"humantime",
"netlink-packet-route",
"rtnetlink",
......
......@@ -53,7 +53,6 @@ anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
hf-hub = { workspace = true }
humantime = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
......
......@@ -66,7 +66,7 @@ struct Entry {
pub async fn run(
runtime: Runtime,
flags: Flags,
maybe_card: Option<ModelDeploymentCard>,
card: ModelDeploymentCard,
input_jsonl: PathBuf,
engine_config: EngineConfig,
template: Option<RequestTemplate>,
......@@ -83,7 +83,7 @@ pub async fn run(
let prepared_engine = common::prepare_engine(runtime, flags, engine_config).await?;
let service_name_ref = Arc::new(prepared_engine.service_name);
let pre_processor = if let Some(card) = maybe_card {
let pre_processor = if card.has_tokenizer() {
Some(OpenAIPreprocessor::new(card).await?)
} else {
None
......
......@@ -153,11 +153,8 @@ pub async fn prepare_engine(
_cache_dir: cache_dir,
})
}
EngineConfig::StaticFull {
service_name,
engine,
card: _card,
} => {
EngineConfig::StaticFull { engine, model } => {
let service_name = model.service_name().to_string();
tracing::debug!("Model: {service_name} with engine pre-processing");
let engine = Arc::new(StreamingEngineAdapter::new(engine));
Ok(PreparedEngine {
......@@ -168,16 +165,16 @@ pub async fn prepare_engine(
})
}
EngineConfig::StaticCore {
service_name,
engine: inner_engine,
card,
model,
} => {
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, inner_engine)
>(model.card(), inner_engine)
.await?;
let service_name = model.service_name().to_string();
tracing::debug!("Model: {service_name} with Dynamo pre-processing");
Ok(PreparedEngine {
service_name,
......@@ -236,7 +233,7 @@ mod tests {
#[tokio::test]
async fn test_build_chat_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::from_local_path(HF_PATH, None).await?;
let card = ModelDeploymentCard::load(HF_PATH).await?;
let engine = dynamo_llm::engines::make_engine_core();
// Build pipeline for chat completions
......@@ -255,7 +252,7 @@ mod tests {
#[tokio::test]
async fn test_build_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::from_local_path(HF_PATH, None).await?;
let card = ModelDeploymentCard::load(HF_PATH).await?;
let engine = dynamo_llm::engines::make_engine_core();
// Build pipeline for completions
......
......@@ -18,9 +18,6 @@ use std::{pin::Pin, sync::Arc};
use dynamo_llm::{
backend::Backend,
engines::StreamingEngineAdapter,
http::service::discovery::{ModelEntry, ModelNetworkName},
key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
model_card::{self, ModelDeploymentCard},
model_type::ModelType,
preprocessor::{BackendInput, BackendOutput},
types::{
......@@ -30,10 +27,10 @@ use dynamo_llm::{
Annotated,
},
};
use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
};
use dynamo_runtime::{component::Endpoint, engine::AsyncEngineStream};
use dynamo_runtime::{protocols::Endpoint as EndpointId, DistributedRuntime};
use crate::EngineConfig;
......@@ -46,62 +43,50 @@ pub async fn run(
let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: EndpointId = path.parse()?;
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
let endpoint = component
.service_builder()
.create()
.await?
.endpoint(&endpoint_id.name);
let (rt_fut, mut card) = match engine_config {
EngineConfig::StaticFull {
service_name,
engine,
mut card,
} => {
EngineConfig::StaticFull { engine, mut model } => {
let engine = Arc::new(StreamingEngineAdapter::new(engine));
card.requires_preprocessing = false;
let ingress_chat = Ingress::<
Context<NvCreateChatCompletionRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<NvCreateChatCompletionStreamResponse>>>>,
>::for_engine(engine)?;
let endpoint_chat = register(
distributed_runtime.clone(),
&service_name,
endpoint_id,
*card.clone(),
ModelType::Chat,
)
.await?;
let fut_chat = endpoint_chat
.endpoint_builder()
.handler(ingress_chat)
.start();
(fut_chat, card)
model.attach(&endpoint, ModelType::Chat).await?;
let fut_chat = endpoint.endpoint_builder().handler(ingress_chat).start();
(fut_chat, model.card().clone())
}
EngineConfig::StaticCore {
service_name,
engine: inner_engine,
mut card,
mut model,
} => {
// Pre-processing is done ingress-side, so it should be already done.
let frontend =
SegmentSource::<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>>::new();
let backend = Backend::from_mdc(*card.clone()).await?.into_operator();
let backend = Backend::from_mdc(model.card().clone())
.await?
.into_operator();
let engine = ServiceBackend::from_engine(inner_engine);
let pipeline = frontend
.link(backend.forward_edge())?
.link(engine)?
.link(backend.backward_edge())?
.link(frontend)?;
let ingress = Ingress::for_pipeline(pipeline)?;
card.requires_preprocessing = true;
let endpoint = register(
distributed_runtime.clone(),
&service_name,
endpoint_id,
*card.clone(),
ModelType::Backend,
)
.await?;
(endpoint.endpoint_builder().handler(ingress).start(), card)
model.attach(&endpoint, ModelType::Backend).await?;
let fut = endpoint.endpoint_builder().handler(ingress).start();
(fut, model.card().clone())
}
EngineConfig::Dynamic(_) => {
anyhow::bail!("Cannot use endpoint for both in and out");
......@@ -127,54 +112,3 @@ pub async fn run(
Ok(())
}
async fn register(
distributed_runtime: DistributedRuntime,
service_name: &str,
endpoint_id: EndpointId,
mut card: ModelDeploymentCard,
model_type: ModelType,
) -> anyhow::Result<Endpoint> {
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
let endpoint = component
.service_builder()
.create()
.await?
.endpoint(&endpoint_id.name);
// A static component doesn't have an etcd_client because it doesn't need to register
if let Some(etcd_client) = distributed_runtime.etcd_client() {
// Store model config files in NATS object store
let nats_client = distributed_runtime.nats_client();
card.move_to_nats(nats_client.clone()).await?;
// Publish the Model Deployment Card to etcd
let kvstore: Box<dyn KeyValueStore> =
Box::new(EtcdStorage::new(etcd_client.clone(), endpoint_id.clone()));
let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
let key = card.slug().to_string();
card_store
.publish(model_card::BUCKET_NAME, None, &key, &mut card)
.await?;
// Publish our ModelEntry to etcd. This allows ingress to find the model card.
// (Why don't we put the model card directly under this key?)
let network_name = ModelNetworkName::from_local(&endpoint, etcd_client.lease_id());
tracing::debug!("Registering with etcd as {network_name}");
let model_registration = ModelEntry {
name: service_name.to_string(),
endpoint: endpoint_id.clone(),
model_type,
};
etcd_client
.kv_create(
network_name.to_string(),
serde_json::to_vec_pretty(&model_registration)?,
None, // use primary lease
)
.await?;
}
Ok(endpoint)
}
......@@ -71,36 +71,31 @@ pub async fn run(
}
}
}
EngineConfig::StaticFull {
service_name,
engine,
..
} => {
EngineConfig::StaticFull { engine, model } => {
let engine = Arc::new(StreamingEngineAdapter::new(engine));
let manager = http_service.model_manager();
manager.add_completions_model(&service_name, engine.clone())?;
manager.add_chat_completions_model(&service_name, engine)?;
manager.add_completions_model(model.service_name(), engine.clone())?;
manager.add_chat_completions_model(model.service_name(), engine)?;
}
EngineConfig::StaticCore {
service_name,
engine: inner_engine,
card,
model,
} => {
let manager = http_service.model_manager();
let chat_pipeline = common::build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, inner_engine.clone())
>(model.card(), inner_engine.clone())
.await?;
manager.add_chat_completions_model(&service_name, chat_pipeline)?;
manager.add_chat_completions_model(model.service_name(), chat_pipeline)?;
let cmpl_pipeline = common::build_pipeline::<CompletionRequest, CompletionResponse>(
&card,
model.card(),
inner_engine,
)
.await?;
manager.add_completions_model(&service_name, cmpl_pipeline)?;
manager.add_completions_model(model.service_name(), cmpl_pipeline)?;
}
EngineConfig::None => unreachable!(),
}
......
......@@ -15,17 +15,17 @@
#[cfg(any(feature = "vllm", feature = "sglang"))]
use std::{future::Future, pin::Pin};
use std::{io::Read, path::PathBuf, sync::Arc};
use std::{io::Read, sync::Arc};
use anyhow::Context;
use dynamo_llm::{
backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher,
model_card::model::ModelDeploymentCard,
LocalModel,
};
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime};
mod flags;
pub use flags::Flags;
mod hub;
mod input;
#[cfg(any(feature = "vllm", feature = "sglang"))]
mod net;
......@@ -53,25 +53,20 @@ const PYTHON_STR_SCHEME: &str = "pystr:";
#[cfg(feature = "python")]
const PYTHON_TOK_SCHEME: &str = "pytok:";
/// Prefix for Hugging Face model repository
const HF_SCHEME: &str = "hf://";
pub enum EngineConfig {
/// An remote networked engine we don't know about yet
Dynamic(Endpoint),
/// A Full service engine does it's own tokenization and prompt formatting.
StaticFull {
service_name: String,
engine: Arc<dyn StreamingEngine>,
card: Box<ModelDeploymentCard>,
model: Box<LocalModel>,
},
/// A core engine expects to be wrapped with pre/post processors that handle tokenization.
StaticCore {
service_name: String,
engine: ExecutionContext,
card: Box<ModelDeploymentCard>,
model: Box<LocalModel>,
},
/// vllm multi-node doesn't run an engine on nodes other than 0. 'ray' does all the work.
......@@ -93,104 +88,41 @@ pub async fn run(
#[allow(unused_variables)] zmq_socket_prefix: Option<String>,
) -> anyhow::Result<()> {
let cancel_token = runtime.primary_token();
// Turn relative paths into absolute paths and canonicalize them
let mut model_path = flags
let maybe_path = flags
.model_path_pos
.clone()
.or(flags.model_path_flag.clone())
.and_then(|p| {
// Check for hf:// prefix first
if let Some(hf_path) = p.to_string_lossy().strip_prefix(HF_SCHEME) {
return Some(PathBuf::from(hf_path));
}
if p.exists() {
p.canonicalize().ok()
} else {
Some(p)
}
});
// Serve the model under the name provided, or the name of the GGUF file or HF repo.
let mut model_name = flags
.model_name
.clone()
.or_else(|| {
model_path
.as_ref()
.and_then(|p| p.iter().next_back())
.map(|n| n.to_string_lossy().into_owned())
})
.or_else(|| {
if in_opt == Input::Text {
Some(INVISIBLE_MODEL_NAME.to_string())
} else {
None
}
});
.or(flags.model_path_flag.clone());
// If it's an HF repo download it
if let Some(inner_model_path) = model_path.as_ref() {
if !inner_model_path.exists() && !inner_model_path.is_absolute() {
model_name = Some(inner_model_path.display().to_string());
model_path = Some(hub::from_hf(inner_model_path).await?);
}
}
// Load the model deployment card, if any
// Only used by some engines, so without those feature flags it's unused.
#[allow(unused_variables)]
let maybe_card = match (&model_path, &flags.model_config) {
// --model-config takes precedence
(_, Some(model_config)) => {
match ModelDeploymentCard::from_local_path(model_config, model_name.as_deref()).await {
Ok(card) => Some(card),
Err(e) => {
tracing::error!(
"Failed to load model card from --model-config path {}: {e}",
model_config.display(),
);
None
}
}
}
// If --model-path is an HF repo use that
(Some(model_path), _) if model_path.is_dir() => {
match ModelDeploymentCard::from_local_path(model_path, model_name.as_deref()).await {
Ok(card) => Some(card),
Err(e) => {
tracing::error!(
"Failed to load model card from --model-path {}: {e}",
model_path.display(),
);
None
let local_model: LocalModel = match out_opt {
// If output is an endpoint we are ingress and don't have a local model, but making an
// empty one cleans up the code.
Output::Endpoint(_) => Default::default(),
_ => {
match &maybe_path {
Some(model_path) => {
let maybe_model_name = if in_opt == Input::Text {
Some(INVISIBLE_MODEL_NAME.to_string())
} else {
flags.model_name.clone()
};
LocalModel::prepare(
model_path.to_str().context("Invalid UTF-8 in model path")?,
flags.model_config.as_deref(),
maybe_model_name.as_deref(),
)
.await?
}
}
}
(Some(model_path), _) if model_path.is_file() => {
match ModelDeploymentCard::from_gguf(model_path, model_name.as_deref()).await {
Ok(card) => Some(card),
Err(e) => {
tracing::error!(
"Failed to load model card from GGUF {}: {e}",
model_path.display(),
);
None
None => {
// echo_full engine doesn't need a path
Default::default()
}
}
}
// Otherwise we don't have one, but we only need it if we're tokenizing
_ => {
tracing::debug!(
"No model card path provided (neither --model-config nor --model-path)"
);
None
}
};
let dyn_input = match &in_opt {
Input::Endpoint(endpoint_path) => {
if model_path.as_ref().map(|mp| mp.is_file()).unwrap_or(false)
if maybe_path.as_ref().map(|mp| mp.is_file()).unwrap_or(false)
&& flags.model_config.is_none()
{
// TODO We need to convert tokenizer extract from GGUF file into something we can
......@@ -222,60 +154,41 @@ pub async fn run(
None
};
// We may need it later
let card = local_model.card().clone();
// Create the engine matching `out`
let engine_config = match out_opt {
Output::EchoFull => {
let Some(model_name) = model_name else {
anyhow::bail!(
"Pass --model-name or --model-path so we know which model to imitate"
);
};
EngineConfig::StaticFull {
card: Box::new(ModelDeploymentCard::with_name_only(&model_name)),
service_name: model_name,
engine: dynamo_llm::engines::make_engine_full(),
}
Output::Endpoint(path) => {
let endpoint: Endpoint = path.parse()?;
EngineConfig::Dynamic(endpoint)
}
Output::EchoFull => EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
},
Output::EchoCore => {
let Some(mut card) = maybe_card.clone() else {
let card = local_model.card();
if !card.has_tokenizer() {
anyhow::bail!(
"out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
);
};
EngineConfig::StaticCore {
service_name: card.service_name.clone(),
engine: dynamo_llm::engines::make_engine_core(),
card: Box::new(card),
model: Box::new(local_model),
}
}
Output::Endpoint(path) => {
let endpoint: Endpoint = path.parse()?;
EngineConfig::Dynamic(endpoint)
}
#[cfg(feature = "mistralrs")]
Output::MistralRs => {
let Some(model_path) = model_path else {
anyhow::bail!("out=mistralrs requires flag --model-path=<full-path-to-model-gguf>");
};
let Some(model_name) = model_name else {
unreachable!("We checked model_path earlier, and set model_name from model_path");
};
EngineConfig::StaticFull {
card: Box::new(ModelDeploymentCard::with_name_only(&model_name)),
service_name: model_name,
engine: dynamo_engine_mistralrs::make_engine(&model_path).await?,
}
}
Output::MistralRs => EngineConfig::StaticFull {
engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?,
model: Box::new(local_model),
},
#[cfg(feature = "sglang")]
Output::SgLang => {
let Some(model_path) = model_path else {
anyhow::bail!("out=sglang requires flag --model-path=<full-path-to-model-dir>");
};
if !model_path.is_dir() {
if !local_model.path().is_dir() {
anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
}
// Safety: Earlier we build maybe_card from model_path, which we checked right above
let card = maybe_card.clone().unwrap();
let Some(sock_prefix) = zmq_socket_prefix else {
anyhow::bail!("sglang requires zmq_socket_prefix");
};
......@@ -299,7 +212,7 @@ pub async fn run(
let (engine, sglang_process) = dynamo_engine_sglang::make_engine(
cancel_token.clone(),
&model_path,
local_model.path(),
&sock_prefix,
node_conf,
flags.tensor_parallel_size,
......@@ -311,9 +224,8 @@ pub async fn run(
let _ = sglang_process.await;
}));
EngineConfig::StaticCore {
service_name: card.service_name.clone(),
engine,
card: Box::new(card),
model: Box::new(local_model),
}
}
#[cfg(feature = "vllm")]
......@@ -321,16 +233,6 @@ pub async fn run(
if flags.base_gpu_id != 0 {
anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
}
let Some(model_path) = model_path else {
anyhow::bail!(
"out=vllm requires flag --model-path=<full-path-to-hf-repo-or-model-gguf>"
);
};
let Some(card) = maybe_card.clone() else {
anyhow::bail!(
"Unable to build tokenizer. out=vllm requires --model-path to be an HF repo with fast tokenizer (tokenizer.json) or a GGUF file"
);
};
let Some(sock_prefix) = zmq_socket_prefix else {
anyhow::bail!("vllm requires zmq_socket_prefix");
};
......@@ -368,7 +270,7 @@ pub async fn run(
// vllm multi-node only the leader runs vllm
let (engine, vllm_future) = dynamo_engine_vllm0_7::make_leader_engine(
cancel_token.clone(),
&model_path,
local_model.path(),
&sock_prefix,
node_conf,
flags.tensor_parallel_size,
......@@ -380,9 +282,8 @@ pub async fn run(
let _ = vllm_future.await;
}));
EngineConfig::StaticCore {
service_name: card.service_name.clone(),
engine,
card: Box::new(card),
model: Box::new(local_model),
}
} else {
// Nodes rank > 0 only run 'ray'
......@@ -398,16 +299,6 @@ pub async fn run(
if flags.base_gpu_id != 0 {
anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
}
let Some(model_path) = model_path else {
anyhow::bail!(
"out=vllm requires flag --model-path=<full-path-to-hf-repo-or-model-gguf>"
);
};
let Some(card) = maybe_card.clone() else {
anyhow::bail!(
"Unable to build tokenizer. out=vllm requires --model-path to be an HF repo with fast tokenizer (tokenizer.json) or a GGUF file"
);
};
let node_conf = dynamo_llm::engines::MultiNodeConfig {
num_nodes: flags.num_nodes,
node_rank: flags.node_rank,
......@@ -415,71 +306,53 @@ pub async fn run(
};
let engine = dynamo_engine_vllm0_8::make_engine(
cancel_token.clone(),
&model_path,
local_model.path(),
node_conf,
flags.tensor_parallel_size,
flags.extra_engine_args.clone(),
)
.await?;
EngineConfig::StaticCore {
service_name: card.service_name.clone(),
engine,
card: Box::new(card),
model: Box::new(local_model),
}
}
#[cfg(feature = "llamacpp")]
Output::LlamaCpp => {
let Some(model_path) = model_path else {
anyhow::bail!("out=llamacpp requires flag --model-path=<full-path-to-model-gguf>");
};
if !model_path.is_file() {
if !local_model.path().is_file() {
anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
}
let Some(card) = maybe_card.clone() else {
anyhow::bail!(
"Pass --model-config so we can find the tokenizer, should be an HF checkout."
);
};
let engine =
dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &model_path).await?;
dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
.await?;
EngineConfig::StaticCore {
service_name: card.service_name.clone(),
engine,
card: Box::new(card),
model: Box::new(local_model),
}
}
#[cfg(feature = "python")]
Output::PythonStr(path_str) => {
let Some(model_name) = &model_name else {
anyhow::bail!("Provide model service name as `--model-name <this>`");
};
let py_args = flags.as_vec(&path_str, model_name);
let card = local_model.card();
let py_args = flags.as_vec(&path_str, &card.service_name);
let p = std::path::PathBuf::from(path_str);
let engine =
dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticFull {
service_name: model_name.to_string(),
engine,
card: Box::new(ModelDeploymentCard::with_name_only(model_name)),
model: Box::new(local_model),
}
}
#[cfg(feature = "python")]
Output::PythonTok(path_str) => {
let Some(card) = maybe_card.clone() else {
anyhow::bail!("Could not find tokenizer. Pass flag --model-path <path>");
};
let Some(model_name) = model_name else {
unreachable!("If we have a card we must have a model name");
};
let py_args = flags.as_vec(&path_str, &model_name);
let card = local_model.card();
let py_args = flags.as_vec(&path_str, &card.service_name);
let p = std::path::PathBuf::from(path_str);
let engine =
dynamo_engine_python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticCore {
service_name: model_name.clone(),
engine,
card: Box::new(card),
model: Box::new(local_model),
}
}
};
......@@ -504,15 +377,8 @@ pub async fn run(
.await?;
}
Input::Batch(path) => {
crate::input::batch::run(
runtime.clone(),
flags,
maybe_card,
path,
engine_config,
template,
)
.await?;
crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
.await?;
}
Input::Endpoint(path) => {
let Some(dyn_input) = dyn_input else {
......
......@@ -1053,6 +1053,7 @@ dependencies = [
"futures",
"galil-seiferas",
"ggus",
"hf-hub",
"itertools 0.14.0",
"memmap2",
"minijinja",
......@@ -1195,6 +1196,15 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "enum-as-inner"
version = "0.6.1"
......@@ -1870,15 +1880,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc03dcb0b0a83ae3f3363ec811014ae669f083e4e499c66602f447c4828737a1"
dependencies = [
"dirs",
"futures",
"http",
"indicatif",
"libc",
"log",
"num_cpus",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"thiserror 2.0.12",
"tokio",
"ureq",
"windows-sys 0.59.0",
]
......@@ -3580,8 +3593,10 @@ checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb"
dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"http-body-util",
......@@ -3605,6 +3620,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-rustls",
"tokio-util",
......@@ -4180,6 +4196,27 @@ dependencies = [
"walkdir",
]
[[package]]
name = "system-configuration"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
dependencies = [
"bitflags 2.9.0",
"core-foundation 0.9.4",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "system-deps"
version = "6.2.2"
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A very basic example of vllm worker handling pre-processed requests.
#
# Dynamo does the HTTP handling, prompt templating and tokenization, then forwards the
# request via NATS to this python script, which runs vllm.
#
# Setup a virtualenv with dynamo.llm, dynamo.runtime and vllm installed
# in lib/bindings/python `maturin develop` and `pip install -e .` should do it
# Start nats and etcd:
# - nats-server -js
#
# Window 1: `python server_vllm.py`. Wait for log "Starting endpoint".
# Window 2: `dynamo-run out=dyn://dynamo.backend.generate`
import argparse
import asyncio
import sys
import uvloop
from vllm import SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.inputs import TokensPrompt
from dynamo.llm import ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen2.5-0.5B-Instruct"
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model: str
class RequestHandler:
"""
Request handler for the generate endpoint
"""
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
request_id = "1" # hello_world example only
# print(f"Received request: {request}")
prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
sampling_params = SamplingParams(
temperature=request["sampling_options"]["temperature"],
# vllm defaults this to 16
max_tokens=request["stop_conditions"]["max_tokens"],
)
num_output_tokens_so_far = 0
gen = self.engine_client.generate(prompt, sampling_params, request_id)
async for res in gen:
# res is vllm's RequestOutput
# This is the expected way for a request to end.
# The new token ID will be eos, don't forward it.
if res.finished:
yield {"finish_reason": "stop", "token_ids": []}
break
if not res.outputs:
yield {"finish_reason": "error", "token_ids": []}
break
output = res.outputs[0]
next_total_toks = len(output.token_ids)
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
out["stop_reason"] = output.stop_reason
yield out
num_output_tokens_so_far = next_total_toks
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
await init(runtime, cmd_line_args())
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
print("Started server instance")
await register_llm(endpoint, config.model, ModelType.Backend)
engine_args = AsyncEngineArgs(
model=config.model,
task="generate",
skip_tokenizer_init=True,
)
engine_context = build_async_engine_client_from_engine_args(engine_args)
engine_client = await engine_context.__aenter__()
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
await endpoint.serve_endpoint(RequestHandler(engine_client).generate, None)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="vLLM server integrated with Dynamo runtime."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help=f"Path to disk model or HuggingFace model identifier to load. Default: {DEFAULT_MODEL}",
)
args = parser.parse_args()
config = Config()
config.model = args.model
endpoint_str = args.endpoint
if endpoint_str.startswith("dyn://"):
endpoint_str = endpoint_str[len("dyn://") :]
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
print(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
return config
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -51,6 +51,7 @@ const DEFAULT_ANNOTATED_SETTING: Option<bool> = Some(true);
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
logging::init();
m.add_function(wrap_pyfunction!(log_message, m)?)?;
m.add_function(wrap_pyfunction!(register_llm, m)?)?;
m.add_class::<DistributedRuntime>()?;
m.add_class::<CancellationToken>()?;
......@@ -77,6 +78,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?;
m.add_class::<EtcdKvCache>()?;
m.add_class::<ModelType>()?;
engine::add_to_module(m)?;
......@@ -97,6 +99,37 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32)
logging::log_message(level, message, module, file, line);
}
#[pyfunction]
#[pyo3(text_signature = "(endpoint, path, model_type)")]
fn register_llm<'p>(
py: Python<'p>,
endpoint: Endpoint,
path: &str,
model_type: ModelType,
) -> PyResult<Bound<'p, PyAny>> {
let model_type_obj = match model_type {
ModelType::Chat => llm_rs::model_type::ModelType::Chat,
ModelType::Completion => llm_rs::model_type::ModelType::Completion,
ModelType::Backend => llm_rs::model_type::ModelType::Backend,
};
let inner_path = path.to_string();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
// Download from HF, load the ModelDeploymentCard
let mut local_model = llm_rs::LocalModel::prepare(&inner_path, None, None)
.await
.map_err(to_pyerr)?;
// Advertise ourself on etcd so ingress can find us
local_model
.attach(&endpoint.inner, model_type_obj)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
#[pyclass]
#[derive(Clone)]
struct EtcdKvCache {
......@@ -155,6 +188,15 @@ struct PyLease {
inner: rs::transports::etcd::Lease,
}
#[pyclass(eq, eq_int)]
#[derive(Clone, PartialEq)]
#[repr(i32)]
enum ModelType {
Chat = 1,
Completion = 2,
Backend = 3,
}
#[pymethods]
impl PyLease {
fn id(&self) -> i64 {
......
......@@ -26,16 +26,12 @@ impl ModelDeploymentCard {}
#[pymethods]
impl ModelDeploymentCard {
// Previously called "from_local_path"
#[staticmethod]
fn from_local_path(
path: String,
model_name: String,
py: Python<'_>,
) -> PyResult<Bound<'_, PyAny>> {
fn load(path: String, model_name: String, py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let card = RsModelDeploymentCard::from_local_path(&path, Some(&model_name))
.await
.map_err(to_pyerr)?;
let mut card = RsModelDeploymentCard::load(&path).await.map_err(to_pyerr)?;
card.set_name(&model_name);
Ok(ModelDeploymentCard { inner: card })
})
}
......
......@@ -605,3 +605,12 @@ class HttpAsyncEngine:
"""
...
class ModelType:
"""What type of request this model needs: Chat, Component or Backend (pre-processed)"""
...
async def register_llm(endpoint: Endpoint, path: str, model_type: ModelType) -> None:
"""Attach the model at path to the given endpoint, and advertise it as model_type"""
...
......@@ -24,4 +24,6 @@ from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter
from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import register_llm as register_llm
......@@ -46,6 +46,7 @@ derive_builder = {workspace = true }
either = { workspace = true }
etcd-client = { workspace = true }
futures = { workspace = true }
hf-hub = { workspace = true }
rand = { workspace = true }
prometheus = { workspace = true }
serde = { workspace = true }
......
......@@ -20,7 +20,8 @@ const IGNORED: [&str; 3] = [".gitattributes", "LICENSE", "README.md"];
/// Attempt to download a model from Hugging Face
/// Returns the directory it is in
pub async fn from_hf(name: &Path) -> anyhow::Result<PathBuf> {
pub async fn from_hf(name: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
let name = name.as_ref();
let api = ApiBuilder::new().with_progress(true).build()?;
let model_name = name.display().to_string();
......
......@@ -24,8 +24,10 @@ pub mod disagg_router;
pub mod engines;
pub mod gguf;
pub mod http;
pub mod hub;
pub mod key_value_store;
pub mod kv_router;
mod local_model;
pub mod model_card;
pub mod model_type;
pub mod preprocessor;
......@@ -36,5 +38,7 @@ pub mod tokenizers;
pub mod tokens;
pub mod types;
pub use local_model::LocalModel;
#[cfg(feature = "cuda_kv")]
pub mod kv;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use dynamo_runtime::component::Endpoint;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use crate::http::service::discovery::{ModelEntry, ModelNetworkName};
use crate::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager};
use crate::model_card::{self, ModelDeploymentCard};
use crate::model_type::ModelType;
/// Prefix for Hugging Face model repository
const HF_SCHEME: &str = "hf://";
/// What we call a model if the user didn't provide a name. Usually this means the name
/// is invisible, for example in a text chat.
const DEFAULT_NAME: &str = "dynamo";
#[derive(Debug, Clone)]
pub struct LocalModel {
full_path: PathBuf,
card: ModelDeploymentCard,
}
impl Default for LocalModel {
fn default() -> Self {
LocalModel {
full_path: PathBuf::new(),
card: ModelDeploymentCard::with_name_only(DEFAULT_NAME),
}
}
}
impl LocalModel {
pub fn card(&self) -> &ModelDeploymentCard {
&self.card
}
pub fn path(&self) -> &Path {
&self.full_path
}
pub fn service_name(&self) -> &str {
&self.card.service_name
}
/// Make an LLM ready for use:
/// - Download it from Hugging Face (and NGC in future) if necessary
/// - Resolve the path
/// - Load it's ModelDeploymentCard card
/// - Name it correctly
///
/// The model name will depend on what "model_path" is:
/// - A folder: The last part of the folder name: "/data/llms/Qwen2.5-3B-Instruct" -> "Qwen2.5-3B-Instruct"
/// - A file: The GGUF filename: "/data/llms/Qwen2.5-3B-Instruct-Q6_K.gguf" -> "Qwen2.5-3B-Instruct-Q6_K.gguf"
/// - An HF repo: The HF repo name: "Qwen/Qwen2.5-3B-Instruct" stays the same
pub async fn prepare(
model_path: &str,
override_config: Option<&Path>,
override_name: Option<&str>,
) -> anyhow::Result<LocalModel> {
// Name it
// Check for hf:// prefix first, in case we really want an HF repo but it conflicts
// with a relative path.
let is_hf_repo =
model_path.starts_with(HF_SCHEME) || !fs::exists(model_path).unwrap_or(false);
let relative_path = model_path.trim_start_matches(HF_SCHEME);
let full_path = if is_hf_repo {
// HF download if necessary
super::hub::from_hf(relative_path).await?
} else {
fs::canonicalize(relative_path)?
};
let model_name = match override_name.map(|s| s.to_string()) {
Some(name) => name,
None => {
if is_hf_repo {
// HF repos use their full name ("org/name") not the folder name
relative_path.to_string()
} else {
full_path
.iter()
.next_back()
.map(|n| n.to_string_lossy().into_owned())
.with_context(|| {
format!("Invalid model path, too short: {}", full_path.display())
})?
}
}
};
// Load the ModelDeploymentCard
// --model-config takes precedence over --model-path
let model_config_path = override_config.unwrap_or(&full_path);
let mut card = ModelDeploymentCard::load(&model_config_path).await?;
card.set_name(&model_name);
Ok(LocalModel { full_path, card })
}
/// Attach this model the endpoint. This registers it on the network
/// allowing ingress to discover it.
pub async fn attach(
&mut self,
endpoint: &Endpoint,
model_type: ModelType,
) -> anyhow::Result<()> {
// A static component doesn't have an etcd_client because it doesn't need to register
let Some(etcd_client) = endpoint.drt().etcd_client() else {
anyhow::bail!("Cannot attach to static endpoint");
};
// Store model config files in NATS object store
let nats_client = endpoint.drt().nats_client();
self.card.move_to_nats(nats_client.clone()).await?;
// Publish the Model Deployment Card to etcd
let endpoint_id = endpoint.id();
let kvstore: Box<dyn KeyValueStore> =
Box::new(EtcdStorage::new(etcd_client.clone(), endpoint_id.clone()));
let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
let key = self.card.slug().to_string();
card_store
.publish(model_card::BUCKET_NAME, None, &key, &mut self.card)
.await?;
// Publish our ModelEntry to etcd. This allows ingress to find the model card.
// (Why don't we put the model card directly under this key?)
let network_name = ModelNetworkName::from_local(endpoint, etcd_client.lease_id());
tracing::debug!("Registering with etcd as {network_name}");
let model_registration = ModelEntry {
name: self.service_name().to_string(),
endpoint: endpoint_id.clone(),
model_type,
};
etcd_client
.kv_create(
network_name.to_string(),
serde_json::to_vec_pretty(&model_registration)?,
None, // use primary lease
)
.await
}
}
......@@ -23,6 +23,25 @@ use std::path::Path;
use crate::model_card::model::{ModelInfoType, PromptFormatterArtifact, TokenizerKind};
impl ModelDeploymentCard {
/// Allow user to override the name we register this model under.
/// Corresponds to vllm's `--served-model-name`.
pub fn set_name(&mut self, name: &str) {
self.display_name = name.to_string();
self.service_name = name.to_string();
}
/// Build an in-memory ModelDeploymentCard from either:
/// - a folder containing config.json, tokenizer.json and token_config.json
/// - a GGUF file
pub async fn load(config_path: impl AsRef<Path>) -> anyhow::Result<ModelDeploymentCard> {
let config_path = config_path.as_ref();
if config_path.is_dir() {
Self::from_local_path(config_path).await
} else {
Self::from_gguf(config_path).await
}
}
/// Creates a ModelDeploymentCard from a local directory path.
///
/// Currently HuggingFace format is supported and following files are expected:
......@@ -38,10 +57,7 @@ impl ModelDeploymentCard {
/// - The path doesn't exist or isn't a directory
/// - The path contains invalid Unicode characters
/// - Required model files are missing or invalid
pub async fn from_local_path(
local_root_dir: impl AsRef<Path>,
model_name: Option<&str>,
) -> anyhow::Result<Self> {
async fn from_local_path(local_root_dir: impl AsRef<Path>) -> anyhow::Result<Self> {
let local_root_dir = local_root_dir.as_ref();
check_valid_local_repo_path(local_root_dir)?;
let repo_id = local_root_dir
......@@ -49,22 +65,18 @@ impl ModelDeploymentCard {
.to_str()
.ok_or_else(|| anyhow::anyhow!("Path contains invalid Unicode"))?
.to_string();
let model_name = model_name.unwrap_or(
local_root_dir
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow::anyhow!("Invalid model directory name"))?,
);
let model_name = local_root_dir
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow::anyhow!("Invalid model directory name"))?;
Self::from_repo(&repo_id, model_name).await
}
pub async fn from_gguf(gguf_file: &Path, model_name: Option<&str>) -> anyhow::Result<Self> {
let model_name = model_name.map(|s| s.to_string()).or_else(|| {
gguf_file
.iter()
.next_back()
.map(|n| n.to_string_lossy().to_string())
});
async fn from_gguf(gguf_file: &Path) -> anyhow::Result<Self> {
let model_name = gguf_file
.iter()
.next_back()
.map(|n| n.to_string_lossy().to_string());
let Some(model_name) = model_name else {
// I think this would only happy on an empty path
anyhow::bail!(
......@@ -81,19 +93,19 @@ impl ModelDeploymentCard {
prompt_context: None, // TODO - auto-detect prompt context
revision: 0,
last_published: None,
requires_preprocessing: false,
})
}
/// TODO: This will be implemented after nova-hub is integrated with the model-card
/// TODO: Attempt to auto-detect model type and construct an MDC from a NGC repo
pub async fn from_ngc_repo(_: &str) -> anyhow::Result<Self> {
#[allow(dead_code)]
async fn from_ngc_repo(_: &str) -> anyhow::Result<Self> {
Err(anyhow::anyhow!(
"ModelDeploymentCard::from_ngc_repo is not implemented"
))
}
pub async fn from_repo(repo_id: &str, model_name: &str) -> anyhow::Result<Self> {
async fn from_repo(repo_id: &str, model_name: &str) -> anyhow::Result<Self> {
Ok(Self {
display_name: model_name.to_string(),
service_name: model_name.to_string(),
......@@ -103,7 +115,6 @@ impl ModelDeploymentCard {
prompt_context: None, // TODO - auto-detect prompt context
revision: 0,
last_published: None,
requires_preprocessing: false,
})
}
}
......
......@@ -125,12 +125,6 @@ pub struct ModelDeploymentCard {
/// Incrementing count of how many times we published this card
#[serde(default, skip_serializing)]
pub revision: u64,
/// Does this model expect preprocessing (tokenization, etc) to be already done?
/// If this is true they get a BackendInput JSON. If this is false they get
/// an NvCreateChatCompletionRequest JSON.
#[serde(default)]
pub requires_preprocessing: bool,
}
impl ModelDeploymentCard {
......@@ -171,9 +165,7 @@ impl ModelDeploymentCard {
/// Load a model deployment card from a JSON file
pub fn load_from_json_file<P: AsRef<Path>>(file: P) -> std::io::Result<Self> {
let mut card: ModelDeploymentCard = serde_json::from_str(&std::fs::read_to_string(file)?)?;
card.requires_preprocessing = false;
Ok(card)
Ok(serde_json::from_str(&std::fs::read_to_string(file)?)?)
}
/// Load a model deployment card from a JSON string
......@@ -218,6 +210,12 @@ impl ModelDeploymentCard {
}
}
/// Is this a full model card with tokenizer?
/// There are cases where we have a placeholder card (see `with_name_only`).
pub fn has_tokenizer(&self) -> bool {
self.tokenizer.is_some()
}
pub fn tokenizer_hf(&self) -> anyhow::Result<HfTokenizer> {
match &self.tokenizer {
Some(TokenizerKind::HfTokenizerJson(file)) => {
......
......@@ -18,7 +18,7 @@ use dynamo_llm::model_card::model::ModelDeploymentCard;
#[tokio::test]
async fn test_sequence_factory() {
let mdc = ModelDeploymentCard::from_local_path("tests/data/sample-models/TinyLlama_v1.1", None)
let mdc = ModelDeploymentCard::load("tests/data/sample-models/TinyLlama_v1.1")
.await
.unwrap();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment