Unverified Commit 28fd481c authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(dynamo-run): vllm and sglang subprocess engines (#954)

New vllm and sglang engines that run in a sub-process. Will hopefully replace the existing embedded python engines.
    
Why?
    
  - Pure Python, does not require knowing Rust to work on it. Much simpler to maintain.
  - No embedded Python interpreter which avoids linking libpython and avoids the MacOS virtualenv issues.
  - Should have better performance as it's "native" vllm / sglang.
  - Works with any version of vllm (including v1!) and sglang. Less upgrade struggle.
parent 9f0e12a0
...@@ -1679,7 +1679,9 @@ dependencies = [ ...@@ -1679,7 +1679,9 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"humantime", "humantime",
"libc",
"netlink-packet-route", "netlink-packet-route",
"regex",
"rtnetlink", "rtnetlink",
"serde", "serde",
"serde_json", "serde_json",
......
...@@ -59,6 +59,7 @@ etcd-client = { version = "0.14" } ...@@ -59,6 +59,7 @@ etcd-client = { version = "0.14" }
futures = { version = "0.3" } futures = { version = "0.3" }
hf-hub = { version = "0.4.2", default-features = false, features = ["tokio", "rustls-tls"] } hf-hub = { version = "0.4.2", default-features = false, features = ["tokio", "rustls-tls"] }
humantime = { version = "2.2.0" } humantime = { version = "2.2.0" }
libc = { version = "0.2" }
prometheus = { version = "0.14" } prometheus = { version = "0.14" }
rand = { version = "0.9.0" } rand = { version = "0.9.0" }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
......
...@@ -239,16 +239,16 @@ Inside that virtualenv: ...@@ -239,16 +239,16 @@ Inside that virtualenv:
./dynamo-run in=http out=vllm ~/llm_models/Llama-3.2-3B-Instruct-Q6_K.gguf ./dynamo-run in=http out=vllm ~/llm_models/Llama-3.2-3B-Instruct-Q6_K.gguf
``` ```
Note that vllm GGUF handling is very slow. Prefer llamacpp.
**Multi-node:** **Multi-node:**
**Node 1:**
```
dynamo-run in=text out=vllm ~/llm_models/Llama-3.2-3B-Instruct/ --tensor-parallel-size 8 --num-nodes 2 --leader-addr 10.217.98.122:6539 --node-rank 0
```
**Node 2:** vllm uses [ray](https://docs.vllm.ai/en/latest/serving/distributed_serving.html#running-vllm-on-multiple-nodes) for pipeline parallel inference. Dynamo does not change or manage that.
```
dynamo-run in=none out=vllm ~/llm_models/Llama-3.2-3B-Instruct/ --num-nodes 2 --leader-addr 10.217.98.122:6539 --node-rank 1 Head node (the one running `dynamo-run`): `ray start --head --port=6379 --dashboard-host 0.0.0.0`
``` Each worker node: `ray start --address='<HEAD_NODE_IP>:6379`
Remember to pass dynamo-run `--tensor-parallel-size <total-gpus-across-cluster>`, which is often constrained by a model dimension such as being a divisor of the number of attention heads.
To pass extra arguments to the vllm engine see [Extra engine arguments](#extra_engine_arguments) below. To pass extra arguments to the vllm engine see [Extra engine arguments](#extra_engine_arguments) below.
......
...@@ -54,6 +54,7 @@ async-stream = { workspace = true } ...@@ -54,6 +54,7 @@ async-stream = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
humantime = { workspace = true } humantime = { workspace = true }
libc = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tempfile = { workspace = true } tempfile = { workspace = true }
...@@ -66,6 +67,7 @@ async-openai = { version = "0.27.2" } ...@@ -66,6 +67,7 @@ async-openai = { version = "0.27.2" }
clap = { version = "4.5", features = ["derive", "env"] } clap = { version = "4.5", features = ["derive", "env"] }
dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] } dialoguer = { version = "0.11", default-features = false, features = ["editor", "history"] }
futures-util = { version = "0.3" } futures-util = { version = "0.3" }
regex = "1"
[target.x86_64-unknown-linux-gnu.dependencies] [target.x86_64-unknown-linux-gnu.dependencies]
netlink-packet-route = { version = "0.19", optional = true } netlink-packet-route = { version = "0.19", optional = true }
......
...@@ -202,6 +202,7 @@ pub async fn run( ...@@ -202,6 +202,7 @@ pub async fn run(
tokens_out, tokens_out,
tokens_out / cmp::max(elapsed.as_secs(), 1), tokens_out / cmp::max(elapsed.as_secs(), 1),
); );
cancel_token.cancel(); // stop everything else
Ok(()) Ok(())
} }
......
...@@ -99,7 +99,9 @@ pub async fn run( ...@@ -99,7 +99,9 @@ pub async fn run(
} }
EngineConfig::None => unreachable!(), EngineConfig::None => unreachable!(),
} }
http_service.run(runtime.primary_token()).await http_service.run(runtime.primary_token()).await?;
runtime.shutdown(); // Cancel primary token
Ok(())
} }
/// Spawns a task that watches for new models in etcd at network_prefix, /// Spawns a task that watches for new models in etcd at network_prefix,
......
...@@ -191,6 +191,7 @@ async fn main_loop( ...@@ -191,6 +191,7 @@ async fn main_loop(
break; break;
} }
} }
cancel_token.cancel(); // stop everything else
println!(); println!();
Ok(()) Ok(())
} }
...@@ -15,14 +15,14 @@ ...@@ -15,14 +15,14 @@
#[cfg(any(feature = "vllm", feature = "sglang"))] #[cfg(any(feature = "vllm", feature = "sglang"))]
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use std::{io::Read, sync::Arc}; use std::{io::Read, sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use dynamo_llm::{ use dynamo_llm::{
backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher, backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher,
LocalModel, LocalModel,
}; };
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime}; use dynamo_runtime::{protocols::Endpoint, CancellationToken, DistributedRuntime};
mod flags; mod flags;
pub use flags::Flags; pub use flags::Flags;
...@@ -32,11 +32,7 @@ mod net; ...@@ -32,11 +32,7 @@ mod net;
mod opt; mod opt;
pub use dynamo_llm::request_template::RequestTemplate; pub use dynamo_llm::request_template::RequestTemplate;
pub use opt::{Input, Output}; pub use opt::{Input, Output};
mod subprocess;
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
const ENDPOINT_SCHEME: &str = "dyn://";
/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on /// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
/// the command line. Hence it's optional, and defaults to this. /// the command line. Hence it's optional, and defaults to this.
...@@ -45,6 +41,8 @@ const INVISIBLE_MODEL_NAME: &str = "dynamo-run"; ...@@ -45,6 +41,8 @@ const INVISIBLE_MODEL_NAME: &str = "dynamo-run";
/// The component name for the KV publisher, if used /// The component name for the KV publisher, if used
const KV_PUBLISHER_COMPONENT: &str = "kvpublisher"; const KV_PUBLISHER_COMPONENT: &str = "kvpublisher";
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);
/// How we identify a python string endpoint /// How we identify a python string endpoint
#[cfg(feature = "python")] #[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:"; const PYTHON_STR_SCHEME: &str = "pystr:";
...@@ -97,6 +95,8 @@ pub async fn run( ...@@ -97,6 +95,8 @@ pub async fn run(
// If output is an endpoint we are ingress and don't have a local model, but making an // If output is an endpoint we are ingress and don't have a local model, but making an
// empty one cleans up the code. // empty one cleans up the code.
Output::Endpoint(_) => Default::default(), Output::Endpoint(_) => Default::default(),
// All other output types have a local model
_ => { _ => {
match &maybe_path { match &maybe_path {
Some(model_path) => { Some(model_path) => {
...@@ -143,7 +143,6 @@ pub async fn run( ...@@ -143,7 +143,6 @@ pub async fn run(
_ => None, _ => None,
}; };
#[cfg(any(feature = "vllm", feature = "sglang"))]
let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
let template = if let Some(path) = flags.request_template.as_ref() { let template = if let Some(path) = flags.request_template.as_ref() {
...@@ -184,8 +183,42 @@ pub async fn run( ...@@ -184,8 +183,42 @@ pub async fn run(
engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?, engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?,
model: Box::new(local_model), model: Box::new(local_model),
}, },
#[cfg(feature = "sglang")]
Output::SgLang => { Output::SgLang => {
if !local_model.path().is_dir() {
// TODO Does sglang support GGUF? Can we make it work?
anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
}
let (py_script, mut child) = match subprocess::start(
subprocess::sglang::PY,
local_model.path(),
flags.tensor_parallel_size,
if flags.base_gpu_id == 0 {
None
} else {
Some(flags.base_gpu_id)
},
flags.extra_engine_args.as_deref(),
)
.await
{
Ok(x) => x,
Err(err) => {
anyhow::bail!("Failed starting sglang sub-process: {err}");
}
};
let cancel_token = cancel_token.clone();
// Sub-process cleanup
extra = Some(Box::pin(async move {
stopper(cancel_token, child, py_script).await;
}));
let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
EngineConfig::Dynamic(endpoint)
}
#[cfg(feature = "sglang")]
Output::SgLangLegacy => {
if !local_model.path().is_dir() { if !local_model.path().is_dir() {
anyhow::bail!("`--model-path should point at a HuggingFace repo checkout"); anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
} }
...@@ -295,7 +328,7 @@ pub async fn run( ...@@ -295,7 +328,7 @@ pub async fn run(
} }
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
Output::Vllm | Output::Vllm0_8 => { Output::Vllm0_8 => {
if flags.base_gpu_id != 0 { if flags.base_gpu_id != 0 {
anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead."); anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
} }
...@@ -318,6 +351,35 @@ pub async fn run( ...@@ -318,6 +351,35 @@ pub async fn run(
} }
} }
// No feature flag because it uses a sub-process, it's very cheap to include
Output::Vllm => {
if flags.base_gpu_id != 0 {
anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
}
let (py_script, mut child) = match subprocess::start(
subprocess::vllm::PY,
local_model.path(),
flags.tensor_parallel_size,
None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
flags.extra_engine_args.as_deref(),
)
.await
{
Ok(x) => x,
Err(err) => {
anyhow::bail!("Failed starting vllm sub-process: {err}");
}
};
let cancel_token = cancel_token.clone();
// Sub-process cleanup
extra = Some(Box::pin(async move {
stopper(cancel_token, child, py_script).await;
}));
let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
EngineConfig::Dynamic(endpoint)
}
#[cfg(feature = "llamacpp")] #[cfg(feature = "llamacpp")]
Output::LlamaCpp => { Output::LlamaCpp => {
if !local_model.path().is_file() { if !local_model.path().is_file() {
...@@ -394,11 +456,50 @@ pub async fn run( ...@@ -394,11 +456,50 @@ pub async fn run(
} }
} }
#[cfg(any(feature = "vllm", feature = "sglang"))]
// Allow engines to ask main thread to wait on an extra future. // Allow engines to ask main thread to wait on an extra future.
// We use this to stop the vllm and sglang sub-process
if let Some(extra) = extra { if let Some(extra) = extra {
extra.await; extra.await;
} }
Ok(()) Ok(())
} }
/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
cancel_token: CancellationToken,
mut child: tokio::process::Child,
py_script: tempfile::TempPath,
) {
cancel_token.cancelled().await;
// Ask subprocess to stop gracefully
if let Some(pid) = child.id() {
unsafe { libc::kill(pid as i32, libc::SIGTERM) };
}
tokio::select! {
exit = child.wait() => {
tracing::trace!("vllm sub-process graceful exit");
match exit {
Ok(exit_status) if exit_status.success() => {}
Ok(exit_status) => {
// This is nearly always 15 (SIGTERM)
tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
}
Err(err) => {
tracing::warn!("vllm sub-process error getting exit status: {err}");
}
}
}
_ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
// It didn't stop in time, kill it
child.kill().await.expect("Failed killing vllm subprocess");
let _ = child.wait().await;
}
}
// This temporary file contains the python script running the engine. It deletes on drop.
// Keep it alive until the engine has stopped.
drop(py_script);
}
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
use std::{fmt, io::IsTerminal as _, path::PathBuf}; use std::{fmt, io::IsTerminal as _, path::PathBuf};
use crate::ENDPOINT_SCHEME; use dynamo_runtime::protocols::ENDPOINT_SCHEME;
const BATCH_PREFIX: &str = "batch:"; const BATCH_PREFIX: &str = "batch:";
...@@ -52,8 +52,7 @@ impl TryFrom<&str> for Input { ...@@ -52,8 +52,7 @@ impl TryFrom<&str> for Input {
"stdin" => Ok(Input::Stdin), "stdin" => Ok(Input::Stdin),
"none" => Ok(Input::None), "none" => Ok(Input::None),
endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => { endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
let path = endpoint_path.strip_prefix(ENDPOINT_SCHEME).unwrap(); Ok(Input::Endpoint(endpoint_path.to_string()))
Ok(Input::Endpoint(path.to_string()))
} }
batch_patch if batch_patch.starts_with(BATCH_PREFIX) => { batch_patch if batch_patch.starts_with(BATCH_PREFIX) => {
let path = batch_patch.strip_prefix(BATCH_PREFIX).unwrap(); let path = batch_patch.strip_prefix(BATCH_PREFIX).unwrap();
...@@ -103,6 +102,9 @@ pub enum Output { ...@@ -103,6 +102,9 @@ pub enum Output {
MistralRs, MistralRs,
#[cfg(feature = "sglang")] #[cfg(feature = "sglang")]
/// Deprecated
SgLangLegacy,
/// Run inference using sglang /// Run inference using sglang
SgLang, SgLang,
...@@ -110,8 +112,8 @@ pub enum Output { ...@@ -110,8 +112,8 @@ pub enum Output {
/// Run inference using llama.cpp /// Run inference using llama.cpp
LlamaCpp, LlamaCpp,
#[cfg(feature = "vllm")] // Start vllm in a sub-process connecting via nats
/// Alias for vllm0_8 // Sugar for `python vllm_inc.py --endpoint <thing> --model <thing>`
Vllm, Vllm,
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
...@@ -145,13 +147,15 @@ impl TryFrom<&str> for Output { ...@@ -145,13 +147,15 @@ impl TryFrom<&str> for Output {
"mistralrs" => Ok(Output::MistralRs), "mistralrs" => Ok(Output::MistralRs),
#[cfg(feature = "sglang")] #[cfg(feature = "sglang")]
"sglang_legacy" => Ok(Output::SgLangLegacy),
"sglang" => Ok(Output::SgLang), "sglang" => Ok(Output::SgLang),
#[cfg(feature = "llamacpp")] #[cfg(feature = "llamacpp")]
"llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp), "llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp),
#[cfg(feature = "vllm")]
"vllm" => Ok(Output::Vllm), "vllm" => Ok(Output::Vllm),
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
"vllm0_8" => Ok(Output::Vllm0_8), "vllm0_8" => Ok(Output::Vllm0_8),
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
...@@ -193,13 +197,15 @@ impl fmt::Display for Output { ...@@ -193,13 +197,15 @@ impl fmt::Display for Output {
Output::MistralRs => "mistralrs", Output::MistralRs => "mistralrs",
#[cfg(feature = "sglang")] #[cfg(feature = "sglang")]
Output::SgLangLegacy => "sglang_legacy",
Output::SgLang => "sglang", Output::SgLang => "sglang",
#[cfg(feature = "llamacpp")] #[cfg(feature = "llamacpp")]
Output::LlamaCpp => "llamacpp", Output::LlamaCpp => "llamacpp",
#[cfg(feature = "vllm")]
Output::Vllm => "vllm", Output::Vllm => "vllm",
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
Output::Vllm0_8 => "vllm0_8", Output::Vllm0_8 => "vllm0_8",
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
...@@ -222,27 +228,11 @@ impl fmt::Display for Output { ...@@ -222,27 +228,11 @@ impl fmt::Display for Output {
/// Returns the engine to use if user did not say on cmd line. /// Returns the engine to use if user did not say on cmd line.
/// Nearly always defaults to mistralrs which has no dependencies and we include by default. /// Nearly always defaults to mistralrs which has no dependencies and we include by default.
/// If built with --no-default-features and a specific engine, default to that. /// If built with --no-default-features default to subprocess vllm.
#[allow(unused_assignments, unused_mut)] #[allow(unused_assignments, unused_mut)]
impl Default for Output { impl Default for Output {
fn default() -> Self { fn default() -> Self {
// Default if no engines let mut out = Output::Vllm;
let mut out = Output::EchoFull;
#[cfg(feature = "llamacpp")]
{
out = Output::LlamaCpp;
}
#[cfg(feature = "sglang")]
{
out = Output::SgLang;
}
#[cfg(feature = "vllm")]
{
out = Output::Vllm;
}
#[cfg(feature = "mistralrs")] #[cfg(feature = "mistralrs")]
{ {
...@@ -267,14 +257,15 @@ impl Output { ...@@ -267,14 +257,15 @@ impl Output {
out.push(Output::LlamaCpp.to_string()); out.push(Output::LlamaCpp.to_string());
} }
out.push(Output::SgLang.to_string());
#[cfg(feature = "sglang")] #[cfg(feature = "sglang")]
{ {
out.push(Output::SgLang.to_string()); out.push(Output::SgLangLegacy.to_string());
} }
out.push(Output::Vllm.to_string());
#[cfg(feature = "vllm")] #[cfg(feature = "vllm")]
{ {
out.push(Output::Vllm.to_string());
out.push(Output::Vllm0_7.to_string()); out.push(Output::Vllm0_7.to_string());
out.push(Output::Vllm0_8.to_string()); out.push(Output::Vllm0_8.to_string());
} }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::io::Write;
use std::path::Path;
use std::process::Stdio;
use std::sync::LazyLock;
use anyhow::Context;
use regex::Regex;
use tokio::io::AsyncBufReadExt;
pub mod sglang;
pub mod vllm;
/// Internal endpoint to connect the subprocess over etcd/nats
pub const ENDPOINT: &str = "dyn://dynamo.internal.worker";
pub async fn start(
// The Python code to run
py_script: &'static str,
// Path to folder or file with model weights
model_path: &Path,
// How many GPUs to use
tensor_parallel_size: u32,
// sglang which GPU to start from, on a multi-GPU system
// vllm uses CUDA_VISIBLE_DEVICES
base_gpu_id: Option<u32>,
// Path to a JSON file containing extra arguments to the backend engine
extra_engine_args: Option<&Path>,
) -> anyhow::Result<(tempfile::TempPath, tokio::process::Child)> {
let mut tmp = tempfile::NamedTempFile::new()?;
// Writes on Linux don't block
tmp.write_all(py_script.as_bytes())?;
let script_path = tmp.into_temp_path();
let mut args = vec![
script_path.to_string_lossy().to_string(),
"--endpoint".to_string(),
ENDPOINT.to_string(),
"--model".to_string(),
model_path.to_string_lossy().to_string(),
"--tensor-parallel-size".to_string(),
tensor_parallel_size.to_string(),
];
// sglang only
if let Some(base_gpu_id) = base_gpu_id {
args.push("--base-gpu-id".to_string());
args.push(base_gpu_id.to_string());
}
if let Some(extra_engine_args) = extra_engine_args {
args.push("--extra-engine-args".to_string());
args.push(extra_engine_args.to_string_lossy().to_string());
}
let mut cmd = tokio::process::Command::new("python3");
cmd.kill_on_drop(false)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.with_context(|| format!("Failed running: '{}'", pretty_cmd(&cmd)))?;
// Safety: We set stdout/stderr a few lines above
let stdout = tokio::io::BufReader::new(child.stdout.take().unwrap());
let stderr = tokio::io::BufReader::new(child.stderr.take().unwrap());
tokio::spawn(async move {
let mut lines = stdout.lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!("{}", strip_log_prefix(&line));
}
});
tokio::spawn(async move {
let mut lines = stderr.lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!("{}", strip_log_prefix(&line));
}
});
// We must keep temp path alive, it deletes on drop
Ok((script_path, child))
}
pub fn pretty_cmd(c: &tokio::process::Command) -> String {
format!(
"{} {}",
c.as_std().get_program().to_string_lossy(),
c.as_std()
.get_args()
.map(|x| x.to_string_lossy())
.collect::<Vec<std::borrow::Cow<'_, str>>>()
.join(" ")
)
}
// Thanks Gemini
static LOG_PREFIX_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"^(?:(?:[A-Z]+ \d{2}-\d{2} \d{2}:\d{2}:\d{2})|(?:\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\])) (.*)$"
).unwrap()
// ^ Start of the line.
// (?: Non-capturing group for the two prefix alternatives.
// (?: Non-capturing group for the first prefix type.
// [A-Z]+ One or more uppercase letters (log level).
// (single space) A literal space.
// \d{2}-\d{2} Date like MM-DD.
// (single space) A literal space.
// \d{2}:\d{2}:\d{2} Time like HH:MM:SS.
// ) End of first prefix type group.
// | OR
// (?: Non-capturing group for the second prefix type.
// \[ A literal opening square bracket.
// \d{4}-\d{2}-\d{2} Date like YYYY-MM-DD.
// (single space) A literal space.
// \d{2}:\d{2}:\d{2} Time like HH:MM:SS.
// \] A literal closing square bracket.
// ) End of second prefix type group.
// ) End of the alternatives group.
// (single space) A literal space. This is the space BEFORE the message.
// (.*) Capture group 1: The rest of the line (the message).
// $ End of the line.
});
/// Strips the log level, date, and time from the start of a log line.
///
/// # Examples
/// ```
/// let line = "INFO 05-06 09:38:50 [async_llm.py:252] Added request 1";
/// assert_eq!(strip_log_prefix(line), "[async_llm.py:252] Added request 1");
///
/// let line_no_prefix = "This is a normal line.";
/// assert_eq!(strip_log_prefix(line_no_prefix), "This is a normal line.");
/// ```
fn strip_log_prefix(line: &str) -> Cow<'_, str> {
if let Some(captures) = LOG_PREFIX_RE.captures(line) {
// `captures.get(0)` would be the entire matched prefix + message.
// `captures.get(1)` is the first capture group, which is `(.*)`, the message itself.
if let Some(message_match) = captures.get(1) {
return Cow::Borrowed(message_match.as_str());
}
}
// If the regex doesn't match, or somehow the capture group is not found (shouldn't happen with (.*))
// return the original line.
Cow::Borrowed(line)
}
#[cfg(test)]
mod tests {
use super::strip_log_prefix;
#[test]
fn test_strip_log_prefix() {
let line = "INFO 05-06 09:38:50 [async_llm.py:252] Added request 1";
let expected = "[async_llm.py:252] Added request 1";
assert_eq!(strip_log_prefix(line), expected);
let line = "Just a regular line.";
assert_eq!(strip_log_prefix(line), line);
let line = "INFO this is not a full prefix";
assert_eq!(strip_log_prefix(line), line);
let line = "[2025-05-06 11:58:51] Capture cuda graph bs [1, 2, 4, 8]";
assert_eq!(strip_log_prefix(line), "Capture cuda graph bs [1, 2, 4, 8]");
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Source code of the SGLang sub-process
pub const PY: &str = include_str!("sglang_inc.py");
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# A very basic example of sglang worker handling pre-processed requests.
#
# Dynamo does the HTTP handling, prompt templating and tokenization, then forwards the
# request via NATS to this python script, which runs sglang.
#
# Setup a virtualenv with dynamo.llm, dynamo.runtime and sglang[all] installed
# in lib/bindings/python `maturin develop` and `pip install -e .` should do it
# Start nats and etcd:
# - nats-server -js
#
# Window 1: `python server_sglang.py`. Wait for log "Starting endpoint".
# Window 2: `dynamo-run out=dyn://dynamo.backend.generate`
import argparse
import asyncio
import sys
import sglang
import uvloop
from sglang.srt.server_args import ServerArgs
from dynamo.llm import ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen2.5-0.5B-Instruct"
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model: str
base_gpu_id: int
tensor_parallel_size: int
extra_engine_args: str
class RequestHandler:
"""
Request handler for the generate endpoint
"""
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
# print(f"Received request: {request}")
sampling_params = {
"temperature": request["sampling_options"]["temperature"],
# sglang defaults this to 128
"max_new_tokens": request["stop_conditions"]["max_tokens"],
}
num_output_tokens_so_far = 0
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"], sampling_params=sampling_params, stream=True
)
async for res in gen:
# res is a dict
finish_reason = res["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
else:
next_total_toks = len(res["output_ids"])
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
yield out
num_output_tokens_so_far = next_total_toks
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
await init(runtime, cmd_line_args())
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
print("Started server instance")
await register_llm(endpoint, config.model, ModelType.Backend)
arg_map = {
"model_path": config.model,
"skip_tokenizer_init": True,
"tp_size": config.tensor_parallel_size,
"base_gpu_id": config.base_gpu_id,
}
if config.extra_engine_args != "":
json_map = {}
# extra_engine_args is a filename
try:
with open(config.extra_engine_args) as f:
json_map = json.load(f)
except FileNotFoundError:
logging.error(f"File {config.extra_engine_args} not found.")
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in {config.extra_engine_args}: {e}")
logging.debug(f"Adding extra engine arguments: {json_map}")
arg_map = {**arg_map, **json_map} # json_map gets precedence
engine_args = ServerArgs(**arg_map)
engine_client = sglang.Engine(server_args=engine_args)
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
await endpoint.serve_endpoint(RequestHandler(engine_client).generate, None)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="SGLang server integrated with Dynamo LLM."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help=f"Path to disk model or HuggingFace model identifier to load. Default: {DEFAULT_MODEL}",
)
parser.add_argument(
"--base-gpu-id",
type=int,
default=0,
help="The base GPU ID to start allocating GPUs from. Useful when running multiple instances on the same machine.",
)
parser.add_argument(
"--tensor-parallel-size", type=int, default=1, help="Number of GPUs to use."
)
parser.add_argument(
"--extra-engine-args",
type=str,
default="",
help="Path to a JSON file containing additional keyword arguments to pass to the SGLang Engine.",
)
args = parser.parse_args()
config = Config()
config.model = args.model
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
print(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
config.base_gpu_id = args.base_gpu_id
config.tensor_parallel_size = args.tensor_parallel_size
config.extra_engine_args = args.extra_engine_args
return config
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Source code of the VLLM sub-process
pub const PY: &str = include_str!("vllm_inc.py");
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A very basic example of vllm worker handling pre-processed requests.
#
# Dynamo does the HTTP handling, prompt templating and tokenization, then forwards the
# request via NATS to this python script, which runs vllm.
#
# Setup a virtualenv with dynamo.llm, dynamo.runtime and vllm installed
# in lib/bindings/python `maturin develop` and `pip install -e .` should do it
# Start nats and etcd:
# - nats-server -js
#
# Window 1: `python server_vllm.py`. Wait for log "Starting endpoint".
# Window 2: `dynamo-run out=dyn://dynamo.backend.generate`
import argparse
import asyncio
import logging
import sys
import uvloop
from vllm import SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.inputs import TokensPrompt
from dynamo.llm import ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen2.5-0.5B-Instruct"
# TODO this should match DYN_LOG level
logging.basicConfig(level=logging.INFO)
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model: str
tensor_parallel_size: int
extra_engine_args: str
class RequestHandler:
"""
Request handler for the generate endpoint
"""
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
request_id = "1" # hello_world example only
logging.debug(f"Received request: {request}")
prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
sampling_params = SamplingParams(
temperature=request["sampling_options"]["temperature"],
# vllm defaults this to 16
max_tokens=request["stop_conditions"]["max_tokens"],
)
num_output_tokens_so_far = 0
gen = self.engine_client.generate(prompt, sampling_params, request_id)
async for res in gen:
# res is vllm's RequestOutput
# This is the expected way for a request to end.
# The new token ID will be eos, don't forward it.
if res.finished:
yield {"finish_reason": "stop", "token_ids": []}
break
if not res.outputs:
yield {"finish_reason": "error", "token_ids": []}
break
output = res.outputs[0]
next_total_toks = len(output.token_ids)
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
out["stop_reason"] = output.stop_reason
yield out
num_output_tokens_so_far = next_total_toks
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
await init(runtime, cmd_line_args())
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
logging.info("Started server instance")
await register_llm(endpoint, config.model, ModelType.Backend)
arg_map = {
"model": config.model,
"task": "generate",
"tensor_parallel_size": config.tensor_parallel_size,
"skip_tokenizer_init": True,
}
if config.extra_engine_args != "":
json_map = {}
# extra_engine_args is a filename
try:
with open(config.extra_engine_args) as f:
json_map = json.load(f)
except FileNotFoundError:
logging.error(f"File {config.extra_engine_args} not found.")
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in {config.extra_engine_args}: {e}")
logging.debug(f"Adding extra engine arguments: {json_map}")
arg_map = {**arg_map, **json_map} # json_map gets precedence
engine_args = AsyncEngineArgs(**arg_map)
engine_context = build_async_engine_client_from_engine_args(engine_args)
engine_client = await engine_context.__aenter__()
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
await endpoint.serve_endpoint(RequestHandler(engine_client).generate, None)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="vLLM server integrated with Dynamo LLM."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help=f"Path to disk model or HuggingFace model identifier to load. Default: {DEFAULT_MODEL}",
)
parser.add_argument(
"--tensor-parallel-size", type=int, default=1, help="Number of GPUs to use."
)
parser.add_argument(
"--extra-engine-args",
type=str,
default="",
help="Path to a JSON file containing additional keyword arguments to pass to the vLLM AsyncLLMEngine.",
)
args = parser.parse_args()
config = Config()
config.model = args.model
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
logging.error(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
config.tensor_parallel_size = args.tensor_parallel_size
config.extra_engine_args = args.extra_engine_args
return config
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A very basic example of sglang worker handling pre-processed requests.
#
# Dynamo does the HTTP handling, prompt templating and tokenization, then forwards the
# request via NATS to this python script, which runs sglang.
#
# Setup a virtualenv with dynamo.llm, dynamo.runtime and sglang[all] installed
# in lib/bindings/python `maturin develop` and `pip install -e .` should do it
# Start nats and etcd:
# - nats-server -js
#
# Window 1: `python server_sglang.py`. Wait for log "Starting endpoint".
# Window 2: `dynamo-run out=dyn://dynamo.backend.generate`
import argparse
import asyncio
import sys
import sglang
import uvloop
from sglang.srt.server_args import ServerArgs
from dynamo.llm import ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen2.5-0.5B-Instruct"
class Config:
"""Command line parameters or defaults"""
namespace: str
component: str
endpoint: str
model: str
class RequestHandler:
"""
Request handler for the generate endpoint
"""
def __init__(self, engine):
self.engine_client = engine
async def generate(self, request):
# print(f"Received request: {request}")
sampling_params = {
"temperature": request["sampling_options"]["temperature"],
# sglang defaults this to 128
"max_new_tokens": request["stop_conditions"]["max_tokens"],
}
num_output_tokens_so_far = 0
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"], sampling_params=sampling_params, stream=True
)
async for res in gen:
# res is a dict
finish_reason = res["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
else:
next_total_toks = len(res["output_ids"])
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
yield out
num_output_tokens_so_far = next_total_toks
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
await init(runtime, cmd_line_args())
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
endpoint = component.endpoint(config.endpoint)
print("Started server instance")
await register_llm(endpoint, config.model, ModelType.Backend)
engine_args = ServerArgs(
model_path=config.model,
skip_tokenizer_init=True,
)
engine_client = sglang.Engine(server_args=engine_args)
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked
await endpoint.serve_endpoint(RequestHandler(engine_client).generate, None)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="SGLang server integrated with Dynamo runtime."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help=f"Path to disk model or HuggingFace model identifier to load. Default: {DEFAULT_MODEL}",
)
args = parser.parse_args()
config = Config()
config.model = args.model
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
print(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
return config
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
...@@ -151,10 +151,7 @@ def cmd_line_args(): ...@@ -151,10 +151,7 @@ def cmd_line_args():
config = Config() config = Config()
config.model = args.model config.model = args.model
endpoint_str = args.endpoint endpoint_str = args.endpoint.replace("dyn://", "", 1)
if endpoint_str.startswith("dyn://"):
endpoint_str = endpoint_str[len("dyn://") :]
endpoint_parts = endpoint_str.split(".") endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3: if len(endpoint_parts) != 3:
print( print(
......
...@@ -29,6 +29,11 @@ const DEFAULT_COMPONENT: &str = "C"; ...@@ -29,6 +29,11 @@ const DEFAULT_COMPONENT: &str = "C";
const DEFAULT_ENDPOINT: &str = "E"; const DEFAULT_ENDPOINT: &str = "E";
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
pub const ENDPOINT_SCHEME: &str = "dyn://";
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Component { pub struct Component {
pub name: String, pub name: String,
...@@ -158,9 +163,13 @@ impl FromStr for Endpoint { ...@@ -158,9 +163,13 @@ impl FromStr for Endpoint {
/// assert_eq!(endpoint.namespace, "namespace"); /// assert_eq!(endpoint.namespace, "namespace");
/// assert_eq!(endpoint.component, "component"); /// assert_eq!(endpoint.component, "component");
/// assert_eq!(endpoint.name, "endpoint"); /// assert_eq!(endpoint.name, "endpoint");
/// let endpoint: Endpoint = "dyn://namespace/component/endpoint".parse().unwrap();
/// // same as above
/// assert_eq!(endpoint.name, "endpoint");
/// ``` /// ```
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Endpoint::from(s)) let cleaned = s.strip_prefix(ENDPOINT_SCHEME).unwrap_or(s);
Ok(Endpoint::from(cleaned))
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment