"vscode:/vscode.git/clone" did not exist on "44e8600a8dfde42dd47243d2bb212d7d43185242"
Unverified Commit 77841e7b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test: remove redundant mocker e2e test in rust (#4720)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent dab2de67
...@@ -818,8 +818,8 @@ impl WorkerMetricsPublisher { ...@@ -818,8 +818,8 @@ impl WorkerMetricsPublisher {
tokio::spawn(async move { tokio::spawn(async move {
let mut rx = nats_rx; let mut rx = nats_rx;
let mut last_kv_active_blocks: Option<u64> = None; let mut last_kv_active_blocks: Option<u64> = Some(0);
let mut last_num_requests_waiting: Option<u64> = None; let mut last_num_requests_waiting: Option<u64> = Some(0);
let mut pending_publish: Option<Arc<ForwardPassMetrics>> = None; let mut pending_publish: Option<Arc<ForwardPassMetrics>> = None;
let mut publish_timer = let mut publish_timer =
Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(0))); Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(0)));
......
...@@ -433,213 +433,3 @@ pub async fn make_mocker_engine( ...@@ -433,213 +433,3 @@ pub async fn make_mocker_engine(
Ok(Arc::new(annotated_engine)) Ok(Arc::new(annotated_engine))
} }
#[cfg(test)]
mod integration_tests {
use super::*;
use crate::kv_router::KV_EVENT_SUBJECT;
use crate::kv_router::indexer::RouterEvent;
use crate::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
use dynamo_runtime::{
DistributedRuntime, Worker,
pipeline::Context,
pipeline::{PushRouter, network::Ingress},
traits::events::EventSubscriber,
};
use futures::StreamExt;
use tokio::time::timeout;
#[tokio::test]
#[ignore] // Run with: cargo test -- --ignored
async fn test_mock_vllm_engine_full_integration() -> Result<()> {
const DP_SIZE: u32 = 2;
const TOKENS_PER_REQUEST: usize = 20;
const BLOCK_SIZE: usize = 2;
// Create runtime and distributed runtime
let worker = Worker::from_settings()?;
let runtime = worker.runtime();
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
tracing::info!("✓ Runtime and distributed runtime created");
// Create component for MockVllmEngine (needed for publishers)
let test_component = distributed.namespace("test")?.component(MOCKER_COMPONENT)?;
tracing::info!("✓ Test component created");
// Create MockVllmEngine WITH component (enables publishers)
let args = MockEngineArgs::builder()
.speedup_ratio(10.0)
.dp_size(DP_SIZE)
.block_size(BLOCK_SIZE)
.build()
.unwrap();
let engine = MockVllmEngine::new(args);
engine.start(test_component.clone()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
let engine = Arc::new(engine);
tracing::info!("✓ MockVllmEngine created with DP_SIZE: {DP_SIZE}");
// Set up KV events subscriber
let mut kv_events_subscriber = test_component.subscribe(KV_EVENT_SUBJECT).await?;
tracing::info!("✓ KV events subscriber created");
// Wrap with Ingress and register with component/endpoint
let ingress = Ingress::for_engine(engine)?;
tracing::info!("✓ Ingress wrapper created");
// Start the server in background
let server_handle = tokio::spawn({
let test_component = test_component.clone();
async move {
if let Err(e) = test_component
.endpoint("generate")
.endpoint_builder()
.handler(ingress)
.start()
.await
{
eprintln!("❌ Generate endpoint failed: {e}");
}
}
});
tracing::info!("✓ Server started in background");
// Give server time to start
tokio::time::sleep(Duration::from_millis(500)).await;
tracing::info!("✓ Server startup delay completed");
// Print all registered instances from etcd
match test_component.list_instances().await {
Ok(instances) => {
tracing::info!("📋 Found {} registered instances:", instances.len());
for instance in instances {
tracing::info!(
" • {}/{}/{} (ID: {})",
instance.namespace,
instance.component,
instance.endpoint,
instance.instance_id
);
}
}
Err(e) => {
tracing::error!("❌ Failed to list instances: {e}");
}
}
// Create client
let client = distributed
.namespace("test")?
.component(MOCKER_COMPONENT)?
.endpoint("generate")
.client()
.await?;
tracing::info!("✓ Client created");
let router = PushRouter::from_client(client, Default::default()).await?;
tracing::info!("✓ Router created");
// Create test requests for both DP workers
let create_request = |tokens: Vec<TokenIdType>, dp_rank: u32| {
PreprocessedRequest::builder()
.model("mock".to_string())
.token_ids(tokens)
.stop_conditions(StopConditions {
max_tokens: Some(TOKENS_PER_REQUEST as u32),
..Default::default()
})
.sampling_options(SamplingOptions::default())
.output_options(OutputOptions::default())
.eos_token_ids(vec![])
.annotations(vec![format!("dp_rank:{dp_rank}")])
.build()
.unwrap()
};
let requests = vec![
create_request(vec![1, 2, 3, 4, 5], 0),
create_request(vec![1, 2, 3, 4, 5], 0),
create_request(vec![1, 2, 3, 4, 5], 1),
create_request(vec![1, 2, 3, 4, 5], 1),
];
tracing::info!(
"✓ Test requests created ({} requests total)",
requests.len()
);
// Test each request
for (i, request) in requests.into_iter().enumerate() {
tracing::info!("Testing request {}", i + 1);
let response_stream = router.generate(Context::new(request)).await?;
let responses: Vec<LLMEngineOutput> = response_stream.collect().await;
// Should have at least one response
assert!(
!responses.is_empty(),
"Request {} should produce at least one response",
i + 1
);
// Count total tokens generated (excluding final message)
let mut total_tokens = 0;
let mut has_finish_reason = false;
for response in &responses {
total_tokens += response.token_ids.len();
if response.finish_reason.is_some() {
has_finish_reason = true;
}
}
// Should have a finish reason in the last response
assert!(
has_finish_reason,
"Request {} should have a finish reason",
i + 1
);
// Verify we got approximately the expected number of tokens
assert!(
total_tokens <= TOKENS_PER_REQUEST + 1, // +1 for potential final empty response
"Request {} generated {} tokens, expected at most {}",
i + 1,
total_tokens,
TOKENS_PER_REQUEST + 1
);
tracing::info!(
"✓ Request {} completed successfully with {} tokens",
i + 1,
total_tokens
);
}
tracing::info!("🎉 All requests completed successfully!");
// Try to receive at least one KV event with 100ms timeout
tracing::info!("Waiting for KV event with 100ms timeout...");
let msg = timeout(Duration::from_millis(100), kv_events_subscriber.next())
.await
.map_err(|_| Error::msg("Timeout waiting for KV event"))?
.ok_or_else(|| Error::msg("KV events stream ended unexpectedly"))?;
match serde_json::from_slice::<RouterEvent>(&msg.payload) {
Ok(event) => {
tracing::info!("✓ Received KV event: {event:?}");
}
Err(e) => {
return Err(Error::msg(format!("Failed to deserialize KV event: {e}")));
}
}
tracing::info!("🎉 Event verification completed!");
// Cleanup
distributed.shutdown();
server_handle.await?;
Ok(())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment