Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
zhaoyu6
sglang
Commits
33c4b4d0
Unverified
Commit
33c4b4d0
authored
Jul 24, 2025
by
Simo Lin
Committed by
GitHub
Jul 24, 2025
Browse files
[router] add streaming unit test (#8299)
parent
8d1c5b94
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
579 additions
and
0 deletions
+579
-0
sgl-router/tests/streaming_tests.rs
sgl-router/tests/streaming_tests.rs
+579
-0
No files found.
sgl-router/tests/streaming_tests.rs
0 → 100644
View file @
33c4b4d0
mod
common
;
use
actix_web
::{
http
::
StatusCode
,
rt
::
System
,
test
as
actix_test
,
web
,
App
};
use
bytes
::
Bytes
;
use
common
::
mock_worker
::{
HealthStatus
,
MockWorker
,
MockWorkerConfig
,
WorkerType
};
use
reqwest
::
Client
;
use
serde_json
::
json
;
use
sglang_router_rs
::
config
::{
PolicyConfig
,
RouterConfig
,
RoutingMode
};
use
sglang_router_rs
::
server
::{
add_worker
,
generate
,
list_workers
,
v1_chat_completions
,
v1_completions
,
AppState
,
};
use
std
::
time
::
Instant
;
/// Test context for streaming tests
struct
StreamingTestContext
{
workers
:
Vec
<
MockWorker
>
,
app_state
:
web
::
Data
<
AppState
>
,
}
impl
StreamingTestContext
{
async
fn
new
(
worker_configs
:
Vec
<
MockWorkerConfig
>
)
->
Self
{
let
mut
workers
=
Vec
::
new
();
let
mut
worker_urls
=
Vec
::
new
();
// Start mock workers
for
config
in
worker_configs
{
let
mut
worker
=
MockWorker
::
new
(
config
);
let
url
=
worker
.start
()
.await
.unwrap
();
worker_urls
.push
(
url
);
workers
.push
(
worker
);
}
// Give workers time to start
tokio
::
time
::
sleep
(
tokio
::
time
::
Duration
::
from_millis
(
50
))
.await
;
// Create router config with empty worker URLs initially
// We'll add workers via the /add_worker endpoint
let
config
=
RouterConfig
{
mode
:
RoutingMode
::
Regular
{
worker_urls
:
vec!
[],
},
policy
:
PolicyConfig
::
Random
,
host
:
"127.0.0.1"
.to_string
(),
port
:
3003
,
max_payload_size
:
256
*
1024
*
1024
,
request_timeout_secs
:
600
,
worker_startup_timeout_secs
:
1
,
worker_startup_check_interval_secs
:
1
,
discovery
:
None
,
metrics
:
None
,
log_dir
:
None
,
log_level
:
None
,
};
let
client
=
Client
::
builder
()
.timeout
(
std
::
time
::
Duration
::
from_secs
(
config
.request_timeout_secs
))
.build
()
.unwrap
();
let
app_state
=
AppState
::
new
(
config
,
client
)
.unwrap
();
let
app_state
=
web
::
Data
::
new
(
app_state
);
// Add workers via HTTP API
let
app
=
actix_test
::
init_service
(
App
::
new
()
.app_data
(
app_state
.clone
())
.service
(
add_worker
))
.await
;
for
url
in
&
worker_urls
{
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
&
format!
(
"/add_worker?url={}"
,
url
))
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert
!
(
resp
.status
()
.is_success
());
}
tokio
::
time
::
sleep
(
tokio
::
time
::
Duration
::
from_millis
(
100
))
.await
;
Self
{
workers
,
app_state
}
}
async
fn
create_app
(
&
self
,
)
->
impl
actix_web
::
dev
::
Service
<
actix_http
::
Request
,
Response
=
actix_web
::
dev
::
ServiceResponse
,
Error
=
actix_web
::
Error
,
>
{
actix_test
::
init_service
(
App
::
new
()
.app_data
(
self
.app_state
.clone
())
.service
(
generate
)
.service
(
v1_chat_completions
)
.service
(
v1_completions
)
.service
(
list_workers
),
)
.await
}
async
fn
shutdown
(
mut
self
)
{
for
worker
in
&
mut
self
.workers
{
worker
.stop
()
.await
;
}
}
}
/// Parse SSE (Server-Sent Events) from response body
async
fn
parse_sse_stream
(
body
:
Bytes
)
->
Vec
<
serde_json
::
Value
>
{
let
text
=
String
::
from_utf8_lossy
(
&
body
);
let
mut
events
=
Vec
::
new
();
for
line
in
text
.lines
()
{
if
line
.starts_with
(
"data: "
)
{
let
data
=
&
line
[
6
..
];
if
data
==
"[DONE]"
{
continue
;
}
if
let
Ok
(
json
)
=
serde_json
::
from_str
::
<
serde_json
::
Value
>
(
data
)
{
events
.push
(
json
);
}
}
}
events
}
#[cfg(test)]
mod
basic_streaming_tests
{
use
super
::
*
;
#[test]
fn
test_router_uses_mock_workers
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19000
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
// Verify workers are registered with the router
let
req
=
actix_test
::
TestRequest
::
get
()
.uri
(
"/list_workers"
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
let
body
:
serde_json
::
Value
=
actix_test
::
read_body_json
(
resp
)
.await
;
let
urls
=
body
[
"urls"
]
.as_array
()
.unwrap
();
assert_eq!
(
urls
.len
(),
1
);
assert
!
(
urls
[
0
]
.as_str
()
.unwrap
()
.contains
(
"19000"
));
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_generate_streaming
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19001
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"text"
:
"Hello, streaming world!"
,
"stream"
:
true
,
"max_new_tokens"
:
50
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
// Check content type
let
content_type
=
resp
.headers
()
.get
(
"content-type"
)
.unwrap
();
assert_eq!
(
content_type
,
"text/event-stream"
);
// Read streaming body
let
body
=
actix_test
::
read_body
(
resp
)
.await
;
let
events
=
parse_sse_stream
(
body
)
.await
;
// Verify we got multiple chunks
assert
!
(
events
.len
()
>
1
);
// Verify first chunk has text
assert
!
(
events
[
0
]
.get
(
"text"
)
.is_some
());
// Verify last chunk has finish_reason in meta_info
let
last_event
=
events
.last
()
.unwrap
();
assert
!
(
last_event
.get
(
"meta_info"
)
.is_some
());
let
meta_info
=
&
last_event
[
"meta_info"
];
assert
!
(
meta_info
.get
(
"finish_reason"
)
.is_some
());
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_chat_completion_streaming
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19002
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"model"
:
"test-model"
,
"messages"
:
[
{
"role"
:
"user"
,
"content"
:
"Hello, streaming!"
}
],
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/v1/chat/completions"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
assert_eq!
(
resp
.headers
()
.get
(
"content-type"
)
.unwrap
(),
"text/event-stream"
);
let
body
=
actix_test
::
read_body
(
resp
)
.await
;
let
events
=
parse_sse_stream
(
body
)
.await
;
// Verify we got streaming events
// Note: Mock doesn't provide full OpenAI format, just verify we got chunks
assert
!
(
!
events
.is_empty
(),
"Should have received streaming events"
);
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_completion_streaming
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19003
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"model"
:
"test-model"
,
"prompt"
:
"Once upon a time"
,
"stream"
:
true
,
"max_tokens"
:
30
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/v1/completions"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
assert_eq!
(
resp
.headers
()
.get
(
"content-type"
)
.unwrap
(),
"text/event-stream"
);
let
_
body
=
actix_test
::
read_body
(
resp
)
.await
;
ctx
.shutdown
()
.await
;
});
}
}
#[cfg(test)]
mod
streaming_performance_tests
{
use
super
::
*
;
#[test]
fn
test_streaming_first_token_latency
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19010
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
10
,
// Small delay to simulate processing
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"text"
:
"Measure latency"
,
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
start
=
Instant
::
now
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
// Note: actix_test framework doesn't provide easy access to streaming chunks.
// The ideal solution would be to:
// 1. Start the router as a real HTTP server
// 2. Use reqwest::Client to make streaming requests
// 3. Measure time to first chunk properly
//
// For now, we verify that streaming responses work correctly,
// but cannot accurately measure TTFT with actix_test.
let
body
=
actix_test
::
read_body
(
resp
)
.await
;
let
total_time
=
start
.elapsed
();
// Verify we got streaming data
let
events
=
parse_sse_stream
(
body
)
.await
;
assert
!
(
!
events
.is_empty
(),
"Should receive streaming events"
);
// With mock worker delay of 10ms, total time should still be reasonable
assert
!
(
total_time
.as_millis
()
<
1000
,
"Total response took {}ms"
,
total_time
.as_millis
()
);
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_concurrent_streaming_requests
()
{
System
::
new
()
.block_on
(
async
{
// Test basic concurrent streaming functionality
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19050
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
},
MockWorkerConfig
{
port
:
19051
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
},
])
.await
;
let
app
=
ctx
.create_app
()
.await
;
// Send a moderate number of concurrent requests for unit testing
use
futures
::
future
::
join_all
;
let
mut
futures
=
Vec
::
new
();
for
i
in
0
..
20
{
let
app_ref
=
&
app
;
let
future
=
async
move
{
let
payload
=
json!
({
"text"
:
format!
(
"Concurrent request {}"
,
i
),
"stream"
:
true
,
"max_new_tokens"
:
5
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
app_ref
,
req
)
.await
;
resp
.status
()
==
StatusCode
::
OK
};
futures
.push
(
future
);
}
let
results
=
join_all
(
futures
)
.await
;
let
successful
=
results
.iter
()
.filter
(|
&&
r
|
r
)
.count
();
// All requests should succeed in a unit test environment
assert_eq!
(
successful
,
20
,
"Expected all 20 requests to succeed, got {}"
,
successful
);
ctx
.shutdown
()
.await
;
});
}
// Note: Extreme load testing has been moved to benches/streaming_load_test.rs
// Run with: cargo run --release --bin streaming_load_test 10000 10
// Or: cargo bench streaming_load_test
}
#[cfg(test)]
mod
streaming_error_tests
{
use
super
::
*
;
#[test]
fn
test_streaming_with_worker_failure
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19020
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
1.0
,
// Always fail
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"text"
:
"This should fail"
,
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
INTERNAL_SERVER_ERROR
);
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_streaming_with_invalid_payload
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19021
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
// Missing required fields
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
// TODO: Router should validate payload and reject requests with missing content fields
// Currently, the router accepts requests with no prompt/text/input_ids which is a bug
// This should return StatusCode::BAD_REQUEST once proper validation is implemented
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
ctx
.shutdown
()
.await
;
});
}
}
#[cfg(test)]
mod
streaming_content_tests
{
use
super
::
*
;
#[test]
fn
test_unicode_streaming
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19030
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"text"
:
"Test Unicode: 你好世界 🌍 émojis"
,
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
let
body
=
actix_test
::
read_body
(
resp
)
.await
;
let
events
=
parse_sse_stream
(
body
)
.await
;
// Verify events were parsed correctly (Unicode didn't break parsing)
assert
!
(
!
events
.is_empty
());
ctx
.shutdown
()
.await
;
});
}
#[test]
fn
test_incremental_text_building
()
{
System
::
new
()
.block_on
(
async
{
let
ctx
=
StreamingTestContext
::
new
(
vec!
[
MockWorkerConfig
{
port
:
19031
,
worker_type
:
WorkerType
::
Regular
,
health_status
:
HealthStatus
::
Healthy
,
response_delay_ms
:
0
,
fail_rate
:
0.0
,
}])
.await
;
let
app
=
ctx
.create_app
()
.await
;
let
payload
=
json!
({
"text"
:
"Build text incrementally"
,
"stream"
:
true
});
let
req
=
actix_test
::
TestRequest
::
post
()
.uri
(
"/generate"
)
.set_json
(
&
payload
)
.to_request
();
let
resp
=
actix_test
::
call_service
(
&
app
,
req
)
.await
;
assert_eq!
(
resp
.status
(),
StatusCode
::
OK
);
let
body
=
actix_test
::
read_body
(
resp
)
.await
;
let
events
=
parse_sse_stream
(
body
)
.await
;
// Build complete text from chunks
let
mut
complete_text
=
String
::
new
();
for
event
in
&
events
{
if
let
Some
(
text
)
=
event
.get
(
"text"
)
.and_then
(|
t
|
t
.as_str
())
{
complete_text
.push_str
(
text
);
}
}
// Verify we got some text
assert
!
(
!
complete_text
.is_empty
());
ctx
.shutdown
()
.await
;
});
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment