mod common; use actix_web::{http::StatusCode, rt::System, test as actix_test, web, App}; use bytes::Bytes; use common::mock_worker::{HealthStatus, MockWorker, MockWorkerConfig, WorkerType}; use reqwest::Client; use serde_json::json; use sglang_router_rs::config::{PolicyConfig, RouterConfig, RoutingMode}; use sglang_router_rs::server::{ add_worker, generate, list_workers, v1_chat_completions, v1_completions, AppState, }; use std::time::Instant; /// Test context for streaming tests struct StreamingTestContext { workers: Vec, app_state: web::Data, } impl StreamingTestContext { async fn new(worker_configs: Vec) -> Self { let mut workers = Vec::new(); let mut worker_urls = Vec::new(); // Start mock workers for config in worker_configs { let mut worker = MockWorker::new(config); let url = worker.start().await.unwrap(); worker_urls.push(url); workers.push(worker); } // Give workers time to start tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Create router config with empty worker URLs initially // We'll add workers via the /add_worker endpoint let config = RouterConfig { mode: RoutingMode::Regular { worker_urls: vec![], }, policy: PolicyConfig::Random, host: "127.0.0.1".to_string(), port: 3003, max_payload_size: 256 * 1024 * 1024, request_timeout_secs: 600, worker_startup_timeout_secs: 1, worker_startup_check_interval_secs: 1, dp_aware: false, api_key: None, discovery: None, metrics: None, log_dir: None, log_level: None, request_id_headers: None, }; let client = Client::builder() .timeout(std::time::Duration::from_secs(config.request_timeout_secs)) .build() .unwrap(); let app_state = AppState::new(config, client).unwrap(); let app_state = web::Data::new(app_state); // Add workers via HTTP API let app = actix_test::init_service(App::new().app_data(app_state.clone()).service(add_worker)) .await; for url in &worker_urls { let req = actix_test::TestRequest::post() .uri(&format!("/add_worker?url={}", url)) .to_request(); let resp = actix_test::call_service(&app, req).await; assert!(resp.status().is_success()); } tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; Self { workers, app_state } } async fn create_app( &self, ) -> impl actix_web::dev::Service< actix_http::Request, Response = actix_web::dev::ServiceResponse, Error = actix_web::Error, > { actix_test::init_service( App::new() .app_data(self.app_state.clone()) .service(generate) .service(v1_chat_completions) .service(v1_completions) .service(list_workers), ) .await } async fn shutdown(mut self) { for worker in &mut self.workers { worker.stop().await; } } } /// Parse SSE (Server-Sent Events) from response body async fn parse_sse_stream(body: Bytes) -> Vec { let text = String::from_utf8_lossy(&body); let mut events = Vec::new(); for line in text.lines() { if line.starts_with("data: ") { let data = &line[6..]; if data == "[DONE]" { continue; } if let Ok(json) = serde_json::from_str::(data) { events.push(json); } } } events } #[cfg(test)] mod basic_streaming_tests { use super::*; #[test] fn test_router_uses_mock_workers() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19000, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; // Verify workers are registered with the router let req = actix_test::TestRequest::get() .uri("/list_workers") .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); let body: serde_json::Value = actix_test::read_body_json(resp).await; let urls = body["urls"].as_array().unwrap(); assert_eq!(urls.len(), 1); assert!(urls[0].as_str().unwrap().contains("19000")); ctx.shutdown().await; }); } #[test] fn test_generate_streaming() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19001, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "text": "Hello, streaming world!", "stream": true, "max_new_tokens": 50 }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); // Check content type let content_type = resp.headers().get("content-type").unwrap(); assert_eq!(content_type, "text/event-stream"); // Read streaming body let body = actix_test::read_body(resp).await; let events = parse_sse_stream(body).await; // Verify we got multiple chunks assert!(events.len() > 1); // Verify first chunk has text assert!(events[0].get("text").is_some()); // Verify last chunk has finish_reason in meta_info let last_event = events.last().unwrap(); assert!(last_event.get("meta_info").is_some()); let meta_info = &last_event["meta_info"]; assert!(meta_info.get("finish_reason").is_some()); ctx.shutdown().await; }); } #[test] fn test_chat_completion_streaming() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19002, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "model": "test-model", "messages": [ {"role": "user", "content": "Hello, streaming!"} ], "stream": true }); let req = actix_test::TestRequest::post() .uri("/v1/chat/completions") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); assert_eq!( resp.headers().get("content-type").unwrap(), "text/event-stream" ); let body = actix_test::read_body(resp).await; let events = parse_sse_stream(body).await; // Verify we got streaming events // Note: Mock doesn't provide full OpenAI format, just verify we got chunks assert!(!events.is_empty(), "Should have received streaming events"); ctx.shutdown().await; }); } #[test] fn test_completion_streaming() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19003, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "model": "test-model", "prompt": "Once upon a time", "stream": true, "max_tokens": 30 }); let req = actix_test::TestRequest::post() .uri("/v1/completions") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); assert_eq!( resp.headers().get("content-type").unwrap(), "text/event-stream" ); let _body = actix_test::read_body(resp).await; ctx.shutdown().await; }); } } #[cfg(test)] mod streaming_performance_tests { use super::*; #[test] fn test_streaming_first_token_latency() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19010, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 10, // Small delay to simulate processing fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "text": "Measure latency", "stream": true }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let start = Instant::now(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); // Note: actix_test framework doesn't provide easy access to streaming chunks. // The ideal solution would be to: // 1. Start the router as a real HTTP server // 2. Use reqwest::Client to make streaming requests // 3. Measure time to first chunk properly // // For now, we verify that streaming responses work correctly, // but cannot accurately measure TTFT with actix_test. let body = actix_test::read_body(resp).await; let total_time = start.elapsed(); // Verify we got streaming data let events = parse_sse_stream(body).await; assert!(!events.is_empty(), "Should receive streaming events"); // With mock worker delay of 10ms, total time should still be reasonable assert!( total_time.as_millis() < 1000, "Total response took {}ms", total_time.as_millis() ); ctx.shutdown().await; }); } #[test] fn test_concurrent_streaming_requests() { System::new().block_on(async { // Test basic concurrent streaming functionality let ctx = StreamingTestContext::new(vec![ MockWorkerConfig { port: 19050, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }, MockWorkerConfig { port: 19051, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }, ]) .await; let app = ctx.create_app().await; // Send a moderate number of concurrent requests for unit testing use futures::future::join_all; let mut futures = Vec::new(); for i in 0..20 { let app_ref = &app; let future = async move { let payload = json!({ "text": format!("Concurrent request {}", i), "stream": true, "max_new_tokens": 5 }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(app_ref, req).await; resp.status() == StatusCode::OK }; futures.push(future); } let results = join_all(futures).await; let successful = results.iter().filter(|&&r| r).count(); // All requests should succeed in a unit test environment assert_eq!( successful, 20, "Expected all 20 requests to succeed, got {}", successful ); ctx.shutdown().await; }); } // Note: Extreme load testing has been moved to benches/streaming_load_test.rs // Run with: cargo run --release --bin streaming_load_test 10000 10 // Or: cargo bench streaming_load_test } #[cfg(test)] mod streaming_error_tests { use super::*; #[test] fn test_streaming_with_worker_failure() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19020, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 1.0, // Always fail }]) .await; let app = ctx.create_app().await; let payload = json!({ "text": "This should fail", "stream": true }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); ctx.shutdown().await; }); } #[test] fn test_streaming_with_invalid_payload() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19021, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ // Missing required fields "stream": true }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; // TODO: Router should validate payload and reject requests with missing content fields // Currently, the router accepts requests with no prompt/text/input_ids which is a bug // This should return StatusCode::BAD_REQUEST once proper validation is implemented assert_eq!(resp.status(), StatusCode::OK); ctx.shutdown().await; }); } } #[cfg(test)] mod streaming_content_tests { use super::*; #[test] fn test_unicode_streaming() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19030, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "text": "Test Unicode: 你好世界 🌍 émojis", "stream": true }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); let body = actix_test::read_body(resp).await; let events = parse_sse_stream(body).await; // Verify events were parsed correctly (Unicode didn't break parsing) assert!(!events.is_empty()); ctx.shutdown().await; }); } #[test] fn test_incremental_text_building() { System::new().block_on(async { let ctx = StreamingTestContext::new(vec![MockWorkerConfig { port: 19031, worker_type: WorkerType::Regular, health_status: HealthStatus::Healthy, response_delay_ms: 0, fail_rate: 0.0, }]) .await; let app = ctx.create_app().await; let payload = json!({ "text": "Build text incrementally", "stream": true }); let req = actix_test::TestRequest::post() .uri("/generate") .set_json(&payload) .to_request(); let resp = actix_test::call_service(&app, req).await; assert_eq!(resp.status(), StatusCode::OK); let body = actix_test::read_body(resp).await; let events = parse_sse_stream(body).await; // Build complete text from chunks let mut complete_text = String::new(); for event in &events { if let Some(text) = event.get("text").and_then(|t| t.as_str()) { complete_text.push_str(text); } } // Verify we got some text assert!(!complete_text.is_empty()); ctx.shutdown().await; }); } }