Unverified Commit 43991e76 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(dynamo-run): For internal comms use a random endpoint instead of hard coded (#1335)

To talk to the vllm/sglang/trtllm engine we previously hardcoded an endpoint. The user never sees it so it doesn't matter which one.

However if you try to run _two_ instances of Dynamo on one machine they will conflict.

Use a UUID as the component name to resolve that.

Part of the solution for:
https://github.com/ai-dynamo/dynamo/issues/1073
parent aba3ab03
...@@ -1682,6 +1682,7 @@ dependencies = [ ...@@ -1682,6 +1682,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid 1.16.0",
] ]
[[package]] [[package]]
......
...@@ -43,6 +43,7 @@ tokio = { workspace = true } ...@@ -43,6 +43,7 @@ tokio = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
uuid = { workspace = true }
async-openai = { version = "0.27.2" } async-openai = { version = "0.27.2" }
clap = { version = "4.5", features = ["derive", "env"] } clap = { version = "4.5", features = ["derive", "env"] }
......
...@@ -6,6 +6,8 @@ use std::{io::Read, sync::Arc, time::Duration}; ...@@ -6,6 +6,8 @@ use std::{io::Read, sync::Arc, time::Duration};
use anyhow::Context; use anyhow::Context;
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel}; use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
use dynamo_runtime::protocols::Endpoint as EndpointId;
use dynamo_runtime::slug::Slug;
use dynamo_runtime::{CancellationToken, DistributedRuntime}; use dynamo_runtime::{CancellationToken, DistributedRuntime};
mod flags; mod flags;
...@@ -18,9 +20,6 @@ mod subprocess; ...@@ -18,9 +20,6 @@ mod subprocess;
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2); const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";
/// Default size of a KV cache block. Override with --kv-cache-block-size /// Default size of a KV cache block. Override with --kv-cache-block-size
const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16; const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16;
...@@ -171,7 +170,7 @@ pub async fn run( ...@@ -171,7 +170,7 @@ pub async fn run(
// If not, then the endpoint isn't exposed so we invent an internal one. // If not, then the endpoint isn't exposed so we invent an internal one.
let endpoint = match &in_opt { let endpoint = match &in_opt {
Input::Endpoint(path) => path.parse()?, Input::Endpoint(path) => path.parse()?,
_ => INTERNAL_ENDPOINT.parse()?, _ => internal_endpoint("sglang"),
}; };
let multi_node_conf = dynamo_llm::engines::MultiNodeConfig { let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
...@@ -214,7 +213,7 @@ pub async fn run( ...@@ -214,7 +213,7 @@ pub async fn run(
// If not, then the endpoint isn't exposed so we invent an internal one. // If not, then the endpoint isn't exposed so we invent an internal one.
let endpoint = match &in_opt { let endpoint = match &in_opt {
Input::Endpoint(path) => path.parse()?, Input::Endpoint(path) => path.parse()?,
_ => INTERNAL_ENDPOINT.parse()?, _ => internal_endpoint("vllm"),
}; };
let (py_script, child) = match subprocess::start( let (py_script, child) = match subprocess::start(
...@@ -248,7 +247,7 @@ pub async fn run( ...@@ -248,7 +247,7 @@ pub async fn run(
// If not, then the endpoint isn't exposed so we invent an internal one. // If not, then the endpoint isn't exposed so we invent an internal one.
let endpoint = match &in_opt { let endpoint = match &in_opt {
Input::Endpoint(path) => path.parse()?, Input::Endpoint(path) => path.parse()?,
_ => INTERNAL_ENDPOINT.parse()?, _ => internal_endpoint("trtllm"),
}; };
let (py_script, child) = match subprocess::start( let (py_script, child) = match subprocess::start(
...@@ -403,19 +402,39 @@ fn print_cuda(_output: &Output) {} ...@@ -403,19 +402,39 @@ fn print_cuda(_output: &Output) {}
fn gguf_default() -> Output { fn gguf_default() -> Output {
#[cfg(feature = "llamacpp")] #[cfg(feature = "llamacpp")]
return Output::LlamaCpp; {
Output::LlamaCpp
}
#[cfg(all(feature = "mistralrs", not(feature = "llamacpp")))] #[cfg(all(feature = "mistralrs", not(feature = "llamacpp")))]
return Output::MistralRs; {
Output::MistralRs
}
#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))] #[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
return Output::EchoFull; {
Output::EchoFull
}
} }
fn safetensors_default() -> Output { fn safetensors_default() -> Output {
#[cfg(feature = "mistralrs")] #[cfg(feature = "mistralrs")]
return Output::MistralRs; {
Output::MistralRs
}
#[cfg(not(feature = "mistralrs"))] #[cfg(not(feature = "mistralrs"))]
return Output::EchoFull; {
Output::EchoFull
}
}
/// A random endpoint to use for internal communication
/// We can't hard code because we may be running several on the same machine (GPUs 0-3 and 4-7)
fn internal_endpoint(engine: &str) -> EndpointId {
EndpointId {
namespace: Slug::slugify(&uuid::Uuid::new_v4().to_string()).to_string(),
component: engine.to_string(),
name: "generate".to_string(),
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment