Unverified Commit 0a1d1fbe authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(dynamo-llm): Remove bring-your-own-engine (#1216)

It was removed from the docs in 0.2.1 and replaced with writing a [standalone Python engine](https://github.com/ai-dynamo/dynamo/blob/main/docs/guides/dynamo_run.md#writing-your-own-engine-in-python).

Also remove the associated `dynamo-run` feature `python`.

Releasing this in 0.3.0 will resolve #784 and #1109.
parent edc6fdea
...@@ -155,18 +155,6 @@ version = "1.5.0" ...@@ -155,18 +155,6 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-nats" name = "async-nats"
version = "0.40.0" version = "0.40.0"
...@@ -961,15 +949,6 @@ version = "1.0.3" ...@@ -961,15 +949,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "console" name = "console"
version = "0.15.11" version = "0.15.11"
...@@ -1579,27 +1558,6 @@ dependencies = [ ...@@ -1579,27 +1558,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "dynamo-engine-python"
version = "0.2.1"
dependencies = [
"anyhow",
"async-openai",
"async-stream",
"async-trait",
"dynamo-llm",
"dynamo-runtime",
"pyo3",
"pyo3-async-runtimes",
"pythonize",
"serde",
"serde_json",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "dynamo-llm" name = "dynamo-llm"
version = "0.2.1" version = "0.2.1"
...@@ -1682,7 +1640,6 @@ dependencies = [ ...@@ -1682,7 +1640,6 @@ dependencies = [
"dialoguer", "dialoguer",
"dynamo-engine-llamacpp", "dynamo-engine-llamacpp",
"dynamo-engine-mistralrs", "dynamo-engine-mistralrs",
"dynamo-engine-python",
"dynamo-llm", "dynamo-llm",
"dynamo-runtime", "dynamo-runtime",
"futures", "futures",
...@@ -1974,27 +1931,6 @@ dependencies = [ ...@@ -1974,27 +1931,6 @@ dependencies = [
"tower-service", "tower-service",
] ]
[[package]]
name = "event-listener"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]] [[package]]
name = "eventsource-stream" name = "eventsource-stream"
version = "0.2.3" version = "0.2.3"
...@@ -3179,12 +3115,6 @@ dependencies = [ ...@@ -3179,12 +3115,6 @@ dependencies = [
"web-time", "web-time",
] ]
[[package]]
name = "indoc"
version = "2.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
[[package]] [[package]]
name = "inlinable_string" name = "inlinable_string"
version = "0.1.15" version = "0.1.15"
...@@ -3232,15 +3162,6 @@ dependencies = [ ...@@ -3232,15 +3162,6 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "inventory"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83"
dependencies = [
"rustversion",
]
[[package]] [[package]]
name = "iovec" name = "iovec"
version = "0.1.4" version = "0.1.4"
...@@ -3705,15 +3626,6 @@ dependencies = [ ...@@ -3705,15 +3626,6 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "memoffset"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "metal" name = "metal"
version = "0.27.0" version = "0.27.0"
...@@ -4120,7 +4032,7 @@ dependencies = [ ...@@ -4120,7 +4032,7 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
"memoffset 0.7.1", "memoffset",
"pin-utils", "pin-utils",
] ]
...@@ -4468,12 +4380,6 @@ dependencies = [ ...@@ -4468,12 +4380,6 @@ dependencies = [
"unicode-width 0.2.0", "unicode-width 0.2.0",
] ]
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.3" version = "0.12.3"
...@@ -4964,107 +4870,6 @@ dependencies = [ ...@@ -4964,107 +4870,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "pyo3"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7778bffd85cf38175ac1f545509665d0b9b92a198ca7941f131f85f7a4f9a872"
dependencies = [
"cfg-if 1.0.0",
"indoc",
"libc",
"memoffset 0.9.1",
"once_cell",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-async-runtimes"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977dc837525cfd22919ba6a831413854beb7c99a256c03bf8624ad707e45810e"
dependencies = [
"async-channel",
"clap",
"futures",
"inventory",
"once_cell",
"pin-project-lite",
"pyo3",
"pyo3-async-runtimes-macros",
"tokio",
]
[[package]]
name = "pyo3-async-runtimes-macros"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2df2884957d2476731f987673befac5d521dff10abb0a7cbe12015bc7702fe9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "pyo3-build-config"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb"
dependencies = [
"once_cell",
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9f1b4c431c0bb1c8fb0a338709859eed0d030ff6daa34368d3b152a63dfdd8d"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-macros"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbc2201328f63c4710f68abdf653c89d8dbc2858b88c5d88b0ff38a75288a9da"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn 2.0.100",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fca6726ad0f3da9c9de093d6f116a93c1a38e417ed73bf138472cf4064f72028"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"pyo3-build-config",
"quote",
"syn 2.0.100",
]
[[package]]
name = "pythonize"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91a6ee7a084f913f98d70cdc3ebec07e852b735ae3059a1500db2661265da9ff"
dependencies = [
"pyo3",
"serde",
]
[[package]] [[package]]
name = "qoi" name = "qoi"
version = "0.4.1" version = "0.4.1"
...@@ -7146,12 +6951,6 @@ version = "0.1.1" ...@@ -7146,12 +6951,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "unindent"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3"
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.11" version = "0.2.11"
......
...@@ -112,7 +112,7 @@ dynamo-build: ...@@ -112,7 +112,7 @@ dynamo-build:
COPY deploy/ deploy/ COPY deploy/ deploy/
ENV CARGO_TARGET_DIR=/workspace/target ENV CARGO_TARGET_DIR=/workspace/target
RUN cargo build --release --locked --features llamacpp,python,cuda && \ RUN cargo build --release --locked --features llamacpp,cuda && \
cargo doc --no-deps cargo doc --no-deps
# Create symlinks for wheel building # Create symlinks for wheel building
......
...@@ -46,7 +46,7 @@ ARG CARGO_BUILD_JOBS ...@@ -46,7 +46,7 @@ ARG CARGO_BUILD_JOBS
ENV CARGO_TARGET_DIR=/workspace/target ENV CARGO_TARGET_DIR=/workspace/target
RUN cargo build --release --locked --features mistralrs,python && \ RUN cargo build --release --locked && \
cargo doc --no-deps && \ cargo doc --no-deps && \
cp target/release/dynamo-run /usr/local/bin && \ cp target/release/dynamo-run /usr/local/bin && \
cp target/release/http /usr/local/bin && \ cp target/release/http /usr/local/bin && \
......
...@@ -275,7 +275,7 @@ COPY launch /workspace/launch ...@@ -275,7 +275,7 @@ COPY launch /workspace/launch
COPY deploy/sdk /workspace/deploy/sdk COPY deploy/sdk /workspace/deploy/sdk
# Build Rust crate binaries packaged with the wheel # Build Rust crate binaries packaged with the wheel
RUN cargo build --release --locked --features mistralrs,python \ RUN cargo build --release --locked \
-p dynamo-run \ -p dynamo-run \
-p llmctl \ -p llmctl \
# Multiple http named crates are present in dependencies, need to specify the path # Multiple http named crates are present in dependencies, need to specify the path
......
...@@ -368,7 +368,7 @@ COPY launch /workspace/launch ...@@ -368,7 +368,7 @@ COPY launch /workspace/launch
COPY deploy/sdk /workspace/deploy/sdk COPY deploy/sdk /workspace/deploy/sdk
# Build Rust crate binaries packaged with the wheel # Build Rust crate binaries packaged with the wheel
RUN cargo build --release --locked --features mistralrs,python,dynamo-llm/block-manager \ RUN cargo build --release --locked --features dynamo-llm/block-manager \
-p dynamo-run \ -p dynamo-run \
-p llmctl \ -p llmctl \
# Multiple http named crates are present in dependencies, need to specify the path # Multiple http named crates are present in dependencies, need to specify the path
......
...@@ -29,7 +29,6 @@ description = "Dynamo Run CLI" ...@@ -29,7 +29,6 @@ description = "Dynamo Run CLI"
default = ["mistralrs"] default = ["mistralrs"]
mistralrs = ["dep:dynamo-engine-mistralrs"] mistralrs = ["dep:dynamo-engine-mistralrs"]
llamacpp = ["dep:dynamo-engine-llamacpp"] llamacpp = ["dep:dynamo-engine-llamacpp"]
python = ["dep:dynamo-engine-python"]
cuda = ["dynamo-engine-llamacpp/cuda", "dynamo-engine-mistralrs/cuda"] cuda = ["dynamo-engine-llamacpp/cuda", "dynamo-engine-mistralrs/cuda"]
metal = ["dynamo-engine-llamacpp/metal", "dynamo-engine-mistralrs/metal"] metal = ["dynamo-engine-llamacpp/metal", "dynamo-engine-mistralrs/metal"]
...@@ -41,7 +40,6 @@ dynamo-runtime = { workspace = true } ...@@ -41,7 +40,6 @@ dynamo-runtime = { workspace = true }
dynamo-engine-llamacpp = { path = "../../lib/engines/llamacpp", optional = true } dynamo-engine-llamacpp = { path = "../../lib/engines/llamacpp", optional = true }
dynamo-engine-mistralrs = { path = "../../lib/engines/mistralrs", optional = true } dynamo-engine-mistralrs = { path = "../../lib/engines/mistralrs", optional = true }
dynamo-engine-python = { path = "../../lib/engines/python", optional = true }
anyhow = { workspace = true } anyhow = { workspace = true }
async-stream = { workspace = true } async-stream = { workspace = true }
......
...@@ -18,10 +18,6 @@ mod subprocess; ...@@ -18,10 +18,6 @@ mod subprocess;
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2); const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";
/// Where we will attach the vllm/sglang subprocess. Invisible to users. /// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker"; pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";
...@@ -270,18 +266,6 @@ pub async fn run( ...@@ -270,18 +266,6 @@ pub async fn run(
model: Box::new(local_model), model: Box::new(local_model),
} }
} }
#[cfg(feature = "python")]
Output::PythonStr(path_str) => {
let card = local_model.card();
let py_args = flags.as_vec(&path_str, &card.service_name);
let p = std::path::PathBuf::from(path_str);
let engine =
dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticFull {
engine,
model: Box::new(local_model),
}
}
}; };
match in_opt { match in_opt {
......
...@@ -26,11 +26,11 @@ dynamo-run is a single binary that wires together the various inputs (http, text ...@@ -26,11 +26,11 @@ dynamo-run is a single binary that wires together the various inputs (http, text
Example: Example:
- cargo build --features cuda -p dynamo-run - cargo build --features cuda -p dynamo-run
- cd target/debug - cd target/debug
- ./dynamo-run Qwen/Qwen2.5-3B-Instruct - ./dynamo-run Qwen/Qwen3-0.6B
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf - OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#; "#;
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn://<path> [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]"; const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--kv-cache-block-size=16] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]";
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag // Set log level based on verbosity flag
......
...@@ -107,13 +107,6 @@ pub enum Output { ...@@ -107,13 +107,6 @@ pub enum Output {
// Start vllm in a sub-process connecting via nats // Start vllm in a sub-process connecting via nats
// Sugar for `python vllm_inc.py --endpoint <thing> --model <thing>` // Sugar for `python vllm_inc.py --endpoint <thing> --model <thing>`
Vllm, Vllm,
/// Run inference using a user supplied python file that accepts and returns
/// strings. It does it's own pre-processing.
#[cfg(feature = "python")]
PythonStr(String),
// DEVELOPER NOTE
// If you add an engine add it to `available_engines` below, and to Default if it makes sense
} }
impl TryFrom<&str> for Output { impl TryFrom<&str> for Output {
...@@ -145,14 +138,6 @@ impl TryFrom<&str> for Output { ...@@ -145,14 +138,6 @@ impl TryFrom<&str> for Output {
Ok(Output::Dynamic) Ok(Output::Dynamic)
} }
#[cfg(feature = "python")]
python_str_gen if python_str_gen.starts_with(crate::PYTHON_STR_SCHEME) => {
let path = python_str_gen
.strip_prefix(crate::PYTHON_STR_SCHEME)
.unwrap();
Ok(Output::PythonStr(path.to_string()))
}
e => Err(anyhow::anyhow!("Invalid out= option '{e}'")), e => Err(anyhow::anyhow!("Invalid out= option '{e}'")),
} }
} }
...@@ -175,9 +160,6 @@ impl fmt::Display for Output { ...@@ -175,9 +160,6 @@ impl fmt::Display for Output {
Output::EchoCore => "echo_core", Output::EchoCore => "echo_core",
Output::Dynamic => "dyn", Output::Dynamic => "dyn",
#[cfg(feature = "python")]
Output::PythonStr(_) => "pystr",
}; };
write!(f, "{s}") write!(f, "{s}")
} }
...@@ -218,11 +200,6 @@ impl Output { ...@@ -218,11 +200,6 @@ impl Output {
out.push(Output::Trtllm.to_string()); out.push(Output::Trtllm.to_string());
out.push(Output::Vllm.to_string()); out.push(Output::Vllm.to_string());
#[cfg(feature = "python")]
{
out.push(Output::PythonStr("file.py".to_string()).to_string());
}
out out
} }
} }
...@@ -1064,27 +1064,6 @@ dependencies = [ ...@@ -1064,27 +1064,6 @@ dependencies = [
"bytemuck", "bytemuck",
] ]
[[package]]
name = "dynamo-engine-python"
version = "0.2.1"
dependencies = [
"anyhow",
"async-openai",
"async-stream",
"async-trait",
"dynamo-llm",
"dynamo-runtime",
"pyo3",
"pyo3-async-runtimes",
"pythonize",
"serde",
"serde_json",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tracing",
]
[[package]] [[package]]
name = "dynamo-llm" name = "dynamo-llm"
version = "0.2.1" version = "0.2.1"
...@@ -1150,8 +1129,11 @@ dependencies = [ ...@@ -1150,8 +1129,11 @@ dependencies = [
name = "dynamo-py3" name = "dynamo-py3"
version = "0.2.1" version = "0.2.1"
dependencies = [ dependencies = [
"anyhow",
"async-openai",
"async-stream",
"async-trait",
"dlpark", "dlpark",
"dynamo-engine-python",
"dynamo-llm", "dynamo-llm",
"dynamo-runtime", "dynamo-runtime",
"futures", "futures",
......
...@@ -40,8 +40,11 @@ block-manager = ["dynamo-llm/block-manager", "dep:dlpark"] ...@@ -40,8 +40,11 @@ block-manager = ["dynamo-llm/block-manager", "dep:dlpark"]
[dependencies] [dependencies]
dynamo-llm = { path = "../../llm" } dynamo-llm = { path = "../../llm" }
dynamo-runtime = { path = "../../runtime" } dynamo-runtime = { path = "../../runtime" }
dynamo-engine-python = { path = "../../engines/python" }
anyhow = { version = "1" }
async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
async-openai = "0.27.2"
futures = { version = "0.3" } futures = { version = "0.3" }
once_cell = { version = "1.20.3" } once_cell = { version = "1.20.3" }
serde = { version = "1" } serde = { version = "1" }
......
...@@ -15,17 +15,22 @@ ...@@ -15,17 +15,22 @@
use std::sync::Arc; use std::sync::Arc;
use dynamo_engine_python::PythonServerStreamingEngine; use pyo3::prelude::*;
use dynamo_runtime::CancellationToken; use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
pub use dynamo_runtime::{ pub use dynamo_runtime::{
pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn}, pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream,
SingleIn,
},
protocols::annotated::Annotated, protocols::annotated::Annotated,
Error, Result, CancellationToken, Error, Result,
}; };
pub use serde::{Deserialize, Serialize}; pub use serde::{Deserialize, Serialize};
use pyo3::prelude::*;
/// Add bingings from this crate to the provided module /// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> { pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PythonAsyncEngine>()?; m.add_class::<PythonAsyncEngine>()?;
...@@ -102,3 +107,186 @@ where ...@@ -102,3 +107,186 @@ where
self.0.generate(request).await self.0.generate(request).await
} }
} }
#[derive(Clone)]
pub struct PythonServerStreamingEngine {
_cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
}
impl PythonServerStreamingEngine {
pub fn new(
cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
) -> Self {
PythonServerStreamingEngine {
_cancel_token: cancel_token,
generator,
event_loop,
}
}
}
#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),
#[error("deserialize error: {0}")]
DeserializeError(String),
#[error("gil offload error: {0}")]
OffloadError(String),
}
#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
// Create a context
let (request, context) = request.transfer(());
let ctx = context.context();
let id = context.id().to_string();
tracing::trace!("processing request: {}", id);
// Clone the PyObject to move into the thread
// Create a channel to communicate between the Python thread and the Rust async context
let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128);
let generator = self.generator.clone();
let event_loop = self.event_loop.clone();
// Acquiring the GIL is similar to acquiring a standard lock/mutex
// Performing this in an tokio async task could block the thread for an undefined amount of time
// To avoid this, we spawn a blocking task to acquire the GIL and perform the operations needed
// while holding the GIL.
//
// Under low GIL contention, we wouldn't need to do this.
// However, under high GIL contention, this can lead to significant performance degradation.
//
// Since we cannot predict the GIL contention, we will always use the blocking task and pay the
// cost. The Python GIL is the gift that keeps on giving -- performance hits...
let stream = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let gen = generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})
})
.await??;
let stream = Box::pin(stream);
// process the stream
// any error thrown in the stream will be caught and complete the processing task
// errors are captured by a task that is watching the processing task
// the error will be emitted as an annotated error
let request_id = id.clone();
tokio::spawn(async move {
tracing::debug!(
request_id,
"starting task to process python async generator stream"
);
let mut stream = stream;
let mut count = 0;
while let Some(item) = stream.next().await {
count += 1;
tracing::trace!(
request_id,
"processing the {}th item from python async generator",
count
);
let mut done = false;
let response = match process_item::<Resp>(item).await {
Ok(response) => response,
Err(e) => {
done = true;
let msg = match &e {
ResponseProcessingError::DeserializeError(e) => {
// tell the python async generator to stop generating
// right now, this is impossible as we are not passing the context to the python async generator
// todo: add task-local context to the python async generator
ctx.stop_generating();
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
msg
}
ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e);
msg
}
ResponseProcessingError::OffloadError(e) => {
let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e);
msg
}
};
Annotated::from_error(msg)
}
};
if tx.send(response).await.is_err() {
tracing::trace!(
request_id,
"error forwarding annotated response to channel; channel is closed"
);
break;
}
if done {
tracing::debug!(
request_id,
"early termination of python async generator stream task"
);
break;
}
}
tracing::debug!(
request_id,
"finished processing python async generator stream"
);
});
let stream = ReceiverStream::new(rx);
Ok(ResponseStream::new(Box::pin(stream), context.context()))
}
}
async fn process_item<Resp>(
item: Result<Py<PyAny>, PyErr>,
) -> Result<Annotated<Resp>, ResponseProcessingError>
where
Resp: Data + for<'de> Deserialize<'de>,
{
let item = item.map_err(|e| {
println!();
Python::with_gil(|py| e.display(py));
ResponseProcessingError::PythonException(e.to_string())
})?;
let response = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))
})
.await
.map_err(|e| ResponseProcessingError::OffloadError(e.to_string()))?
.map_err(|e| ResponseProcessingError::DeserializeError(e.to_string()))?;
let response = Annotated::from_data(response);
Ok(response)
}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "dynamo-engine-python"
version.workspace = true
edition.workspace = true
description.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
[dependencies]
dynamo-runtime = { workspace = true }
dynamo-llm = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
async-openai = "0.27.2"
pyo3 = { version = "0.23.3", default-features = false, features = [
"macros",
"experimental-async",
"experimental-inspect",
"py-clone",
] }
pyo3-async-runtimes = { version = "0.23.0", default-features = false, features = [
"attributes",
"testing",
"tokio-runtime",
"unstable-streams",
] }
pythonize = { version = "0.23" }
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ffi::CStr;
use std::{env, path::Path, sync::Arc};
use anyhow::Context;
use dynamo_runtime::pipeline::error as pipeline_error;
pub use dynamo_runtime::{
error,
pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream,
SingleIn,
},
protocols::annotated::Annotated,
CancellationToken, Error, Result,
};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyDict};
use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
pub use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::oneshot::Sender;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use dynamo_llm::engines::{EngineDispatcher, StreamingEngine};
/// Python snippet to import a file as a module
const PY_IMPORT: &CStr = cr#"
import runpy
import sys
import os
import functools
import types
module_dir = os.path.dirname(file_path)
if module_dir not in sys.path:
sys.path.insert(0, module_dir)
sys.argv = sys_argv
module_dict = runpy.run_path(file_path, run_name='__main__')
# Create a module class with the generate function
class Module:
def __init__(self, module_dict):
self.__dict__.update(module_dict)
self._generate_func = module_dict['generate']
async def generate(self, request):
async for response in self._generate_func(request):
yield response
# Create module instance and store it in globals
module = Module(module_dict)
globals()['module'] = module
"#;
/// An engine that takes and returns strings, feeding them to a python written engine
pub async fn make_string_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<Arc<dyn StreamingEngine>> {
pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| {
if let Err(e) = fix_venv(venv, py) {
tracing::warn!("failed to fix venv: {}", e);
}
});
}
let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: Arc<dyn StreamingEngine> = Arc::new(EngineDispatcher::new(engine));
Ok(engine)
}
#[derive(Clone)]
pub struct PythonServerStreamingEngine {
_cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
}
async fn new_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> anyhow::Result<PythonServerStreamingEngine> {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(move || run_asyncio(tx));
let event_loop = rx.await?;
let user_module =
python_file_to_module(py_file, py_args).with_context(|| py_file.display().to_string())?;
let generator = Python::with_gil(|py| {
user_module
.getattr(py, "generate")
.with_context(|| "generate")
})?;
Ok(PythonServerStreamingEngine::new(
cancel_token,
Arc::new(generator),
event_loop,
))
}
impl PythonServerStreamingEngine {
pub fn new(
cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
) -> Self {
PythonServerStreamingEngine {
_cancel_token: cancel_token,
generator,
event_loop,
}
}
}
/// Start asyncio event loop and block on it forever
fn run_asyncio(tx: Sender<Arc<PyObject>>) {
let event_loop: PyObject = Python::with_gil(|py| {
let aio: PyObject = py.import("asyncio").unwrap().into();
aio.call_method0(py, "new_event_loop").unwrap()
});
let event_loop = Arc::new(event_loop);
let _ = tx.send(event_loop.clone());
Python::with_gil(|py| {
let _ = event_loop.call_method0(py, "run_forever");
});
}
fn python_file_to_module(p: &Path, mut py_args: Vec<String>) -> Result<PyObject> {
if let Some(filename) = p.file_name() {
py_args.insert(0, filename.to_string_lossy().to_string());
};
let module: PyObject = Python::with_gil(|py| {
let py_file_path: PyObject = p.display().to_string().into_pyobject(py).unwrap().into();
let py_sys_argv: PyObject = py_args.into_pyobject(py).unwrap().into();
let globals = [("file_path", py_file_path), ("sys_argv", py_sys_argv)]
.into_py_dict(py)
.context("into_py_dict")?;
let locals = PyDict::new(py);
py.run(PY_IMPORT, Some(&globals), Some(&locals))
.context("PY_IMPORT")?;
let module = locals
.get_item("module")
.unwrap()
.context("get module after import")?;
module.extract().context("extract")
})?;
Ok(module)
}
#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),
#[error("deserialize error: {0}")]
DeserializeError(String),
#[error("gil offload error: {0}")]
OffloadError(String),
}
#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
// Create a context
let (request, context) = request.transfer(());
let ctx = context.context();
let id = context.id().to_string();
tracing::trace!("processing request: {}", id);
// Clone the PyObject to move into the thread
// Create a channel to communicate between the Python thread and the Rust async context
let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128);
let generator = self.generator.clone();
let event_loop = self.event_loop.clone();
// Acquiring the GIL is similar to acquiring a standard lock/mutex
// Performing this in an tokio async task could block the thread for an undefined amount of time
// To avoid this, we spawn a blocking task to acquire the GIL and perform the operations needed
// while holding the GIL.
//
// Under low GIL contention, we wouldn't need to do this.
// However, under high GIL contention, this can lead to significant performance degradation.
//
// Since we cannot predict the GIL contention, we will always use the blocking task and pay the
// cost. The Python GIL is the gift that keeps on giving -- performance hits...
let stream = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let gen = generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})
})
.await??;
let stream = Box::pin(stream);
// process the stream
// any error thrown in the stream will be caught and complete the processing task
// errors are captured by a task that is watching the processing task
// the error will be emitted as an annotated error
let request_id = id.clone();
tokio::spawn(async move {
tracing::debug!(
request_id,
"starting task to process python async generator stream"
);
let mut stream = stream;
let mut count = 0;
while let Some(item) = stream.next().await {
count += 1;
tracing::trace!(
request_id,
"processing the {}th item from python async generator",
count
);
let mut done = false;
let response = match process_item::<Resp>(item).await {
Ok(response) => response,
Err(e) => {
done = true;
let msg = match &e {
ResponseProcessingError::DeserializeError(e) => {
// tell the python async generator to stop generating
// right now, this is impossible as we are not passing the context to the python async generator
// todo: add task-local context to the python async generator
ctx.stop_generating();
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
msg
}
ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e);
msg
}
ResponseProcessingError::OffloadError(e) => {
let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e);
msg
}
};
Annotated::from_error(msg)
}
};
if tx.send(response).await.is_err() {
tracing::trace!(
request_id,
"error forwarding annotated response to channel; channel is closed"
);
break;
}
if done {
tracing::debug!(
request_id,
"early termination of python async generator stream task"
);
break;
}
}
tracing::debug!(
request_id,
"finished processing python async generator stream"
);
});
let stream = ReceiverStream::new(rx);
Ok(ResponseStream::new(Box::pin(stream), context.context()))
}
}
async fn process_item<Resp>(
item: Result<Py<PyAny>, PyErr>,
) -> Result<Annotated<Resp>, ResponseProcessingError>
where
Resp: Data + for<'de> Deserialize<'de>,
{
let item = item.map_err(|e| {
println!();
Python::with_gil(|py| e.display(py));
ResponseProcessingError::PythonException(e.to_string())
})?;
let response = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))
})
.await
.map_err(|e| ResponseProcessingError::OffloadError(e.to_string()))?
.map_err(|e| ResponseProcessingError::DeserializeError(e.to_string()))?;
let response = Annotated::from_data(response);
Ok(response)
}
/// On Mac embedded Python interpreters do not pick up the virtual env.
#[cfg(target_os = "macos")]
fn fix_venv(venv: String, py: Python<'_>) -> anyhow::Result<()> {
let version_info = py.version_info();
let sys: PyObject = py.import("sys")?.into();
let sys_path = sys.getattr(py, "path")?;
let venv_path = format!(
"{venv}/lib/python{}.{}/site-packages",
version_info.major, version_info.minor
);
// TODO: This should go _before_ the site-packages
sys_path.call_method1(py, "append", (venv_path,))?;
Ok(())
}
#[cfg(not(target_os = "macos"))]
fn fix_venv(_venv: String, _py: Python<'_>) -> anyhow::Result<()> {
Ok(())
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment