"vscode:/vscode.git/clone" did not exist on "8841d0d1a9e63bda03e36c072d9d3d0692f07be4"
Unverified Commit eb7d9261 authored by Keyang Ru's avatar Keyang Ru Committed by GitHub
Browse files

[router] conversation item API: create, retrieve and delete (#11369)

parent 44cb0607
...@@ -142,6 +142,50 @@ impl ConversationItemStorage for MemoryConversationItemStorage { ...@@ -142,6 +142,50 @@ impl ConversationItemStorage for MemoryConversationItemStorage {
Ok(results) Ok(results)
} }
async fn get_item(&self, item_id: &ConversationItemId) -> Result<Option<ConversationItem>> {
let items = self.items.read().unwrap();
Ok(items.get(item_id).cloned())
}
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<bool> {
let rev = self.rev_index.read().unwrap();
if let Some(conv_idx) = rev.get(conversation_id) {
Ok(conv_idx.contains_key(&item_id.0))
} else {
Ok(false)
}
}
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<()> {
// Get the key from rev_index and remove the entry at the same time
let key_to_remove = {
let mut rev = self.rev_index.write().unwrap();
if let Some(conv_idx) = rev.get_mut(conversation_id) {
conv_idx.remove(&item_id.0)
} else {
None
}
};
// If the item was in rev_index, remove it from links as well
if let Some(key) = key_to_remove {
let mut links = self.links.write().unwrap();
if let Some(conv_links) = links.get_mut(conversation_id) {
conv_links.remove(&key);
}
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -243,6 +243,92 @@ impl ConversationItemStorage for OracleConversationItemStorage { ...@@ -243,6 +243,92 @@ impl ConversationItemStorage for OracleConversationItemStorage {
) )
.collect() .collect()
} }
async fn get_item(&self, item_id: &ConversationItemId) -> ItemResult<Option<ConversationItem>> {
let iid = item_id.0.clone();
self.with_connection(move |conn| {
let mut stmt = conn
.statement(
"SELECT id, response_id, item_type, role, content, status, created_at \
FROM conversation_items WHERE id = :1",
)
.build()
.map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&iid]).map_err(map_oracle_error)?;
if let Some(row_res) = rows.next() {
let row = row_res.map_err(map_oracle_error)?;
let id: String = row.get(0).map_err(map_oracle_error)?;
let response_id: Option<String> = row.get(1).map_err(map_oracle_error)?;
let item_type: String = row.get(2).map_err(map_oracle_error)?;
let role: Option<String> = row.get(3).map_err(map_oracle_error)?;
let content_raw: Option<String> = row.get(4).map_err(map_oracle_error)?;
let status: Option<String> = row.get(5).map_err(map_oracle_error)?;
let created_at: DateTime<Utc> = row.get(6).map_err(map_oracle_error)?;
let content = match content_raw {
Some(s) => serde_json::from_str(&s)?,
None => Value::Null,
};
Ok(Some(ConversationItem {
id: ConversationItemId(id),
response_id,
item_type,
role,
content,
status,
created_at,
}))
} else {
Ok(None)
}
})
.await
}
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ItemResult<bool> {
let cid = conversation_id.0.clone();
let iid = item_id.0.clone();
self.with_connection(move |conn| {
let count: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2",
&[&cid, &iid],
)
.map_err(map_oracle_error)?;
Ok(count > 0)
})
.await
}
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ItemResult<()> {
let cid = conversation_id.0.clone();
let iid = item_id.0.clone();
self.with_connection(move |conn| {
// Delete ONLY the link (do not delete the item itself)
conn.execute(
"DELETE FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2",
&[&cid, &iid],
)
.map_err(map_oracle_error)?;
Ok(())
})
.await
}
} }
#[derive(Clone)] #[derive(Clone)]
......
...@@ -94,15 +94,32 @@ pub trait ConversationItemStorage: Send + Sync + 'static { ...@@ -94,15 +94,32 @@ pub trait ConversationItemStorage: Send + Sync + 'static {
conversation_id: &ConversationId, conversation_id: &ConversationId,
params: ListParams, params: ListParams,
) -> Result<Vec<ConversationItem>>; ) -> Result<Vec<ConversationItem>>;
/// Get a single item by ID
async fn get_item(&self, item_id: &ConversationItemId) -> Result<Option<ConversationItem>>;
/// Check if an item is linked to a conversation
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<bool>;
/// Delete an item link from a conversation (does not delete the item itself)
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<()>;
} }
pub type SharedConversationItemStorage = Arc<dyn ConversationItemStorage>; pub type SharedConversationItemStorage = Arc<dyn ConversationItemStorage>;
/// Helper to build id prefix based on item_type /// Helper to build id prefix based on item_type
pub fn make_item_id(item_type: &str) -> ConversationItemId { pub fn make_item_id(item_type: &str) -> ConversationItemId {
// Generate a 24-byte random hex string (48 hex chars), consistent with conversation id style // Generate exactly 50 hex characters (25 bytes) for the part after the underscore
let mut rng = rand::rng(); let mut rng = rand::rng();
let mut bytes = [0u8; 24]; let mut bytes = [0u8; 25];
rng.fill_bytes(&mut bytes); rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
......
...@@ -190,6 +190,50 @@ pub trait RouterTrait: Send + Sync + Debug { ...@@ -190,6 +190,50 @@ pub trait RouterTrait: Send + Sync + Debug {
.into_response() .into_response()
} }
/// Create items in a conversation
async fn create_conversation_items(
&self,
_headers: Option<&HeaderMap>,
_conversation_id: &str,
_body: &Value,
) -> Response {
(
StatusCode::NOT_IMPLEMENTED,
"Conversation items create endpoint not implemented",
)
.into_response()
}
/// Get a single conversation item
/// The `include` parameter is accepted but not yet implemented
async fn get_conversation_item(
&self,
_headers: Option<&HeaderMap>,
_conversation_id: &str,
_item_id: &str,
_include: Option<Vec<String>>,
) -> Response {
(
StatusCode::NOT_IMPLEMENTED,
"Conversation item get endpoint not implemented",
)
.into_response()
}
/// Delete a conversation item
async fn delete_conversation_item(
&self,
_headers: Option<&HeaderMap>,
_conversation_id: &str,
_item_id: &str,
) -> Response {
(
StatusCode::NOT_IMPLEMENTED,
"Conversation item delete endpoint not implemented",
)
.into_response()
}
/// Get router type name /// Get router type name
fn router_type(&self) -> &'static str; fn router_type(&self) -> &'static str;
......
...@@ -829,7 +829,8 @@ pub(super) fn build_incomplete_response( ...@@ -829,7 +829,8 @@ pub(super) fn build_incomplete_response(
pub(super) fn generate_mcp_id(prefix: &str) -> String { pub(super) fn generate_mcp_id(prefix: &str) -> String {
use rand::RngCore; use rand::RngCore;
let mut rng = rand::rng(); let mut rng = rand::rng();
let mut bytes = [0u8; 30]; // Generate exactly 50 hex characters (25 bytes) for the part after the underscore
let mut bytes = [0u8; 25];
rng.fill_bytes(&mut bytes); rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
format!("{}_{}", prefix, hex_string) format!("{}_{}", prefix, hex_string)
......
//! Response storage, patching, and extraction utilities //! Response storage, patching, and extraction utilities
use crate::data_connector::{ResponseId, SharedResponseStorage, StoredResponse}; use crate::data_connector::{ResponseId, StoredResponse};
use crate::protocols::spec::{ResponseInput, ResponseToolType, ResponsesRequest}; use crate::protocols::spec::{ResponseInput, ResponseToolType, ResponsesRequest};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
use tracing::{info, warn}; use tracing::warn;
use super::utils::event_types; use super::utils::event_types;
...@@ -12,25 +12,6 @@ use super::utils::event_types; ...@@ -12,25 +12,6 @@ use super::utils::event_types;
// Response Storage Operations // Response Storage Operations
// ============================================================================ // ============================================================================
/// Store a response internally (checks if storage is enabled)
pub(super) async fn store_response_internal(
response_storage: &SharedResponseStorage,
response_json: &Value,
original_body: &ResponsesRequest,
) -> Result<(), String> {
if !original_body.store {
return Ok(());
}
match store_response_impl(response_storage, response_json, original_body).await {
Ok(response_id) => {
info!(response_id = %response_id.0, "Stored response locally");
Ok(())
}
Err(e) => Err(e),
}
}
/// Build a StoredResponse from response JSON and original request /// Build a StoredResponse from response JSON and original request
pub(super) fn build_stored_response( pub(super) fn build_stored_response(
response_json: &Value, response_json: &Value,
...@@ -98,20 +79,6 @@ pub(super) fn build_stored_response( ...@@ -98,20 +79,6 @@ pub(super) fn build_stored_response(
stored_response stored_response
} }
/// Store response implementation (public for use across modules)
pub(super) async fn store_response_impl(
response_storage: &SharedResponseStorage,
response_json: &Value,
original_body: &ResponsesRequest,
) -> Result<ResponseId, String> {
let stored_response = build_stored_response(response_json, original_body);
response_storage
.store_response(stored_response)
.await
.map_err(|e| format!("Failed to store response: {}", e))
}
// ============================================================================ // ============================================================================
// Response JSON Patching // Response JSON Patching
// ============================================================================ // ============================================================================
......
...@@ -31,14 +31,15 @@ use tracing::{info, warn}; ...@@ -31,14 +31,15 @@ use tracing::{info, warn};
// Import from sibling modules // Import from sibling modules
use super::conversations::{ use super::conversations::{
create_conversation, delete_conversation, get_conversation, list_conversation_items, create_conversation, create_conversation_items, delete_conversation, delete_conversation_item,
persist_conversation_items, update_conversation, get_conversation, get_conversation_item, list_conversation_items, persist_conversation_items,
update_conversation,
}; };
use super::mcp::{ use super::mcp::{
execute_tool_loop, mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming, execute_tool_loop, mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming,
McpLoopConfig, McpLoopConfig,
}; };
use super::responses::{mask_tools_as_mcp, patch_streaming_response_json, store_response_internal}; use super::responses::{mask_tools_as_mcp, patch_streaming_response_json};
use super::streaming::handle_streaming_response; use super::streaming::handle_streaming_response;
// ============================================================================ // ============================================================================
...@@ -230,26 +231,17 @@ impl OpenAIRouter { ...@@ -230,26 +231,17 @@ impl OpenAIRouter {
original_previous_response_id.as_deref(), original_previous_response_id.as_deref(),
); );
// Persist conversation items if conversation is provided // Always persist conversation items and response (even without conversation)
if original_body.conversation.is_some() { if let Err(err) = persist_conversation_items(
if let Err(err) = persist_conversation_items( self.conversation_storage.clone(),
self.conversation_storage.clone(), self.conversation_item_storage.clone(),
self.conversation_item_storage.clone(), self.response_storage.clone(),
self.response_storage.clone(), &response_json,
&response_json, original_body,
original_body, )
) .await
.await {
{ warn!("Failed to persist conversation items: {}", err);
warn!("Failed to persist conversation items: {}", err);
}
} else {
// Store response only if no conversation (persist_conversation_items already stores it)
if let Err(err) =
store_response_internal(&self.response_storage, &response_json, original_body).await
{
warn!("Failed to store response: {}", err);
}
} }
(StatusCode::OK, Json(response_json)).into_response() (StatusCode::OK, Json(response_json)).into_response()
...@@ -906,4 +898,51 @@ impl crate::routers::RouterTrait for OpenAIRouter { ...@@ -906,4 +898,51 @@ impl crate::routers::RouterTrait for OpenAIRouter {
) )
.await .await
} }
async fn create_conversation_items(
&self,
_headers: Option<&HeaderMap>,
conversation_id: &str,
body: &Value,
) -> Response {
create_conversation_items(
&self.conversation_storage,
&self.conversation_item_storage,
conversation_id,
body.clone(),
)
.await
}
async fn get_conversation_item(
&self,
_headers: Option<&HeaderMap>,
conversation_id: &str,
item_id: &str,
include: Option<Vec<String>>,
) -> Response {
get_conversation_item(
&self.conversation_storage,
&self.conversation_item_storage,
conversation_id,
item_id,
include,
)
.await
}
async fn delete_conversation_item(
&self,
_headers: Option<&HeaderMap>,
conversation_id: &str,
item_id: &str,
) -> Response {
delete_conversation_item(
&self.conversation_storage,
&self.conversation_item_storage,
conversation_id,
item_id,
)
.await
}
} }
...@@ -32,9 +32,7 @@ use super::mcp::{ ...@@ -32,9 +32,7 @@ use super::mcp::{
mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming, send_mcp_list_tools_events, mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming, send_mcp_list_tools_events,
McpLoopConfig, ToolLoopState, McpLoopConfig, ToolLoopState,
}; };
use super::responses::{ use super::responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block};
mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, store_response_impl,
};
use super::utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction}; use super::utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction};
// ============================================================================ // ============================================================================
...@@ -1082,26 +1080,17 @@ pub(super) async fn handle_simple_streaming_passthrough( ...@@ -1082,26 +1080,17 @@ pub(super) async fn handle_simple_streaming_passthrough(
previous_response_id.as_deref(), previous_response_id.as_deref(),
); );
if persist_needed { // Always persist conversation items and response (even without conversation)
if let Err(err) = persist_conversation_items( if let Err(err) = persist_conversation_items(
conversation_storage.clone(), conversation_storage.clone(),
conversation_item_storage.clone(), conversation_item_storage.clone(),
response_storage.clone(), response_storage.clone(),
&response_json, &response_json,
&original_request, &original_request,
) )
.await .await
{ {
warn!("Failed to persist conversation items (stream): {}", err); warn!("Failed to persist conversation items (stream): {}", err);
}
} else if should_store {
// Store response only if no conversation (persist_conversation_items already stores it)
if let Err(err) =
store_response_impl(&response_storage, &response_json, &original_request)
.await
{
warn!("Failed to store streaming response: {}", err);
}
} }
} else if let Some(error_payload) = encountered_error { } else if let Some(error_payload) = encountered_error {
warn!("Upstream streaming error payload: {}", error_payload); warn!("Upstream streaming error payload: {}", error_payload);
...@@ -1390,32 +1379,20 @@ pub(super) async fn handle_streaming_with_tool_interception( ...@@ -1390,32 +1379,20 @@ pub(super) async fn handle_streaming_with_tool_interception(
previous_response_id.as_deref(), previous_response_id.as_deref(),
); );
if persist_needed { // Always persist conversation items and response (even without conversation)
if let Err(err) = persist_conversation_items( if let Err(err) = persist_conversation_items(
conversation_storage.clone(), conversation_storage.clone(),
conversation_item_storage.clone(), conversation_item_storage.clone(),
response_storage.clone(), response_storage.clone(),
&response_json, &response_json,
&original_request, &original_request,
) )
.await .await
{ {
warn!( warn!(
"Failed to persist conversation items (stream + MCP): {}", "Failed to persist conversation items (stream + MCP): {}",
err err
); );
}
} else if should_store {
// Store response only if no conversation (persist_conversation_items already stores it)
if let Err(err) = store_response_impl(
&response_storage,
&response_json,
&original_request,
)
.await
{
warn!("Failed to store streaming response: {}", err);
}
} }
} }
......
...@@ -614,6 +614,76 @@ impl RouterTrait for RouterManager { ...@@ -614,6 +614,76 @@ impl RouterTrait for RouterManager {
.into_response() .into_response()
} }
} }
async fn create_conversation_items(
&self,
headers: Option<&HeaderMap>,
conversation_id: &str,
body: &Value,
) -> Response {
let router = self.select_router_for_request(headers, None);
if let Some(router) = router {
router
.create_conversation_items(headers, conversation_id, body)
.await
} else {
(
StatusCode::NOT_FOUND,
format!(
"No router available to create conversation items for '{}'",
conversation_id
),
)
.into_response()
}
}
async fn get_conversation_item(
&self,
headers: Option<&HeaderMap>,
conversation_id: &str,
item_id: &str,
include: Option<Vec<String>>,
) -> Response {
let router = self.select_router_for_request(headers, None);
if let Some(router) = router {
router
.get_conversation_item(headers, conversation_id, item_id, include)
.await
} else {
(
StatusCode::NOT_FOUND,
format!(
"No router available to get conversation item '{}' in '{}'",
item_id, conversation_id
),
)
.into_response()
}
}
async fn delete_conversation_item(
&self,
headers: Option<&HeaderMap>,
conversation_id: &str,
item_id: &str,
) -> Response {
let router = self.select_router_for_request(headers, None);
if let Some(router) = router {
router
.delete_conversation_item(headers, conversation_id, item_id)
.await
} else {
(
StatusCode::NOT_FOUND,
format!(
"No router available to delete conversation item '{}' in '{}'",
item_id, conversation_id
),
)
.into_response()
}
}
} }
impl std::fmt::Debug for RouterManager { impl std::fmt::Debug for RouterManager {
......
...@@ -440,6 +440,47 @@ async fn v1_conversations_list_items( ...@@ -440,6 +440,47 @@ async fn v1_conversations_list_items(
.await .await
} }
#[derive(Deserialize, Default)]
struct GetItemQuery {
/// Additional fields to include in response (not yet implemented)
include: Option<Vec<String>>,
}
async fn v1_conversations_create_items(
State(state): State<Arc<AppState>>,
Path(conversation_id): Path<String>,
headers: http::HeaderMap,
Json(body): Json<Value>,
) -> Response {
state
.router
.create_conversation_items(Some(&headers), &conversation_id, &body)
.await
}
async fn v1_conversations_get_item(
State(state): State<Arc<AppState>>,
Path((conversation_id, item_id)): Path<(String, String)>,
Query(query): Query<GetItemQuery>,
headers: http::HeaderMap,
) -> Response {
state
.router
.get_conversation_item(Some(&headers), &conversation_id, &item_id, query.include)
.await
}
async fn v1_conversations_delete_item(
State(state): State<Arc<AppState>>,
Path((conversation_id, item_id)): Path<(String, String)>,
headers: http::HeaderMap,
) -> Response {
state
.router
.delete_conversation_item(Some(&headers), &conversation_id, &item_id)
.await
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct AddWorkerQuery { struct AddWorkerQuery {
url: String, url: String,
...@@ -716,7 +757,11 @@ pub fn build_app( ...@@ -716,7 +757,11 @@ pub fn build_app(
) )
.route( .route(
"/v1/conversations/{conversation_id}/items", "/v1/conversations/{conversation_id}/items",
get(v1_conversations_list_items), get(v1_conversations_list_items).post(v1_conversations_create_items),
)
.route(
"/v1/conversations/{conversation_id}/items/{item_id}",
get(v1_conversations_get_item).delete(v1_conversations_delete_item),
) )
.route_layer(axum::middleware::from_fn_with_state( .route_layer(axum::middleware::from_fn_with_state(
app_state.clone(), app_state.clone(),
......
...@@ -1333,3 +1333,519 @@ async fn test_streaming_multi_turn_with_mcp() { ...@@ -1333,3 +1333,519 @@ async fn test_streaming_multi_turn_with_mcp() {
worker.stop().await; worker.stop().await;
mcp.stop().await; mcp.stop().await;
} }
#[tokio::test]
async fn test_conversation_items_create_and_get() {
// Test creating items and getting a specific item
let router_cfg = RouterConfig {
mode: RoutingMode::OpenAI {
worker_urls: vec!["http://localhost".to_string()],
},
connection_mode: ConnectionMode::Http,
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 0,
max_payload_size: 8 * 1024 * 1024,
request_timeout_secs: 60,
worker_startup_timeout_secs: 1,
worker_startup_check_interval_secs: 1,
dp_aware: false,
api_key: None,
discovery: None,
metrics: None,
log_dir: None,
log_level: Some("warn".to_string()),
request_id_headers: None,
max_concurrent_requests: 8,
queue_size: 0,
queue_timeout_secs: 5,
rate_limit_tokens_per_second: None,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
enable_igw: false,
model_path: None,
tokenizer_path: None,
history_backend: sglang_router_rs::config::HistoryBackend::Memory,
oracle: None,
reasoning_parser: None,
tool_call_parser: None,
};
let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx");
let router = RouterFactory::create_router(&Arc::new(ctx))
.await
.expect("router");
// Create conversation
let create_conv = serde_json::json!({});
let conv_resp = router.create_conversation(None, &create_conv).await;
assert_eq!(conv_resp.status(), StatusCode::OK);
let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap();
let conv_id = conv_json["id"].as_str().unwrap();
// Create items
let create_items = serde_json::json!({
"items": [
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Hello"}]
},
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "Hi there!"}]
}
]
});
let items_resp = router
.create_conversation_items(None, conv_id, &create_items)
.await;
assert_eq!(items_resp.status(), StatusCode::OK);
let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX)
.await
.unwrap();
let items_json: serde_json::Value = serde_json::from_slice(&items_bytes).unwrap();
// Verify response structure
assert_eq!(items_json["object"], "list");
assert!(items_json["data"].is_array());
// Get first item
let item_id = items_json["data"][0]["id"].as_str().unwrap();
let get_resp = router
.get_conversation_item(None, conv_id, item_id, None)
.await;
assert_eq!(get_resp.status(), StatusCode::OK);
let get_bytes = axum::body::to_bytes(get_resp.into_body(), usize::MAX)
.await
.unwrap();
let get_json: serde_json::Value = serde_json::from_slice(&get_bytes).unwrap();
// Verify item structure
assert_eq!(get_json["id"], item_id);
assert_eq!(get_json["type"], "message");
assert_eq!(get_json["role"], "user");
}
#[tokio::test]
async fn test_conversation_items_delete() {
// Test deleting an item from a conversation
let router_cfg = RouterConfig {
mode: RoutingMode::OpenAI {
worker_urls: vec!["http://localhost".to_string()],
},
connection_mode: ConnectionMode::Http,
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 0,
max_payload_size: 8 * 1024 * 1024,
request_timeout_secs: 60,
worker_startup_timeout_secs: 1,
worker_startup_check_interval_secs: 1,
dp_aware: false,
api_key: None,
discovery: None,
metrics: None,
log_dir: None,
log_level: Some("warn".to_string()),
request_id_headers: None,
max_concurrent_requests: 8,
queue_size: 0,
queue_timeout_secs: 5,
rate_limit_tokens_per_second: None,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
enable_igw: false,
model_path: None,
tokenizer_path: None,
history_backend: sglang_router_rs::config::HistoryBackend::Memory,
oracle: None,
reasoning_parser: None,
tool_call_parser: None,
};
let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx");
let router = RouterFactory::create_router(&Arc::new(ctx))
.await
.expect("router");
// Create conversation
let create_conv = serde_json::json!({});
let conv_resp = router.create_conversation(None, &create_conv).await;
let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap();
let conv_id = conv_json["id"].as_str().unwrap();
// Create item
let create_items = serde_json::json!({
"items": [
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Test"}]
}
]
});
let items_resp = router
.create_conversation_items(None, conv_id, &create_items)
.await;
let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX)
.await
.unwrap();
let items_json: serde_json::Value = serde_json::from_slice(&items_bytes).unwrap();
let item_id = items_json["data"][0]["id"].as_str().unwrap();
// List items (should have 1)
let list_resp = router
.list_conversation_items(None, conv_id, None, None, None)
.await;
let list_bytes = axum::body::to_bytes(list_resp.into_body(), usize::MAX)
.await
.unwrap();
let list_json: serde_json::Value = serde_json::from_slice(&list_bytes).unwrap();
assert_eq!(list_json["data"].as_array().unwrap().len(), 1);
// Delete item
let del_resp = router
.delete_conversation_item(None, conv_id, item_id)
.await;
assert_eq!(del_resp.status(), StatusCode::OK);
// List items again (should have 0)
let list_resp2 = router
.list_conversation_items(None, conv_id, None, None, None)
.await;
let list_bytes2 = axum::body::to_bytes(list_resp2.into_body(), usize::MAX)
.await
.unwrap();
let list_json2: serde_json::Value = serde_json::from_slice(&list_bytes2).unwrap();
assert_eq!(list_json2["data"].as_array().unwrap().len(), 0);
// Item should NOT be gettable from this conversation after deletion (link removed)
let get_resp = router
.get_conversation_item(None, conv_id, item_id, None)
.await;
assert_eq!(get_resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_conversation_items_max_limit() {
// Test that creating > 20 items returns error
let router_cfg = RouterConfig {
mode: RoutingMode::OpenAI {
worker_urls: vec!["http://localhost".to_string()],
},
connection_mode: ConnectionMode::Http,
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 0,
max_payload_size: 8 * 1024 * 1024,
request_timeout_secs: 60,
worker_startup_timeout_secs: 1,
worker_startup_check_interval_secs: 1,
dp_aware: false,
api_key: None,
discovery: None,
metrics: None,
log_dir: None,
log_level: Some("warn".to_string()),
request_id_headers: None,
max_concurrent_requests: 8,
queue_size: 0,
queue_timeout_secs: 5,
rate_limit_tokens_per_second: None,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
enable_igw: false,
model_path: None,
tokenizer_path: None,
history_backend: sglang_router_rs::config::HistoryBackend::Memory,
oracle: None,
reasoning_parser: None,
tool_call_parser: None,
};
let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx");
let router = RouterFactory::create_router(&Arc::new(ctx))
.await
.expect("router");
// Create conversation
let create_conv = serde_json::json!({});
let conv_resp = router.create_conversation(None, &create_conv).await;
let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap();
let conv_id = conv_json["id"].as_str().unwrap();
// Try to create 21 items (over limit)
let mut items = Vec::new();
for i in 0..21 {
items.push(serde_json::json!({
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": format!("Message {}", i)}]
}));
}
let create_items = serde_json::json!({"items": items});
let items_resp = router
.create_conversation_items(None, conv_id, &create_items)
.await;
assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST);
let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX)
.await
.unwrap();
let items_text = String::from_utf8_lossy(&items_bytes);
assert!(items_text.contains("Cannot add more than 20 items"));
}
#[tokio::test]
async fn test_conversation_items_unsupported_type() {
// Test that unsupported item types return error
let router_cfg = RouterConfig {
mode: RoutingMode::OpenAI {
worker_urls: vec!["http://localhost".to_string()],
},
connection_mode: ConnectionMode::Http,
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 0,
max_payload_size: 8 * 1024 * 1024,
request_timeout_secs: 60,
worker_startup_timeout_secs: 1,
worker_startup_check_interval_secs: 1,
dp_aware: false,
api_key: None,
discovery: None,
metrics: None,
log_dir: None,
log_level: Some("warn".to_string()),
request_id_headers: None,
max_concurrent_requests: 8,
queue_size: 0,
queue_timeout_secs: 5,
rate_limit_tokens_per_second: None,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
enable_igw: false,
model_path: None,
tokenizer_path: None,
history_backend: sglang_router_rs::config::HistoryBackend::Memory,
oracle: None,
reasoning_parser: None,
tool_call_parser: None,
};
let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx");
let router = RouterFactory::create_router(&Arc::new(ctx))
.await
.expect("router");
// Create conversation
let create_conv = serde_json::json!({});
let conv_resp = router.create_conversation(None, &create_conv).await;
let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap();
let conv_id = conv_json["id"].as_str().unwrap();
// Try to create item with completely unsupported type
let create_items = serde_json::json!({
"items": [
{
"type": "totally_invalid_type",
"content": []
}
]
});
let items_resp = router
.create_conversation_items(None, conv_id, &create_items)
.await;
assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST);
let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX)
.await
.unwrap();
let items_text = String::from_utf8_lossy(&items_bytes);
assert!(items_text.contains("Unsupported item type"));
}
#[tokio::test]
async fn test_conversation_items_multi_conversation_sharing() {
// Test that items can be shared across conversations via soft delete
let router_cfg = RouterConfig {
mode: RoutingMode::OpenAI {
worker_urls: vec!["http://localhost".to_string()],
},
connection_mode: ConnectionMode::Http,
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 0,
max_payload_size: 8 * 1024 * 1024,
request_timeout_secs: 60,
worker_startup_timeout_secs: 1,
worker_startup_check_interval_secs: 1,
dp_aware: false,
api_key: None,
discovery: None,
metrics: None,
log_dir: None,
log_level: Some("warn".to_string()),
request_id_headers: None,
max_concurrent_requests: 8,
queue_size: 0,
queue_timeout_secs: 5,
rate_limit_tokens_per_second: None,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
enable_igw: false,
model_path: None,
tokenizer_path: None,
history_backend: sglang_router_rs::config::HistoryBackend::Memory,
oracle: None,
reasoning_parser: None,
tool_call_parser: None,
};
let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx");
let router = RouterFactory::create_router(&Arc::new(ctx))
.await
.expect("router");
// Create two conversations
let conv_a_resp = router
.create_conversation(None, &serde_json::json!({}))
.await;
let conv_a_bytes = axum::body::to_bytes(conv_a_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_a_json: serde_json::Value = serde_json::from_slice(&conv_a_bytes).unwrap();
let conv_a_id = conv_a_json["id"].as_str().unwrap();
let conv_b_resp = router
.create_conversation(None, &serde_json::json!({}))
.await;
let conv_b_bytes = axum::body::to_bytes(conv_b_resp.into_body(), usize::MAX)
.await
.unwrap();
let conv_b_json: serde_json::Value = serde_json::from_slice(&conv_b_bytes).unwrap();
let conv_b_id = conv_b_json["id"].as_str().unwrap();
// Create item in conversation A
let create_items = serde_json::json!({
"items": [
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Shared message"}]
}
]
});
let items_a_resp = router
.create_conversation_items(None, conv_a_id, &create_items)
.await;
let items_a_bytes = axum::body::to_bytes(items_a_resp.into_body(), usize::MAX)
.await
.unwrap();
let items_a_json: serde_json::Value = serde_json::from_slice(&items_a_bytes).unwrap();
let item_id = items_a_json["data"][0]["id"].as_str().unwrap();
// Reference the same item in conversation B
let reference_items = serde_json::json!({
"items": [
{
"type": "item_reference",
"id": item_id
}
]
});
let items_b_resp = router
.create_conversation_items(None, conv_b_id, &reference_items)
.await;
assert_eq!(items_b_resp.status(), StatusCode::OK);
// Verify item appears in both conversations
let list_a = router
.list_conversation_items(None, conv_a_id, None, None, None)
.await;
let list_a_bytes = axum::body::to_bytes(list_a.into_body(), usize::MAX)
.await
.unwrap();
let list_a_json: serde_json::Value = serde_json::from_slice(&list_a_bytes).unwrap();
assert_eq!(list_a_json["data"].as_array().unwrap().len(), 1);
let list_b = router
.list_conversation_items(None, conv_b_id, None, None, None)
.await;
let list_b_bytes = axum::body::to_bytes(list_b.into_body(), usize::MAX)
.await
.unwrap();
let list_b_json: serde_json::Value = serde_json::from_slice(&list_b_bytes).unwrap();
assert_eq!(list_b_json["data"].as_array().unwrap().len(), 1);
// Delete from conversation A
router
.delete_conversation_item(None, conv_a_id, item_id)
.await;
// Should be removed from A
let list_a2 = router
.list_conversation_items(None, conv_a_id, None, None, None)
.await;
let list_a2_bytes = axum::body::to_bytes(list_a2.into_body(), usize::MAX)
.await
.unwrap();
let list_a2_json: serde_json::Value = serde_json::from_slice(&list_a2_bytes).unwrap();
assert_eq!(list_a2_json["data"].as_array().unwrap().len(), 0);
// Should still exist in B (soft delete)
let list_b2 = router
.list_conversation_items(None, conv_b_id, None, None, None)
.await;
let list_b2_bytes = axum::body::to_bytes(list_b2.into_body(), usize::MAX)
.await
.unwrap();
let list_b2_json: serde_json::Value = serde_json::from_slice(&list_b2_bytes).unwrap();
assert_eq!(list_b2_json["data"].as_array().unwrap().len(), 1);
// Item should still be directly gettable
let get_resp = router
.get_conversation_item(None, conv_b_id, item_id, None)
.await;
assert_eq!(get_resp.status(), StatusCode::OK);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment