Unverified Commit c5642a7a authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] use mcp struct from sdk and clean up code across codebase (#12249)

parent 691c8534
......@@ -67,7 +67,7 @@ minijinja = { version = "2.0", features = ["unstable_machinery", "json", "builti
minijinja-contrib = { version = "2.0", features = ["pycompat"] }
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
hf-hub = { version = "0.4.3", features = ["tokio"] }
rmcp = { version = "0.6.3", features = ["client", "server",
rmcp = { version = "0.8.3", features = ["client", "server",
"transport-child-process",
"transport-sse-client-reqwest",
"transport-streamable-http-client-reqwest",
......
use std::collections::HashMap;
// Re-export rmcp types for convenient access
pub use rmcp::model::{Prompt, RawResource, Tool};
use serde::{Deserialize, Serialize};
// ============================================================================
// MCP Data Structures
// ============================================================================
/// Information about an available tool
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolInfo {
pub name: String,
pub description: String,
pub server: String,
pub parameters: Option<serde_json::Value>,
}
/// Information about an available prompt
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptInfo {
pub name: String,
pub description: Option<String>,
pub server: String,
pub arguments: Option<Vec<serde_json::Value>>,
}
/// Information about an available resource
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceInfo {
pub uri: String,
pub name: String,
pub description: Option<String>,
pub mime_type: Option<String>,
pub server: String,
}
// ============================================================================
// Configuration Structures
// ============================================================================
......
......@@ -8,13 +8,13 @@ use std::time::{Duration, Instant};
use dashmap::DashMap;
use crate::mcp::config::{PromptInfo, ResourceInfo, ToolInfo};
use crate::mcp::config::{Prompt, RawResource, Tool};
/// Cached tool with metadata
#[derive(Clone)]
pub struct CachedTool {
pub server_name: String,
pub tool: ToolInfo,
pub tool: Tool,
pub cached_at: Instant,
}
......@@ -22,7 +22,7 @@ pub struct CachedTool {
#[derive(Clone)]
pub struct CachedPrompt {
pub server_name: String,
pub prompt: PromptInfo,
pub prompt: Prompt,
pub cached_at: Instant,
}
......@@ -30,7 +30,7 @@ pub struct CachedPrompt {
#[derive(Clone)]
pub struct CachedResource {
pub server_name: String,
pub resource: ResourceInfo,
pub resource: RawResource,
pub cached_at: Instant,
}
......@@ -74,7 +74,7 @@ impl ToolInventory {
/// Get a tool if it exists and is fresh (within TTL)
///
/// Returns None if the tool doesn't exist or has expired.
pub fn get_tool(&self, tool_name: &str) -> Option<(String, ToolInfo)> {
pub fn get_tool(&self, tool_name: &str) -> Option<(String, Tool)> {
self.tools.get(tool_name).and_then(|entry| {
let cached = entry.value();
......@@ -94,7 +94,7 @@ impl ToolInventory {
}
/// Insert or update a tool
pub fn insert_tool(&self, tool_name: String, server_name: String, tool: ToolInfo) {
pub fn insert_tool(&self, tool_name: String, server_name: String, tool: Tool) {
self.tools.insert(
tool_name,
CachedTool {
......@@ -106,7 +106,7 @@ impl ToolInventory {
}
/// Get all tools (fresh only)
pub fn list_tools(&self) -> Vec<(String, String, ToolInfo)> {
pub fn list_tools(&self) -> Vec<(String, String, Tool)> {
let now = Instant::now();
self.tools
.iter()
......@@ -130,7 +130,7 @@ impl ToolInventory {
// ============================================================================
/// Get a prompt if it exists and is fresh (within TTL)
pub fn get_prompt(&self, prompt_name: &str) -> Option<(String, PromptInfo)> {
pub fn get_prompt(&self, prompt_name: &str) -> Option<(String, Prompt)> {
self.prompts.get(prompt_name).and_then(|entry| {
let cached = entry.value();
......@@ -149,7 +149,7 @@ impl ToolInventory {
}
/// Insert or update a prompt
pub fn insert_prompt(&self, prompt_name: String, server_name: String, prompt: PromptInfo) {
pub fn insert_prompt(&self, prompt_name: String, server_name: String, prompt: Prompt) {
self.prompts.insert(
prompt_name,
CachedPrompt {
......@@ -161,7 +161,7 @@ impl ToolInventory {
}
/// Get all prompts (fresh only)
pub fn list_prompts(&self) -> Vec<(String, String, PromptInfo)> {
pub fn list_prompts(&self) -> Vec<(String, String, Prompt)> {
let now = Instant::now();
self.prompts
.iter()
......@@ -185,7 +185,7 @@ impl ToolInventory {
// ============================================================================
/// Get a resource if it exists and is fresh (within TTL)
pub fn get_resource(&self, resource_uri: &str) -> Option<(String, ResourceInfo)> {
pub fn get_resource(&self, resource_uri: &str) -> Option<(String, RawResource)> {
self.resources.get(resource_uri).and_then(|entry| {
let cached = entry.value();
......@@ -208,7 +208,7 @@ impl ToolInventory {
&self,
resource_uri: String,
server_name: String,
resource: ResourceInfo,
resource: RawResource,
) {
self.resources.insert(
resource_uri,
......@@ -221,7 +221,7 @@ impl ToolInventory {
}
/// Get all resources (fresh only)
pub fn list_resources(&self) -> Vec<(String, String, ResourceInfo)> {
pub fn list_resources(&self) -> Vec<(String, String, RawResource)> {
let now = Instant::now();
self.resources
.iter()
......@@ -316,38 +316,55 @@ impl ToolInventory {
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::config::{Prompt, RawResource, Tool};
// Helper to create a test tool
fn create_test_tool(name: &str) -> ToolInfo {
ToolInfo {
name: name.to_string(),
description: format!("Test tool: {}", name),
server: "test_server".to_string(),
parameters: Some(serde_json::json!({
"type": "object",
"properties": {}
})),
fn create_test_tool(name: &str) -> Tool {
use std::{borrow::Cow, sync::Arc};
let schema_obj = serde_json::json!({
"type": "object",
"properties": {}
});
let schema_map = if let serde_json::Value::Object(m) = schema_obj {
m
} else {
serde_json::Map::new()
};
Tool {
name: Cow::Owned(name.to_string()),
title: None,
description: Some(Cow::Owned(format!("Test tool: {}", name))),
input_schema: Arc::new(schema_map),
output_schema: None,
annotations: None,
icons: None,
}
}
// Helper to create a test prompt
fn create_test_prompt(name: &str) -> PromptInfo {
PromptInfo {
fn create_test_prompt(name: &str) -> Prompt {
Prompt {
name: name.to_string(),
title: None,
description: Some(format!("Test prompt: {}", name)),
server: "test_server".to_string(),
arguments: None,
icons: None,
}
}
// Helper to create a test resource
fn create_test_resource(uri: &str) -> ResourceInfo {
ResourceInfo {
fn create_test_resource(uri: &str) -> RawResource {
RawResource {
uri: uri.to_string(),
name: uri.to_string(),
title: None,
description: Some(format!("Test resource: {}", uri)),
mime_type: Some("text/plain".to_string()),
server: "test_server".to_string(),
size: None,
icons: None,
}
}
......
......@@ -30,10 +30,7 @@ use serde_json::Map;
use tracing::{debug, error, info, warn};
use crate::mcp::{
config::{
McpConfig, McpProxyConfig, McpServerConfig, McpTransport, PromptInfo, ResourceInfo,
ToolInfo,
},
config::{McpConfig, McpProxyConfig, McpServerConfig, McpTransport, Prompt, RawResource, Tool},
connection_pool::McpConnectionPool,
error::{McpError, McpResult},
inventory::ToolInventory,
......@@ -215,7 +212,7 @@ impl McpManager {
// ========================================================================
/// List all available tools from all servers
pub fn list_tools(&self) -> Vec<ToolInfo> {
pub fn list_tools(&self) -> Vec<Tool> {
self.inventory
.list_tools()
.into_iter()
......@@ -239,10 +236,10 @@ impl McpManager {
.ok_or_else(|| McpError::ToolNotFound(tool_name.to_string()))?;
// Convert args with type coercion based on schema
let tool_schema = tool_info.parameters.as_ref();
let tool_schema = Some(serde_json::Value::Object((*tool_info.input_schema).clone()));
let args_map = args
.into()
.into_map(tool_schema)
.into_map(tool_schema.as_ref())
.map_err(McpError::InvalidArguments)?;
// Get client for that server
......@@ -264,7 +261,7 @@ impl McpManager {
}
/// Get a tool by name
pub fn get_tool(&self, tool_name: &str) -> Option<ToolInfo> {
pub fn get_tool(&self, tool_name: &str) -> Option<Tool> {
self.inventory
.get_tool(tool_name)
.map(|(_server_name, tool_info)| tool_info)
......@@ -305,7 +302,7 @@ impl McpManager {
}
/// List all available prompts
pub fn list_prompts(&self) -> Vec<PromptInfo> {
pub fn list_prompts(&self) -> Vec<Prompt> {
self.inventory
.list_prompts()
.into_iter()
......@@ -343,7 +340,7 @@ impl McpManager {
}
/// List all available resources
pub fn list_resources(&self) -> Vec<ResourceInfo> {
pub fn list_resources(&self) -> Vec<RawResource> {
self.inventory
.list_resources()
.into_iter()
......@@ -410,12 +407,12 @@ impl McpManager {
}
/// Get prompt info by name
pub fn get_prompt_info(&self, name: &str) -> Option<PromptInfo> {
pub fn get_prompt_info(&self, name: &str) -> Option<Prompt> {
self.inventory.get_prompt(name).map(|(_server, info)| info)
}
/// Get resource info by URI
pub fn get_resource_info(&self, uri: &str) -> Option<ResourceInfo> {
pub fn get_resource_info(&self, uri: &str) -> Option<RawResource> {
self.inventory.get_resource(uri).map(|(_server, info)| info)
}
......@@ -536,13 +533,7 @@ impl McpManager {
Ok(ts) => {
info!("Discovered {} tools from '{}'", ts.len(), server_name);
for t in ts {
let tool_info = ToolInfo {
name: t.name.to_string(),
description: t.description.as_deref().unwrap_or_default().to_string(),
server: server_name.to_string(),
parameters: Some(serde_json::Value::Object((*t.input_schema).clone())),
};
inventory.insert_tool(t.name.to_string(), server_name.to_string(), tool_info);
inventory.insert_tool(t.name.to_string(), server_name.to_string(), t);
}
}
Err(e) => warn!("Failed to list tools from '{}': {}", server_name, e),
......@@ -553,15 +544,7 @@ impl McpManager {
Ok(ps) => {
info!("Discovered {} prompts from '{}'", ps.len(), server_name);
for p in ps {
let prompt_info = PromptInfo {
name: p.name.clone(),
description: p.description.clone(),
server: server_name.to_string(),
arguments: p.arguments.clone().map(|args| {
args.into_iter().map(|arg| serde_json::json!(arg)).collect()
}),
};
inventory.insert_prompt(p.name.clone(), server_name.to_string(), prompt_info);
inventory.insert_prompt(p.name.clone(), server_name.to_string(), p);
}
}
Err(e) => debug!("No prompts or failed to list on '{}': {}", server_name, e),
......@@ -572,18 +555,7 @@ impl McpManager {
Ok(rs) => {
info!("Discovered {} resources from '{}'", rs.len(), server_name);
for r in rs {
let resource_info = ResourceInfo {
uri: r.uri.clone(),
name: r.name.clone(),
description: r.description.clone(),
mime_type: r.mime_type.clone(),
server: server_name.to_string(),
};
inventory.insert_resource(
r.uri.clone(),
server_name.to_string(),
resource_info,
);
inventory.insert_resource(r.uri.clone(), server_name.to_string(), r.raw);
}
}
Err(e) => debug!("No resources or failed to list on '{}': {}", server_name, e),
......@@ -600,17 +572,8 @@ impl McpManager {
Ok(ts) => {
info!("Discovered {} tools from '{}'", ts.len(), server_name);
for t in ts {
let tool_info = ToolInfo {
name: t.name.to_string(),
description: t.description.as_deref().unwrap_or_default().to_string(),
server: server_name.to_string(),
parameters: Some(serde_json::Value::Object((*t.input_schema).clone())),
};
self.inventory.insert_tool(
t.name.to_string(),
server_name.to_string(),
tool_info,
);
self.inventory
.insert_tool(t.name.to_string(), server_name.to_string(), t);
}
}
Err(e) => warn!("Failed to list tools from '{}': {}", server_name, e),
......@@ -621,19 +584,8 @@ impl McpManager {
Ok(ps) => {
info!("Discovered {} prompts from '{}'", ps.len(), server_name);
for p in ps {
let prompt_info = PromptInfo {
name: p.name.clone(),
description: p.description.clone(),
server: server_name.to_string(),
arguments: p.arguments.clone().map(|args| {
args.into_iter().map(|arg| serde_json::json!(arg)).collect()
}),
};
self.inventory.insert_prompt(
p.name.clone(),
server_name.to_string(),
prompt_info,
);
self.inventory
.insert_prompt(p.name.clone(), server_name.to_string(), p);
}
}
Err(e) => debug!("No prompts or failed to list on '{}': {}", server_name, e),
......@@ -644,18 +596,8 @@ impl McpManager {
Ok(rs) => {
info!("Discovered {} resources from '{}'", rs.len(), server_name);
for r in rs {
let resource_info = ResourceInfo {
uri: r.uri.clone(),
name: r.name.clone(),
description: r.description.clone(),
mime_type: r.mime_type.clone(),
server: server_name.to_string(),
};
self.inventory.insert_resource(
r.uri.clone(),
server_name.to_string(),
resource_info,
);
self.inventory
.insert_resource(r.uri.clone(), server_name.to_string(), r.raw);
}
}
Err(e) => debug!("No resources or failed to list on '{}': {}", server_name, e),
......
......@@ -19,7 +19,7 @@ pub mod tool_args;
// Re-export the main types for convenience
pub use config::{
InventoryConfig, McpConfig, McpPoolConfig, McpProxyConfig, McpServerConfig, McpTransport,
PromptInfo, ResourceInfo, ToolInfo, WarmupServer,
Prompt, RawResource, Tool, WarmupServer,
};
pub use connection_pool::{CachedConnection, McpConnectionPool, PoolStats};
pub use error::{McpError, McpResult};
......
......@@ -94,7 +94,7 @@ impl OAuthHelper {
.map_err(|e| McpError::Auth(format!("Failed to initialize OAuth: {}", e)))?;
oauth_state
.start_authorization(scopes, &self.redirect_uri)
.start_authorization(scopes, &self.redirect_uri, None)
.await
.map_err(|e| McpError::Auth(format!("Failed to start authorization: {}", e)))?;
......@@ -111,7 +111,7 @@ impl OAuthHelper {
// Exchange code for token
oauth_state
.handle_callback(&auth_code)
.handle_callback(&auth_code, "")
.await
.map_err(|e| McpError::Auth(format!("Failed to handle OAuth callback: {}", e)))?;
......
......@@ -254,19 +254,15 @@ impl ResponseStreamEventEmitter {
pub(super) fn emit_mcp_list_tools_completed(
&mut self,
output_index: usize,
tools: &[crate::mcp::ToolInfo],
tools: &[crate::mcp::Tool],
) -> serde_json::Value {
let tool_items: Vec<_> = tools
.iter()
.map(|t| {
json!({
"name": t.name,
"description": t.description,
"input_schema": t.parameters.clone().unwrap_or_else(|| json!({
"type": "object",
"properties": {},
"required": []
}))
"name": &t.name,
"description": &t.description,
"input_schema": t.input_schema.clone()
})
})
.collect();
......
......@@ -160,19 +160,16 @@ fn build_mcp_list_tools_item(
let tools = mcp.list_tools();
let tools_info: Vec<McpToolInfo> = tools
.iter()
.map(|t| McpToolInfo {
name: t.name.clone(),
description: Some(t.description.clone()),
input_schema: t.parameters.clone().unwrap_or_else(|| {
json!({
"type": "object",
"properties": {},
"additionalProperties": false
})
}),
annotations: Some(json!({
"read_only": false
})),
.map(|t| {
use serde_json::Value;
McpToolInfo {
name: t.name.to_string(),
description: t.description.as_ref().map(|d| d.to_string()),
input_schema: Value::Object((*t.input_schema).clone()),
annotations: Some(json!({
"read_only": false
})),
}
})
.collect();
......@@ -593,14 +590,11 @@ async fn execute_tool_loop_streaming_internal(
let tool_items: Vec<_> = mcp_tools
.iter()
.map(|t| {
use serde_json::Value;
json!({
"name": t.name,
"description": t.description,
"input_schema": t.parameters.clone().unwrap_or_else(|| json!({
"type": "object",
"properties": {},
"required": []
}))
"input_schema": Value::Object((*t.input_schema).clone())
})
})
.collect();
......@@ -925,21 +919,16 @@ async fn execute_tool_loop_streaming_internal(
}
/// Convert MCP tools to Chat API tool format
fn convert_mcp_tools_to_chat_tools(mcp_tools: &[crate::mcp::ToolInfo]) -> Vec<Tool> {
fn convert_mcp_tools_to_chat_tools(mcp_tools: &[crate::mcp::Tool]) -> Vec<Tool> {
use serde_json::Value;
mcp_tools
.iter()
.map(|tool_info| Tool {
tool_type: "function".to_string(),
function: crate::protocols::common::Function {
name: tool_info.name.clone(),
description: Some(tool_info.description.clone()),
parameters: tool_info.parameters.clone().unwrap_or_else(|| {
json!({
"type": "object",
"properties": {},
"required": []
})
}),
name: tool_info.name.to_string(),
description: tool_info.description.as_ref().map(|d| d.to_string()),
parameters: Value::Object((*tool_info.input_schema).clone()),
strict: None,
},
})
......
......@@ -295,11 +295,7 @@ pub(super) fn prepare_mcp_payload_for_streaming(
let mut tools_json = Vec::new();
let tools = active_mcp.list_tools();
for t in tools {
let parameters = t.parameters.clone().unwrap_or(serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false
}));
let parameters = Value::Object((*t.input_schema).clone());
let tool = serde_json::json!({
"type": event_types::ITEM_TYPE_FUNCTION,
"name": t.name,
......@@ -864,11 +860,7 @@ pub(super) fn build_mcp_list_tools_item(mcp: &Arc<mcp::McpManager>, server_label
json!({
"name": t.name,
"description": t.description,
"input_schema": t.parameters.clone().unwrap_or_else(|| json!({
"type": "object",
"properties": {},
"additionalProperties": false
})),
"input_schema": Value::Object((*t.input_schema).clone()),
"annotations": {
"read_only": false
}
......
......@@ -373,13 +373,18 @@ async fn test_tool_info_structure() {
let tools = manager.list_tools();
let brave_search = tools
.iter()
.find(|t| t.name == "brave_web_search")
.find(|t| t.name.as_ref() == "brave_web_search")
.expect("Should have brave_web_search tool");
assert_eq!(brave_search.name, "brave_web_search");
assert!(brave_search.description.contains("Mock web search"));
assert_eq!(brave_search.server, "mock_server");
assert!(brave_search.parameters.is_some());
assert_eq!(brave_search.name.as_ref(), "brave_web_search");
assert!(brave_search
.description
.as_ref()
.map(|d| d.contains("Mock web search"))
.unwrap_or(false));
// Note: server information is now maintained separately in the inventory,
// not in the Tool type itself
assert!(!brave_search.input_schema.is_empty());
}
// SSE Parsing Tests (simplified since we don't expose parse_sse_event)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment