Unverified Commit 4d86f347 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: make Router non-reliant on etcd (#4244)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 7d604dd3
......@@ -3,18 +3,16 @@
//! Background processes for the KV Router including event consumption and snapshot uploads.
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
use anyhow::Result;
use dynamo_runtime::{
component::Component,
discovery::DiscoveryQuery,
prelude::*,
storage::key_value_store::WatchEvent,
traits::events::EventPublisher,
transports::{
etcd::{Client as EtcdClient, WatchEvent},
nats::{NatsQueue, Slug},
},
transports::nats::{NatsQueue, Slug},
};
use futures::StreamExt;
use rand::Rng;
......@@ -249,12 +247,6 @@ pub async fn start_kv_router_background(
.build()?;
let nats_client = client_options.connect().await?;
// Get etcd client (needed for both snapshots and router watching)
let etcd_client = component
.drt()
.etcd_client()
.ok_or_else(|| anyhow::anyhow!("etcd client not available"))?;
// Create bucket name for snapshots/state
let bucket_name = Slug::slugify(&format!("{}-{RADIX_STATE_BUCKET}", component.subject()))
.to_string()
......@@ -273,13 +265,12 @@ pub async fn start_kv_router_background(
}
// Cleanup orphaned consumers on startup
cleanup_orphaned_consumers(&mut nats_queue, &etcd_client, &component, &consumer_uuid).await;
cleanup_orphaned_consumers(&mut nats_queue, &component, &consumer_uuid).await;
// Watch for router deletions to clean up orphaned consumers
let (_prefix_str, mut router_replicas_rx) = etcd_client
.kv_get_and_watch_prefix(&format!("{}/", KV_ROUTERS_ROOT_PATH))
.await?
.dissolve();
let store = component.drt().store();
let (_watch_handle, mut router_replicas_rx) =
Arc::new(store.clone()).watch(KV_ROUTERS_ROOT_PATH, None, cancellation_token.clone());
// Get the generate endpoint and watch for instance deletions
let generate_endpoint = component.endpoint("generate");
......@@ -424,7 +415,7 @@ pub async fn start_kv_router_background(
continue;
};
let key = String::from_utf8_lossy(kv.key());
let key = kv.as_ref();
tracing::info!("Detected router replica deletion: {key}");
// Only process deletions for routers on the same component
......@@ -466,10 +457,9 @@ pub async fn start_kv_router_background(
Ok(())
}
/// Cleanup orphaned NATS consumers that no longer have corresponding etcd router entries
/// Cleanup orphaned NATS consumers that no longer have corresponding router entries
async fn cleanup_orphaned_consumers(
nats_queue: &mut NatsQueue,
etcd_client: &EtcdClient,
component: &Component,
consumer_uuid: &str,
) {
......@@ -477,18 +467,32 @@ async fn cleanup_orphaned_consumers(
return;
};
let router_prefix = format!("{}/{}/", KV_ROUTERS_ROOT_PATH, component.path());
let Ok(router_entries) = etcd_client.kv_get_prefix(&router_prefix).await else {
// Get active routers from store
let store = component.drt().store();
let Ok(Some(router_bucket)) = store.get_bucket(KV_ROUTERS_ROOT_PATH).await else {
tracing::debug!("No router bucket found, skipping cleanup");
return;
};
let Ok(entries) = router_bucket.entries().await else {
return;
};
let active_uuids: HashSet<String> = router_entries
// Filter to only routers for this component
// Note: keys differ between storage backends:
// - FileStore: "namespace/component/uuid" (relative to bucket)
// - EtcdStore: "v1/kv_routers/namespace/component/uuid" (full path)
// Use contains() to handle both cases
let component_path = component.path();
let active_uuids: HashSet<String> = entries
.iter()
.filter_map(|kv| {
String::from_utf8_lossy(kv.key())
.split('/')
.next_back()
.map(str::to_string)
.filter_map(|(key, _)| {
// Check if key contains this component's path
if !key.contains(&component_path) {
return None;
}
// Extract the last part (should be the UUID)
key.split('/').next_back().map(str::to_string)
})
.collect();
......
......@@ -301,12 +301,21 @@ impl KeyValueBucket for Directory {
continue;
}
let key = match entry.path().strip_prefix(&self.root) {
// Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
let canonical_entry_path = match entry.path().canonicalize() {
Ok(p) => p,
Err(err) => {
tracing::warn!(error = %err, path = %entry.path().display(), "Failed to canonicalize path. Using original path.");
entry.path()
}
};
let key = match canonical_entry_path.strip_prefix(&self.root) {
Ok(p) => p.to_string_lossy().to_string().replace("_", "/"),
Err(err) => {
tracing::error!(
error = %err,
path = %entry.path().display(),
path = %canonical_entry_path.display(),
root = %self.root.display(),
"FileStore path not in root. Should be impossible. Skipping entry."
);
......
......@@ -7,6 +7,7 @@ import logging
import os
import random
import string
import tempfile
from typing import Any, Dict, Optional
import aiohttp
......@@ -21,6 +22,26 @@ pytestmark = pytest.mark.pre_merge
logger = logging.getLogger(__name__)
@pytest.fixture
def file_storage_backend():
"""Fixture that sets up and tears down file storage backend.
Creates a temporary directory for file-based KV storage and sets
the DYN_FILE_KV environment variable. Cleans up after the test.
"""
with tempfile.TemporaryDirectory() as tmpdir:
old_env = os.environ.get("DYN_FILE_KV")
os.environ["DYN_FILE_KV"] = tmpdir
logger.info(f"Set up file storage backend in: {tmpdir}")
yield tmpdir
# Cleanup
if old_env is not None:
os.environ["DYN_FILE_KV"] = old_env
else:
os.environ.pop("DYN_FILE_KV", None)
MODEL_NAME = ROUTER_MODEL_NAME
NUM_MOCKERS = 2
BLOCK_SIZE = 16
......@@ -56,6 +77,7 @@ class MockerProcess:
request,
mocker_args: Optional[Dict[str, Any]] = None,
num_mockers: int = 1,
store_backend: str = "etcd",
):
# Generate a unique namespace suffix shared by all mockers
namespace_suffix = generate_random_suffix()
......@@ -78,6 +100,8 @@ class MockerProcess:
MODEL_NAME,
"--endpoint",
self.endpoint,
"--store-kv",
store_backend,
]
# Add individual CLI arguments from mocker_args
......@@ -142,7 +166,7 @@ class MockerProcess:
class KVRouterProcess(ManagedProcess):
"""Manages the KV router process using dynamo.frontend"""
def __init__(self, request, frontend_port: int):
def __init__(self, request, frontend_port: int, store_backend: str = "etcd"):
command = [
"python",
"-m",
......@@ -153,6 +177,8 @@ class KVRouterProcess(ManagedProcess):
"kv",
"--http-port",
str(frontend_port),
"--store-kv",
store_backend,
]
super().__init__(
......@@ -206,29 +232,63 @@ async def send_request_with_retry(url: str, payload: dict, max_retries: int = 8)
return False
def get_runtime():
"""Get or create a DistributedRuntime instance.
def get_runtime(store_backend="etcd"):
"""Create a DistributedRuntime instance for testing.
This handles the case where a worker is already initialized (common in CI)
by using the detached() method to reuse the existing runtime.
Args:
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
"""
try:
# Try to use existing runtime (common in CI where tests run in same process)
_runtime_instance = DistributedRuntime.detached()
logger.info("Using detached runtime (worker already initialized)")
except Exception as e:
# If no existing runtime, create a new one
logger.info(f"Creating new runtime (detached failed: {e})")
try:
# Try to get running loop (works in async context)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_runtime_instance = DistributedRuntime(loop, "etcd")
# Try to get running loop (works in async context)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return DistributedRuntime(loop, store_backend)
async def check_nats_consumers(namespace: str, expected_count: Optional[int] = None):
"""Check NATS consumers for the KV events stream.
Args:
namespace: The namespace to check consumers for
expected_count: Optional expected number of consumers. If provided, logs an error if count doesn't match.
Returns:
List of consumer names
"""
component_subject = f"namespace.{namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
logger.info(f"Checking consumers for stream: {stream_name}")
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
# Log detailed consumer info
for info in consumer_infos:
logger.info(
f"Consumer {info.name}: "
f"num_pending={info.num_pending}, "
f"num_ack_pending={info.num_ack_pending}, "
f"ack_floor={info.ack_floor}, "
f"delivered={info.delivered}"
)
if expected_count is not None:
assert (
len(consumer_names) == expected_count
), f"Expected {expected_count} durable consumers, found {len(consumer_names)}: {consumer_names}"
logger.info(f"✓ Verified {expected_count} durable consumers exist")
return _runtime_instance
return consumer_names
finally:
await nc.close()
async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
......@@ -503,14 +563,24 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers):
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
def test_mocker_two_kv_router(
request,
runtime_services,
predownload_tokenizers,
file_storage_backend,
store_backend,
):
"""
Test with two KV routers and multiple mocker engine instances.
Alternates requests between the two routers to test load distribution.
Tests with both etcd and file storage backends.
"""
# runtime_services starts etcd and nats
logger.info("Starting mocker two KV router test")
logger.info(
f"Starting mocker two KV router test with {store_backend} storage backend"
)
# Create mocker args dictionary: FixtureRequest: tuple[NatsServer, EtcdServer]: NoneType
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
......@@ -523,14 +593,17 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers)
for port in router_ports:
logger.info(f"Starting KV router frontend on port {port}")
kv_router = KVRouterProcess(request, port)
kv_router = KVRouterProcess(request, port, store_backend)
kv_router.__enter__()
kv_routers.append(kv_router)
# Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
request,
mocker_args=mocker_args,
num_mockers=NUM_MOCKERS,
store_backend=store_backend,
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__()
......@@ -557,72 +630,34 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers)
async def verify_consumer_lifecycle():
logger.info("Verifying durable consumers lifecycle")
# Construct the stream name
component_subject = f"namespace.{mockers.namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
# Check initial consumer count - should have 2 (one for each router process)
await check_nats_consumers(mockers.namespace, expected_count=2)
logger.info(f"Checking consumers for stream: {stream_name}")
# Kill the first router process
logger.info(f"Killing first router on port {router_ports[0]}")
kv_routers[0].__exit__(None, None, None)
# Connect to NATS and list consumers
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
# List consumers - should have 2 (one for each router process)
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
assert (
len(consumer_names) == 2
), f"Expected 2 durable consumers (one per router), found {len(consumer_names)}: {consumer_names}"
logger.info("✓ Verified 2 durable consumers exist (one per router)")
# Kill the first router process
logger.info(f"Killing first router on port {router_ports[0]}")
kv_routers[0].__exit__(None, None, None)
# Wait for cleanup to happen (consumer deletion is triggered by etcd watch)
await asyncio.sleep(1)
# Verify only 1 consumer remains
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router1, found {len(consumer_names)} consumers: {consumer_names}"
)
assert (
len(consumer_names) == 1
), f"Expected 1 durable consumer after killing router1, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 1 durable consumer remains after killing first router"
)
# Kill the second router process
logger.info(f"Killing second router on port {router_ports[1]}")
kv_routers[1].__exit__(None, None, None)
# Wait for cleanup to happen (consumer deletion is triggered by etcd watch)
await asyncio.sleep(1)
# Wait for cleanup to happen
await asyncio.sleep(1)
# Verify only 1 consumer remains
await check_nats_consumers(mockers.namespace, expected_count=1)
logger.info(
"✓ Verified 1 durable consumer remains after killing first router"
)
# Verify no consumers remain
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router2, found {len(consumer_names)} consumers: {consumer_names}"
)
# Kill the second router process
logger.info(f"Killing second router on port {router_ports[1]}")
kv_routers[1].__exit__(None, None, None)
assert (
len(consumer_names) == 0
), f"Expected 0 durable consumers after killing both routers, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 0 durable consumers remain after killing both routers"
)
# Wait for cleanup to happen
await asyncio.sleep(1)
finally:
await nc.close()
# Verify no consumers remain
await check_nats_consumers(mockers.namespace, expected_count=0)
logger.info(
"✓ Verified 0 durable consumers remain after killing both routers"
)
# Run consumer lifecycle verification
asyncio.run(verify_consumer_lifecycle())
......@@ -942,14 +977,22 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize
@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_indexers_sync(request, runtime_services, predownload_tokenizers):
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
def test_indexers_sync(
request,
runtime_services,
predownload_tokenizers,
file_storage_backend,
store_backend,
):
"""
Test that two KV routers have synchronized indexer states after processing requests.
This test verifies that both routers converge to the same internal state.
Tests with both etcd and file storage backends.
"""
# runtime_services starts etcd and nats
logger.info("Starting indexers sync test")
logger.info(f"Starting indexers sync test with {store_backend} storage backend")
# Create mocker args dicti: FixtureRequestonary: tuple[NatsServer, EtcdServer]: NoneType
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
......@@ -958,7 +1001,10 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
request,
mocker_args=mocker_args,
num_mockers=NUM_MOCKERS,
store_backend=store_backend,
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
# Initialize mockers
......@@ -966,12 +1012,20 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# Use async to manage the test flow
async def test_sync():
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
endpoint = component.endpoint("generate")
# Create SEPARATE runtimes for each router to ensure independence
# This is especially important for file storage backend where connection_id
# would otherwise be shared between routers
runtime1 = get_runtime(store_backend)
runtime2 = get_runtime(store_backend)
# Use the namespace from the mockers for both runtimes
namespace1 = runtime1.namespace(mockers.namespace)
component1 = namespace1.component("mocker")
endpoint1 = component1.endpoint("generate")
namespace2 = runtime2.namespace(mockers.namespace)
component2 = namespace2.component("mocker")
endpoint2 = component2.endpoint("generate")
# Create KvRouterConfig with lower snapshot threshold for testing
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
......@@ -1015,13 +1069,13 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# Launch first router
logger.info("Creating first KV router")
kv_push_router1 = KvPushRouter(
endpoint=endpoint,
endpoint=endpoint1,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
# Wait for mockers to be ready
await wait_for_mockers_ready(endpoint, kv_push_router1)
await wait_for_mockers_ready(endpoint1, kv_push_router1)
# Send 25 requests to first router
logger.info("Sending 25 requests to first router")
......@@ -1039,7 +1093,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# Launch second router - will automatically sync with the first router's state
logger.info("Creating second KV router")
kv_push_router2 = KvPushRouter(
endpoint=endpoint,
endpoint=endpoint2,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
......@@ -1056,6 +1110,9 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
logger.info("Waiting for final synchronization")
await asyncio.sleep(1)
# Check NATS consumers to verify both routers have separate consumers
await check_nats_consumers(mockers.namespace, expected_count=2)
# Verify NATS object store bucket was created with snapshot
# Mirror the Rust bucket naming logic from subscriber.rs:
# component.subject() -> "namespace.{ns}.component.{comp}"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment