Unverified Commit 2fcd56ea authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] add get server info and get model info in grpc server (#11303)

parent 0958a397
...@@ -5,6 +5,7 @@ Uses GrpcRequestManager for orchestration without tokenization. ...@@ -5,6 +5,7 @@ Uses GrpcRequestManager for orchestration without tokenization.
import argparse import argparse
import asyncio import asyncio
import dataclasses
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os import os
...@@ -15,8 +16,11 @@ from typing import AsyncIterator, Dict, Optional, Tuple ...@@ -15,8 +16,11 @@ from typing import AsyncIterator, Dict, Optional, Tuple
import grpc import grpc
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpc_reflection.v1alpha import reflection from grpc_reflection.v1alpha import reflection
import sglang
from sglang.srt.disaggregation.utils import FAKE_BOOTSTRAP_HOST, DisaggregationMode from sglang.srt.disaggregation.utils import FAKE_BOOTSTRAP_HOST, DisaggregationMode
from sglang.srt.entrypoints.grpc_request_manager import GrpcRequestManager from sglang.srt.entrypoints.grpc_request_manager import GrpcRequestManager
from sglang.srt.grpc import sglang_scheduler_pb2, sglang_scheduler_pb2_grpc from sglang.srt.grpc import sglang_scheduler_pb2, sglang_scheduler_pb2_grpc
...@@ -173,11 +177,13 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) ...@@ -173,11 +177,13 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
request_manager: GrpcRequestManager, request_manager: GrpcRequestManager,
server_args: ServerArgs, server_args: ServerArgs,
model_info: Dict, model_info: Dict,
scheduler_info: Dict,
): ):
"""Initialize the standalone gRPC service.""" """Initialize the standalone gRPC service."""
self.request_manager = request_manager self.request_manager = request_manager
self.server_args = server_args self.server_args = server_args
self.model_info = model_info self.model_info = model_info
self.scheduler_info = scheduler_info
self.start_time = time.time() self.start_time = time.time()
# Start the request manager's event loop using auto_create_handle_loop # Start the request manager's event loop using auto_create_handle_loop
...@@ -396,6 +402,89 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) ...@@ -396,6 +402,89 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer)
message=str(e), message=str(e),
) )
async def GetModelInfo(
self,
request: sglang_scheduler_pb2.GetModelInfoRequest,
context: grpc.aio.ServicerContext,
) -> sglang_scheduler_pb2.GetModelInfoResponse:
"""Get model information."""
logger.info("Model info request received")
is_generation = self.scheduler_info.get("is_generation")
if is_generation is None:
is_generation = not self.server_args.is_embedding
return sglang_scheduler_pb2.GetModelInfoResponse(
model_path=self.server_args.model_path,
tokenizer_path=self.server_args.tokenizer_path or "",
is_generation=is_generation,
preferred_sampling_params=(
self.server_args.preferred_sampling_params or ""
),
weight_version=self.server_args.weight_version or "",
served_model_name=self.server_args.served_model_name,
max_context_length=self.model_info["max_context_length"],
vocab_size=self.model_info["vocab_size"],
supports_vision=self.model_info["supports_vision"],
model_type=self.model_info["model_type"],
eos_token_ids=self.model_info["eos_token_ids"],
pad_token_id=self.model_info["pad_token_id"],
bos_token_id=self.model_info["bos_token_id"],
max_req_input_len=self.model_info["max_req_input_len"],
)
async def GetServerInfo(
self,
request: sglang_scheduler_pb2.GetServerInfoRequest,
context: grpc.aio.ServicerContext,
) -> sglang_scheduler_pb2.GetServerInfoResponse:
"""Get server information."""
logger.info("Server info request received")
server_args_dict = dataclasses.asdict(self.server_args)
server_args_struct = Struct()
def make_serializable(obj):
if obj is None:
return None
elif isinstance(obj, (str, int, float, bool)):
return obj
elif isinstance(obj, (list, tuple, set)):
return [make_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {k: make_serializable(v) for k, v in obj.items()}
else:
return str(obj)
serializable_args = make_serializable(server_args_dict)
server_args_struct.update(serializable_args)
# Convert scheduler_info to Struct
scheduler_info_struct = Struct()
scheduler_info_struct.update(self.scheduler_info)
# Get runtime state from request manager
manager_state = self.request_manager.get_server_info()
# Calculate uptime
uptime = time.time() - self.start_time
# Create timestamp
start_timestamp = Timestamp()
start_timestamp.FromSeconds(int(self.start_time))
return sglang_scheduler_pb2.GetServerInfoResponse(
server_args=server_args_struct,
scheduler_info=scheduler_info_struct,
active_requests=manager_state["active_requests"],
is_paused=manager_state["paused"],
last_receive_timestamp=manager_state["last_receive_time"],
uptime_seconds=uptime,
sglang_version=sglang.__version__,
server_type="grpc",
start_time=start_timestamp,
)
# Helper methods for request/response conversion # Helper methods for request/response conversion
def _convert_generate_request( def _convert_generate_request(
...@@ -756,6 +845,7 @@ async def serve_grpc( ...@@ -756,6 +845,7 @@ async def serve_grpc(
request_manager=request_manager, request_manager=request_manager,
server_args=server_args, server_args=server_args,
model_info=model_info, model_info=model_info,
scheduler_info=scheduler_info,
) )
sglang_scheduler_pb2_grpc.add_SglangSchedulerServicer_to_server(servicer, server) sglang_scheduler_pb2_grpc.add_SglangSchedulerServicer_to_server(servicer, server)
......
...@@ -20,6 +20,12 @@ service SglangScheduler { ...@@ -20,6 +20,12 @@ service SglangScheduler {
// Abort a running request // Abort a running request
rpc Abort(AbortRequest) returns (AbortResponse); rpc Abort(AbortRequest) returns (AbortResponse);
// Get model information
rpc GetModelInfo(GetModelInfoRequest) returns (GetModelInfoResponse);
// Get server information
rpc GetServerInfo(GetServerInfoRequest) returns (GetServerInfoResponse);
} }
// ===================== // =====================
...@@ -401,3 +407,56 @@ message SetInternalStateResponse { ...@@ -401,3 +407,56 @@ message SetInternalStateResponse {
bool success = 1; bool success = 1;
string message = 2; string message = 2;
} }
// =====================
// Model and Server Info
// =====================
// Get model information
message GetModelInfoRequest {}
message GetModelInfoResponse {
string model_path = 1;
string tokenizer_path = 2;
bool is_generation = 3;
string preferred_sampling_params = 4; // JSON string or empty
string weight_version = 5;
string served_model_name = 6;
int32 max_context_length = 7;
int32 vocab_size = 8;
bool supports_vision = 9;
string model_type = 10;
repeated int32 eos_token_ids = 11;
int32 pad_token_id = 12;
int32 bos_token_id = 13;
int32 max_req_input_len = 14;
}
// Get server information
message GetServerInfoRequest {}
message GetServerInfoResponse {
// Server configuration (as structured data)
google.protobuf.Struct server_args = 1;
// Scheduler metrics (from scheduler initialization)
google.protobuf.Struct scheduler_info = 2;
// Runtime state
int32 active_requests = 3;
bool is_paused = 4;
double last_receive_timestamp = 5;
double uptime_seconds = 6;
// Version info
string sglang_version = 7;
// Server metadata
string server_type = 8; // "grpc"
google.protobuf.Timestamp start_time = 9;
// Note: internal_states not provided in gRPC mode
// Scheduler-side metrics (memory usage, throughput) require
// bidirectional communicator infrastructure not available in gRPC.
// Use HTTP /get_server_info if scheduler internal state is needed.
}
...@@ -428,3 +428,65 @@ class SetInternalStateResponse(_message.Message): ...@@ -428,3 +428,65 @@ class SetInternalStateResponse(_message.Message):
success: bool success: bool
message: str message: str
def __init__(self, success: bool = ..., message: _Optional[str] = ...) -> None: ... def __init__(self, success: bool = ..., message: _Optional[str] = ...) -> None: ...
class GetModelInfoRequest(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class GetModelInfoResponse(_message.Message):
__slots__ = ("model_path", "tokenizer_path", "is_generation", "preferred_sampling_params", "weight_version", "served_model_name", "max_context_length", "vocab_size", "supports_vision", "model_type", "eos_token_ids", "pad_token_id", "bos_token_id", "max_req_input_len")
MODEL_PATH_FIELD_NUMBER: _ClassVar[int]
TOKENIZER_PATH_FIELD_NUMBER: _ClassVar[int]
IS_GENERATION_FIELD_NUMBER: _ClassVar[int]
PREFERRED_SAMPLING_PARAMS_FIELD_NUMBER: _ClassVar[int]
WEIGHT_VERSION_FIELD_NUMBER: _ClassVar[int]
SERVED_MODEL_NAME_FIELD_NUMBER: _ClassVar[int]
MAX_CONTEXT_LENGTH_FIELD_NUMBER: _ClassVar[int]
VOCAB_SIZE_FIELD_NUMBER: _ClassVar[int]
SUPPORTS_VISION_FIELD_NUMBER: _ClassVar[int]
MODEL_TYPE_FIELD_NUMBER: _ClassVar[int]
EOS_TOKEN_IDS_FIELD_NUMBER: _ClassVar[int]
PAD_TOKEN_ID_FIELD_NUMBER: _ClassVar[int]
BOS_TOKEN_ID_FIELD_NUMBER: _ClassVar[int]
MAX_REQ_INPUT_LEN_FIELD_NUMBER: _ClassVar[int]
model_path: str
tokenizer_path: str
is_generation: bool
preferred_sampling_params: str
weight_version: str
served_model_name: str
max_context_length: int
vocab_size: int
supports_vision: bool
model_type: str
eos_token_ids: _containers.RepeatedScalarFieldContainer[int]
pad_token_id: int
bos_token_id: int
max_req_input_len: int
def __init__(self, model_path: _Optional[str] = ..., tokenizer_path: _Optional[str] = ..., is_generation: bool = ..., preferred_sampling_params: _Optional[str] = ..., weight_version: _Optional[str] = ..., served_model_name: _Optional[str] = ..., max_context_length: _Optional[int] = ..., vocab_size: _Optional[int] = ..., supports_vision: bool = ..., model_type: _Optional[str] = ..., eos_token_ids: _Optional[_Iterable[int]] = ..., pad_token_id: _Optional[int] = ..., bos_token_id: _Optional[int] = ..., max_req_input_len: _Optional[int] = ...) -> None: ...
class GetServerInfoRequest(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...
class GetServerInfoResponse(_message.Message):
__slots__ = ("server_args", "scheduler_info", "active_requests", "is_paused", "last_receive_timestamp", "uptime_seconds", "sglang_version", "server_type", "start_time")
SERVER_ARGS_FIELD_NUMBER: _ClassVar[int]
SCHEDULER_INFO_FIELD_NUMBER: _ClassVar[int]
ACTIVE_REQUESTS_FIELD_NUMBER: _ClassVar[int]
IS_PAUSED_FIELD_NUMBER: _ClassVar[int]
LAST_RECEIVE_TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
UPTIME_SECONDS_FIELD_NUMBER: _ClassVar[int]
SGLANG_VERSION_FIELD_NUMBER: _ClassVar[int]
SERVER_TYPE_FIELD_NUMBER: _ClassVar[int]
START_TIME_FIELD_NUMBER: _ClassVar[int]
server_args: _struct_pb2.Struct
scheduler_info: _struct_pb2.Struct
active_requests: int
is_paused: bool
last_receive_timestamp: float
uptime_seconds: float
sglang_version: str
server_type: str
start_time: _timestamp_pb2.Timestamp
def __init__(self, server_args: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., scheduler_info: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ..., active_requests: _Optional[int] = ..., is_paused: bool = ..., last_receive_timestamp: _Optional[float] = ..., uptime_seconds: _Optional[float] = ..., sglang_version: _Optional[str] = ..., server_type: _Optional[str] = ..., start_time: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
...@@ -59,6 +59,16 @@ class SglangSchedulerStub(object): ...@@ -59,6 +59,16 @@ class SglangSchedulerStub(object):
request_serializer=sglang__scheduler__pb2.AbortRequest.SerializeToString, request_serializer=sglang__scheduler__pb2.AbortRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.AbortResponse.FromString, response_deserializer=sglang__scheduler__pb2.AbortResponse.FromString,
_registered_method=True) _registered_method=True)
self.GetModelInfo = channel.unary_unary(
'/sglang.grpc.scheduler.SglangScheduler/GetModelInfo',
request_serializer=sglang__scheduler__pb2.GetModelInfoRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.GetModelInfoResponse.FromString,
_registered_method=True)
self.GetServerInfo = channel.unary_unary(
'/sglang.grpc.scheduler.SglangScheduler/GetServerInfo',
request_serializer=sglang__scheduler__pb2.GetServerInfoRequest.SerializeToString,
response_deserializer=sglang__scheduler__pb2.GetServerInfoResponse.FromString,
_registered_method=True)
class SglangSchedulerServicer(object): class SglangSchedulerServicer(object):
...@@ -94,6 +104,20 @@ class SglangSchedulerServicer(object): ...@@ -94,6 +104,20 @@ class SglangSchedulerServicer(object):
context.set_details('Method not implemented!') context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!') raise NotImplementedError('Method not implemented!')
def GetModelInfo(self, request, context):
"""Get model information
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetServerInfo(self, request, context):
"""Get server information
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_SglangSchedulerServicer_to_server(servicer, server): def add_SglangSchedulerServicer_to_server(servicer, server):
rpc_method_handlers = { rpc_method_handlers = {
...@@ -117,6 +141,16 @@ def add_SglangSchedulerServicer_to_server(servicer, server): ...@@ -117,6 +141,16 @@ def add_SglangSchedulerServicer_to_server(servicer, server):
request_deserializer=sglang__scheduler__pb2.AbortRequest.FromString, request_deserializer=sglang__scheduler__pb2.AbortRequest.FromString,
response_serializer=sglang__scheduler__pb2.AbortResponse.SerializeToString, response_serializer=sglang__scheduler__pb2.AbortResponse.SerializeToString,
), ),
'GetModelInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetModelInfo,
request_deserializer=sglang__scheduler__pb2.GetModelInfoRequest.FromString,
response_serializer=sglang__scheduler__pb2.GetModelInfoResponse.SerializeToString,
),
'GetServerInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetServerInfo,
request_deserializer=sglang__scheduler__pb2.GetServerInfoRequest.FromString,
response_serializer=sglang__scheduler__pb2.GetServerInfoResponse.SerializeToString,
),
} }
generic_handler = grpc.method_handlers_generic_handler( generic_handler = grpc.method_handlers_generic_handler(
'sglang.grpc.scheduler.SglangScheduler', rpc_method_handlers) 'sglang.grpc.scheduler.SglangScheduler', rpc_method_handlers)
...@@ -237,3 +271,57 @@ class SglangScheduler(object): ...@@ -237,3 +271,57 @@ class SglangScheduler(object):
timeout, timeout,
metadata, metadata,
_registered_method=True) _registered_method=True)
@staticmethod
def GetModelInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/sglang.grpc.scheduler.SglangScheduler/GetModelInfo',
sglang__scheduler__pb2.GetModelInfoRequest.SerializeToString,
sglang__scheduler__pb2.GetModelInfoResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def GetServerInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/sglang.grpc.scheduler.SglangScheduler/GetServerInfo',
sglang__scheduler__pb2.GetServerInfoRequest.SerializeToString,
sglang__scheduler__pb2.GetServerInfoResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
...@@ -97,6 +97,30 @@ impl SglangSchedulerClient { ...@@ -97,6 +97,30 @@ impl SglangSchedulerClient {
Ok(()) Ok(())
} }
/// Get model information
pub async fn get_model_info(
&mut self,
) -> Result<proto::GetModelInfoResponse, Box<dyn std::error::Error + Send + Sync>> {
debug!("Requesting model info");
let request = Request::new(proto::GetModelInfoRequest {});
let response = self.client.get_model_info(request).await?;
debug!("Model info response received");
Ok(response.into_inner())
}
/// Get server information
pub async fn get_server_info(
&mut self,
) -> Result<proto::GetServerInfoResponse, Box<dyn std::error::Error + Send + Sync>> {
debug!("Requesting server info");
let request = Request::new(proto::GetServerInfoRequest {});
let response = self.client.get_server_info(request).await?;
debug!("Server info response received");
Ok(response.into_inner())
}
/// Build a single SGLang GenerateRequest from OpenAI ChatCompletionRequest /// Build a single SGLang GenerateRequest from OpenAI ChatCompletionRequest
pub fn build_generate_request( pub fn build_generate_request(
&self, &self,
......
...@@ -20,6 +20,12 @@ service SglangScheduler { ...@@ -20,6 +20,12 @@ service SglangScheduler {
// Abort a running request // Abort a running request
rpc Abort(AbortRequest) returns (AbortResponse); rpc Abort(AbortRequest) returns (AbortResponse);
// Get model information
rpc GetModelInfo(GetModelInfoRequest) returns (GetModelInfoResponse);
// Get server information
rpc GetServerInfo(GetServerInfoRequest) returns (GetServerInfoResponse);
} }
// ===================== // =====================
...@@ -401,3 +407,56 @@ message SetInternalStateResponse { ...@@ -401,3 +407,56 @@ message SetInternalStateResponse {
bool success = 1; bool success = 1;
string message = 2; string message = 2;
} }
// =====================
// Model and Server Info
// =====================
// Get model information
message GetModelInfoRequest {}
message GetModelInfoResponse {
string model_path = 1;
string tokenizer_path = 2;
bool is_generation = 3;
string preferred_sampling_params = 4; // JSON string or empty
string weight_version = 5;
string served_model_name = 6;
int32 max_context_length = 7;
int32 vocab_size = 8;
bool supports_vision = 9;
string model_type = 10;
repeated int32 eos_token_ids = 11;
int32 pad_token_id = 12;
int32 bos_token_id = 13;
int32 max_req_input_len = 14;
}
// Get server information
message GetServerInfoRequest {}
message GetServerInfoResponse {
// Server configuration (as structured data)
google.protobuf.Struct server_args = 1;
// Scheduler metrics (from scheduler initialization)
google.protobuf.Struct scheduler_info = 2;
// Runtime state
int32 active_requests = 3;
bool is_paused = 4;
double last_receive_timestamp = 5;
double uptime_seconds = 6;
// Version info
string sglang_version = 7;
// Server metadata
string server_type = 8; // "grpc"
google.protobuf.Timestamp start_time = 9;
// Note: internal_states not provided in gRPC mode
// Scheduler-side metrics (memory usage, throughput) require
// bidirectional communicator infrastructure not available in gRPC.
// Use HTTP /get_server_info if scheduler internal state is needed.
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment