Unverified Commit a124b517 authored by Chang Su's avatar Chang Su Committed by GitHub
Browse files

[router] Remove SharedXxxStorage type aliases to make Arc explicit (#12171)

parent d05a968b
......@@ -10,8 +10,7 @@ use crate::{
config::RouterConfig,
core::{workflow::WorkflowEngine, ConnectionMode, JobQueue, LoadMonitor, WorkerRegistry},
data_connector::{
create_storage, SharedConversationItemStorage, SharedConversationStorage,
SharedResponseStorage,
create_storage, ConversationItemStorage, ConversationStorage, ResponseStorage,
},
mcp::McpManager,
middleware::TokenBucket,
......@@ -49,9 +48,9 @@ pub struct AppContext {
pub worker_registry: Arc<WorkerRegistry>,
pub policy_registry: Arc<PolicyRegistry>,
pub router_manager: Option<Arc<RouterManager>>,
pub response_storage: SharedResponseStorage,
pub conversation_storage: SharedConversationStorage,
pub conversation_item_storage: SharedConversationItemStorage,
pub response_storage: Arc<dyn ResponseStorage>,
pub conversation_storage: Arc<dyn ConversationStorage>,
pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
pub load_monitor: Option<Arc<LoadMonitor>>,
pub configured_reasoning_parser: Option<String>,
pub configured_tool_parser: Option<String>,
......@@ -70,9 +69,9 @@ pub struct AppContextBuilder {
worker_registry: Option<Arc<WorkerRegistry>>,
policy_registry: Option<Arc<PolicyRegistry>>,
router_manager: Option<Arc<RouterManager>>,
response_storage: Option<SharedResponseStorage>,
conversation_storage: Option<SharedConversationStorage>,
conversation_item_storage: Option<SharedConversationItemStorage>,
response_storage: Option<Arc<dyn ResponseStorage>>,
conversation_storage: Option<Arc<dyn ConversationStorage>>,
conversation_item_storage: Option<Arc<dyn ConversationItemStorage>>,
load_monitor: Option<Arc<LoadMonitor>>,
worker_job_queue: Option<Arc<OnceLock<Arc<JobQueue>>>>,
workflow_engine: Option<Arc<OnceLock<Arc<WorkflowEngine>>>>,
......@@ -167,19 +166,22 @@ impl AppContextBuilder {
self
}
pub fn response_storage(mut self, response_storage: SharedResponseStorage) -> Self {
pub fn response_storage(mut self, response_storage: Arc<dyn ResponseStorage>) -> Self {
self.response_storage = Some(response_storage);
self
}
pub fn conversation_storage(mut self, conversation_storage: SharedConversationStorage) -> Self {
pub fn conversation_storage(
mut self,
conversation_storage: Arc<dyn ConversationStorage>,
) -> Self {
self.conversation_storage = Some(conversation_storage);
self
}
pub fn conversation_item_storage(
mut self,
conversation_item_storage: SharedConversationItemStorage,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
) -> Self {
self.conversation_item_storage = Some(conversation_item_storage);
self
......
......@@ -11,7 +11,6 @@
use std::{
collections::HashMap,
fmt::{Display, Formatter},
sync::Arc,
};
use async_trait::async_trait;
......@@ -141,9 +140,6 @@ pub trait ConversationStorage: Send + Sync + 'static {
async fn delete_conversation(&self, id: &ConversationId) -> ConversationResult<bool>;
}
/// Shared pointer alias for conversation storage
pub type SharedConversationStorage = Arc<dyn ConversationStorage>;
// ============================================================================
// PART 2: ConversationItem Storage
// ============================================================================
......@@ -259,8 +255,6 @@ pub trait ConversationItemStorage: Send + Sync + 'static {
) -> ConversationItemResult<()>;
}
pub type SharedConversationItemStorage = Arc<dyn ConversationItemStorage>;
/// Helper to build id prefix based on item_type
pub fn make_item_id(item_type: &str) -> ConversationItemId {
// Generate exactly 50 hex characters (25 bytes) for the part after the underscore
......@@ -482,9 +476,6 @@ pub trait ResponseStorage: Send + Sync {
async fn delete_user_responses(&self, user: &str) -> ResponseResult<usize>;
}
/// Type alias for shared storage
pub type SharedResponseStorage = Arc<dyn ResponseStorage>;
impl Default for StoredResponse {
fn default() -> Self {
Self::new(None)
......
......@@ -9,13 +9,21 @@ use std::sync::Arc;
use tracing::info;
use super::{
core::{SharedConversationItemStorage, SharedConversationStorage, SharedResponseStorage},
core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
};
use crate::config::{HistoryBackend, OracleConfig, RouterConfig};
/// Type alias for the storage tuple returned by factory functions.
/// This avoids clippy::type_complexity warnings while keeping Arc explicit.
pub type StorageTuple = (
Arc<dyn ResponseStorage>,
Arc<dyn ConversationStorage>,
Arc<dyn ConversationItemStorage>,
);
/// Create all three storage backends based on router configuration.
///
/// # Arguments
......@@ -26,16 +34,7 @@ use crate::config::{HistoryBackend, OracleConfig, RouterConfig};
///
/// # Errors
/// Returns error string if Oracle configuration is missing or initialization fails
pub fn create_storage(
config: &RouterConfig,
) -> Result<
(
SharedResponseStorage,
SharedConversationStorage,
SharedConversationItemStorage,
),
String,
> {
pub fn create_storage(config: &RouterConfig) -> Result<StorageTuple, String> {
match config.history_backend {
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
......@@ -73,16 +72,7 @@ pub fn create_storage(
}
/// Create Oracle storage backends
fn create_oracle_storage(
oracle_cfg: &OracleConfig,
) -> Result<
(
SharedResponseStorage,
SharedConversationStorage,
SharedConversationItemStorage,
),
String,
> {
fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageTuple, String> {
let response_storage = OracleResponseStorage::new(oracle_cfg.clone())
.map_err(|err| format!("failed to initialize Oracle response storage: {err}"))?;
......
......@@ -13,12 +13,11 @@ mod memory;
mod noop;
mod oracle;
// Re-export all core types
pub use core::*;
pub use core::{
Conversation, ConversationId, ConversationItem, ConversationItemId, ConversationItemStorage,
ConversationStorage, ListParams, NewConversation, NewConversationItem, ResponseId,
ResponseStorage, SortOrder, StoredResponse,
};
// Re-export factory function
pub use factory::create_storage;
// Re-export all storage implementations
pub use memory::*;
pub use noop::*;
pub use oracle::*;
pub use memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage};
......@@ -10,9 +10,7 @@ use tokio::sync::RwLock;
use super::types::BackgroundTaskInfo;
use crate::{
core::WorkerRegistry,
data_connector::{
SharedConversationItemStorage, SharedConversationStorage, SharedResponseStorage,
},
data_connector::{ConversationItemStorage, ConversationStorage, ResponseStorage},
mcp::McpManager,
routers::grpc::{context::SharedComponents, pipeline::RequestPipeline},
};
......@@ -32,13 +30,13 @@ pub struct ResponsesContext {
pub worker_registry: Arc<WorkerRegistry>,
/// Response storage backend
pub response_storage: SharedResponseStorage,
pub response_storage: Arc<dyn ResponseStorage>,
/// Conversation storage backend
pub conversation_storage: SharedConversationStorage,
pub conversation_storage: Arc<dyn ConversationStorage>,
/// Conversation item storage backend
pub conversation_item_storage: SharedConversationItemStorage,
pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
/// MCP manager for tool support
pub mcp_manager: Arc<McpManager>,
......@@ -53,9 +51,9 @@ impl ResponsesContext {
pipeline: Arc<RequestPipeline>,
components: Arc<SharedComponents>,
worker_registry: Arc<WorkerRegistry>,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
response_storage: Arc<dyn ResponseStorage>,
conversation_storage: Arc<dyn ConversationStorage>,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
mcp_manager: Arc<McpManager>,
) -> Self {
Self {
......
......@@ -57,8 +57,7 @@ use super::{
};
use crate::{
data_connector::{
ConversationId, ResponseId, SharedConversationItemStorage, SharedConversationStorage,
SharedResponseStorage,
ConversationId, ConversationItemStorage, ConversationStorage, ResponseId, ResponseStorage,
},
protocols::{
chat::ChatCompletionStreamResponse,
......@@ -529,9 +528,9 @@ async fn process_and_transform_sse_stream(
body: Body,
original_request: ResponsesRequest,
_chat_request: Arc<crate::protocols::chat::ChatCompletionRequest>,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
response_storage: Arc<dyn ResponseStorage>,
conversation_storage: Arc<dyn ConversationStorage>,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
tx: mpsc::UnboundedSender<Result<Bytes, std::io::Error>>,
) -> Result<(), String> {
// Create accumulator for final response
......
......@@ -16,7 +16,7 @@ use crate::{
data_connector::{
Conversation, ConversationId, ConversationItemId, ConversationItemStorage,
ConversationStorage, ListParams, NewConversation, NewConversationItem, ResponseId,
ResponseStorage, SharedConversationItemStorage, SharedConversationStorage, SortOrder,
ResponseStorage, SortOrder,
},
protocols::responses::{ResponseInput, ResponsesRequest},
};
......@@ -30,7 +30,7 @@ pub(crate) const MAX_METADATA_PROPERTIES: usize = 16;
/// Create a new conversation
pub(super) async fn create_conversation(
conversation_storage: &SharedConversationStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
body: Value,
) -> Response {
// TODO: The validation should be done in the right place
......@@ -83,7 +83,7 @@ pub(super) async fn create_conversation(
/// Get a conversation by ID
pub(super) async fn get_conversation(
conversation_storage: &SharedConversationStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
conv_id: &str,
) -> Response {
let conversation_id = ConversationId::from(conv_id);
......@@ -112,7 +112,7 @@ pub(super) async fn get_conversation(
/// Update a conversation's metadata
pub(super) async fn update_conversation(
conversation_storage: &SharedConversationStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
conv_id: &str,
body: Value,
) -> Response {
......@@ -224,7 +224,7 @@ pub(super) async fn update_conversation(
/// Delete a conversation
pub(super) async fn delete_conversation(
conversation_storage: &SharedConversationStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
conv_id: &str,
) -> Response {
let conversation_id = ConversationId::from(conv_id);
......@@ -280,8 +280,8 @@ pub(super) async fn delete_conversation(
/// List items in a conversation with pagination
pub(super) async fn list_conversation_items(
conversation_storage: &SharedConversationStorage,
item_storage: &SharedConversationItemStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
item_storage: &Arc<dyn ConversationItemStorage>,
conv_id: &str,
query_params: HashMap<String, String>,
) -> Response {
......@@ -412,8 +412,8 @@ const IMPLEMENTED_ITEM_TYPES: &[&str] = &[
/// Create items in a conversation (bulk operation)
pub(super) async fn create_conversation_items(
conversation_storage: &SharedConversationStorage,
item_storage: &SharedConversationItemStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
item_storage: &Arc<dyn ConversationItemStorage>,
conv_id: &str,
body: Value,
) -> Response {
......@@ -681,8 +681,8 @@ pub(super) async fn create_conversation_items(
/// Get a single conversation item
/// Note: `include` query parameter is accepted but not yet implemented
pub(super) async fn get_conversation_item(
conversation_storage: &SharedConversationStorage,
item_storage: &SharedConversationItemStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
item_storage: &Arc<dyn ConversationItemStorage>,
conv_id: &str,
item_id: &str,
_include: Option<Vec<String>>, // Reserved for future use
......@@ -761,8 +761,8 @@ pub(super) async fn get_conversation_item(
/// Delete a conversation item
pub(super) async fn delete_conversation_item(
conversation_storage: &SharedConversationStorage,
item_storage: &SharedConversationItemStorage,
conversation_storage: &Arc<dyn ConversationStorage>,
item_storage: &Arc<dyn ConversationItemStorage>,
conv_id: &str,
item_id: &str,
) -> Response {
......
......@@ -38,8 +38,8 @@ use super::{
use crate::{
core::{CircuitBreaker, CircuitBreakerConfig as CoreCircuitBreakerConfig},
data_connector::{
ConversationId, ListParams, ResponseId, SharedConversationItemStorage,
SharedConversationStorage, SharedResponseStorage, SortOrder,
ConversationId, ConversationItemStorage, ConversationStorage, ListParams, ResponseId,
ResponseStorage, SortOrder,
},
mcp::McpManager,
protocols::{
......@@ -81,11 +81,11 @@ pub struct OpenAIRouter {
/// Health status
healthy: AtomicBool,
/// Response storage for managing conversation history
response_storage: SharedResponseStorage,
response_storage: Arc<dyn ResponseStorage>,
/// Conversation storage backend
conversation_storage: SharedConversationStorage,
conversation_storage: Arc<dyn ConversationStorage>,
/// Conversation item storage backend
conversation_item_storage: SharedConversationItemStorage,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
/// MCP manager (handles both static and dynamic servers)
mcp_manager: Arc<McpManager>,
}
......
......@@ -33,9 +33,7 @@ use super::{
utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction},
};
use crate::{
data_connector::{
SharedConversationItemStorage, SharedConversationStorage, SharedResponseStorage,
},
data_connector::{ConversationItemStorage, ConversationStorage, ResponseStorage},
protocols::responses::{ResponseToolType, ResponsesRequest},
routers::header_utils::{apply_request_headers, preserve_response_headers},
};
......@@ -961,9 +959,9 @@ pub(super) fn send_final_response_event(
pub(super) async fn handle_simple_streaming_passthrough(
client: &reqwest::Client,
circuit_breaker: &crate::core::CircuitBreaker,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
response_storage: Arc<dyn ResponseStorage>,
conversation_storage: Arc<dyn ConversationStorage>,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
url: String,
headers: Option<&HeaderMap>,
payload: Value,
......@@ -996,10 +994,10 @@ pub(super) async fn handle_simple_streaming_passthrough(
if !status.is_success() {
circuit_breaker.record_failure();
let error_body = match response.text().await {
Ok(body) => body,
Err(err) => format!("Failed to read upstream error body: {}", err),
};
let error_body = response
.text()
.await
.unwrap_or_else(|err| format!("Failed to read upstream error body: {}", err));
return (status_code, error_body).into_response();
}
......@@ -1130,9 +1128,9 @@ pub(super) async fn handle_simple_streaming_passthrough(
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_streaming_with_tool_interception(
client: &reqwest::Client,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
response_storage: Arc<dyn ResponseStorage>,
conversation_storage: Arc<dyn ConversationStorage>,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
url: String,
headers: Option<&HeaderMap>,
mut payload: Value,
......@@ -1492,9 +1490,9 @@ pub(super) async fn handle_streaming_response(
client: &reqwest::Client,
circuit_breaker: &crate::core::CircuitBreaker,
mcp_manager: Option<&Arc<crate::mcp::McpManager>>,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
response_storage: Arc<dyn ResponseStorage>,
conversation_storage: Arc<dyn ConversationStorage>,
conversation_item_storage: Arc<dyn ConversationItemStorage>,
url: String,
headers: Option<&HeaderMap>,
payload: Value,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment