"lib/ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "6f7f6b12df9e892987cee48926609efb6faab169"
Unverified Commit db79f324 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

fix: Request Rejection busy signal passage (#7615)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent b6317af6
...@@ -502,8 +502,18 @@ impl ModelWatcher { ...@@ -502,8 +502,18 @@ impl ModelWatcher {
// monitor (1-to-1) since each monitor is scoped to this WorkerSet's Client/namespace. // monitor (1-to-1) since each monitor is scoped to this WorkerSet's Client/namespace.
// The monitor tracks Prometheus metrics (active_decode_blocks, active_prefill_tokens, // The monitor tracks Prometheus metrics (active_decode_blocks, active_prefill_tokens,
// worker TTFT/ITL cleanup). The thresholds control busy detection behavior only. // worker TTFT/ITL cleanup). The thresholds control busy detection behavior only.
//
// IMPORTANT: When KV routing is active, the monitor must use the KvRouter's Client
// so that busy-state updates (via update_free_instances) are visible to the
// PushRouter, which also uses the KvRouter's Client (see common.rs:258-263).
// Using a different Client instance would cause the PushRouter to never see
// busy workers, since each Client::new() creates independent ArcSwap state.
let monitor_client = kv_chooser
.as_ref()
.map(|chooser| chooser.client().clone())
.unwrap_or_else(|| client.clone());
let worker_monitor = Some(KvWorkerMonitor::new( let worker_monitor = Some(KvWorkerMonitor::new(
client.clone(), monitor_client,
self.router_config.load_threshold_config.clone(), self.router_config.load_threshold_config.clone(),
)); ));
......
...@@ -677,7 +677,7 @@ def _test_router_overload_503( ...@@ -677,7 +677,7 @@ def _test_router_overload_503(
) )
# Wait briefly to ensure requests are in-flight # Wait briefly to ensure requests are in-flight
await asyncio.sleep(0.2) await asyncio.sleep(0.8)
# Now send one more request that should get 503 # Now send one more request that should get 503
logger.info("Sending additional request that should receive 503...") logger.info("Sending additional request that should receive 503...")
...@@ -687,10 +687,10 @@ def _test_router_overload_503( ...@@ -687,10 +687,10 @@ def _test_router_overload_503(
if status_code == 503: if status_code == 503:
body = await response.json() body = await response.json()
logger.info(f"Got expected 503 response: {body}") logger.info(f"Got expected 503 response: {body}")
assert "Service temporarily unavailable" in body.get( error_msg = body.get("message", "")
"error", "" assert (
) or "All workers are busy" in body.get( "Service temporarily unavailable" in error_msg
"error", "" or "All workers are busy" in error_msg
), f"Expected service overload error message, got: {body}" ), f"Expected service overload error message, got: {body}"
return True return True
else: else:
......
...@@ -762,7 +762,6 @@ def test_mocker_two_kv_router( ...@@ -762,7 +762,6 @@ def test_mocker_two_kv_router(
) )
@pytest.mark.skip(reason="Flaky, temporarily disabled")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"durable_kv_events", [False], ids=["nondurable"], indirect=True "durable_kv_events", [False], ids=["nondurable"], indirect=True
) # Use NATS Core (local indexer) ) # Use NATS Core (local indexer)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment