"vllm/vscode:/vscode.git/clone" did not exist on "ad517f954382ef4e207ab4a6fd9aa12cbe7a389f"
Unverified Commit 6ba534a6 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

fix(runtime): stop unconditional NATS connection with --discovery-backend file (#8398)

parent 04ea38c3
......@@ -54,12 +54,19 @@ python3 -m dynamo.vllm --event-plane zmq --model Qwen/Qwen3-0.6B
| Variable | Description | Default |
|----------|-------------|---------|
| `DYN_EVENT_PLANE` | Transport: `nats` or `zmq` | `nats` |
| `DYN_EVENT_PLANE` | Transport: `nats` or `zmq` | Context-dependent (see below) |
| `NATS_SERVER` | NATS server URL (NATS transport only) | `nats://localhost:4222` |
When `DYN_EVENT_PLANE` is not set, the default is chosen based on the discovery backend:
- `--discovery-backend file` or `mem` (local backends): defaults to **zmq** — no external services required.
- `--discovery-backend etcd` or `kubernetes` (distributed backends): defaults to **nats**.
Set `DYN_EVENT_PLANE` explicitly to override this automatic selection.
## NATS Transport
When using NATS (`DYN_EVENT_PLANE=nats` or unset):
When using NATS (`DYN_EVENT_PLANE=nats`, or unset with a distributed backend):
- Requires a running NATS server. Set `NATS_SERVER` if it is not on `localhost:4222`.
- Events are published to NATS subjects scoped by namespace and component.
......
......@@ -104,8 +104,8 @@ Dynamo components discover each other through a shared backend. Two options are
| Backend | When to Use | Setup |
|---|---|---|
| **File** | Single machine, local development | No setup -- pass `--discovery-backend file` to all components |
| **etcd** | Multi-node, production | Requires a running etcd instance (default if no flag is specified) |
| **File** | Single machine, local development | No setup -- pass `--discovery-backend file` to all components. The event plane automatically defaults to ZMQ (no NATS required). |
| **etcd** | Multi-node, production | Requires a running etcd instance (default if no flag is specified). The event plane defaults to NATS. |
This guide uses `--discovery-backend file`. For etcd setup, see [Service Discovery](../kubernetes/service-discovery.md).
......
......@@ -86,7 +86,8 @@ Start the frontend, then start a worker for your chosen backend.
```bash
# Start the OpenAI compatible frontend (default port is 8000)
# --discovery-backend file avoids needing etcd (frontend and workers must share a disk)
# --discovery-backend file avoids needing etcd. Frontend and workers must share a disk.
# The event plane automatically defaults to ZMQ (no NATS required) with this backend.
python3 -m dynamo.frontend --discovery-backend file
```
......
......@@ -644,14 +644,14 @@ impl DistributedRuntime {
});
}
let event_plane_is_nats =
std::env::var(config::environment_names::event_plane::DYN_EVENT_PLANE)
.map(|v| v.eq_ignore_ascii_case("nats"))
.unwrap_or(true);
let event_transport_kind = discovery_backend_config.resolve_event_transport_kind();
let nats_enabled = request_plane.is_nats()
|| std::env::var(config::environment_names::nats::NATS_SERVER).is_ok()
|| event_plane_is_nats;
|| matches!(
event_transport_kind,
dynamo_runtime::discovery::EventTransportKind::Nats
);
let runtime_config = DistributedConfig {
discovery_backend: discovery_backend_config,
......@@ -661,6 +661,7 @@ impl DistributedRuntime {
None
},
request_plane,
event_transport_kind,
};
let inner = runtime
.secondary()
......
......@@ -365,7 +365,13 @@ pub mod tcp_response_stream {
/// Event Plane transport environment variables
pub mod event_plane {
/// Event transport selection: "zmq" or "nats". Default: "nats"
/// Event transport selection: "zmq" or "nats".
///
/// When unset the default depends on the discovery backend:
/// - `file` / `mem` backends: defaults to `zmq` (no external services required).
/// - `etcd` / `kubernetes` backends: defaults to `nats`.
///
/// Set this explicitly to override the context-aware default.
pub const DYN_EVENT_PLANE: &str = "DYN_EVENT_PLANE";
/// Event plane codec selection: "json" or "msgpack".
......
......@@ -39,8 +39,15 @@ pub enum EventTransportKind {
impl EventTransportKind {
/// Parse from environment variable `DYN_EVENT_PLANE`.
/// Returns `Nats` if not set or empty.
/// Returns error for invalid values.
///
/// Returns `Nats` if the variable is not set or is empty, which is the correct
/// default for distributed deployments (etcd/kubernetes backends). For local-only
/// workflows (`--discovery-backend file` or `mem`) this context-unaware default
/// may be incorrect — prefer [`DistributedRuntime::default_event_transport_kind`]
/// when you have access to a runtime, as it derives the correct default from the
/// configured discovery backend.
///
/// Returns an error for unrecognised values.
pub fn from_env() -> Result<Self> {
match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
.as_deref()
......@@ -54,7 +61,12 @@ impl EventTransportKind {
}
}
/// Parse from environment variable, defaulting to Nats on error.
/// Parse from environment variable, defaulting to NATS when the variable is unset.
///
/// This default is suitable for distributed deployments. For local-only workflows
/// prefer [`DistributedRuntime::default_event_transport_kind`], which automatically
/// selects ZMQ when running with a `file` or `mem` discovery backend.
///
/// Logs a warning if an invalid value is encountered.
pub fn from_env_or_default() -> Self {
Self::from_env().unwrap_or_else(|e| {
......
......@@ -80,6 +80,10 @@ pub struct DistributedRuntime {
// Registry for /engine/* route callbacks
engine_routes: crate::engine_routes::EngineRouteRegistry,
// Resolved event transport kind — set once at construction time from
// DYN_EVENT_PLANE + discovery backend; returned by default_event_transport_kind().
event_transport_kind: crate::discovery::EventTransportKind,
}
impl MetricsHierarchy for DistributedRuntime {
......@@ -108,7 +112,8 @@ impl std::fmt::Debug for DistributedRuntime {
impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (discovery_backend, nats_config, request_plane) = config.dissolve();
let (discovery_backend, nats_config, request_plane, event_transport_kind) =
config.dissolve();
let nats_client = match nats_config {
Some(nc) => Some(nc.connect().await?),
......@@ -200,6 +205,7 @@ impl DistributedRuntime {
request_plane,
local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
event_transport_kind,
};
// Initialize the uptime gauge in SystemHealth
......@@ -415,6 +421,19 @@ impl DistributedRuntime {
self.request_plane
}
/// Returns the event transport kind this runtime was configured with.
///
/// The value is resolved once at construction time by `DiscoveryBackend::resolve_event_transport_kind`:
/// if `DYN_EVENT_PLANE` is set explicitly that value wins; otherwise the discovery
/// backend drives the default (ZMQ for `file`/`mem`, NATS for `etcd`/`kubernetes`).
///
/// Use this instead of [`EventTransportKind::from_env_or_default`] wherever you have
/// access to a `DistributedRuntime`, so that local-only workflows work without
/// setting `DYN_EVENT_PLANE` explicitly.
pub fn default_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
self.event_transport_kind
}
pub fn child_token(&self) -> CancellationToken {
self.runtime.child_token()
}
......@@ -570,37 +589,78 @@ pub enum DiscoveryBackend {
KvStore(kv::Selector),
}
impl DiscoveryBackend {
/// Returns true if this backend requires no external services (file or in-memory).
///
/// Local backends do not need etcd, NATS, or any other infrastructure daemon.
/// This is used to drive smart defaults: for example, the event plane defaults to
/// ZMQ (not NATS) when a local backend is in use and `DYN_EVENT_PLANE` is not set.
pub fn is_local(&self) -> bool {
matches!(
self,
DiscoveryBackend::KvStore(kv::Selector::File(_))
| DiscoveryBackend::KvStore(kv::Selector::Memory)
)
}
/// Resolve the event transport kind for this backend.
///
/// This is the single authoritative mapping of `(DYN_EVENT_PLANE, backend)` →
/// `EventTransportKind`. When `DYN_EVENT_PLANE` is unset or empty the backend
/// drives the default: local backends (`file`/`mem`) → ZMQ, distributed backends
/// (`etcd`/`kubernetes`) → NATS.
///
/// Call this once at startup and store the result; do not call it repeatedly.
pub fn resolve_event_transport_kind(&self) -> crate::discovery::EventTransportKind {
use crate::config::environment_names::event_plane::DYN_EVENT_PLANE;
use crate::discovery::EventTransportKind;
match std::env::var(DYN_EVENT_PLANE).as_deref() {
Ok("nats") => EventTransportKind::Nats,
Ok("zmq") => EventTransportKind::Zmq,
// Unset or empty: derive from backend type.
Ok("") | Err(_) => {
if self.is_local() {
EventTransportKind::Zmq
} else {
EventTransportKind::Nats
}
}
Ok(other) => {
let default_kind = if self.is_local() {
EventTransportKind::Zmq
} else {
EventTransportKind::Nats
};
tracing::warn!(
"Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'. \
Defaulting to {:?}.",
other,
default_kind
);
default_kind
}
}
}
}
#[derive(Dissolve)]
pub struct DistributedConfig {
pub discovery_backend: DiscoveryBackend,
pub nats_config: Option<nats::ClientOptions>,
pub request_plane: RequestPlaneMode,
/// Resolved event transport kind — computed once at config time from
/// `DYN_EVENT_PLANE` and the discovery backend, then stored on the runtime
/// so callers always get the same answer regardless of which other services
/// happen to be reachable.
pub event_transport_kind: crate::discovery::EventTransportKind,
}
impl DistributedConfig {
pub fn from_settings() -> DistributedConfig {
let request_plane = RequestPlaneMode::from_env();
// NATS is used for more than just NATS request-plane RPC:
// - KV router events (JetStream or NATS core + local indexer)
// - inter-router replica sync (NATS core)
//
// Historically we only connected to NATS when the request plane was NATS, which made
// `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
// Enable the NATS client when any of these hold:
// 1. Request plane is NATS
// 2. NATS_SERVER is explicitly configured
// 3. Event plane is NATS (the default)
let event_plane_is_nats =
std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
.map(|v| v.eq_ignore_ascii_case("nats"))
.unwrap_or(true);
let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
|| event_plane_is_nats;
// DYN_DISCOVERY_BACKEND selects the discovery mechanism
// Valid values: "kubernetes", "etcd" (default), "file", "mem"
// Determine the discovery backend first — we need it to compute the NATS default below.
// Valid values for DYN_DISCOVERY_BACKEND: "kubernetes", "etcd" (default), "file", "mem"
let backend_str =
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
......@@ -620,6 +680,26 @@ impl DistributedConfig {
}
};
// Resolve event transport kind once — the single source of truth used both to
// decide whether to open a NATS connection and to answer
// `DistributedRuntime::default_event_transport_kind()` later.
let event_transport_kind = discovery_backend.resolve_event_transport_kind();
// NATS is used for more than just NATS request-plane RPC:
// - KV router events (JetStream or NATS core + local indexer)
// - inter-router replica sync (NATS core)
//
// Enable the NATS client when any of these hold:
// 1. Request plane is NATS
// 2. NATS_SERVER is explicitly configured by the user
// 3. The resolved event transport kind is NATS
let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
|| matches!(
event_transport_kind,
crate::discovery::EventTransportKind::Nats
);
DistributedConfig {
discovery_backend,
nats_config: if nats_enabled {
......@@ -628,6 +708,7 @@ impl DistributedConfig {
None
},
request_plane,
event_transport_kind,
}
}
......@@ -637,21 +718,24 @@ impl DistributedConfig {
..Default::default()
};
let request_plane = RequestPlaneMode::from_env();
let event_plane_is_nats =
std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
.map(|v| v.eq_ignore_ascii_case("nats"))
.unwrap_or(true);
let discovery_backend =
DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config)));
let event_transport_kind = discovery_backend.resolve_event_transport_kind();
let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok()
|| event_plane_is_nats;
|| matches!(
event_transport_kind,
crate::discovery::EventTransportKind::Nats
);
DistributedConfig {
discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config))),
discovery_backend,
nats_config: if nats_enabled {
Some(nats::ClientOptions::default())
} else {
None
},
request_plane,
event_transport_kind,
}
}
......@@ -664,6 +748,7 @@ impl DistributedConfig {
// This won't be used in process local, so we likely need a "none" option to
// communicate that and avoid opening the ports.
request_plane: RequestPlaneMode::Tcp,
event_transport_kind: crate::discovery::EventTransportKind::Zmq,
}
}
}
......@@ -743,6 +828,7 @@ pub mod distributed_test_utils {
),
nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(),
event_transport_kind: crate::discovery::EventTransportKind::Nats,
};
super::DistributedRuntime::new(rt, config).await.unwrap()
}
......@@ -765,6 +851,7 @@ pub mod distributed_test_utils {
),
nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(),
event_transport_kind: crate::discovery::EventTransportKind::Nats,
};
super::DistributedRuntime::new(rt, config).await.unwrap()
}
......
......@@ -292,9 +292,15 @@ pub struct EventPublisher {
impl EventPublisher {
/// Create a publisher for a component-scoped topic.
///
/// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
/// value is used; otherwise the runtime's default is used (ZMQ for local backends
/// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
/// Use [`for_component_with_transport`](Self::for_component_with_transport) to
/// override explicitly.
pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
.await
let transport_kind = comp.drt().default_event_transport_kind();
Self::for_component_with_transport(comp, topic, transport_kind).await
}
/// Create a publisher with explicit transport.
......@@ -312,9 +318,15 @@ impl EventPublisher {
}
/// Create a publisher for a namespace-scoped topic.
///
/// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
/// value is used; otherwise the runtime's default is used (ZMQ for local backends
/// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
/// Use [`for_namespace_with_transport`](Self::for_namespace_with_transport) to
/// override explicitly.
pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
.await
let transport_kind = ns.drt().default_event_transport_kind();
Self::for_namespace_with_transport(ns, topic, transport_kind).await
}
/// Create a namespace publisher with explicit transport.
......@@ -567,9 +579,15 @@ pub struct EventSubscriber {
impl EventSubscriber {
/// Create a subscriber for a component-scoped topic.
///
/// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
/// value is used; otherwise the runtime's default is used (ZMQ for local backends
/// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
/// Use [`for_component_with_transport`](Self::for_component_with_transport) to
/// override explicitly.
pub async fn for_component(comp: &Component, topic: impl Into<String>) -> Result<Self> {
Self::for_component_with_transport(comp, topic, EventTransportKind::from_env_or_default())
.await
let transport_kind = comp.drt().default_event_transport_kind();
Self::for_component_with_transport(comp, topic, transport_kind).await
}
/// Create a subscriber with explicit transport.
......@@ -587,9 +605,15 @@ impl EventSubscriber {
}
/// Create a subscriber for a namespace-scoped topic.
///
/// The event transport is chosen automatically: if `DYN_EVENT_PLANE` is set that
/// value is used; otherwise the runtime's default is used (ZMQ for local backends
/// such as `file`/`mem`, NATS for distributed backends such as `etcd`/`kubernetes`).
/// Use [`for_namespace_with_transport`](Self::for_namespace_with_transport) to
/// override explicitly.
pub async fn for_namespace(ns: &Namespace, topic: impl Into<String>) -> Result<Self> {
Self::for_namespace_with_transport(ns, topic, EventTransportKind::from_env_or_default())
.await
let transport_kind = ns.drt().default_event_transport_kind();
Self::for_namespace_with_transport(ns, topic, transport_kind).await
}
/// Create a namespace subscriber with explicit transport.
......
......@@ -8,7 +8,7 @@ These tests validate behavior that cannot be covered by Rust unit tests:
- Streaming responses with embeddings
- Python-side tensor decoding errors
- Usage statistics from worker (the v2.0.4 bug fix)
- Large payload handling through NATS
- Large payload handling through the local request path
- Concurrent request handling
Validation tests (base64, size limits, empty prompt) are covered by Rust unit tests
......@@ -52,7 +52,8 @@ pytestmark = [
class VllmPromptEmbedsWorkerProcess(ManagedProcess):
"""Vllm Worker process configured for prompt embeddings testing.
Uses file-based KV store and TCP request plane (no NATS/etcd required).
Uses file-based KV store and TCP request plane. No NATS or etcd required:
the file backend automatically defaults the event plane to ZMQ.
"""
def __init__(
......@@ -135,8 +136,9 @@ def start_services(
) -> Generator[ServicePorts, None, None]:
"""Start frontend and vllm worker processes for prompt embeds testing.
Uses file-based KV store and TCP request plane (no NATS/etcd needed).
This makes tests simpler and faster by avoiding external dependencies.
Uses file-based KV store and TCP request plane. No NATS or etcd needed:
the file backend automatically defaults the event plane to ZMQ, avoiding
all external service dependencies and keeping tests simpler and faster.
The `file_storage_backend` fixture sets up a temporary directory and
configures DYN_FILE_KV environment variable.
......@@ -268,14 +270,14 @@ class TestPromptEmbedsE2E:
response.usage.prompt_tokens + response.usage.completion_tokens
), "total_tokens should equal prompt_tokens + completion_tokens"
def test_large_embeddings_through_nats(self, dynamo_client):
def test_large_embeddings_through_local_request_path(self, dynamo_client):
"""
Test large embeddings are handled correctly through NATS.
Test large embeddings are handled correctly through the local request path.
This validates the NATS max_payload configuration (15MB) handles
large embedding payloads. Rust unit tests can't test this E2E path.
This validates the E2E frontend-to-worker path handles large embedding
payloads. Rust unit tests can't test this E2E path.
"""
# Create ~7MB embeddings (well under 10MB limit, but large enough to stress NATS)
# Create ~7MB embeddings (well under 10MB limit, but large enough to stress the path)
large_shape = (1700, 1024) # ~6.6MB of float32 data
large_embeds = torch.randn(large_shape, dtype=torch.float32)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment