Unverified Commit 50af3435 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

test: GAIE integration Unit and e2e tests (#7257)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent 0f505846
......@@ -305,7 +305,7 @@ use port-forward to expose the gateway to the host
```bash
# in first terminal
kubectl port-forward svc/inference-gateway 8000:80 -n ${NAMESPACE} # for NAMESPACE put wherever you installed the gateway i.e. kgateway-system or my-model8
kubectl port-forward svc/inference-gateway 8000:80 -n ${NAMESPACE} # for NAMESPACE put wherever you installed the gateway i.e. kgateway-system or my-model
# in second terminal where you want to send inference requests
GATEWAY_URL=http://localhost:8000
......@@ -317,8 +317,9 @@ a. Query models:
```bash
# in the second terminal where you GATEWAY_URL is set
curl $GATEWAY_URL/v1/models | jq .
# or if you added the host name to http route:
curl -H "Host: llama3-70b-disagg.example.com" $GATEWAY_URL/v1/models | jq .
```
Sample output:
......
......@@ -301,6 +301,22 @@ mod tests {
) -> (
Arc<SchedulerQueue<NoopSequencePublisher, SimpleWorkerConfig>>,
Arc<ActiveSequencesMultiWorker<NoopSequencePublisher>>,
) {
let (queue, slots, _tx) =
make_queue_with_sender(num_workers, block_size, isl, threshold_frac);
(queue, slots)
}
#[allow(clippy::type_complexity)]
fn make_queue_with_sender(
num_workers: usize,
block_size: u32,
isl: usize,
threshold_frac: Option<f64>,
) -> (
Arc<SchedulerQueue<NoopSequencePublisher, SimpleWorkerConfig>>,
Arc<ActiveSequencesMultiWorker<NoopSequencePublisher>>,
watch::Sender<HashMap<u64, SimpleWorkerConfig>>,
) {
let dp_range: HashMap<u64, (u32, u32)> =
(0..num_workers as u64).map(|id| (id, (0, 1))).collect();
......@@ -324,7 +340,6 @@ mod tests {
);
}
let (cfg_tx, cfg_rx) = watch::channel(configs);
std::mem::forget(cfg_tx);
let selector = Box::new(DefaultWorkerSelector::new(None, "test"));
let queue = Arc::new(SchedulerQueue::new(
......@@ -336,7 +351,7 @@ mod tests {
FcfsPolicy,
));
(queue, slots)
(queue, slots, cfg_tx)
}
fn make_request(
......@@ -515,4 +530,188 @@ mod tests {
"expected NoEndpoints, got {resp:?}"
);
}
/// Simulates the EPP path: router starts with zero workers (skip_initial_worker_wait),
/// then register_workers lazily injects workers before routing.
#[tokio::test(flavor = "multi_thread")]
async fn test_register_workers_lazy_epp_path() {
let block_size = 16;
let isl = 512;
// Start with zero workers (mimics skip_initial_worker_wait=true)
let (queue, slots, cfg_tx) = make_queue_with_sender(0, block_size, isl, None);
// Routing with no workers must fail
let (req_fail, rx_fail) = make_request("before-register", isl);
queue.enqueue(req_fail).await;
let resp = rx_fail.await.expect("oneshot dropped");
assert!(
matches!(
resp,
Err(crate::scheduling::types::KvSchedulerError::NoEndpoints)
),
"expected NoEndpoints before register_workers, got {resp:?}"
);
// Lazily register two workers in the slot tracker (EPP supplies pod list)
let mut dp_range = std::collections::HashMap::new();
dp_range.insert(100_u64, (0_u32, 1_u32));
dp_range.insert(200_u64, (0_u32, 1_u32));
slots.register_external_workers(&dp_range);
// Also update the config watch so the selector can see these workers
let mut configs = HashMap::new();
for &id in &[100_u64, 200_u64] {
configs.insert(
id,
SimpleWorkerConfig {
max_num_batched_tokens: Some(isl as u64),
..Default::default()
},
);
}
cfg_tx.send(configs).unwrap();
// Routing after registration must succeed and pick one of the registered workers
let (req_ok, rx_ok) = make_request("after-register", isl);
queue.enqueue(req_ok).await;
let resp = rx_ok
.await
.expect("oneshot dropped")
.expect("scheduling failed");
assert!(
resp.best_worker.worker_id == 100 || resp.best_worker.worker_id == 200,
"expected worker 100 or 200, got {}",
resp.best_worker.worker_id
);
// Clean up
slots
.mark_prefill_completed(&"after-register".to_string())
.await
.unwrap();
slots.free(&"after-register".to_string()).await.unwrap();
}
/// Register_workers is additive: calling with a new set does NOT remove old workers.
#[tokio::test(flavor = "multi_thread")]
async fn test_register_workers_additive() {
let block_size = 16;
let isl = 256;
let (queue, slots, cfg_tx) = make_queue_with_sender(0, block_size, isl, None);
// Register worker 10 in slots and config
let mut dp1 = std::collections::HashMap::new();
dp1.insert(10_u64, (0_u32, 1_u32));
slots.register_external_workers(&dp1);
let mut configs = HashMap::new();
configs.insert(
10_u64,
SimpleWorkerConfig {
max_num_batched_tokens: Some(isl as u64),
..Default::default()
},
);
cfg_tx.send(configs.clone()).unwrap();
// Register worker 20 (worker 10 must NOT be evicted)
let mut dp2 = std::collections::HashMap::new();
dp2.insert(20_u64, (0_u32, 1_u32));
slots.register_external_workers(&dp2);
configs.insert(
20_u64,
SimpleWorkerConfig {
max_num_batched_tokens: Some(isl as u64),
..Default::default()
},
);
cfg_tx.send(configs).unwrap();
// Send enough requests to statistically prove both workers are available
let mut seen = std::collections::HashSet::new();
for i in 0..20 {
let req_id = format!("add-{i}");
let (req, rx) = make_request(&req_id, isl);
queue.enqueue(req).await;
let resp = rx
.await
.expect("oneshot dropped")
.expect("scheduling failed");
seen.insert(resp.best_worker.worker_id);
slots.mark_prefill_completed(&req_id).await.unwrap();
slots.free(&req_id).await.unwrap();
}
assert!(
seen.contains(&10) && seen.contains(&20),
"both workers should be reachable after additive registration, saw: {seen:?}"
);
}
/// Requests with allowed_worker_ids should only route to the specified subset.
#[tokio::test(flavor = "multi_thread")]
async fn test_allowed_worker_ids_filter() {
let block_size = 16;
let isl = 256;
let (queue, slots, cfg_tx) = make_queue_with_sender(0, block_size, isl, None);
// Register three workers
let mut dp = std::collections::HashMap::new();
dp.insert(1_u64, (0_u32, 1_u32));
dp.insert(2_u64, (0_u32, 1_u32));
dp.insert(3_u64, (0_u32, 1_u32));
slots.register_external_workers(&dp);
let mut configs = HashMap::new();
for &id in &[1_u64, 2_u64, 3_u64] {
configs.insert(
id,
SimpleWorkerConfig {
max_num_batched_tokens: Some(isl as u64),
..Default::default()
},
);
}
cfg_tx.send(configs).unwrap();
// Send a request with allowed_worker_ids = {2} only
let mut allowed = std::collections::HashSet::new();
allowed.insert(2_u64);
let (tx, rx) = tokio::sync::oneshot::channel();
let req = SchedulingRequest {
maybe_request_id: Some("filter-0".to_string()),
token_seq: None,
isl_tokens: isl,
overlaps: OverlapScores::default(),
decode_blocks: HashMap::new(),
prefill_tokens: HashMap::new(),
router_config_override: None,
update_states: true,
lora_name: None,
priority_jump: 0.0,
expected_output_tokens: None,
allowed_worker_ids: Some(allowed),
resp_tx: Some(tx),
};
queue.enqueue(req).await;
let resp = rx
.await
.expect("oneshot dropped")
.expect("scheduling failed");
assert_eq!(
resp.best_worker.worker_id, 2,
"request must be routed to allowed worker 2, got {}",
resp.best_worker.worker_id
);
slots
.mark_prefill_completed(&"filter-0".to_string())
.await
.unwrap();
slots.free(&"filter-0".to_string()).await.unwrap();
}
}
......@@ -24,13 +24,20 @@ from tests.router.helper import (
wait_for_indexer_workers_active,
wait_for_workers_ready,
)
from tests.router.router_process import FrontendRouterProcess, KVRouterProcess
from tests.router.router_process import (
DirectRouterProcess,
FrontendRouterProcess,
KVRouterProcess,
)
if TYPE_CHECKING:
from tests.conftest import NatsServer
logger = logging.getLogger(__name__)
NUM_REQUESTS = 100
BLOCK_SIZE = 16
########################################################
# Test templates
......@@ -2012,3 +2019,155 @@ def _test_busy_threshold_endpoint(
logger.info("All busy_threshold endpoint tests passed!")
asyncio.run(test_busy_threshold_api())
def _test_disagg_direct_mode(
prefill_workers,
decode_workers,
request,
frontend_port: int,
test_payload: dict,
request_plane: str = "nats",
):
"""E2E test for disaggregated Direct routing mode (simulating GAIE EPP).
In Direct mode, the router does not select workers itself.
Worker IDs must be provided via x-worker-instance-id and x-prefill-instance-id
HTTP headers. The test verifies:
1. Requests with explicit worker ID headers succeed and return a valid response.
2. Requests without headers fail (Direct mode rejects unaddressed requests).
Args:
prefill_workers: Prefill mocker workers (already started).
decode_workers: Decode mocker workers (already started).
request: Pytest request fixture.
frontend_port: Port for the Direct-mode frontend HTTP server.
test_payload: Base test payload for /v1/chat/completions.
request_plane: Transport for request plane ("nats" or "tcp").
"""
with DirectRouterProcess(
request,
frontend_port,
decode_workers.namespace,
enforce_disagg=True,
request_plane=request_plane,
):
frontend_url = f"http://localhost:{frontend_port}"
chat_url = f"{frontend_url}/v1/chat/completions"
logger.info("Waiting for models to appear in Direct-mode frontend...")
async def wait_for_models():
models_url = f"{frontend_url}/v1/models"
for _ in range(120):
try:
async with aiohttp.ClientSession() as session:
async with session.get(models_url) as response:
if response.status == 200:
data = await response.json()
models = data.get("data", [])
if models:
logger.info(
f"Models registered: {[m.get('id') for m in models]}"
)
return
except Exception as e:
logger.debug(f"Error checking models endpoint: {e}")
await asyncio.sleep(1)
raise TimeoutError("Timeout waiting for models in Direct-mode frontend")
asyncio.run(wait_for_models())
# Phase 2: Discover worker IDs via the runtime
runtime = get_runtime(request_plane=request_plane)
prefill_endpoint = runtime.endpoint(
f"{decode_workers.namespace}.prefill.generate"
)
decode_endpoint = runtime.endpoint(
f"{decode_workers.namespace}.backend.generate"
)
async def discover_workers():
prefill_client = await prefill_endpoint.client()
decode_client = await decode_endpoint.client()
for _ in range(60):
p_ids = prefill_client.instance_ids()
d_ids = decode_client.instance_ids()
if p_ids and d_ids:
return p_ids, d_ids
await asyncio.sleep(0.5)
raise TimeoutError(
f"Timeout discovering workers: prefill={p_ids}, decode={d_ids}"
)
prefill_ids, decode_ids = asyncio.run(discover_workers())
logger.info(f"Discovered prefill workers: {prefill_ids}")
logger.info(f"Discovered decode workers: {decode_ids}")
target_prefill = prefill_ids[0]
target_decode = decode_ids[0]
async def run_direct_mode_tests():
# Test 1: Request WITH correct headers should succeed.
# In direct mode the router is a passthrough — it does not have a
# KvRouter and does not record worker IDs on the RequestTracker, so
# the response's nvext will not contain worker_id info. We only
# verify that the request is routed successfully (HTTP 200) and
# produces a valid chat completion response.
payload = {
**test_payload,
"stream": False,
}
headers = {
"x-worker-instance-id": str(target_decode),
"x-prefill-instance-id": str(target_prefill),
}
async with aiohttp.ClientSession() as session:
# Retry a few times to allow the pipeline to warm up
for attempt in range(10):
async with session.post(
chat_url, json=payload, headers=headers
) as response:
if response.status == 200:
data = await response.json()
logger.info(
f"Direct-mode response (attempt {attempt + 1}): "
f"status=200, model={data.get('model')}"
)
assert (
"choices" in data
), "Expected 'choices' in response data"
assert (
len(data["choices"]) > 0
), "Expected at least one choice in response"
break
else:
logger.info(
f"Direct-mode attempt {attempt + 1} returned "
f"status {response.status}, retrying..."
)
await asyncio.sleep(2)
else:
raise AssertionError(
"Direct-mode request with headers never returned 200"
)
# Test 2: Request WITHOUT headers should fail (Direct mode
# rejects requests that have no worker ID)
logger.info(
"Sending request without headers (should fail in Direct mode)..."
)
no_header_payload = {**test_payload, "stream": False}
async with session.post(chat_url, json=no_header_payload) as response:
assert response.status != 200, (
f"Expected non-200 status without routing headers in Direct mode, "
f"got {response.status}. Direct mode must reject unaddressed requests."
)
logger.info(
f"Correctly rejected headerless request: status={response.status}"
)
asyncio.run(run_direct_mode_tests())
logger.info("Direct-mode disagg E2E test passed")
......@@ -89,6 +89,60 @@ class FrontendRouterProcess(ManagedProcess):
super().__exit__(exc_type, exc_val, exc_tb)
class DirectRouterProcess(ManagedProcess):
"""Manages a process in Direct routing mode for EPP-style disagg tests.
In Direct mode, the router does not select workers itself — worker IDs
must be supplied via x-worker-instance-id and x-prefill-instance-id headers.
"""
def __init__(
self,
request,
frontend_port: int,
namespace: str,
enforce_disagg: bool = True,
request_plane: str = "nats",
):
command = [
"python3",
"-m",
"dynamo.frontend",
"--router-mode",
"direct",
"--http-port",
str(frontend_port),
"--namespace",
namespace,
]
if enforce_disagg:
command.append("--enforce-disagg")
env = os.environ.copy()
env["DYN_REQUEST_PLANE"] = request_plane
super().__init__(
command=command,
env=env,
timeout=60,
display_output=True,
health_check_ports=[frontend_port],
health_check_urls=[
(f"http://localhost:{frontend_port}/v1/models", self._check_ready)
],
log_dir=request.node.name,
terminate_all_matching_process_names=False,
)
self.port = frontend_port
def _check_ready(self, response):
return response.status_code == 200
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
# Backward-compatible alias so existing callers that import KVRouterProcess
# continue to work without changes.
KVRouterProcess = FrontendRouterProcess
......@@ -23,6 +23,7 @@ import pytest
from tests.router.common import (
_test_busy_threshold_endpoint,
_test_disagg_direct_mode,
_test_python_router_bindings,
_test_router_basic,
_test_router_decisions,
......@@ -1218,3 +1219,61 @@ def test_busy_threshold_endpoint(
test_payload=TEST_PAYLOAD,
request_plane=request_plane,
)
@pytest.mark.timeout(180)
def test_disagg_direct_mode_epp_headers(
request,
runtime_services_dynamic_ports,
predownload_tokenizers,
):
"""E2E: disaggregated serving with Direct routing mode (simulating GAIE EPP).
This test verifies the EPP-driven routing path used in the GAIE deploy recipe:
- Frontend runs with --router-mode direct (no autonomous worker selection)
- Worker IDs are supplied via x-worker-instance-id / x-prefill-instance-id headers
Validates:
1. Requests with explicit headers succeed and report correct worker IDs
2. Requests without headers are rejected (Direct mode enforces header routing)
"""
logger.info("Starting disaggregated Direct-mode EPP headers E2E test")
namespace_suffix = generate_random_suffix()
shared_namespace = f"test-namespace-{namespace_suffix}"
mocker_args = {
"speedup_ratio": SPEEDUP_RATIO,
"block_size": BLOCK_SIZE,
}
with DisaggMockerProcess(
request,
namespace=shared_namespace,
worker_type="prefill",
mocker_args=mocker_args,
num_mockers=2,
request_plane="nats",
) as prefill_workers:
logger.info(f"Prefill workers using endpoint: {prefill_workers.endpoint}")
with DisaggMockerProcess(
request,
namespace=shared_namespace,
worker_type="decode",
mocker_args=mocker_args,
num_mockers=2,
request_plane="nats",
) as decode_workers:
logger.info(f"Decode workers using endpoint: {decode_workers.endpoint}")
frontend_port = get_unique_ports(request, num_ports=1)[0]
_test_disagg_direct_mode(
prefill_workers=prefill_workers,
decode_workers=decode_workers,
request=request,
frontend_port=frontend_port,
test_payload=TEST_PAYLOAD,
request_plane="nats",
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment