Unverified Commit a1080b72 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router] Fix all unused_qualifications (#11341)

parent a65ca739
...@@ -804,7 +804,7 @@ impl WorkerFactory { ...@@ -804,7 +804,7 @@ impl WorkerFactory {
} }
} }
tokio::time::sleep(Duration::from_secs(1)).await; time::sleep(Duration::from_secs(1)).await;
} }
} }
} }
...@@ -900,7 +900,7 @@ pub fn start_health_checker( ...@@ -900,7 +900,7 @@ pub fn start_health_checker(
let shutdown_clone = shutdown.clone(); let shutdown_clone = shutdown.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(check_interval_secs)); let mut interval = time::interval(Duration::from_secs(check_interval_secs));
// Counter for periodic load reset (every 10 health check cycles) // Counter for periodic load reset (every 10 health check cycles)
let mut check_count = 0u64; let mut check_count = 0u64;
...@@ -1272,7 +1272,7 @@ mod tests { ...@@ -1272,7 +1272,7 @@ mod tests {
let worker_clone = Arc::clone(&worker); let worker_clone = Arc::clone(&worker);
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
worker_clone.set_healthy(i % 2 == 0); worker_clone.set_healthy(i % 2 == 0);
tokio::time::sleep(Duration::from_micros(10)).await; time::sleep(Duration::from_micros(10)).await;
}); });
handles.push(handle); handles.push(handle);
} }
......
...@@ -1180,7 +1180,7 @@ impl WorkerManager { ...@@ -1180,7 +1180,7 @@ impl WorkerManager {
}); });
} }
let results = futures::future::join_all(tasks).await; let results = future::join_all(tasks).await;
let mut successful = Vec::new(); let mut successful = Vec::new();
let mut failed = Vec::new(); let mut failed = Vec::new();
...@@ -1321,7 +1321,7 @@ impl WorkerManager { ...@@ -1321,7 +1321,7 @@ impl WorkerManager {
}); });
} }
let loads = futures::future::join_all(tasks).await; let loads = future::join_all(tasks).await;
let successful = loads.iter().filter(|l| l.load >= 0).count(); let successful = loads.iter().filter(|l| l.load >= 0).count();
let failed = loads.iter().filter(|l| l.load < 0).count(); let failed = loads.iter().filter(|l| l.load < 0).count();
......
...@@ -388,7 +388,7 @@ impl WorkerRegistry { ...@@ -388,7 +388,7 @@ impl WorkerRegistry {
} }
// Get all workers from registry // Get all workers from registry
let workers: Vec<Arc<dyn crate::core::Worker>> = workers_ref let workers: Vec<Arc<dyn Worker>> = workers_ref
.iter() .iter()
.map(|entry| entry.value().clone()) .map(|entry| entry.value().clone())
.collect(); .collect();
......
...@@ -51,10 +51,10 @@ pub struct StoredResponse { ...@@ -51,10 +51,10 @@ pub struct StoredResponse {
pub output: String, pub output: String,
/// Tool calls made by the model (if any) /// Tool calls made by the model (if any)
pub tool_calls: Vec<serde_json::Value>, pub tool_calls: Vec<Value>,
/// Custom metadata /// Custom metadata
pub metadata: HashMap<String, serde_json::Value>, pub metadata: HashMap<String, Value>,
/// When this response was created /// When this response was created
pub created_at: chrono::DateTime<chrono::Utc>, pub created_at: chrono::DateTime<chrono::Utc>,
...@@ -95,7 +95,7 @@ pub struct ResponseChain { ...@@ -95,7 +95,7 @@ pub struct ResponseChain {
pub responses: Vec<StoredResponse>, pub responses: Vec<StoredResponse>,
/// Metadata about the chain /// Metadata about the chain
pub metadata: HashMap<String, serde_json::Value>, pub metadata: HashMap<String, Value>,
} }
impl Default for ResponseChain { impl Default for ResponseChain {
......
...@@ -236,7 +236,7 @@ impl Default for ResponseLogger { ...@@ -236,7 +236,7 @@ impl Default for ResponseLogger {
} }
impl<B> OnResponse<B> for ResponseLogger { impl<B> OnResponse<B> for ResponseLogger {
fn on_response(self, response: &Response<B>, latency: std::time::Duration, span: &Span) { fn on_response(self, response: &Response<B>, latency: Duration, span: &Span) {
let status = response.status(); let status = response.status();
// Record these in the span for structured logging/observability tools // Record these in the span for structured logging/observability tools
...@@ -345,10 +345,10 @@ pub struct QueuedRequest { ...@@ -345,10 +345,10 @@ pub struct QueuedRequest {
/// Queue metrics for monitoring /// Queue metrics for monitoring
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct QueueMetrics { pub struct QueueMetrics {
pub total_queued: std::sync::atomic::AtomicU64, pub total_queued: AtomicU64,
pub current_queued: std::sync::atomic::AtomicU64, pub current_queued: AtomicU64,
pub total_timeout: std::sync::atomic::AtomicU64, pub total_timeout: AtomicU64,
pub total_rejected: std::sync::atomic::AtomicU64, pub total_rejected: AtomicU64,
} }
/// Queue processor that handles queued requests /// Queue processor that handles queued requests
...@@ -447,7 +447,7 @@ impl ConcurrencyLimiter { ...@@ -447,7 +447,7 @@ impl ConcurrencyLimiter {
/// Middleware function for concurrency limiting with optional queuing /// Middleware function for concurrency limiting with optional queuing
pub async fn concurrency_limit_middleware( pub async fn concurrency_limit_middleware(
State(app_state): State<Arc<AppState>>, State(app_state): State<Arc<AppState>>,
request: Request<axum::body::Body>, request: Request<Body>,
next: Next, next: Next,
) -> Response { ) -> Response {
// Static counter for embeddings queue size // Static counter for embeddings queue size
......
...@@ -827,8 +827,7 @@ impl StreamingProcessor { ...@@ -827,8 +827,7 @@ impl StreamingProcessor {
// Store latest output logprobs (cumulative from proto, convert to SGLang format) // Store latest output logprobs (cumulative from proto, convert to SGLang format)
if let Some(ref output_logprobs) = chunk.output_logprobs { if let Some(ref output_logprobs) = chunk.output_logprobs {
let converted = let converted = utils::convert_generate_output_logprobs(output_logprobs);
super::utils::convert_generate_output_logprobs(output_logprobs);
accumulated_output_logprobs.insert(index, Some(converted)); accumulated_output_logprobs.insert(index, Some(converted));
} }
......
...@@ -522,7 +522,7 @@ pub fn parse_json_schema_response( ...@@ -522,7 +522,7 @@ pub fn parse_json_schema_response(
match serde_json::from_str::<Value>(processed_text) { match serde_json::from_str::<Value>(processed_text) {
Ok(params) => { Ok(params) => {
let tool_call = ToolCall { let tool_call = ToolCall {
id: format!("call_{}", uuid::Uuid::new_v4()), id: format!("call_{}", Uuid::new_v4()),
tool_type: "function".to_string(), tool_type: "function".to_string(),
function: FunctionCallResponse { function: FunctionCallResponse {
name: function.name.clone(), name: function.name.clone(),
...@@ -553,7 +553,7 @@ pub fn parse_json_schema_response( ...@@ -553,7 +553,7 @@ pub fn parse_json_schema_response(
let parameters = obj.get("parameters")?; let parameters = obj.get("parameters")?;
Some(ToolCall { Some(ToolCall {
id: format!("call_{}_{}", i, uuid::Uuid::new_v4()), id: format!("call_{}_{}", i, Uuid::new_v4()),
tool_type: "function".to_string(), tool_type: "function".to_string(),
function: FunctionCallResponse { function: FunctionCallResponse {
name, name,
......
...@@ -2016,7 +2016,7 @@ impl OpenAIRouter { ...@@ -2016,7 +2016,7 @@ impl OpenAIRouter {
/// ///
/// Returns borrowed strings when possible to avoid allocations in hot paths. /// Returns borrowed strings when possible to avoid allocations in hot paths.
/// Only allocates when multiple data lines need to be joined. /// Only allocates when multiple data lines need to be joined.
fn parse_sse_block(block: &str) -> (Option<&str>, std::borrow::Cow<'_, str>) { fn parse_sse_block(block: &str) -> (Option<&str>, Cow<'_, str>) {
let mut event_name: Option<&str> = None; let mut event_name: Option<&str> = None;
let mut data_lines: Vec<&str> = Vec::new(); let mut data_lines: Vec<&str> = Vec::new();
...@@ -2029,9 +2029,9 @@ impl OpenAIRouter { ...@@ -2029,9 +2029,9 @@ impl OpenAIRouter {
} }
let data = if data_lines.len() == 1 { let data = if data_lines.len() == 1 {
std::borrow::Cow::Borrowed(data_lines[0]) Cow::Borrowed(data_lines[0])
} else { } else {
std::borrow::Cow::Owned(data_lines.join("\n")) Cow::Owned(data_lines.join("\n"))
}; };
(event_name, data) (event_name, data)
...@@ -2714,7 +2714,7 @@ impl OpenAIRouter { ...@@ -2714,7 +2714,7 @@ impl OpenAIRouter {
} }
ResponseInput::Items(items) => { ResponseInput::Items(items) => {
// Items are already structured ResponseInputOutputItem, convert to JSON // Items are already structured ResponseInputOutputItem, convert to JSON
if let Ok(items_value) = serde_json::to_value(items) { if let Ok(items_value) = to_value(items) {
if let Some(items_arr) = items_value.as_array() { if let Some(items_arr) = items_value.as_array() {
input_array.extend_from_slice(items_arr); input_array.extend_from_slice(items_arr);
} }
...@@ -2773,7 +2773,7 @@ impl OpenAIRouter { ...@@ -2773,7 +2773,7 @@ impl OpenAIRouter {
.unwrap_or("{}"); .unwrap_or("{}");
// Check if output contains error by parsing JSON // Check if output contains error by parsing JSON
let is_error = serde_json::from_str::<serde_json::Value>(output_str) let is_error = serde_json::from_str::<Value>(output_str)
.map(|v| v.get("error").is_some()) .map(|v| v.get("error").is_some())
.unwrap_or(false); .unwrap_or(false);
...@@ -3189,7 +3189,7 @@ impl super::super::RouterTrait for OpenAIRouter { ...@@ -3189,7 +3189,7 @@ impl super::super::RouterTrait for OpenAIRouter {
let content_type = res.headers().get(CONTENT_TYPE).cloned(); let content_type = res.headers().get(CONTENT_TYPE).cloned();
match res.bytes().await { match res.bytes().await {
Ok(body) => { Ok(body) => {
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = status; *response.status_mut() = status;
if let Some(ct) = content_type { if let Some(ct) = content_type {
response.headers_mut().insert(CONTENT_TYPE, ct); response.headers_mut().insert(CONTENT_TYPE, ct);
...@@ -3316,7 +3316,7 @@ impl super::super::RouterTrait for OpenAIRouter { ...@@ -3316,7 +3316,7 @@ impl super::super::RouterTrait for OpenAIRouter {
match resp.bytes().await { match resp.bytes().await {
Ok(body) => { Ok(body) => {
self.circuit_breaker.record_success(); self.circuit_breaker.record_success();
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = status; *response.status_mut() = status;
if let Some(ct) = content_type { if let Some(ct) = content_type {
response.headers_mut().insert(CONTENT_TYPE, ct); response.headers_mut().insert(CONTENT_TYPE, ct);
......
...@@ -88,7 +88,7 @@ impl PDRouter { ...@@ -88,7 +88,7 @@ impl PDRouter {
match res.bytes().await { match res.bytes().await {
Ok(body) => { Ok(body) => {
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = StatusCode::OK; *response.status_mut() = StatusCode::OK;
*response.headers_mut() = response_headers; *response.headers_mut() = response_headers;
response response
...@@ -201,7 +201,7 @@ impl PDRouter { ...@@ -201,7 +201,7 @@ impl PDRouter {
} }
obj.insert( obj.insert(
"bootstrap_host".to_string(), "bootstrap_host".to_string(),
Value::Array(hosts.into_iter().map(serde_json::Value::from).collect()), Value::Array(hosts.into_iter().map(Value::from).collect()),
); );
obj.insert( obj.insert(
"bootstrap_port".to_string(), "bootstrap_port".to_string(),
...@@ -209,7 +209,7 @@ impl PDRouter { ...@@ -209,7 +209,7 @@ impl PDRouter {
ports ports
.into_iter() .into_iter()
.map(|p| match p { .map(|p| match p {
Some(v) => serde_json::Value::from(v), Some(v) => Value::from(v),
None => Value::Null, None => Value::Null,
}) })
.collect(), .collect(),
...@@ -217,23 +217,23 @@ impl PDRouter { ...@@ -217,23 +217,23 @@ impl PDRouter {
); );
obj.insert( obj.insert(
"bootstrap_room".to_string(), "bootstrap_room".to_string(),
Value::Array(rooms.into_iter().map(serde_json::Value::from).collect()), Value::Array(rooms.into_iter().map(Value::from).collect()),
); );
} else { } else {
obj.insert( obj.insert(
"bootstrap_host".to_string(), "bootstrap_host".to_string(),
serde_json::Value::from(prefill_worker.bootstrap_host()), Value::from(prefill_worker.bootstrap_host()),
); );
obj.insert( obj.insert(
"bootstrap_port".to_string(), "bootstrap_port".to_string(),
match prefill_worker.bootstrap_port() { match prefill_worker.bootstrap_port() {
Some(v) => serde_json::Value::from(v), Some(v) => Value::from(v),
None => Value::Null, None => Value::Null,
}, },
); );
obj.insert( obj.insert(
"bootstrap_room".to_string(), "bootstrap_room".to_string(),
serde_json::Value::from(super::pd_types::generate_room_id()), Value::from(super::pd_types::generate_room_id()),
); );
} }
Ok(original) Ok(original)
...@@ -508,8 +508,7 @@ impl PDRouter { ...@@ -508,8 +508,7 @@ impl PDRouter {
match res.bytes().await { match res.bytes().await {
Ok(decode_body) => { Ok(decode_body) => {
let mut response = let mut response = Response::new(Body::from(decode_body));
Response::new(axum::body::Body::from(decode_body));
*response.status_mut() = status; *response.status_mut() = status;
*response.headers_mut() = response_headers; *response.headers_mut() = response_headers;
response response
...@@ -1365,7 +1364,7 @@ mod tests { ...@@ -1365,7 +1364,7 @@ mod tests {
assert_eq!(decode_ref.load(), 0); assert_eq!(decode_ref.load(), 0);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); let stream = UnboundedReceiverStream::new(rx);
let _response = router.create_streaming_response( let _response = router.create_streaming_response(
stream.map(Ok), stream.map(Ok),
......
...@@ -95,7 +95,7 @@ impl Router { ...@@ -95,7 +95,7 @@ impl Router {
match res.bytes().await { match res.bytes().await {
Ok(body) => { Ok(body) => {
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = status; *response.status_mut() = status;
*response.headers_mut() = response_headers; *response.headers_mut() = response_headers;
response response
...@@ -315,7 +315,7 @@ impl Router { ...@@ -315,7 +315,7 @@ impl Router {
let response_headers = header_utils::preserve_response_headers(res.headers()); let response_headers = header_utils::preserve_response_headers(res.headers());
match res.bytes().await { match res.bytes().await {
Ok(body) => { Ok(body) => {
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = status; *response.status_mut() = status;
*response.headers_mut() = response_headers; *response.headers_mut() = response_headers;
if status.is_success() { if status.is_success() {
...@@ -496,7 +496,7 @@ impl Router { ...@@ -496,7 +496,7 @@ impl Router {
let response = match res.bytes().await { let response = match res.bytes().await {
Ok(body) => { Ok(body) => {
let mut response = Response::new(axum::body::Body::from(body)); let mut response = Response::new(Body::from(body));
*response.status_mut() = status; *response.status_mut() = status;
*response.headers_mut() = response_headers; *response.headers_mut() = response_headers;
response response
......
...@@ -42,7 +42,7 @@ pub struct Tokenizer(Arc<dyn traits::Tokenizer>); ...@@ -42,7 +42,7 @@ pub struct Tokenizer(Arc<dyn traits::Tokenizer>);
impl Tokenizer { impl Tokenizer {
/// Create a tokenizer from a file path /// Create a tokenizer from a file path
pub fn from_file(file_path: &str) -> Result<Tokenizer> { pub fn from_file(file_path: &str) -> Result<Tokenizer> {
Ok(Tokenizer(factory::create_tokenizer_from_file(file_path)?)) Ok(Tokenizer(create_tokenizer_from_file(file_path)?))
} }
/// Create a tokenizer from a file path with an optional chat template /// Create a tokenizer from a file path with an optional chat template
...@@ -50,7 +50,7 @@ impl Tokenizer { ...@@ -50,7 +50,7 @@ impl Tokenizer {
file_path: &str, file_path: &str,
chat_template_path: Option<&str>, chat_template_path: Option<&str>,
) -> Result<Tokenizer> { ) -> Result<Tokenizer> {
Ok(Tokenizer(factory::create_tokenizer_with_chat_template( Ok(Tokenizer(create_tokenizer_with_chat_template(
file_path, file_path,
chat_template_path, chat_template_path,
)?)) )?))
......
...@@ -124,7 +124,7 @@ impl ToolParser for KimiK2Parser { ...@@ -124,7 +124,7 @@ impl ToolParser for KimiK2Parser {
// Parse function ID // Parse function ID
if let Some((func_name, _index)) = self.parse_function_id(function_id) { if let Some((func_name, _index)) = self.parse_function_id(function_id) {
// Try to parse JSON arguments // Try to parse JSON arguments
match serde_json::from_str::<serde_json::Value>(function_args) { match serde_json::from_str::<Value>(function_args) {
Ok(_) => { Ok(_) => {
tools.push(ToolCall { tools.push(ToolCall {
function: FunctionCall { function: FunctionCall {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment