Unverified Commit a289695c authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: consolidate dyn_discovery_backend and dyn_kv_store (#6167)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent 0c83585a
...@@ -42,11 +42,11 @@ The hierarchy and naming may change over time, and this document might not refle ...@@ -42,11 +42,11 @@ The hierarchy and naming may change over time, and this document might not refle
The `DistributedRuntime` supports two service discovery backends, configured via `DYN_DISCOVERY_BACKEND`: The `DistributedRuntime` supports two service discovery backends, configured via `DYN_DISCOVERY_BACKEND`:
- **KV Store Discovery** (`DYN_DISCOVERY_BACKEND=kv_store`): Uses etcd for service discovery. **This is the global default** for all deployments unless explicitly overridden. - **KV Store Discovery** (`DYN_DISCOVERY_BACKEND=etcd`): Uses etcd for service discovery. **This is the default** for all deployments unless explicitly overridden. Other KV store backends (`file`, `mem`) are also available.
- **Kubernetes Discovery** (`DYN_DISCOVERY_BACKEND=kubernetes`): Uses native Kubernetes resources (DynamoWorkerMetadata CRD, EndpointSlices) for service discovery. **Must be explicitly set.** The Dynamo operator automatically sets this environment variable for Kubernetes deployments. **No etcd required.** - **Kubernetes Discovery** (`DYN_DISCOVERY_BACKEND=kubernetes`): Uses native Kubernetes resources (DynamoWorkerMetadata CRD, EndpointSlices) for service discovery. **Must be explicitly set.** The Dynamo operator automatically sets this environment variable for Kubernetes deployments. **No etcd required.**
> **Note:** There is no automatic detection of the deployment environment. The runtime always defaults to `kv_store`. For Kubernetes deployments, the operator injects `DYN_DISCOVERY_BACKEND=kubernetes` into pod environments. > **Note:** There is no automatic detection of the deployment environment. The runtime defaults to `etcd`. For Kubernetes deployments, the operator injects `DYN_DISCOVERY_BACKEND=kubernetes` into pod environments.
When using Kubernetes discovery, the KV store backend automatically switches to in-memory storage since etcd is not needed. When using Kubernetes discovery, the KV store backend automatically switches to in-memory storage since etcd is not needed.
......
...@@ -99,13 +99,13 @@ Start the frontend, then start a worker for your chosen backend. ...@@ -99,13 +99,13 @@ Start the frontend, then start a worker for your chosen backend.
<Tip> <Tip>
To run in a single terminal (useful in containers), append `> logfile.log 2>&1 &` To run in a single terminal (useful in containers), append `> logfile.log 2>&1 &`
to run processes in background. Example: `python3 -m dynamo.frontend --store-kv file > dynamo.frontend.log 2>&1 &` to run processes in background. Example: `python3 -m dynamo.frontend --discovery-backend file > dynamo.frontend.log 2>&1 &`
</Tip> </Tip>
```bash ```bash
# Start the OpenAI compatible frontend (default port is 8000) # Start the OpenAI compatible frontend (default port is 8000)
# --store-kv file avoids needing etcd (frontend and workers must share a disk) # --discovery-backend file avoids needing etcd (frontend and workers must share a disk)
python3 -m dynamo.frontend --store-kv file python3 -m dynamo.frontend --discovery-backend file
``` ```
In another terminal (or same terminal if using background mode), start a worker: In another terminal (or same terminal if using background mode), start a worker:
...@@ -113,19 +113,19 @@ In another terminal (or same terminal if using background mode), start a worker: ...@@ -113,19 +113,19 @@ In another terminal (or same terminal if using background mode), start a worker:
**SGLang** **SGLang**
```bash ```bash
python3 -m dynamo.sglang --model-path Qwen/Qwen3-0.6B --store-kv file python3 -m dynamo.sglang --model-path Qwen/Qwen3-0.6B --discovery-backend file
``` ```
**TensorRT-LLM** **TensorRT-LLM**
```bash ```bash
python3 -m dynamo.trtllm --model-path Qwen/Qwen3-0.6B --store-kv file python3 -m dynamo.trtllm --model-path Qwen/Qwen3-0.6B --discovery-backend file
``` ```
**vLLM** **vLLM**
```bash ```bash
python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --store-kv file \ python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --discovery-backend file \
--kv-events-config '{"enable_kv_cache_events": false}' --kv-events-config '{"enable_kv_cache_events": false}'
``` ```
......
...@@ -122,17 +122,17 @@ Options: ...@@ -122,17 +122,17 @@ Options:
--model-repository <path> Path to model repository --model-repository <path> Path to model repository
--backend-directory <path> Path to Triton backends --backend-directory <path> Path to Triton backends
--log-verbose <level> Triton log verbosity 0-6 (default: 1) --log-verbose <level> Triton log verbosity 0-6 (default: 1)
--store-kv <backend> KV store backend: file, etcd, mem (default: file) --discovery-backend <backend> Discovery backend: kubernetes, etcd, file, mem (default: file)
``` ```
### Environment Variables ### Environment Variables
| Variable | Description | Default | | Variable | Description | Default |
|----------|-------------|---------| |----------|-------------|---------|
| `DYN_STORE_KV` | KV store backend: `file`, `etcd`, or `mem` | `file` | | `DYN_DISCOVERY_BACKEND` | Discovery backend: `kubernetes`, `etcd`, `file`, or `mem` | `file` |
| `DYN_LOG` | Log level (debug, info, warn, error) | `info` | | `DYN_LOG` | Log level (debug, info, warn, error) | `info` |
| `DYN_HTTP_PORT` | Frontend HTTP port | `8000` | | `DYN_HTTP_PORT` | Frontend HTTP port | `8000` |
| `ETCD_ENDPOINTS` | etcd connection URL (only when `--store-kv etcd`) | `http://localhost:2379` | | `ETCD_ENDPOINTS` | etcd connection URL (only when `--discovery-backend etcd`) | `http://localhost:2379` |
| `NATS_SERVER` | NATS connection URL (only for distributed mode) | `nats://localhost:4222` | | `NATS_SERVER` | NATS connection URL (only for distributed mode) | `nats://localhost:4222` |
## Adding Your Own Models ## Adding Your Own Models
......
...@@ -25,7 +25,7 @@ MODEL_NAME="identity" ...@@ -25,7 +25,7 @@ MODEL_NAME="identity"
MODEL_REPO="${TRITON_DIR}/model_repo" MODEL_REPO="${TRITON_DIR}/model_repo"
BACKEND_DIR="${TRITON_DIR}/backends" BACKEND_DIR="${TRITON_DIR}/backends"
LOG_VERBOSE=1 LOG_VERBOSE=1
STORE_KV="${DYN_STORE_KV:-file}" # Default to file-based KV (no etcd required) DISCOVERY_BACKEND="${DYN_DISCOVERY_BACKEND:-file}" # Default to file-based discovery (no etcd required)
# Parse command line arguments # Parse command line arguments
EXTRA_ARGS=() EXTRA_ARGS=()
...@@ -47,8 +47,8 @@ while [[ $# -gt 0 ]]; do ...@@ -47,8 +47,8 @@ while [[ $# -gt 0 ]]; do
LOG_VERBOSE="$2" LOG_VERBOSE="$2"
shift 2 shift 2
;; ;;
--store-kv) --discovery-backend)
STORE_KV="$2" DISCOVERY_BACKEND="$2"
shift 2 shift 2
;; ;;
-h|--help) -h|--help)
...@@ -61,11 +61,11 @@ while [[ $# -gt 0 ]]; do ...@@ -61,11 +61,11 @@ while [[ $# -gt 0 ]]; do
echo " --model-repository <path> Path to model repository (default: $MODEL_REPO)" echo " --model-repository <path> Path to model repository (default: $MODEL_REPO)"
echo " --backend-directory <path> Path to Triton backends (default: $BACKEND_DIR)" echo " --backend-directory <path> Path to Triton backends (default: $BACKEND_DIR)"
echo " --log-verbose <level> Triton log verbosity 0-6 (default: $LOG_VERBOSE)" echo " --log-verbose <level> Triton log verbosity 0-6 (default: $LOG_VERBOSE)"
echo " --store-kv <backend> KV store backend: file, etcd, mem (default: $STORE_KV)" echo " --discovery-backend <backend> Discovery backend: kubernetes, etcd, file, mem (default: $DISCOVERY_BACKEND)"
echo " -h, --help Show this help message" echo " -h, --help Show this help message"
echo "" echo ""
echo "Environment variables:" echo "Environment variables:"
echo " DYN_STORE_KV KV store backend (default: file)" echo " DYN_DISCOVERY_BACKEND Discovery backend (default: file)"
echo " DYN_HTTP_PORT Frontend HTTP port (default: 8000)" echo " DYN_HTTP_PORT Frontend HTTP port (default: 8000)"
echo " DYN_SYSTEM_PORT Worker metrics port (default: 8081)" echo " DYN_SYSTEM_PORT Worker metrics port (default: 8081)"
echo "" echo ""
...@@ -99,19 +99,19 @@ echo "Model name: $MODEL_NAME" ...@@ -99,19 +99,19 @@ echo "Model name: $MODEL_NAME"
echo "Model repository: $MODEL_REPO" echo "Model repository: $MODEL_REPO"
echo "Backend directory: $BACKEND_DIR" echo "Backend directory: $BACKEND_DIR"
echo "Log verbose: $LOG_VERBOSE" echo "Log verbose: $LOG_VERBOSE"
echo "KV store: $STORE_KV" echo "Discovery: $DISCOVERY_BACKEND"
echo "" echo ""
# Set library path for Triton # Set library path for Triton
export LD_LIBRARY_PATH="${TRITON_DIR}/lib:${BACKEND_DIR}:${LD_LIBRARY_PATH:-}" export LD_LIBRARY_PATH="${TRITON_DIR}/lib:${BACKEND_DIR}:${LD_LIBRARY_PATH:-}"
# Export KV store setting for worker (read by @dynamo_worker decorator) # Export discovery backend setting for worker (read by @dynamo_worker decorator)
export DYN_STORE_KV="$STORE_KV" export DYN_DISCOVERY_BACKEND="$DISCOVERY_BACKEND"
# Run frontend in background # Run frontend in background
# --kserve-grpc-server enables the KServe gRPC endpoint for tensor models # --kserve-grpc-server enables the KServe gRPC endpoint for tensor models
echo "Starting Dynamo frontend..." echo "Starting Dynamo frontend..."
python3 -m dynamo.frontend --kserve-grpc-server --store-kv "$STORE_KV" & python3 -m dynamo.frontend --kserve-grpc-server --discovery-backend "$DISCOVERY_BACKEND" &
FRONTEND_PID=$! FRONTEND_PID=$!
# Give frontend time to start # Give frontend time to start
......
...@@ -99,7 +99,7 @@ async def triton_worker(runtime: DistributedRuntime, args: argparse.Namespace): ...@@ -99,7 +99,7 @@ async def triton_worker(runtime: DistributedRuntime, args: argparse.Namespace):
) )
logger.info(f"Environment: NATS_SERVER={os.environ.get('NATS_SERVER', 'NOT SET')}") logger.info(f"Environment: NATS_SERVER={os.environ.get('NATS_SERVER', 'NOT SET')}")
logger.info( logger.info(
f"Environment: DYN_STORE_KV={os.environ.get('DYN_STORE_KV', 'NOT SET')}" f"Environment: DYN_DISCOVERY_BACKEND={os.environ.get('DYN_DISCOVERY_BACKEND', 'NOT SET')}"
) )
component = runtime.namespace("triton").component("tritonserver") component = runtime.namespace("triton").component("tritonserver")
......
...@@ -57,17 +57,17 @@ Dynamo must be installed. No external services are required for local developmen ...@@ -57,17 +57,17 @@ Dynamo must be installed. No external services are required for local developmen
First, start the backend service: First, start the backend service:
```bash ```bash
cd examples/custom_backend/hello_world cd examples/custom_backend/hello_world
DYN_STORE_KV=file python hello_world.py DYN_DISCOVERY_BACKEND=file python hello_world.py
``` ```
Second, in a separate terminal, run the client: Second, in a separate terminal, run the client:
```bash ```bash
cd examples/custom_backend/hello_world cd examples/custom_backend/hello_world
DYN_STORE_KV=file python client.py DYN_DISCOVERY_BACKEND=file python client.py
``` ```
> **Note**: Setting `DYN_STORE_KV=file` uses file-based storage instead of etcd. > **Note**: Setting `DYN_DISCOVERY_BACKEND=file` uses file-based discovery instead of etcd.
> Both the backend and client must use the same KV backend to discover each other. > Both the backend and client must use the same discovery backend to discover each other.
The client will connect to the backend service and print the streaming results. The client will connect to the backend service and print the streaming results.
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use dynamo_llm::local_model::LocalModel; use dynamo_llm::local_model::LocalModel;
use dynamo_runtime::distributed::{DistributedConfig, RequestPlaneMode}; use dynamo_runtime::distributed::{DiscoveryBackend, DistributedConfig, RequestPlaneMode};
use dynamo_runtime::storage::kv; use dynamo_runtime::storage::kv;
use futures::StreamExt; use futures::StreamExt;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
...@@ -559,14 +559,20 @@ enum ModelInput { ...@@ -559,14 +559,20 @@ enum ModelInput {
#[pymethods] #[pymethods]
impl DistributedRuntime { impl DistributedRuntime {
#[new] #[new]
#[pyo3(signature = (event_loop, store_kv, request_plane, enable_nats=None))] #[pyo3(signature = (event_loop, discovery_backend, request_plane, enable_nats=None))]
fn new( fn new(
event_loop: PyObject, event_loop: PyObject,
store_kv: String, discovery_backend: String,
request_plane: String, request_plane: String,
enable_nats: Option<bool>, enable_nats: Option<bool>,
) -> PyResult<Self> { ) -> PyResult<Self> {
let selected_kv_store: kv::Selector = store_kv.parse().map_err(to_pyerr)?; let discovery_backend_config = match discovery_backend.as_str() {
"kubernetes" => DiscoveryBackend::Kubernetes,
other => {
let selector: kv::Selector = other.parse().map_err(to_pyerr)?;
DiscoveryBackend::KvStore(selector)
}
};
let request_plane: RequestPlaneMode = request_plane.parse().map_err(to_pyerr)?; let request_plane: RequestPlaneMode = request_plane.parse().map_err(to_pyerr)?;
// Try to get existing runtime first, create new Worker only if needed // Try to get existing runtime first, create new Worker only if needed
...@@ -608,7 +614,7 @@ impl DistributedRuntime { ...@@ -608,7 +614,7 @@ impl DistributedRuntime {
let enable_nats = enable_nats.unwrap_or(true); // Default to true let enable_nats = enable_nats.unwrap_or(true); // Default to true
let runtime_config = DistributedConfig { let runtime_config = DistributedConfig {
store_backend: selected_kv_store, discovery_backend: discovery_backend_config,
nats_config: if request_plane.is_nats() || enable_nats { nats_config: if request_plane.is_nats() || enable_nats {
Some(dynamo_runtime::transports::nats::ClientOptions::default()) Some(dynamo_runtime::transports::nats::ClientOptions::default())
} else { } else {
......
...@@ -41,7 +41,7 @@ class DistributedRuntime: ...@@ -41,7 +41,7 @@ class DistributedRuntime:
def __new__( def __new__(
cls, cls,
event_loop: Any, event_loop: Any,
store_kv: str, discovery_backend: str,
request_plane: str, request_plane: str,
enable_nats: Optional[bool] = None, enable_nats: Optional[bool] = None,
) -> "DistributedRuntime": ) -> "DistributedRuntime":
...@@ -50,7 +50,7 @@ class DistributedRuntime: ...@@ -50,7 +50,7 @@ class DistributedRuntime:
Args: Args:
event_loop: The asyncio event loop event_loop: The asyncio event loop
store_kv: Key-value store backend ("etcd", "file", or "mem") discovery_backend: Discovery backend ("kubernetes", "etcd", "file", or "mem")
request_plane: Request plane transport ("tcp", "http", or "nats") request_plane: Request plane transport ("tcp", "http", or "nats")
enable_nats: Whether to enable NATS for KV events. Defaults to True. enable_nats: Whether to enable NATS for KV events. Defaults to True.
If request_plane is "nats", NATS is always enabled. If request_plane is "nats", NATS is always enabled.
......
...@@ -34,8 +34,10 @@ def dynamo_worker(enable_nats: bool = True): ...@@ -34,8 +34,10 @@ def dynamo_worker(enable_nats: bool = True):
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
request_plane = os.environ.get("DYN_REQUEST_PLANE", "tcp") request_plane = os.environ.get("DYN_REQUEST_PLANE", "tcp")
store_kv = os.environ.get("DYN_STORE_KV", "etcd") discovery_backend = os.environ.get("DYN_DISCOVERY_BACKEND", "etcd")
runtime = DistributedRuntime(loop, store_kv, request_plane, enable_nats) runtime = DistributedRuntime(
loop, discovery_backend, request_plane, enable_nats
)
await func(runtime, *args, **kwargs) await func(runtime, *args, **kwargs)
......
...@@ -403,12 +403,12 @@ def temp_file_store(): ...@@ -403,12 +403,12 @@ def temp_file_store():
@pytest.fixture @pytest.fixture
def store_kv(request): def discovery_backend(request):
""" """
KV store for runtime. Defaults to "file". Discovery backend for runtime. Defaults to "file".
To iterate over multiple stores in a test: To iterate over multiple backends in a test:
@pytest.mark.parametrize("store_kv", ["file", "etcd"], indirect=True) @pytest.mark.parametrize("discovery_backend", ["file", "etcd"], indirect=True)
async def test_example(runtime): async def test_example(runtime):
... ...
""" """
...@@ -429,7 +429,7 @@ def request_plane(request): ...@@ -429,7 +429,7 @@ def request_plane(request):
@pytest.fixture(scope="function", autouse=False) @pytest.fixture(scope="function", autouse=False)
async def runtime(request, store_kv, request_plane): async def runtime(request, discovery_backend, request_plane):
""" """
Create a DistributedRuntime for testing. Create a DistributedRuntime for testing.
...@@ -440,11 +440,11 @@ async def runtime(request, store_kv, request_plane): ...@@ -440,11 +440,11 @@ async def runtime(request, store_kv, request_plane):
Without @pytest.mark.forked in isolated mode, you will get "Worker already initialized" Without @pytest.mark.forked in isolated mode, you will get "Worker already initialized"
errors when multiple tests try to create runtimes in the same process. errors when multiple tests try to create runtimes in the same process.
The store_kv and request_plane can be customized by overriding their fixtures The discovery_backend and request_plane can be customized by overriding their fixtures
or using @pytest.mark.parametrize with indirect=True: or using @pytest.mark.parametrize with indirect=True:
@pytest.mark.forked @pytest.mark.forked
@pytest.mark.parametrize("store_kv", ["etcd"], indirect=True) @pytest.mark.parametrize("discovery_backend", ["etcd"], indirect=True)
async def test_with_etcd(runtime): async def test_with_etcd(runtime):
... ...
""" """
...@@ -469,6 +469,6 @@ This is required because DistributedRuntime is a process-level singleton. ...@@ -469,6 +469,6 @@ This is required because DistributedRuntime is a process-level singleton.
) )
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, store_kv, request_plane) runtime = DistributedRuntime(loop, discovery_backend, request_plane)
yield runtime yield runtime
runtime.shutdown() runtime.shutdown()
...@@ -56,8 +56,9 @@ pub async fn run( ...@@ -56,8 +56,9 @@ pub async fn run(
ref model, ref model,
ref chat_engine_factory, ref chat_engine_factory,
} => { } => {
// This allows the /health endpoint to query store for active instances // Pass the discovery client so the /health endpoint can query active instances
http_service_builder = http_service_builder.store(distributed_runtime.store().clone()); http_service_builder =
http_service_builder.discovery(Some(distributed_runtime.discovery()));
let http_service = http_service_builder.build()?; let http_service = http_service_builder.build()?;
let router_config = model.router_config(); let router_config = model.router_config();
......
...@@ -24,9 +24,8 @@ use anyhow::Result; ...@@ -24,9 +24,8 @@ use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig; use axum_server::tls_rustls::RustlsConfig;
use derive_builder::Builder; use derive_builder::Builder;
use dynamo_runtime::config::environment_names::llm as env_llm; use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::{Discovery, KVStoreDiscovery}; use dynamo_runtime::discovery::Discovery;
use dynamo_runtime::logging::make_request_span; use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::storage::kv;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -36,7 +35,6 @@ use tower_http::trace::TraceLayer; ...@@ -36,7 +35,6 @@ use tower_http::trace::TraceLayer;
pub struct State { pub struct State {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
manager: Arc<ModelManager>, manager: Arc<ModelManager>,
store: kv::Manager,
discovery_client: Arc<dyn Discovery>, discovery_client: Arc<dyn Discovery>,
flags: StateFlags, flags: StateFlags,
cancel_token: CancellationToken, cancel_token: CancellationToken,
...@@ -91,21 +89,12 @@ impl StateFlags { ...@@ -91,21 +89,12 @@ impl StateFlags {
impl State { impl State {
pub fn new( pub fn new(
manager: Arc<ModelManager>, manager: Arc<ModelManager>,
store: kv::Manager, discovery_client: Arc<dyn Discovery>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
) -> Self { ) -> Self {
// Initialize discovery backed by KV store
// Create a cancellation token for the discovery's watch streams
let discovery_client = {
let discovery_cancel_token = cancel_token.child_token();
Arc::new(KVStoreDiscovery::new(store.clone(), discovery_cancel_token))
as Arc<dyn Discovery>
};
Self { Self {
manager, manager,
metrics: Arc::new(Metrics::default()), metrics: Arc::new(Metrics::default()),
store,
discovery_client, discovery_client,
flags: StateFlags { flags: StateFlags {
chat_endpoints_enabled: AtomicBool::new(false), chat_endpoints_enabled: AtomicBool::new(false),
...@@ -132,10 +121,6 @@ impl State { ...@@ -132,10 +121,6 @@ impl State {
self.manager.clone() self.manager.clone()
} }
pub fn store(&self) -> &kv::Manager {
&self.store
}
pub fn discovery(&self) -> Arc<dyn Discovery> { pub fn discovery(&self) -> Arc<dyn Discovery> {
self.discovery_client.clone() self.discovery_client.clone()
} }
...@@ -205,8 +190,8 @@ pub struct HttpServiceConfig { ...@@ -205,8 +190,8 @@ pub struct HttpServiceConfig {
#[builder(default = "None")] #[builder(default = "None")]
request_template: Option<RequestTemplate>, request_template: Option<RequestTemplate>,
#[builder(default)] #[builder(default = "None")]
store: kv::Manager, discovery: Option<Arc<dyn Discovery>>,
} }
impl HttpService { impl HttpService {
...@@ -368,7 +353,20 @@ impl HttpServiceConfigBuilder { ...@@ -368,7 +353,20 @@ impl HttpServiceConfigBuilder {
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
// Create a temporary cancel token for building - will be replaced in spawn/run // Create a temporary cancel token for building - will be replaced in spawn/run
let temp_cancel_token = CancellationToken::new(); let temp_cancel_token = CancellationToken::new();
let state = Arc::new(State::new(model_manager, config.store, temp_cancel_token)); // Use the provided discovery client, or fall back to a no-op memory-backed one
// (for in-process modes that don't need discovery)
let discovery_client = config.discovery.unwrap_or_else(|| {
use dynamo_runtime::discovery::KVStoreDiscovery;
Arc::new(KVStoreDiscovery::new(
dynamo_runtime::storage::kv::Manager::memory(),
temp_cancel_token.child_token(),
)) as Arc<dyn Discovery>
});
let state = Arc::new(State::new(
model_manager,
discovery_client,
temp_cancel_token,
));
state state
.flags .flags
.set(&EndpointType::Chat, config.enable_chat_endpoints); .set(&EndpointType::Chat, config.enable_chat_endpoints);
......
...@@ -591,6 +591,10 @@ impl Discovery for KVStoreDiscovery { ...@@ -591,6 +591,10 @@ impl Discovery for KVStoreDiscovery {
}; };
Ok(Box::pin(stream)) Ok(Box::pin(stream))
} }
fn shutdown(&self) {
self.store.shutdown();
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -707,4 +707,9 @@ pub trait Discovery: Send + Sync { ...@@ -707,4 +707,9 @@ pub trait Discovery: Send + Sync {
query: DiscoveryQuery, query: DiscoveryQuery,
cancel_token: Option<CancellationToken>, cancel_token: Option<CancellationToken>,
) -> Result<DiscoveryStream>; ) -> Result<DiscoveryStream>;
/// Clean up resources held by this discovery backend.
/// For KV store backends, this deletes owned registrations immediately rather than
/// waiting for TTL expiry. Default is a no-op for backends that don't need cleanup.
fn shutdown(&self) {}
} }
...@@ -5,7 +5,7 @@ use crate::component::{Component, Instance}; ...@@ -5,7 +5,7 @@ use crate::component::{Component, Instance};
use crate::pipeline::PipelineError; use crate::pipeline::PipelineError;
use crate::pipeline::network::manager::NetworkManager; use crate::pipeline::network::manager::NetworkManager;
use crate::service::{ServiceClient, ServiceSet}; use crate::service::{ServiceClient, ServiceSet};
use crate::storage::kv::{self, Store as _}; use crate::storage::kv;
use crate::{ use crate::{
component::{self, ComponentBuilder, Endpoint, Namespace}, component::{self, ComponentBuilder, Endpoint, Namespace},
discovery::Discovery, discovery::Discovery,
...@@ -44,7 +44,6 @@ pub struct DistributedRuntime { ...@@ -44,7 +44,6 @@ pub struct DistributedRuntime {
runtime: Runtime, runtime: Runtime,
nats_client: Option<transports::nats::Client>, nats_client: Option<transports::nats::Client>,
store: kv::Manager,
network_manager: Arc<NetworkManager>, network_manager: Arc<NetworkManager>,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>, system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
...@@ -101,21 +100,7 @@ impl std::fmt::Debug for DistributedRuntime { ...@@ -101,21 +100,7 @@ impl std::fmt::Debug for DistributedRuntime {
impl DistributedRuntime { impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (selected_kv_store, nats_config, request_plane) = config.dissolve(); let (discovery_backend, nats_config, request_plane) = config.dissolve();
let runtime_clone = runtime.clone();
let store = match selected_kv_store {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
// The returned error doesn't show because of a dropped runtime error, so
// log it first.
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
kv::Manager::etcd(etcd_client)
}
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
kv::Selector::Memory => kv::Manager::memory(),
};
let nats_client = match nats_config { let nats_client = match nats_config {
Some(nc) => Some(nc.connect().await?), Some(nc) => Some(nc.connect().await?),
...@@ -143,11 +128,8 @@ impl DistributedRuntime { ...@@ -143,11 +128,8 @@ impl DistributedRuntime {
))); )));
// Initialize discovery client based on backend configuration // Initialize discovery client based on backend configuration
let discovery_backend = let (discovery_client, discovery_metadata) = match discovery_backend {
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string()); DiscoveryBackend::Kubernetes => {
let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
"kubernetes" => {
tracing::info!("Initializing Kubernetes discovery backend"); tracing::info!("Initializing Kubernetes discovery backend");
let metadata = Arc::new(tokio::sync::RwLock::new( let metadata = Arc::new(tokio::sync::RwLock::new(
crate::discovery::DiscoveryMetadata::new(), crate::discovery::DiscoveryMetadata::new(),
...@@ -162,14 +144,22 @@ impl DistributedRuntime { ...@@ -162,14 +144,22 @@ impl DistributedRuntime {
)?; )?;
(Arc::new(client) as Arc<dyn Discovery>, Some(metadata)) (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
} }
_ => { DiscoveryBackend::KvStore(kv_selector) => {
tracing::info!("Initializing KV store discovery backend"); tracing::info!("Initializing KV store discovery backend");
let runtime_clone = runtime.clone();
let store = match kv_selector {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
kv::Manager::etcd(etcd_client)
}
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
kv::Selector::Memory => kv::Manager::memory(),
};
use crate::discovery::KVStoreDiscovery; use crate::discovery::KVStoreDiscovery;
( (
Arc::new(KVStoreDiscovery::new( Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
store.clone(), as Arc<dyn Discovery>,
runtime.primary_token(),
)) as Arc<dyn Discovery>,
None, None,
) )
} }
...@@ -187,7 +177,6 @@ impl DistributedRuntime { ...@@ -187,7 +177,6 @@ impl DistributedRuntime {
let distributed_runtime = Self { let distributed_runtime = Self {
runtime, runtime,
store,
network_manager: Arc::new(network_manager), network_manager: Arc::new(network_manager),
nats_client, nats_client,
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
...@@ -322,7 +311,7 @@ impl DistributedRuntime { ...@@ -322,7 +311,7 @@ impl DistributedRuntime {
pub fn shutdown(&self) { pub fn shutdown(&self) {
self.runtime.shutdown(); self.runtime.shutdown();
self.store.shutdown(); self.discovery_client.shutdown();
} }
/// Create a [`Namespace`] /// Create a [`Namespace`]
...@@ -372,12 +361,6 @@ impl DistributedRuntime { ...@@ -372,12 +361,6 @@ impl DistributedRuntime {
self.system_status_server.get().cloned() self.system_status_server.get().cloned()
} }
/// An interface to store things outside of the process. Usually backed by something like etcd.
/// Currently does key-value, but will grow to include whatever we need to store.
pub fn store(&self) -> &kv::Manager {
&self.store
}
/// How the frontend should talk to the backend. /// How the frontend should talk to the backend.
pub fn request_plane(&self) -> RequestPlaneMode { pub fn request_plane(&self) -> RequestPlaneMode {
self.request_plane self.request_plane
...@@ -525,9 +508,18 @@ impl DistributedRuntime { ...@@ -525,9 +508,18 @@ impl DistributedRuntime {
} }
} }
/// Selects which discovery backend to use and, for KV store backends, which KV store.
#[derive(Clone, Debug)]
pub enum DiscoveryBackend {
/// Use Kubernetes API for service discovery (no KV store needed)
Kubernetes,
/// Use a KV store (etcd, file, or memory) for service discovery
KvStore(kv::Selector),
}
#[derive(Dissolve)] #[derive(Dissolve)]
pub struct DistributedConfig { pub struct DistributedConfig {
pub store_backend: kv::Selector, pub discovery_backend: DiscoveryBackend,
pub nats_config: Option<nats::ClientOptions>, pub nats_config: Option<nats::ClientOptions>,
pub request_plane: RequestPlaneMode, pub request_plane: RequestPlaneMode,
} }
...@@ -545,20 +537,29 @@ impl DistributedConfig { ...@@ -545,20 +537,29 @@ impl DistributedConfig {
let nats_enabled = request_plane.is_nats() let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
// Check discovery backend to determine the appropriate KV store backend - // DYN_DISCOVERY_BACKEND selects the discovery mechanism
// kubernetes discovery, or etcd. // Valid values: "kubernetes", "etcd" (default), "file", "mem"
let discovery_backend = let backend_str =
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string()); std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
let store_backend = if discovery_backend == "kubernetes" { let discovery_backend = match backend_str.as_str() {
tracing::info!("Using Kubernetes discovery backend"); "kubernetes" => {
kv::Selector::Memory tracing::info!("Using Kubernetes discovery backend");
} else { DiscoveryBackend::Kubernetes
kv::Selector::Etcd(Box::default()) }
other => {
let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
panic!(
"Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
Valid options: kubernetes, etcd, file, mem"
)
});
DiscoveryBackend::KvStore(selector)
}
}; };
DistributedConfig { DistributedConfig {
store_backend, discovery_backend,
nats_config: if nats_enabled { nats_config: if nats_enabled {
Some(nats::ClientOptions::default()) Some(nats::ClientOptions::default())
} else { } else {
...@@ -577,7 +578,7 @@ impl DistributedConfig { ...@@ -577,7 +578,7 @@ impl DistributedConfig {
let nats_enabled = request_plane.is_nats() let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
DistributedConfig { DistributedConfig {
store_backend: kv::Selector::Etcd(Box::new(etcd_config)), discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config))),
nats_config: if nats_enabled { nats_config: if nats_enabled {
Some(nats::ClientOptions::default()) Some(nats::ClientOptions::default())
} else { } else {
...@@ -591,7 +592,7 @@ impl DistributedConfig { ...@@ -591,7 +592,7 @@ impl DistributedConfig {
/// same process. /// same process.
pub fn process_local() -> DistributedConfig { pub fn process_local() -> DistributedConfig {
DistributedConfig { DistributedConfig {
store_backend: kv::Selector::Memory, discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
nats_config: None, nats_config: None,
// This won't be used in process local, so we likely need a "none" option to // This won't be used in process local, so we likely need a "none" option to
// communicate that and avoid opening the ports. // communicate that and avoid opening the ports.
...@@ -671,11 +672,13 @@ pub mod distributed_test_utils { ...@@ -671,11 +672,13 @@ pub mod distributed_test_utils {
/// Note: Settings are read from environment variables inside DistributedRuntime::from_settings /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> super::DistributedRuntime { pub async fn create_test_drt_async() -> super::DistributedRuntime {
use crate::{storage::kv, transports::nats}; use crate::transports::nats;
let rt = crate::Runtime::from_current().unwrap(); let rt = crate::Runtime::from_current().unwrap();
let config = super::DistributedConfig { let config = super::DistributedConfig {
store_backend: kv::Selector::Memory, discovery_backend: super::DiscoveryBackend::KvStore(
crate::storage::kv::Selector::Memory,
),
nats_config: Some(nats::ClientOptions::default()), nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(), request_plane: crate::distributed::RequestPlaneMode::default(),
}; };
...@@ -691,11 +694,13 @@ pub mod distributed_test_utils { ...@@ -691,11 +694,13 @@ pub mod distributed_test_utils {
pub async fn create_test_shared_drt_async( pub async fn create_test_shared_drt_async(
store_path: &std::path::Path, store_path: &std::path::Path,
) -> super::DistributedRuntime { ) -> super::DistributedRuntime {
use crate::{storage::kv, transports::nats}; use crate::transports::nats;
let rt = crate::Runtime::from_current().unwrap(); let rt = crate::Runtime::from_current().unwrap();
let config = super::DistributedConfig { let config = super::DistributedConfig {
store_backend: kv::Selector::File(store_path.to_path_buf()), discovery_backend: super::DiscoveryBackend::KvStore(
crate::storage::kv::Selector::File(store_path.to_path_buf()),
),
nats_config: Some(nats::ClientOptions::default()), nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(), request_plane: crate::distributed::RequestPlaneMode::default(),
}; };
......
...@@ -253,7 +253,8 @@ fn make_key(bucket_name: &str, key: &Key) -> String { ...@@ -253,7 +253,8 @@ fn make_key(bucket_name: &str, key: &Key) -> String {
#[cfg(test)] #[cfg(test)]
mod concurrent_create_tests { mod concurrent_create_tests {
use super::*; use super::*;
use crate::{DistributedRuntime, Runtime, distributed::DistributedConfig}; use crate::Runtime;
use crate::transports::etcd as etcd_transport;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Barrier; use tokio::sync::Barrier;
...@@ -261,17 +262,20 @@ mod concurrent_create_tests { ...@@ -261,17 +262,20 @@ mod concurrent_create_tests {
fn test_concurrent_etcd_create_race_condition() { fn test_concurrent_etcd_create_race_condition() {
let rt = Runtime::from_settings().unwrap(); let rt = Runtime::from_settings().unwrap();
let rt_clone = rt.clone(); let rt_clone = rt.clone();
let config = DistributedConfig::from_settings();
rt_clone.primary().block_on(async move { rt_clone.primary().block_on(async move {
let drt = DistributedRuntime::new(rt, config).await.unwrap(); let etcd_client =
test_concurrent_create(drt).await.unwrap(); etcd_transport::Client::new(etcd_transport::ClientOptions::default(), rt)
.await
.unwrap();
let storage = crate::storage::kv::Manager::etcd(etcd_client);
test_concurrent_create(&storage).await.unwrap();
}); });
} }
async fn test_concurrent_create(drt: DistributedRuntime) -> Result<(), StoreError> { async fn test_concurrent_create(
let storage = drt.store(); storage: &crate::storage::kv::Manager,
) -> Result<(), StoreError> {
// Create a bucket for testing // Create a bucket for testing
let bucket = Arc::new(tokio::sync::Mutex::new( let bucket = Arc::new(tokio::sync::Mutex::new(
storage storage
......
...@@ -594,12 +594,12 @@ class SharedNatsServer(SharedManagedProcess): ...@@ -594,12 +594,12 @@ class SharedNatsServer(SharedManagedProcess):
@pytest.fixture @pytest.fixture
def store_kv(request): def discovery_backend(request):
""" """
KV store for runtime. Defaults to "etcd". Discovery backend for runtime. Defaults to "etcd".
To iterate over multiple stores in a test: To iterate over multiple backends in a test:
@pytest.mark.parametrize("store_kv", ["file", "etcd"], indirect=True) @pytest.mark.parametrize("discovery_backend", ["file", "etcd"], indirect=True)
def test_example(runtime_services): def test_example(runtime_services):
... ...
""" """
...@@ -641,24 +641,24 @@ def durable_kv_events(request): ...@@ -641,24 +641,24 @@ def durable_kv_events(request):
@pytest.fixture() @pytest.fixture()
def runtime_services(request, store_kv, request_plane): def runtime_services(request, discovery_backend, request_plane):
""" """
Start runtime services (NATS and/or etcd) based on store_kv and request_plane. Start runtime services (NATS and/or etcd) based on discovery_backend and request_plane.
- If store_kv != "etcd", etcd is not started (returns None) - If discovery_backend != "etcd", etcd is not started (returns None)
- If request_plane != "nats", NATS is not started (returns None) - If request_plane != "nats", NATS is not started (returns None)
Returns a tuple of (nats_process, etcd_process) where each has a .port attribute. Returns a tuple of (nats_process, etcd_process) where each has a .port attribute.
""" """
# Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods # Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods
if request_plane == "nats" and store_kv == "etcd": if request_plane == "nats" and discovery_backend == "etcd":
with NatsServer(request) as nats_process: with NatsServer(request) as nats_process:
with EtcdServer(request) as etcd_process: with EtcdServer(request) as etcd_process:
yield nats_process, etcd_process yield nats_process, etcd_process
elif request_plane == "nats": elif request_plane == "nats":
with NatsServer(request) as nats_process: with NatsServer(request) as nats_process:
yield nats_process, None yield nats_process, None
elif store_kv == "etcd": elif discovery_backend == "etcd":
with EtcdServer(request) as etcd_process: with EtcdServer(request) as etcd_process:
yield None, etcd_process yield None, etcd_process
else: else:
...@@ -666,7 +666,9 @@ def runtime_services(request, store_kv, request_plane): ...@@ -666,7 +666,9 @@ def runtime_services(request, store_kv, request_plane):
@pytest.fixture() @pytest.fixture()
def runtime_services_dynamic_ports(request, store_kv, request_plane, durable_kv_events): def runtime_services_dynamic_ports(
request, discovery_backend, request_plane, durable_kv_events
):
"""Provide NATS and Etcd servers with truly dynamic ports per test. """Provide NATS and Etcd servers with truly dynamic ports per test.
This fixture actually allocates dynamic ports by passing port=0 to the servers. This fixture actually allocates dynamic ports by passing port=0 to the servers.
...@@ -678,7 +680,7 @@ def runtime_services_dynamic_ports(request, store_kv, request_plane, durable_kv_ ...@@ -678,7 +680,7 @@ def runtime_services_dynamic_ports(request, store_kv, request_plane, durable_kv_
- Each pytest-xdist worker runs tests in a separate process, so env vars do not - Each pytest-xdist worker runs tests in a separate process, so env vars do not
leak across workers. leak across workers.
- If store_kv != "etcd", etcd is not started (returns None) - If discovery_backend != "etcd", etcd is not started (returns None)
- NATS is always started when etcd is used, because KV events require NATS - NATS is always started when etcd is used, because KV events require NATS
regardless of the request_plane (tcp/nats only affects request transport) regardless of the request_plane (tcp/nats only affects request transport)
- NATS Core mode (no JetStream) is the default; JetStream is enabled when durable_kv_events=True - NATS Core mode (no JetStream) is the default; JetStream is enabled when durable_kv_events=True
...@@ -690,7 +692,7 @@ def runtime_services_dynamic_ports(request, store_kv, request_plane, durable_kv_ ...@@ -690,7 +692,7 @@ def runtime_services_dynamic_ports(request, store_kv, request_plane, durable_kv_
# Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods # Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods
# Always start NATS when etcd is used - KV events require NATS regardless of request_plane # Always start NATS when etcd is used - KV events require NATS regardless of request_plane
# When durable_kv_events=False (default), disable JetStream for faster startup # When durable_kv_events=False (default), disable JetStream for faster startup
if store_kv == "etcd": if discovery_backend == "etcd":
with NatsServer( with NatsServer(
request, port=0, disable_jetstream=not durable_kv_events request, port=0, disable_jetstream=not durable_kv_events
) as nats_process: ) as nats_process:
......
...@@ -77,7 +77,7 @@ class VllmPromptEmbedsWorkerProcess(ManagedProcess): ...@@ -77,7 +77,7 @@ class VllmPromptEmbedsWorkerProcess(ManagedProcess):
"none", "none",
"--max-model-len", "--max-model-len",
"4096", "4096",
"--store-kv", "--discovery-backend",
"file", "file",
"--request-plane", "--request-plane",
"tcp", "tcp",
...@@ -152,7 +152,7 @@ def start_services( ...@@ -152,7 +152,7 @@ def start_services(
request, request,
frontend_port=frontend_port, frontend_port=frontend_port,
terminate_all_matching_process_names=False, terminate_all_matching_process_names=False,
extra_args=["--store-kv", "file", "--request-plane", "tcp"], extra_args=["--discovery-backend", "file", "--request-plane", "tcp"],
): ):
logger.info("Frontend started for prompt embeds tests") logger.info("Frontend started for prompt embeds tests")
with VllmPromptEmbedsWorkerProcess( with VllmPromptEmbedsWorkerProcess(
......
...@@ -62,7 +62,7 @@ class KVRouterProcess(ManagedProcess): ...@@ -62,7 +62,7 @@ class KVRouterProcess(ManagedProcess):
"kv", "kv",
"--http-port", "--http-port",
str(frontend_port), str(frontend_port),
"--store-kv", "--discovery-backend",
store_backend, store_backend,
"--namespace", "--namespace",
namespace, namespace,
......
...@@ -117,7 +117,7 @@ def _build_mocker_command( ...@@ -117,7 +117,7 @@ def _build_mocker_command(
MODEL_NAME, MODEL_NAME,
"--endpoint", "--endpoint",
endpoint, endpoint,
"--store-kv", "--discovery-backend",
store_backend, store_backend,
"--num-workers", "--num-workers",
str(num_workers), str(num_workers),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment