Unverified Commit ab9c9509 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: register Kv router instance into etcd (#2548)

parent 0c50a233
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_runtime::prelude::DistributedRuntimeProvider;
use dynamo_runtime::slug::Slug;
use crate::discovery::ModelEntry; use crate::discovery::ModelEntry;
...@@ -212,6 +214,23 @@ impl ModelManager { ...@@ -212,6 +214,23 @@ impl ModelManager {
kv_cache_block_size: u32, kv_cache_block_size: u32,
kv_router_config: Option<KvRouterConfig>, kv_router_config: Option<KvRouterConfig>,
) -> anyhow::Result<Arc<KvRouter>> { ) -> anyhow::Result<Arc<KvRouter>> {
let etcd_client = component
.drt()
.etcd_client()
.ok_or_else(|| anyhow::anyhow!("KV routing requires etcd (dynamic mode)"))?;
let router_key = format!(
"kv_routers/{}/{}",
Slug::from_string(model_name),
uuid::Uuid::new_v4()
);
etcd_client
.kv_create(
&router_key,
serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?,
None, // use primary lease
)
.await?;
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config)); let selector = Box::new(DefaultWorkerSelector::new(kv_router_config));
let chooser = KvRouter::new( let chooser = KvRouter::new(
component.clone(), component.clone(),
......
...@@ -16,6 +16,7 @@ use dynamo_runtime::{ ...@@ -16,6 +16,7 @@ use dynamo_runtime::{
protocols::annotated::Annotated, protocols::annotated::Annotated,
}; };
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
pub mod approx; pub mod approx;
pub mod indexer; pub mod indexer;
...@@ -73,7 +74,7 @@ pub trait WorkerSelector { ...@@ -73,7 +74,7 @@ pub trait WorkerSelector {
} }
/// KV Router configuration parameters /// KV Router configuration parameters
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct KvRouterConfig { pub struct KvRouterConfig {
pub overlap_score_weight: f64, pub overlap_score_weight: f64,
......
...@@ -11,6 +11,7 @@ from typing import Any, Dict ...@@ -11,6 +11,7 @@ from typing import Any, Dict
import aiohttp import aiohttp
import pytest import pytest
from dynamo._core import DistributedRuntime
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
pytestmark = pytest.mark.pre_merge pytestmark = pytest.mark.pre_merge
...@@ -131,6 +132,50 @@ async def send_request_with_retry(url: str, payload: dict, max_retries: int = 4) ...@@ -131,6 +132,50 @@ async def send_request_with_retry(url: str, payload: dict, max_retries: int = 4)
return False return False
def get_runtime():
"""Get or create a DistributedRuntime instance.
This handles the case where a worker is already initialized (common in CI)
by using the detached() method to reuse the existing runtime.
"""
try:
# Try to use existing runtime (common in CI where tests run in same process)
_runtime_instance = DistributedRuntime.detached()
logger.info("Using detached runtime (worker already initialized)")
except Exception as e:
# If no existing runtime, create a new one
logger.info(f"Creating new runtime (detached failed: {e})")
loop = asyncio.get_running_loop()
_runtime_instance = DistributedRuntime(loop, False)
return _runtime_instance
async def check_registration_in_etcd(expected_count: int):
"""Check that the expected number of KV routers are registered in etcd.
Args:
expected_count: The number of KV routers expected to be registered
Returns:
List of registered KV router entries from etcd
"""
runtime = get_runtime()
etcd = runtime.etcd_client()
# Check for kv_routers in etcd
# The KV router registers itself with key format: kv_routers/{model_name}/{uuid}
kv_routers = await etcd.kv_get_prefix("kv_routers/")
logger.info(f"Found {len(kv_routers)} KV router(s) registered in etcd")
# Assert we have the expected number of KV routers registered
assert (
len(kv_routers) == expected_count
), f"Expected {expected_count} KV router(s) in etcd, found {len(kv_routers)}"
return kv_routers
async def send_inflight_requests(urls: list, payload: dict, num_requests: int): async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
"""Send multiple requests concurrently, alternating between URLs if multiple provided""" """Send multiple requests concurrently, alternating between URLs if multiple provided"""
...@@ -239,6 +284,9 @@ def test_mocker_kv_router(request, runtime_services): ...@@ -239,6 +284,9 @@ def test_mocker_kv_router(request, runtime_services):
logger.info(f"Successfully completed {NUM_REQUESTS} requests") logger.info(f"Successfully completed {NUM_REQUESTS} requests")
# Check etcd registration - expect 1 KV router
asyncio.run(check_registration_in_etcd(expected_count=1))
finally: finally:
# Clean up # Clean up
if "kv_router" in locals(): if "kv_router" in locals():
...@@ -312,6 +360,9 @@ def test_mocker_two_kv_router(request, runtime_services): ...@@ -312,6 +360,9 @@ def test_mocker_two_kv_router(request, runtime_services):
f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers" f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers"
) )
# Check etcd registration - expect 2 KV routers
asyncio.run(check_registration_in_etcd(expected_count=2))
finally: finally:
# Clean up routers # Clean up routers
for kv_router in kv_routers: for kv_router in kv_routers:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment