Commit 84985d3f authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

refactor: migrate engines to standalone crates (#453)



Moved all of `lib/llm/src/engines` to their own crates as e.g. `lib/engines/mistralrs`. This will allow publishing of the `dynamo-llm` crate as it won't have any github dependencies.

The only engines in dynamo-llm will be the demo `echo` ones.
Co-authored-by: default avatarGraham King <grahamk@nvidia.com>
parent 6eb10540
......@@ -14,7 +14,7 @@
// limitations under the License.
use super::*;
use crate::protocols::{
use dynamo_llm::protocols::{
common::{self},
TokenIdType,
};
......
......@@ -15,7 +15,7 @@
use std::sync::Arc;
use crate::backend::ExecutionContext;
use dynamo_llm::backend::ExecutionContext;
use dynamo_runtime::pipeline::error as pipeline_error;
pub mod executor;
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "dynamo-engine-vllm"
version.workspace = true
edition.workspace = true
description.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
[dependencies]
dynamo-runtime = { workspace = true }
dynamo-llm = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
async_zmq = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
async-openai = "0.27.2"
pyo3 = { version = "0.23.3", default-features = false, features = [
"macros",
"experimental-async",
"experimental-inspect",
"py-clone",
] }
regex = "1"
serde-pickle = "1.2.0"
......@@ -18,14 +18,15 @@ use std::path::{Path, PathBuf};
use async_stream::stream;
use async_trait::async_trait;
use crate::engines::vllm::worker;
use crate::engines::MultiNodeConfig;
use crate::protocols::common::llm_backend::{BackendInput, LLMEngineOutput};
use dynamo_llm::engines::MultiNodeConfig;
use dynamo_llm::protocols::common::llm_backend::{BackendInput, LLMEngineOutput};
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated;
use dynamo_runtime::runtime::CancellationToken;
use crate::worker;
pub struct VllmEngine {
cancel_token: CancellationToken,
worker: worker::VllmWorker,
......
......@@ -19,11 +19,13 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use pyo3::prelude::*;
use dynamo_runtime::pipeline::error as pipeline_error;
use dynamo_runtime::CancellationToken;
use crate::backend::ExecutionContext;
use crate::engines::MultiNodeConfig;
use dynamo_llm::backend::ExecutionContext;
use dynamo_llm::engines::MultiNodeConfig;
mod engine;
use engine::VllmEngine;
......@@ -144,3 +146,22 @@ impl Future for StopFuture {
}
}
}
#[cfg(target_os = "macos")]
fn fix_venv(venv: String, py: Python<'_>) -> anyhow::Result<()> {
let version_info = py.version_info();
let sys: PyObject = py.import("sys")?.into();
let sys_path = sys.getattr(py, "path")?;
let venv_path = format!(
"{venv}/lib/python{}.{}/site-packages",
version_info.major, version_info.minor
);
// TODO: This should go _before_ the site-packages
sys_path.call_method1(py, "append", (venv_path,))?;
Ok(())
}
#[cfg(not(target_os = "macos"))]
fn fix_venv(_venv: String, _py: Python<'_>) -> anyhow::Result<()> {
Ok(())
}
......@@ -22,7 +22,6 @@ use thiserror::Error;
use tokio::io::AsyncBufReadExt;
use tokio::select;
use tokio::time;
use tracing;
use dynamo_runtime::CancellationToken;
......
......@@ -18,7 +18,7 @@ use std::env;
use std::ffi::CString;
use std::path::{Path, PathBuf};
use crate::engines::MultiNodeConfig;
use dynamo_llm::engines::MultiNodeConfig;
const PY_START_ENGINE: &str = include_str!("vllm_inc.py");
......@@ -34,7 +34,7 @@ pub fn run_subprocess(
) -> anyhow::Result<()> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") {
let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py));
let _ = Python::with_gil(|py| crate::fix_venv(venv, py));
}
let model_path_str = model_path.display().to_string();
let extra_engine_args_str = &extra_engine_args
......
......@@ -33,11 +33,11 @@ use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{error::SendError, Sender};
use tokio::task::JoinHandle;
use crate::engines::MultiNodeConfig;
use crate::kv_router::protocols::ForwardPassMetrics;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::common::preprocessor::PreprocessedRequest;
use crate::protocols::common::FinishReason;
use dynamo_llm::engines::MultiNodeConfig;
use dynamo_llm::kv_router::protocols::ForwardPassMetrics;
use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput;
use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest;
use dynamo_llm::protocols::common::FinishReason;
/// Wait this long for the vllm sub-process to stop after we send it a KILL
const VLLM_STOP_TIMEOUT: Duration = Duration::from_millis(1500);
......@@ -167,7 +167,7 @@ pub async fn start(
) -> anyhow::Result<VllmWorker> {
pyo3::prepare_freethreaded_python(); // or enable feature "auto-initialize"
if let Ok(venv) = env::var("VIRTUAL_ENV") {
let _ = Python::with_gil(|py| crate::engines::fix_venv(venv, py));
let _ = Python::with_gil(|py| crate::fix_venv(venv, py));
}
let py_imports = Arc::new(python_imports());
......
......@@ -28,17 +28,7 @@ description = "Dynamo LLM Library"
default = []
cuda_kv = ["dep:cudarc", "dep:ndarray"]
llamacpp = ["dep:llama-cpp-2"]
mistralrs = ["dep:mistralrs"]
python = ["dep:pyo3-async-runtimes", "dep:pythonize"]
sglang = []
sentencepiece = ["dep:sentencepiece"]
trtllm = []
vllm = []
cuda = ["mistralrs/cuda", "llama-cpp-2/cuda", "candle-core/cuda"]
metal = ["mistralrs/metal", "llama-cpp-2/metal"]
vulkan = ["llama-cpp-2/vulkan"]
[dependencies]
# repo
......@@ -80,14 +70,6 @@ rayon = "1"
# kv_cuda
cudarc = { git = "https://github.com/coreylowman/cudarc.git", rev = "8c52e735b55bf8e979e1a16bd85e3dfe4f87c9fe", features = ["cuda-12040"], optional = true }
ndarray = { version = "0.16", optional = true }
# half = "2.4.1"
pyo3 = { version = "0.23.3", default-features = false, features = [
"macros",
"experimental-async",
"experimental-inspect",
"py-clone",
] }
# protocols
unicode-segmentation = "1.12"
......@@ -95,17 +77,6 @@ unicode-segmentation = "1.12"
# http-service
axum = "0.8"
# mistralrs
indexmap = { version = "2.6" }
mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git", rev = "aaafc2ef", optional = true }
# sglang
libc = "0.2"
serde-pickle = "1.2.0"
# llamacpp
llama-cpp-2 = { version = "0.1.102", optional = true }
# tokenizers
tokenizers = { version = "0.21.1", default-features = false, features = [
"onig",
......@@ -125,19 +96,6 @@ erased-serde = { version = "0.4" }
itertools = { version = "0.14.0" }
minijinja = { version = "2.3.1", features = ["loader"] }
minijinja-contrib = { version = "2.3.1", features = ["pycompat"] }
semver = { version = "1", features = ["serde"] }
# trtllm
serde_repr = "0.1"
# python
pyo3-async-runtimes = { version = "0.23.0", optional = true, default-features = false, features = [
"attributes",
"testing",
"tokio-runtime",
"unstable-streams",
] }
pythonize = { version = "0.23", optional = true }
# GGUF
ggus = "0.4.0"
......@@ -156,7 +114,3 @@ insta = { version = "1.41", features = [
"redactions",
"filters",
] }
[build-dependencies]
bindgen = "0.70"
cmake = "0.1"
......@@ -131,66 +131,3 @@ fn get_cuda_root_or_default() -> String {
}
}
}
#[cfg(feature = "trtllm")]
fn main() {
extern crate bindgen;
use cmake::Config;
use std::env;
use std::path::PathBuf;
let installed_headers = "/usr/local/include/nvidia/nvllm/nvllm_trt.h";
let local_headers = "../bindings/cpp/nvllm-trt/include/nvidia/nvllm/nvllm_trt.h";
let headers_path;
if PathBuf::from(installed_headers).exists() {
headers_path = installed_headers;
println!("cargo:warning=nvllm found. Building with installed version...");
println!("cargo:rustc-link-search=native=/usr/local/lib");
println!("cargo:rustc-link-search=native=/opt/tensorrt_llm/lib");
println!("cargo:rustc-link-lib=dylib=nvllm_trt");
println!("cargo:rustc-link-lib=dylib=tensorrt_llm");
println!("cargo:rustc-link-lib=dylib=tensorrt_llm_nvrtc_wrapper");
println!("cargo:rustc-link-lib=dylib=nvinfer_plugin_tensorrt_llm");
println!("cargo:rustc-link-lib=dylib=decoder_attention");
println!("cargo:rerun-if-changed=/usr/local/lib");
} else if PathBuf::from(local_headers).exists() {
headers_path = local_headers;
println!("cargo:warning=nvllm not found. Building stub version...");
let dst = Config::new("../bindings/cpp/nvllm-trt")
.define("USE_STUBS", "ON")
.no_build_target(true)
.build();
println!("cargo:warning=building stubs in {}", dst.display());
let dst = dst.canonicalize().unwrap();
println!("cargo:rustc-link-search=native={}/build", dst.display());
println!("cargo:rustc-link-lib=dylib=nvllm_trt");
println!("cargo:rustc-link-lib=dylib=tensorrt_llm");
println!("cargo:rerun-if-changed=../bindings/cpp/nvllm-trt");
} else {
panic!("nvllm_trt.h not found");
}
// generate bindings for the trtllm c api
let bindings = bindgen::Builder::default()
.header(headers_path)
.generate()
.expect("Unable to generate bindings");
// Write the bindings to a file
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("bindings.rs"))
.expect("Could not write bindings!");
// // Build protobuf
// tonic_build::configure()
// .build_server(false)
// .compile_protos(&["../../proto/trtllm.proto"], &["../../proto"])
// .expect("Failed to compile protos");
}
......@@ -13,23 +13,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "mistralrs")]
pub mod mistralrs;
use std::env;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
#[cfg(feature = "sglang")]
pub mod sglang;
use async_stream::stream;
use async_trait::async_trait;
#[cfg(feature = "llamacpp")]
pub mod llamacpp;
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated;
#[cfg(feature = "vllm")]
pub mod vllm;
use crate::backend::ExecutionContext;
use crate::preprocessor::BackendInput;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
};
use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
#[cfg(feature = "trtllm")]
pub mod trtllm;
#[cfg(feature = "python")]
pub mod python;
//
// The engines are each in their own crate under `lib/engines`
//
#[derive(Debug, Clone)]
pub struct MultiNodeConfig {
......@@ -51,31 +57,122 @@ impl Default for MultiNodeConfig {
}
}
#[cfg(any(feature = "sglang", feature = "vllm", feature = "python"))]
use pyo3::prelude::*;
/// On Mac embedded Python interpreters do not pick up the virtual env.
#[cfg(all(
target_os = "macos",
any(feature = "sglang", feature = "vllm", feature = "python")
))]
fn fix_venv(venv: String, py: Python<'_>) -> anyhow::Result<()> {
let version_info = py.version_info();
let sys: PyObject = py.import("sys")?.into();
let sys_path = sys.getattr(py, "path")?;
let venv_path = format!(
"{venv}/lib/python{}.{}/site-packages",
version_info.major, version_info.minor
);
// TODO: This should go _before_ the site-packages
sys_path.call_method1(py, "append", (venv_path,))?;
Ok(())
//
// Example echo engines
//
/// How long to sleep between echoed tokens.
/// Default is 10ms which gives us 100 tok/s.
/// Can be configured via the DYN_TOKEN_ECHO_DELAY_MS environment variable.
pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| {
const DEFAULT_DELAY_MS: u64 = 10;
let delay_ms = env::var("DYN_TOKEN_ECHO_DELAY_MS")
.ok()
.and_then(|val| val.parse::<u64>().ok())
.unwrap_or(DEFAULT_DELAY_MS);
Duration::from_millis(delay_ms)
});
/// Engine that accepts pre-processed requests and echos the tokens back as the response
/// The response will include the full prompt template.
/// Useful for testing pre-processing.
struct EchoEngineCore {}
pub fn make_engine_core() -> ExecutionContext {
Arc::new(EchoEngineCore {})
}
#[async_trait]
impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for EchoEngineCore
{
async fn generate(
&self,
incoming_request: SingleIn<BackendInput>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = incoming_request.into_parts();
let ctx = context.context();
let output = stream! {
for tok in request.token_ids {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
yield delta_core(tok);
}
yield Annotated::from_data(LLMEngineOutput::stop());
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
#[cfg(all(
target_os = "linux",
any(feature = "sglang", feature = "vllm", feature = "python")
))]
fn fix_venv(_venv: String, _py: Python<'_>) -> anyhow::Result<()> {
Ok(())
fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
let delta = LLMEngineOutput {
token_ids: vec![tok],
tokens: None,
text: None,
cum_log_probs: None,
log_probs: None,
finish_reason: None,
};
Annotated::from_data(delta)
}
/// Engine that accepts un-preprocessed requests and echos the prompt back as the response
/// Useful for testing ingress such as service-http.
struct EchoEngineFull {}
pub fn make_engine_full() -> OpenAIChatCompletionsStreamingEngine {
Arc::new(EchoEngineFull {})
}
#[async_trait]
impl
AsyncEngine<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
Error,
> for EchoEngineFull
{
async fn generate(
&self,
incoming_request: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let deltas = request.response_generator();
let ctx = context.context();
let req = request.inner.messages.into_iter().last().unwrap();
let prompt = match req {
async_openai::types::ChatCompletionRequestMessage::User(user_msg) => {
match user_msg.content {
async_openai::types::ChatCompletionRequestUserMessageContent::Text(prompt) => {
prompt
}
_ => anyhow::bail!("Invalid request content field, expected Content::Text"),
}
}
_ => anyhow::bail!("Invalid request type, expected User message"),
};
let output = stream! {
let mut id = 1;
for c in prompt.chars() {
// we are returning characters not tokens, so there will be some postprocessing overhead
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let inner = deltas.create_choice(0, Some(c.to_string()), None, None);
let response = NvCreateChatCompletionStreamResponse {
inner,
};
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
id += 1;
}
let inner = deltas.create_choice(0, None, Some(async_openai::types::FinishReason::Stop), None);
let response = NvCreateChatCompletionStreamResponse {
inner,
};
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment