Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
change
sglang
Commits
a65ca739
Unverified
Commit
a65ca739
authored
Oct 08, 2025
by
Chang Su
Committed by
GitHub
Oct 08, 2025
Browse files
[router][grpc] Cleanup debug logs in grpc_server and grpc_router (#11340)
parent
677aa0e2
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
31 additions
and
102 deletions
+31
-102
python/sglang/srt/entrypoints/grpc_request_manager.py
python/sglang/srt/entrypoints/grpc_request_manager.py
+0
-15
python/sglang/srt/entrypoints/grpc_server.py
python/sglang/srt/entrypoints/grpc_server.py
+27
-19
sgl-router/src/reasoning_parser/parsers/base.rs
sgl-router/src/reasoning_parser/parsers/base.rs
+0
-28
sgl-router/src/routers/grpc/pipeline.rs
sgl-router/src/routers/grpc/pipeline.rs
+0
-31
sgl-router/src/routers/grpc/utils.rs
sgl-router/src/routers/grpc/utils.rs
+4
-9
No files found.
python/sglang/srt/entrypoints/grpc_request_manager.py
View file @
a65ca739
...
@@ -397,9 +397,7 @@ class GrpcRequestManager:
...
@@ -397,9 +397,7 @@ class GrpcRequestManager:
# Wait for result in background
# Wait for result in background
async
def
wait_for_result
():
async
def
wait_for_result
():
try
:
try
:
# Wait for completion
await
state
.
event
.
wait
()
await
state
.
event
.
wait
()
# Get result from queue
result
=
await
state
.
out_queue
.
get
()
result
=
await
state
.
out_queue
.
get
()
future
.
set_result
(
result
)
future
.
set_result
(
result
)
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -437,19 +435,6 @@ class GrpcRequestManager:
...
@@ -437,19 +435,6 @@ class GrpcRequestManager:
return
True
return
True
async
def
pause_generation
(
self
):
"""Pause generation processing."""
async
with
self
.
is_pause_cond
:
self
.
is_pause
=
True
logger
.
info
(
"Generation paused"
)
async
def
resume_generation
(
self
):
"""Resume generation processing."""
async
with
self
.
is_pause_cond
:
self
.
is_pause
=
False
self
.
is_pause_cond
.
notify_all
()
logger
.
info
(
"Generation resumed"
)
async
def
handle_loop
(
self
):
async
def
handle_loop
(
self
):
"""
"""
Main event loop - processes outputs from scheduler.
Main event loop - processes outputs from scheduler.
...
...
python/sglang/srt/entrypoints/grpc_server.py
View file @
a65ca739
...
@@ -189,7 +189,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -189,7 +189,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
# Start the request manager's event loop using auto_create_handle_loop
# Start the request manager's event loop using auto_create_handle_loop
self
.
request_manager
.
auto_create_handle_loop
()
self
.
request_manager
.
auto_create_handle_loop
()
logger
.
info
(
"
Standalone
gRPC scheduler service initialized"
)
logger
.
info
(
"gRPC scheduler service
r
initialized"
)
async
def
Generate
(
async
def
Generate
(
self
,
self
,
...
@@ -197,7 +197,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -197,7 +197,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
context
:
grpc
.
aio
.
ServicerContext
,
context
:
grpc
.
aio
.
ServicerContext
,
)
->
AsyncIterator
[
sglang_scheduler_pb2
.
GenerateResponse
]:
)
->
AsyncIterator
[
sglang_scheduler_pb2
.
GenerateResponse
]:
"""Handle generation requests with streaming responses."""
"""Handle generation requests with streaming responses."""
logger
.
info
(
f
"G
eneration request:
{
request
.
request_id
}
"
)
logger
.
debug
(
f
"Receive g
eneration request:
{
request
.
request_id
}
"
)
try
:
try
:
# Convert gRPC request to internal format
# Convert gRPC request to internal format
...
@@ -249,7 +249,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -249,7 +249,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
yield
self
.
_create_chunk_response
(
request
.
request_id
,
output
)
yield
self
.
_create_chunk_response
(
request
.
request_id
,
output
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Generate failed:
{
e
}
\n
{
get_exception_traceback
()
}
"
)
logger
.
error
(
f
"Generate failed for request
{
request
.
request_id
}
:
{
e
}
\n
"
f
"
{
get_exception_traceback
()
}
"
)
yield
sglang_scheduler_pb2
.
GenerateResponse
(
yield
sglang_scheduler_pb2
.
GenerateResponse
(
request_id
=
request
.
request_id
,
request_id
=
request
.
request_id
,
error
=
sglang_scheduler_pb2
.
GenerateError
(
error
=
sglang_scheduler_pb2
.
GenerateError
(
...
@@ -262,10 +265,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -262,10 +265,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
async
def
Embed
(
async
def
Embed
(
self
,
self
,
request
:
sglang_scheduler_pb2
.
EmbedRequest
,
request
:
sglang_scheduler_pb2
.
EmbedRequest
,
context
:
grpc
.
aio
.
ServicerContext
,
_
context
:
grpc
.
aio
.
ServicerContext
,
)
->
sglang_scheduler_pb2
.
EmbedResponse
:
)
->
sglang_scheduler_pb2
.
EmbedResponse
:
"""Handle embedding requests."""
"""Handle embedding requests."""
logger
.
info
(
f
"E
mbedding request:
{
request
.
request_id
}
"
)
logger
.
debug
(
f
"Receive e
mbedding request:
{
request
.
request_id
}
"
)
try
:
try
:
# Convert request
# Convert request
...
@@ -292,7 +295,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -292,7 +295,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
)
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Embed failed:
{
e
}
\n
{
get_exception_traceback
()
}
"
)
logger
.
error
(
f
"Embed failed for request
{
request
.
request_id
}
:
{
e
}
\n
"
f
"
{
get_exception_traceback
()
}
"
)
return
sglang_scheduler_pb2
.
EmbedResponse
(
return
sglang_scheduler_pb2
.
EmbedResponse
(
request_id
=
request
.
request_id
,
request_id
=
request
.
request_id
,
error
=
sglang_scheduler_pb2
.
EmbedError
(
error
=
sglang_scheduler_pb2
.
EmbedError
(
...
@@ -344,7 +350,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -344,7 +350,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
health_request
.
bootstrap_host
=
FAKE_BOOTSTRAP_HOST
health_request
.
bootstrap_host
=
FAKE_BOOTSTRAP_HOST
health_request
.
bootstrap_room
=
0
health_request
.
bootstrap_room
=
0
logger
.
info
(
f
"Sending
health check request
to request manager...
"
)
logger
.
debug
(
f
"Receive
health check request
:
{
rid
}
"
)
# Submit and wait for response
# Submit and wait for response
output_generator
=
self
.
request_manager
.
generate_request
(
output_generator
=
self
.
request_manager
.
generate_request
(
...
@@ -375,7 +381,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -375,7 +381,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
)
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Health check failed:
{
e
}
"
)
logger
.
error
(
f
"Health check failed:
{
e
}
\n
{
get_exception_traceback
()
}
"
)
return
sglang_scheduler_pb2
.
HealthCheckResponse
(
return
sglang_scheduler_pb2
.
HealthCheckResponse
(
healthy
=
False
,
message
=
f
"Health check error:
{
str
(
e
)
}
"
healthy
=
False
,
message
=
f
"Health check error:
{
str
(
e
)
}
"
)
)
...
@@ -383,10 +389,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -383,10 +389,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
async
def
Abort
(
async
def
Abort
(
self
,
self
,
request
:
sglang_scheduler_pb2
.
AbortRequest
,
request
:
sglang_scheduler_pb2
.
AbortRequest
,
context
:
grpc
.
aio
.
ServicerContext
,
_
context
:
grpc
.
aio
.
ServicerContext
,
)
->
sglang_scheduler_pb2
.
AbortResponse
:
)
->
sglang_scheduler_pb2
.
AbortResponse
:
"""Abort an ongoing request."""
"""Abort an ongoing request."""
logger
.
info
(
f
"A
bort
ing
request:
{
request
.
request_id
}
"
)
logger
.
debug
(
f
"Receive a
bort request:
{
request
.
request_id
}
"
)
try
:
try
:
success
=
await
self
.
request_manager
.
abort_request
(
request
.
request_id
)
success
=
await
self
.
request_manager
.
abort_request
(
request
.
request_id
)
...
@@ -396,7 +402,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -396,7 +402,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
message
=
f
"Request
{
request
.
request_id
}
{
'aborted'
if
success
else
'not found'
}
"
,
message
=
f
"Request
{
request
.
request_id
}
{
'aborted'
if
success
else
'not found'
}
"
,
)
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Abort failed:
{
e
}
"
)
logger
.
error
(
f
"Abort failed for request
{
request
.
request_id
}
:
{
e
}
\n
"
f
"
{
get_exception_traceback
()
}
"
)
return
sglang_scheduler_pb2
.
AbortResponse
(
return
sglang_scheduler_pb2
.
AbortResponse
(
success
=
False
,
success
=
False
,
message
=
str
(
e
),
message
=
str
(
e
),
...
@@ -404,11 +413,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -404,11 +413,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
async
def
GetModelInfo
(
async
def
GetModelInfo
(
self
,
self
,
request
:
sglang_scheduler_pb2
.
GetModelInfoRequest
,
_
request
:
sglang_scheduler_pb2
.
GetModelInfoRequest
,
context
:
grpc
.
aio
.
ServicerContext
,
_
context
:
grpc
.
aio
.
ServicerContext
,
)
->
sglang_scheduler_pb2
.
GetModelInfoResponse
:
)
->
sglang_scheduler_pb2
.
GetModelInfoResponse
:
"""Get model information."""
"""Get model information."""
logger
.
info
(
"M
odel info request
received
"
)
logger
.
debug
(
"Receive m
odel info request"
)
is_generation
=
self
.
scheduler_info
.
get
(
"is_generation"
)
is_generation
=
self
.
scheduler_info
.
get
(
"is_generation"
)
if
is_generation
is
None
:
if
is_generation
is
None
:
...
@@ -435,11 +444,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
...
@@ -435,11 +444,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
async
def
GetServerInfo
(
async
def
GetServerInfo
(
self
,
self
,
request
:
sglang_scheduler_pb2
.
GetServerInfoRequest
,
_
request
:
sglang_scheduler_pb2
.
GetServerInfoRequest
,
context
:
grpc
.
aio
.
ServicerContext
,
_
context
:
grpc
.
aio
.
ServicerContext
,
)
->
sglang_scheduler_pb2
.
GetServerInfoResponse
:
)
->
sglang_scheduler_pb2
.
GetServerInfoResponse
:
"""Get server information."""
"""Get server information."""
logger
.
info
(
"S
erver info request
received
"
)
logger
.
debug
(
"Receive s
erver info request"
)
server_args_dict
=
dataclasses
.
asdict
(
self
.
server_args
)
server_args_dict
=
dataclasses
.
asdict
(
self
.
server_args
)
server_args_struct
=
Struct
()
server_args_struct
=
Struct
()
...
@@ -861,9 +870,8 @@ async def serve_grpc(
...
@@ -861,9 +870,8 @@ async def serve_grpc(
listen_addr
=
f
"
{
server_args
.
host
}
:
{
server_args
.
port
}
"
listen_addr
=
f
"
{
server_args
.
host
}
:
{
server_args
.
port
}
"
server
.
add_insecure_port
(
listen_addr
)
server
.
add_insecure_port
(
listen_addr
)
logger
.
info
(
f
"Starting standalone gRPC server on
{
listen_addr
}
"
)
await
server
.
start
()
await
server
.
start
()
logger
.
info
(
f
"gRPC server listening on
{
listen_addr
}
"
)
# Handle shutdown signals
# Handle shutdown signals
loop
=
asyncio
.
get_running_loop
()
loop
=
asyncio
.
get_running_loop
()
...
...
sgl-router/src/reasoning_parser/parsers/base.rs
View file @
a65ca739
...
@@ -2,7 +2,6 @@
...
@@ -2,7 +2,6 @@
// for detecting and extracting reasoning blocks from text.
// for detecting and extracting reasoning blocks from text.
use
crate
::
reasoning_parser
::
traits
::{
ParseError
,
ParserConfig
,
ParserResult
,
ReasoningParser
};
use
crate
::
reasoning_parser
::
traits
::{
ParseError
,
ParserConfig
,
ParserResult
,
ReasoningParser
};
use
tracing
as
log
;
/// Base reasoning parser implementation.
/// Base reasoning parser implementation.
///
///
...
@@ -46,18 +45,14 @@ impl BaseReasoningParser {
...
@@ -46,18 +45,14 @@ impl BaseReasoningParser {
impl
ReasoningParser
for
BaseReasoningParser
{
impl
ReasoningParser
for
BaseReasoningParser
{
fn
detect_and_parse_reasoning
(
&
mut
self
,
text
:
&
str
)
->
Result
<
ParserResult
,
ParseError
>
{
fn
detect_and_parse_reasoning
(
&
mut
self
,
text
:
&
str
)
->
Result
<
ParserResult
,
ParseError
>
{
log
::
debug!
(
"detect_and_parse_reasoning called with text: {:?}"
,
text
);
// Check input size against buffer limit
// Check input size against buffer limit
if
text
.len
()
>
self
.config.max_buffer_size
{
if
text
.len
()
>
self
.config.max_buffer_size
{
return
Err
(
ParseError
::
BufferOverflow
(
text
.len
()));
return
Err
(
ParseError
::
BufferOverflow
(
text
.len
()));
}
}
let
in_reasoning
=
self
.in_reasoning
||
text
.contains
(
&
self
.config.think_start_token
);
let
in_reasoning
=
self
.in_reasoning
||
text
.contains
(
&
self
.config.think_start_token
);
log
::
debug!
(
"in_reasoning: {}"
,
in_reasoning
);
if
!
in_reasoning
{
if
!
in_reasoning
{
log
::
debug!
(
"No reasoning detected, returning normal text."
);
return
Ok
(
ParserResult
::
normal
(
text
.to_string
()));
return
Ok
(
ParserResult
::
normal
(
text
.to_string
()));
}
}
...
@@ -66,15 +61,8 @@ impl ReasoningParser for BaseReasoningParser {
...
@@ -66,15 +61,8 @@ impl ReasoningParser for BaseReasoningParser {
.replace
(
&
self
.config.think_start_token
,
""
)
.replace
(
&
self
.config.think_start_token
,
""
)
.trim
()
.trim
()
.to_string
();
.to_string
();
log
::
debug!
(
"Processed text after removing think_start_token: {:?}"
,
processed_text
);
if
!
processed_text
.contains
(
&
self
.config.think_end_token
)
{
if
!
processed_text
.contains
(
&
self
.config.think_end_token
)
{
log
::
debug!
(
"Reasoning truncated, think_end_token not found. Returning reasoning text."
);
// Assume reasoning was truncated before end token
// Assume reasoning was truncated before end token
return
Ok
(
ParserResult
::
reasoning
(
processed_text
));
return
Ok
(
ParserResult
::
reasoning
(
processed_text
));
}
}
...
@@ -89,9 +77,6 @@ impl ReasoningParser for BaseReasoningParser {
...
@@ -89,9 +77,6 @@ impl ReasoningParser for BaseReasoningParser {
.map
(|
s
|
s
.trim
()
.to_string
())
.map
(|
s
|
s
.trim
()
.to_string
())
.unwrap_or_default
();
.unwrap_or_default
();
log
::
debug!
(
"Extracted reasoning_text: {:?}"
,
reasoning_text
);
log
::
debug!
(
"Extracted normal_text: {:?}"
,
normal_text
);
Ok
(
ParserResult
::
new
(
normal_text
,
reasoning_text
))
Ok
(
ParserResult
::
new
(
normal_text
,
reasoning_text
))
}
}
...
@@ -108,19 +93,6 @@ impl ReasoningParser for BaseReasoningParser {
...
@@ -108,19 +93,6 @@ impl ReasoningParser for BaseReasoningParser {
self
.buffer
.push_str
(
text
);
self
.buffer
.push_str
(
text
);
let
mut
current_text
=
self
.buffer
.clone
();
let
mut
current_text
=
self
.buffer
.clone
();
log
::
debug!
(
"parse_reasoning_streaming_incremental called with text: {:?}"
,
text
);
log
::
debug!
(
"current buffer: {:?}"
,
self
.buffer
);
log
::
debug!
(
"current_text: {:?}"
,
current_text
);
log
::
debug!
(
"in_reasoning: {}, stripped_think_start: {}, stream_reasoning: {}"
,
self
.in_reasoning
,
self
.stripped_think_start
,
self
.config.stream_reasoning
);
// If the current text is a prefix of a token, keep buffering
// If the current text is a prefix of a token, keep buffering
if
self
.is_partial_token
(
&
current_text
)
{
if
self
.is_partial_token
(
&
current_text
)
{
return
Ok
(
ParserResult
::
default
());
return
Ok
(
ParserResult
::
default
());
...
...
sgl-router/src/routers/grpc/pipeline.rs
View file @
a65ca739
...
@@ -56,8 +56,6 @@ pub struct PreparationStage;
...
@@ -56,8 +56,6 @@ pub struct PreparationStage;
#[async_trait]
#[async_trait]
impl
PipelineStage
for
PreparationStage
{
impl
PipelineStage
for
PreparationStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Processing request"
,
self
.name
());
// Clone Arc before match to avoid borrow checker issues
// Clone Arc before match to avoid borrow checker issues
// (matching borrows ctx, but prepare_* methods need mutable borrow)
// (matching borrows ctx, but prepare_* methods need mutable borrow)
// Arc clone is cheap (8 bytes) - avoids full request clone (15KB-200KB)
// Arc clone is cheap (8 bytes) - avoids full request clone (15KB-200KB)
...
@@ -109,7 +107,6 @@ impl PreparationStage {
...
@@ -109,7 +107,6 @@ impl PreparationStage {
};
};
let
token_ids
=
encoding
.token_ids
()
.to_vec
();
let
token_ids
=
encoding
.token_ids
()
.to_vec
();
debug!
(
"Tokenized {} tokens from input"
,
token_ids
.len
());
// Step 4: Build tool constraints if needed
// Step 4: Build tool constraints if needed
let
tool_call_constraint
=
body_ref
.tools
.as_ref
()
.and_then
(|
tools
|
{
let
tool_call_constraint
=
body_ref
.tools
.as_ref
()
.and_then
(|
tools
|
{
...
@@ -157,8 +154,6 @@ impl PreparationStage {
...
@@ -157,8 +154,6 @@ impl PreparationStage {
}
}
};
};
debug!
(
"Resolved input with {} tokens"
,
token_ids
.len
());
// Create stop sequence decoder for generate requests
// Create stop sequence decoder for generate requests
let
params
=
request
.sampling_params
.as_ref
();
let
params
=
request
.sampling_params
.as_ref
();
let
stop_decoder
=
utils
::
create_stop_decoder
(
let
stop_decoder
=
utils
::
create_stop_decoder
(
...
@@ -259,8 +254,6 @@ impl WorkerSelectionStage {
...
@@ -259,8 +254,6 @@ impl WorkerSelectionStage {
#[async_trait]
#[async_trait]
impl
PipelineStage
for
WorkerSelectionStage
{
impl
PipelineStage
for
WorkerSelectionStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Selecting workers"
,
self
.name
());
let
prep
=
ctx
let
prep
=
ctx
.state
.state
.preparation
.preparation
...
@@ -414,8 +407,6 @@ pub struct ClientAcquisitionStage;
...
@@ -414,8 +407,6 @@ pub struct ClientAcquisitionStage;
#[async_trait]
#[async_trait]
impl
PipelineStage
for
ClientAcquisitionStage
{
impl
PipelineStage
for
ClientAcquisitionStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Acquiring gRPC clients"
,
self
.name
());
let
workers
=
ctx
let
workers
=
ctx
.state
.state
.workers
.workers
...
@@ -464,8 +455,6 @@ impl RequestBuildingStage {
...
@@ -464,8 +455,6 @@ impl RequestBuildingStage {
#[async_trait]
#[async_trait]
impl
PipelineStage
for
RequestBuildingStage
{
impl
PipelineStage
for
RequestBuildingStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Building proto request"
,
self
.name
());
let
prep
=
ctx
let
prep
=
ctx
.state
.state
.preparation
.preparation
...
@@ -578,8 +567,6 @@ pub struct DispatchMetadataStage;
...
@@ -578,8 +567,6 @@ pub struct DispatchMetadataStage;
#[async_trait]
#[async_trait]
impl
PipelineStage
for
DispatchMetadataStage
{
impl
PipelineStage
for
DispatchMetadataStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Preparing dispatch metadata"
,
self
.name
());
let
proto_request
=
ctx
let
proto_request
=
ctx
.state
.state
.proto_request
.proto_request
...
@@ -656,8 +643,6 @@ impl RequestExecutionStage {
...
@@ -656,8 +643,6 @@ impl RequestExecutionStage {
#[async_trait]
#[async_trait]
impl
PipelineStage
for
RequestExecutionStage
{
impl
PipelineStage
for
RequestExecutionStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Executing gRPC request"
,
self
.name
());
let
proto_request
=
ctx
let
proto_request
=
ctx
.state
.state
.proto_request
.proto_request
...
@@ -713,8 +698,6 @@ impl RequestExecutionStage {
...
@@ -713,8 +698,6 @@ impl RequestExecutionStage {
.dual_mut
()
.dual_mut
()
.ok_or_else
(||
utils
::
internal_error_static
(
"Expected dual clients but got single"
))
?
;
.ok_or_else
(||
utils
::
internal_error_static
(
"Expected dual clients but got single"
))
?
;
debug!
(
"Sending concurrent requests to prefill and decode workers"
);
let
prefill_request
=
proto_request
.clone
();
let
prefill_request
=
proto_request
.clone
();
let
decode_request
=
proto_request
;
let
decode_request
=
proto_request
;
...
@@ -780,8 +763,6 @@ impl ResponseProcessingStage {
...
@@ -780,8 +763,6 @@ impl ResponseProcessingStage {
#[async_trait]
#[async_trait]
impl
PipelineStage
for
ResponseProcessingStage
{
impl
PipelineStage
for
ResponseProcessingStage
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
async
fn
execute
(
&
self
,
ctx
:
&
mut
RequestContext
)
->
Result
<
Option
<
Response
>
,
Response
>
{
debug!
(
"Stage {}: Processing response"
,
self
.name
());
// Delegate to request-type specific processing
// Delegate to request-type specific processing
match
&
ctx
.input.request_type
{
match
&
ctx
.input.request_type
{
RequestType
::
Chat
(
_
)
=>
return
self
.process_chat_response
(
ctx
)
.await
,
RequestType
::
Chat
(
_
)
=>
return
self
.process_chat_response
(
ctx
)
.await
,
...
@@ -1199,15 +1180,9 @@ impl ChatCompletionPipeline {
...
@@ -1199,15 +1180,9 @@ impl ChatCompletionPipeline {
// Execute each stage in sequence
// Execute each stage in sequence
for
(
idx
,
stage
)
in
self
.stages
.iter
()
.enumerate
()
{
for
(
idx
,
stage
)
in
self
.stages
.iter
()
.enumerate
()
{
debug!
(
"Executing stage {}: {}"
,
idx
+
1
,
stage
.name
());
match
stage
.execute
(
&
mut
ctx
)
.await
{
match
stage
.execute
(
&
mut
ctx
)
.await
{
Ok
(
Some
(
response
))
=>
{
Ok
(
Some
(
response
))
=>
{
// Stage completed successfully with a response (e.g., streaming)
// Stage completed successfully with a response (e.g., streaming)
debug!
(
"Stage {} ({}) completed with response"
,
idx
+
1
,
stage
.name
()
);
return
response
;
return
response
;
}
}
Ok
(
None
)
=>
{
Ok
(
None
)
=>
{
...
@@ -1249,15 +1224,9 @@ impl ChatCompletionPipeline {
...
@@ -1249,15 +1224,9 @@ impl ChatCompletionPipeline {
// Execute each stage in sequence
// Execute each stage in sequence
for
(
idx
,
stage
)
in
self
.stages
.iter
()
.enumerate
()
{
for
(
idx
,
stage
)
in
self
.stages
.iter
()
.enumerate
()
{
debug!
(
"Executing stage {}: {}"
,
idx
+
1
,
stage
.name
());
match
stage
.execute
(
&
mut
ctx
)
.await
{
match
stage
.execute
(
&
mut
ctx
)
.await
{
Ok
(
Some
(
response
))
=>
{
Ok
(
Some
(
response
))
=>
{
// Stage completed successfully with a response (e.g., streaming)
// Stage completed successfully with a response (e.g., streaming)
debug!
(
"Stage {} ({}) completed with response"
,
idx
+
1
,
stage
.name
()
);
return
response
;
return
response
;
}
}
Ok
(
None
)
=>
{
Ok
(
None
)
=>
{
...
...
sgl-router/src/routers/grpc/utils.rs
View file @
a65ca739
...
@@ -21,7 +21,7 @@ use serde_json::{json, Map, Value};
...
@@ -21,7 +21,7 @@ use serde_json::{json, Map, Value};
use
std
::
collections
::
HashMap
;
use
std
::
collections
::
HashMap
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
tonic
::
codec
::
Streaming
;
use
tonic
::
codec
::
Streaming
;
use
tracing
::{
debug
,
error
,
warn
};
use
tracing
::{
error
,
warn
};
use
uuid
::
Uuid
;
use
uuid
::
Uuid
;
/// Get gRPC client from worker, returning appropriate error response on failure
/// Get gRPC client from worker, returning appropriate error response on failure
...
@@ -602,10 +602,6 @@ pub async fn collect_stream_responses(
...
@@ -602,10 +602,6 @@ pub async fn collect_stream_responses(
Ok
(
gen_response
)
=>
{
Ok
(
gen_response
)
=>
{
match
gen_response
.response
{
match
gen_response
.response
{
Some
(
Complete
(
complete
))
=>
{
Some
(
Complete
(
complete
))
=>
{
debug!
(
"{} completed: prompt_tokens={}, completion_tokens={}, finish_reason={}"
,
worker_name
,
complete
.prompt_tokens
,
complete
.completion_tokens
,
complete
.finish_reason
);
all_responses
.push
(
complete
);
all_responses
.push
(
complete
);
}
}
Some
(
Error
(
err
))
=>
{
Some
(
Error
(
err
))
=>
{
...
@@ -615,11 +611,11 @@ pub async fn collect_stream_responses(
...
@@ -615,11 +611,11 @@ pub async fn collect_stream_responses(
worker_name
,
err
.message
worker_name
,
err
.message
)));
)));
}
}
Some
(
Chunk
(
chunk
))
=>
{
Some
(
Chunk
(
_
chunk
))
=>
{
debug!
(
"{} chunk: {} tokens"
,
worker_name
,
chunk
.token_ids
.len
());
// Streaming chunk - no action needed
}
}
None
=>
{
None
=>
{
debug!
(
"{}: empty response"
,
worker_name
);
// Empty response - no action needed
}
}
}
}
}
}
...
@@ -633,7 +629,6 @@ pub async fn collect_stream_responses(
...
@@ -633,7 +629,6 @@ pub async fn collect_stream_responses(
}
}
}
}
debug!
(
"{} stream closed"
,
worker_name
);
Ok
(
all_responses
)
Ok
(
all_responses
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment