Unverified Commit fdcc8d5b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test: add durable consumer count test to CI using py-nats (#3669)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 333ee983
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
# For IFEval dataset loading in kvbm tests # For IFEval dataset loading in kvbm tests
datasets datasets
# For NATS object store verification in router tests
nats-py
psutil>=5.0.0 psutil>=5.0.0
pyright pyright
pytest pytest
......
...@@ -318,6 +318,9 @@ To manage stream growth, when the message count exceeds `--router-snapshot-thres ...@@ -318,6 +318,9 @@ To manage stream growth, when the message count exceeds `--router-snapshot-thres
Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides. Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides.
>[!Warning]
> **Multiple Routers in Same Process**: If you need to run multiple `KvPushRouter` instances for fault tolerance or load distribution, you must launch them in **separate processes** (e.g., using `python -m dynamo.frontend` with different ports). Creating multiple `KvPushRouter` objects in the same Python process is not supported - they share the same cancellation token from the component's primary lease, so dropping one router will cancel all routers in that process. For in-process routing, use a single `KvPushRouter` instance.
### Methods ### Methods
The `KvPushRouter` provides the following methods: The `KvPushRouter` provides the following methods:
......
...@@ -7,10 +7,10 @@ import logging ...@@ -7,10 +7,10 @@ import logging
import os import os
import random import random
import string import string
import subprocess
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import aiohttp import aiohttp
import nats
import pytest import pytest
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
...@@ -553,8 +553,85 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers) ...@@ -553,8 +553,85 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers)
f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers" f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers"
) )
# Verify durable consumers lifecycle
async def verify_consumer_lifecycle():
logger.info("Verifying durable consumers lifecycle")
# Construct the stream name
component_subject = f"namespace.{mockers.namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
logger.info(f"Checking consumers for stream: {stream_name}")
# Connect to NATS and list consumers
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
# List consumers - should have 2 (one for each router process)
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
assert (
len(consumer_names) == 2
), f"Expected 2 durable consumers (one per router), found {len(consumer_names)}: {consumer_names}"
logger.info("✓ Verified 2 durable consumers exist (one per router)")
# Kill the first router process
logger.info(f"Killing first router on port {router_ports[0]}")
kv_routers[0].__exit__(None, None, None)
# Wait for cleanup to happen (consumer deletion is triggered by etcd watch)
await asyncio.sleep(1)
# Verify only 1 consumer remains
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router1, found {len(consumer_names)} consumers: {consumer_names}"
)
assert (
len(consumer_names) == 1
), f"Expected 1 durable consumer after killing router1, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 1 durable consumer remains after killing first router"
)
# Kill the second router process
logger.info(f"Killing second router on port {router_ports[1]}")
kv_routers[1].__exit__(None, None, None)
# Wait for cleanup to happen
await asyncio.sleep(1)
# Verify no consumers remain
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router2, found {len(consumer_names)} consumers: {consumer_names}"
)
assert (
len(consumer_names) == 0
), f"Expected 0 durable consumers after killing both routers, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 0 durable consumers remain after killing both routers"
)
finally:
await nc.close()
# Run consumer lifecycle verification
asyncio.run(verify_consumer_lifecycle())
# Clear the kv_routers list since we've already cleaned them up
kv_routers = []
finally: finally:
# Clean up routers # Clean up any remaining routers (in case of error before consumer verification)
for kv_router in kv_routers: for kv_router in kv_routers:
kv_router.__exit__(None, None, None) kv_router.__exit__(None, None, None)
...@@ -991,45 +1068,26 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers): ...@@ -991,45 +1068,26 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
logger.info(f"Verifying NATS object store bucket exists: {expected_bucket}") logger.info(f"Verifying NATS object store bucket exists: {expected_bucket}")
snapshot_verified = False snapshot_verified = False
try: try:
# List objects in the bucket # Connect to NATS and check object store
result = subprocess.run( nc = await nats.connect("nats://localhost:4222")
[ try:
"nats", js = nc.jetstream()
"object", obj_store = await js.object_store(expected_bucket)
"ls",
expected_bucket,
"--server",
"nats://localhost:4222",
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0: # Try to get the expected file
logger.info( try:
f"Successfully listed bucket contents:\n{result.stdout}" result = await obj_store.get(expected_file)
)
# Check if the expected file exists
if expected_file in result.stdout:
logger.info( logger.info(
f"✓ Snapshot file '{expected_file}' found in bucket '{expected_bucket}'" f"✓ Snapshot file '{expected_file}' found in bucket '{expected_bucket}' "
f"(size: {len(result.data) if result.data else 0} bytes)"
) )
snapshot_verified = True snapshot_verified = True
else: except Exception as e:
logger.error( logger.error(
f"Snapshot file '{expected_file}' not found in bucket '{expected_bucket}'" f"Snapshot file '{expected_file}' not found in bucket '{expected_bucket}': {e}"
) )
logger.error(f"Bucket contents:\n{result.stdout}") finally:
else: await nc.close()
logger.error(f"Failed to list bucket: {result.stderr}")
except subprocess.TimeoutExpired:
logger.error("Timeout checking NATS object store bucket")
except FileNotFoundError:
logger.warning(
"nats CLI not found in PATH, skipping bucket verification (test will continue)"
)
snapshot_verified = True # Don't fail if nats CLI not installed
except Exception as e: except Exception as e:
logger.error(f"Error checking NATS object store: {e}") logger.error(f"Error checking NATS object store: {e}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment