"vscode:/vscode.git/clone" did not exist on "edcecdd0a1b8c41adcb5610a52e33325b1b0aa50"
Unverified Commit 4ea42f7c authored by Keyang Ru's avatar Keyang Ru Committed by GitHub
Browse files

[router] Refactor data connector architecture with unified storage modules (#12096)

parent ea13cb14
......@@ -7,13 +7,11 @@ use reqwest::Client;
use tracing::info;
use crate::{
config::{HistoryBackend, RouterConfig},
config::RouterConfig,
core::{workflow::WorkflowEngine, ConnectionMode, JobQueue, LoadMonitor, WorkerRegistry},
data_connector::{
MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage,
NoOpConversationStorage, NoOpResponseStorage, OracleConversationItemStorage,
OracleConversationStorage, OracleResponseStorage, SharedConversationItemStorage,
SharedConversationStorage, SharedResponseStorage,
create_storage, SharedConversationItemStorage, SharedConversationStorage,
SharedResponseStorage,
},
middleware::TokenBucket,
policies::PolicyRegistry,
......@@ -254,12 +252,7 @@ impl AppContextBuilder {
.maybe_tool_parser_factory(&router_config)
.with_worker_registry()
.with_policy_registry(&router_config)
.with_response_storage(&router_config)
.await?
.with_conversation_storage(&router_config)
.await?
.with_conversation_item_storage(&router_config)
.await?
.with_storage(&router_config)?
.with_load_monitor(&router_config)
.with_worker_job_queue()
.with_workflow_engine()
......@@ -420,75 +413,15 @@ impl AppContextBuilder {
self
}
/// Create response storage based on history_backend config
async fn with_response_storage(mut self, config: &RouterConfig) -> Result<Self, String> {
self.response_storage = Some(match config.history_backend {
HistoryBackend::Memory => {
info!("Initializing response storage: Memory");
Arc::new(MemoryResponseStorage::new())
}
HistoryBackend::None => {
info!("Initializing response storage: None (no persistence)");
Arc::new(NoOpResponseStorage::new())
}
HistoryBackend::Oracle => {
let oracle_cfg = config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!(
"Initializing response storage: Oracle ATP (pool: {}-{})",
oracle_cfg.pool_min, oracle_cfg.pool_max
);
Arc::new(OracleResponseStorage::new(oracle_cfg).map_err(|err| {
format!("failed to initialize Oracle response storage: {err}")
})?)
}
});
Ok(self)
}
/// Create all storage backends using the factory function
fn with_storage(mut self, config: &RouterConfig) -> Result<Self, String> {
let (response_storage, conversation_storage, conversation_item_storage) =
create_storage(config)?;
/// Create conversation storage based on history_backend config
async fn with_conversation_storage(mut self, config: &RouterConfig) -> Result<Self, String> {
self.conversation_storage = Some(match config.history_backend {
HistoryBackend::Memory => Arc::new(MemoryConversationStorage::new()),
HistoryBackend::None => Arc::new(NoOpConversationStorage::new()),
HistoryBackend::Oracle => {
let oracle_cfg = config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!("Initializing conversation storage: Oracle ATP");
Arc::new(OracleConversationStorage::new(oracle_cfg).map_err(|err| {
format!("failed to initialize Oracle conversation storage: {err}")
})?)
}
});
Ok(self)
}
self.response_storage = Some(response_storage);
self.conversation_storage = Some(conversation_storage);
self.conversation_item_storage = Some(conversation_item_storage);
/// Create conversation item storage based on history_backend config
async fn with_conversation_item_storage(
mut self,
config: &RouterConfig,
) -> Result<Self, String> {
self.conversation_item_storage = Some(match config.history_backend {
HistoryBackend::Oracle => {
let oracle_cfg = config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!("Initializing conversation item storage: Oracle ATP");
Arc::new(OracleConversationItemStorage::new(oracle_cfg).map_err(|e| {
format!("failed to initialize Oracle conversation item storage: {e}")
})?)
}
HistoryBackend::Memory => {
info!("Initializing conversation item storage: Memory");
Arc::new(MemoryConversationItemStorage::new())
}
HistoryBackend::None => {
info!("Initializing conversation item storage: Memory (no NoOp implementation available)");
Arc::new(MemoryConversationItemStorage::new())
}
});
Ok(self)
}
......
use std::{
collections::{BTreeMap, HashMap},
sync::RwLock,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use super::{
conversation_items::{
make_item_id, ConversationItem, ConversationItemId, ConversationItemStorage, ListParams,
Result, SortOrder,
},
conversations::ConversationId,
};
#[derive(Default)]
pub struct MemoryConversationItemStorage {
items: RwLock<HashMap<ConversationItemId, ConversationItem>>, // item_id -> item
#[allow(clippy::type_complexity)]
links: RwLock<HashMap<ConversationId, BTreeMap<(i64, String), ConversationItemId>>>,
// Per-conversation reverse index for fast after cursor lookup: item_id_str -> (ts, item_id_str)
#[allow(clippy::type_complexity)]
rev_index: RwLock<HashMap<ConversationId, HashMap<String, (i64, String)>>>,
}
impl MemoryConversationItemStorage {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl ConversationItemStorage for MemoryConversationItemStorage {
async fn create_item(
&self,
new_item: super::conversation_items::NewConversationItem,
) -> Result<ConversationItem> {
let id = new_item
.id
.clone()
.unwrap_or_else(|| make_item_id(&new_item.item_type));
let created_at = Utc::now();
let item = ConversationItem {
id: id.clone(),
response_id: new_item.response_id,
item_type: new_item.item_type,
role: new_item.role,
content: new_item.content,
status: new_item.status,
created_at,
};
let mut items = self.items.write().unwrap();
items.insert(id.clone(), item.clone());
Ok(item)
}
async fn link_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
added_at: DateTime<Utc>,
) -> Result<()> {
{
let mut links = self.links.write().unwrap();
let entry = links.entry(conversation_id.clone()).or_default();
entry.insert((added_at.timestamp(), item_id.0.clone()), item_id.clone());
}
{
let mut rev = self.rev_index.write().unwrap();
let entry = rev.entry(conversation_id.clone()).or_default();
entry.insert(item_id.0.clone(), (added_at.timestamp(), item_id.0.clone()));
}
Ok(())
}
async fn list_items(
&self,
conversation_id: &ConversationId,
params: ListParams,
) -> Result<Vec<ConversationItem>> {
let links_guard = self.links.read().unwrap();
let map = match links_guard.get(conversation_id) {
Some(m) => m,
None => return Ok(Vec::new()),
};
let mut results: Vec<ConversationItem> = Vec::new();
let after_key: Option<(i64, String)> = if let Some(after_id) = &params.after {
// O(1) lookup via reverse index for this conversation
if let Some(conv_idx) = self.rev_index.read().unwrap().get(conversation_id) {
conv_idx.get(after_id).cloned()
} else {
None
}
} else {
None
};
let take = params.limit;
let items_guard = self.items.read().unwrap();
use std::ops::Bound::{Excluded, Unbounded};
// Helper to push item if it exists and stop when reaching the limit
let mut push_item = |key: &ConversationItemId| -> bool {
if let Some(it) = items_guard.get(key) {
results.push(it.clone());
if results.len() == take {
return true;
}
}
false
};
match (params.order, after_key) {
(SortOrder::Desc, Some(k)) => {
for ((_ts, _id), item_key) in map.range(..k).rev() {
if push_item(item_key) {
break;
}
}
}
(SortOrder::Desc, None) => {
for ((_ts, _id), item_key) in map.iter().rev() {
if push_item(item_key) {
break;
}
}
}
(SortOrder::Asc, Some(k)) => {
for ((_ts, _id), item_key) in map.range((Excluded(k), Unbounded)) {
if push_item(item_key) {
break;
}
}
}
(SortOrder::Asc, None) => {
for ((_ts, _id), item_key) in map.iter() {
if push_item(item_key) {
break;
}
}
}
}
Ok(results)
}
async fn get_item(&self, item_id: &ConversationItemId) -> Result<Option<ConversationItem>> {
let items = self.items.read().unwrap();
Ok(items.get(item_id).cloned())
}
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<bool> {
let rev = self.rev_index.read().unwrap();
if let Some(conv_idx) = rev.get(conversation_id) {
Ok(conv_idx.contains_key(&item_id.0))
} else {
Ok(false)
}
}
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<()> {
// Get the key from rev_index and remove the entry at the same time
let key_to_remove = {
let mut rev = self.rev_index.write().unwrap();
if let Some(conv_idx) = rev.get_mut(conversation_id) {
conv_idx.remove(&item_id.0)
} else {
None
}
};
// If the item was in rev_index, remove it from links as well
if let Some(key) = key_to_remove {
let mut links = self.links.write().unwrap();
if let Some(conv_links) = links.get_mut(conversation_id) {
conv_links.remove(&key);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use super::*;
fn make_item(
item_type: &str,
role: Option<&str>,
content: serde_json::Value,
) -> super::super::conversation_items::NewConversationItem {
super::super::conversation_items::NewConversationItem {
id: None,
response_id: None,
item_type: item_type.to_string(),
role: role.map(|r| r.to_string()),
content,
status: Some("completed".to_string()),
}
}
#[tokio::test]
async fn test_list_ordering_and_cursors() {
let store = MemoryConversationItemStorage::new();
let conv: ConversationId = "conv_test".into();
// Create 3 items and link them at controlled timestamps
let i1 = store
.create_item(make_item("message", Some("user"), serde_json::json!([])))
.await
.unwrap();
let i2 = store
.create_item(make_item(
"message",
Some("assistant"),
serde_json::json!([]),
))
.await
.unwrap();
let i3 = store
.create_item(make_item("reasoning", None, serde_json::json!([])))
.await
.unwrap();
let t1 = Utc.timestamp_opt(1_700_000_001, 0).single().unwrap();
let t2 = Utc.timestamp_opt(1_700_000_002, 0).single().unwrap();
let t3 = Utc.timestamp_opt(1_700_000_003, 0).single().unwrap();
store.link_item(&conv, &i1.id, t1).await.unwrap();
store.link_item(&conv, &i2.id, t2).await.unwrap();
store.link_item(&conv, &i3.id, t3).await.unwrap();
// Desc order, no cursor
let desc = store
.list_items(
&conv,
ListParams {
limit: 2,
order: SortOrder::Desc,
after: None,
},
)
.await
.unwrap();
assert!(desc.len() >= 2);
assert_eq!(desc[0].id, i3.id);
assert_eq!(desc[1].id, i2.id);
// Desc with cursor = i2 -> expect i1 next
let desc_after = store
.list_items(
&conv,
ListParams {
limit: 2,
order: SortOrder::Desc,
after: Some(i2.id.0.clone()),
},
)
.await
.unwrap();
assert!(!desc_after.is_empty());
assert_eq!(desc_after[0].id, i1.id);
// Asc order, no cursor
let asc = store
.list_items(
&conv,
ListParams {
limit: 2,
order: SortOrder::Asc,
after: None,
},
)
.await
.unwrap();
assert!(asc.len() >= 2);
assert_eq!(asc[0].id, i1.id);
assert_eq!(asc[1].id, i2.id);
// Asc with cursor = i2 -> expect i3 next
let asc_after = store
.list_items(
&conv,
ListParams {
limit: 2,
order: SortOrder::Asc,
after: Some(i2.id.0.clone()),
},
)
.await
.unwrap();
assert!(!asc_after.is_empty());
assert_eq!(asc_after[0].id, i3.id);
}
}
use std::{path::Path, sync::Arc, time::Duration};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use deadpool::managed::{Manager, Metrics, Pool, PoolError, RecycleError, RecycleResult};
use oracle::{sql_type::ToSql, Connection};
use serde_json::Value;
use crate::{
config::OracleConfig,
data_connector::{
conversation_items::{
make_item_id, ConversationItem, ConversationItemId, ConversationItemStorage,
ConversationItemStorageError, ListParams, Result as ItemResult, SortOrder,
},
conversations::ConversationId,
},
};
#[derive(Clone)]
pub struct OracleConversationItemStorage {
pool: Pool<ConversationItemOracleConnectionManager>,
}
impl OracleConversationItemStorage {
pub fn new(config: OracleConfig) -> ItemResult<Self> {
configure_oracle_client(&config)?;
initialize_schema(&config)?;
let config = Arc::new(config);
let manager = ConversationItemOracleConnectionManager::new(config.clone());
let mut builder = Pool::builder(manager)
.max_size(config.pool_max)
.runtime(deadpool::Runtime::Tokio1);
if config.pool_timeout_secs > 0 {
builder = builder.wait_timeout(Some(Duration::from_secs(config.pool_timeout_secs)));
}
let pool = builder.build().map_err(|err| {
ConversationItemStorageError::StorageError(format!(
"failed to build Oracle pool for conversation items: {err}"
))
})?;
Ok(Self { pool })
}
async fn with_connection<F, T>(&self, func: F) -> ItemResult<T>
where
F: FnOnce(&Connection) -> ItemResult<T> + Send + 'static,
T: Send + 'static,
{
let connection = self.pool.get().await.map_err(map_pool_error)?;
tokio::task::spawn_blocking(move || {
let result = func(&connection);
drop(connection);
result
})
.await
.map_err(|err| {
ConversationItemStorageError::StorageError(format!(
"failed to execute Oracle conversation item task: {err}"
))
})?
}
// reserved for future use when parsing JSON columns directly into Value
// fn parse_json(raw: Option<String>) -> ItemResult<Value> { ... }
}
#[async_trait]
impl ConversationItemStorage for OracleConversationItemStorage {
async fn create_item(
&self,
item: crate::data_connector::conversation_items::NewConversationItem,
) -> ItemResult<ConversationItem> {
let id = item
.id
.clone()
.unwrap_or_else(|| make_item_id(&item.item_type));
let created_at = Utc::now();
let content_json = serde_json::to_string(&item.content)?;
// Build the return value up-front; move inexpensive clones as needed for SQL
let conversation_item = ConversationItem {
id: id.clone(),
response_id: item.response_id.clone(),
item_type: item.item_type.clone(),
role: item.role.clone(),
content: item.content,
status: item.status.clone(),
created_at,
};
// Prepare values for SQL insertion
let id_str = conversation_item.id.0.clone();
let response_id = conversation_item.response_id.clone();
let item_type = conversation_item.item_type.clone();
let role = conversation_item.role.clone();
let status = conversation_item.status.clone();
self.with_connection(move |conn| {
conn.execute(
"INSERT INTO conversation_items (id, response_id, item_type, role, content, status, created_at) \
VALUES (:1, :2, :3, :4, :5, :6, :7)",
&[&id_str, &response_id, &item_type, &role, &content_json, &status, &created_at],
)
.map_err(map_oracle_error)?;
Ok(())
})
.await?;
Ok(conversation_item)
}
async fn link_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
added_at: DateTime<Utc>,
) -> ItemResult<()> {
let cid = conversation_id.0.clone();
let iid = item_id.0.clone();
self.with_connection(move |conn| {
conn.execute(
"INSERT INTO conversation_item_links (conversation_id, item_id, added_at) VALUES (:1, :2, :3)",
&[&cid, &iid, &added_at],
)
.map_err(map_oracle_error)?;
Ok(())
})
.await
}
async fn list_items(
&self,
conversation_id: &ConversationId,
params: ListParams,
) -> ItemResult<Vec<ConversationItem>> {
let cid = conversation_id.0.clone();
let limit: i64 = params.limit as i64;
let order_desc = matches!(params.order, SortOrder::Desc);
let after_id = params.after.clone();
// Resolve the added_at of the after cursor if provided
let after_key: Option<(DateTime<Utc>, String)> = if let Some(ref aid) = after_id {
self.with_connection({
let cid = cid.clone();
let aid = aid.clone();
move |conn| {
let mut stmt = conn
.statement(
"SELECT added_at FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2",
)
.build()
.map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&cid, &aid]).map_err(map_oracle_error)?;
if let Some(row_res) = rows.next() {
let row = row_res.map_err(map_oracle_error)?;
let ts: DateTime<Utc> = row.get(0).map_err(map_oracle_error)?;
Ok(Some((ts, aid)))
} else {
Ok(None)
}
}
})
.await?
} else {
None
};
// Build the main list query
let rows: Vec<(String, Option<String>, String, Option<String>, Option<String>, Option<String>, DateTime<Utc>)> =
self.with_connection({
let cid = cid.clone();
move |conn| {
let mut sql = String::from(
"SELECT i.id, i.response_id, i.item_type, i.role, i.content, i.status, i.created_at \
FROM conversation_item_links l \
JOIN conversation_items i ON i.id = l.item_id \
WHERE l.conversation_id = :cid",
);
// Cursor predicate
if let Some((_ts, _iid)) = &after_key {
if order_desc {
sql.push_str(" AND (l.added_at < :ats OR (l.added_at = :ats AND l.item_id < :iid))");
} else {
sql.push_str(" AND (l.added_at > :ats OR (l.added_at = :ats AND l.item_id > :iid))");
}
}
// Order and limit
if order_desc {
sql.push_str(" ORDER BY l.added_at DESC, l.item_id DESC");
} else {
sql.push_str(" ORDER BY l.added_at ASC, l.item_id ASC");
}
sql.push_str(" FETCH NEXT :limit ROWS ONLY");
// Build params and perform a named SELECT query
let mut params_vec: Vec<(&str, &dyn ToSql)> = vec![("cid", &cid)];
if let Some((ts, iid)) = &after_key {
params_vec.push(("ats", ts));
params_vec.push(("iid", iid));
}
params_vec.push(("limit", &limit));
let rows_iter = conn.query_named(&sql, &params_vec).map_err(map_oracle_error)?;
let mut out = Vec::new();
for row_res in rows_iter {
let row = row_res.map_err(map_oracle_error)?;
let id: String = row.get(0).map_err(map_oracle_error)?;
let resp_id: Option<String> = row.get(1).map_err(map_oracle_error)?;
let item_type: String = row.get(2).map_err(map_oracle_error)?;
let role: Option<String> = row.get(3).map_err(map_oracle_error)?;
let content_raw: Option<String> = row.get(4).map_err(map_oracle_error)?;
let status: Option<String> = row.get(5).map_err(map_oracle_error)?;
let created_at: DateTime<Utc> = row.get(6).map_err(map_oracle_error)?;
out.push((id, resp_id, item_type, role, content_raw, status, created_at));
}
Ok(out)
}
})
.await?;
// Map rows to ConversationItem; propagate JSON parse errors instead of swallowing
rows.into_iter()
.map(
|(id, resp_id, item_type, role, content_raw, status, created_at)| {
let content = match content_raw {
Some(s) => {
serde_json::from_str(&s).map_err(ConversationItemStorageError::from)?
}
None => Value::Null,
};
Ok(ConversationItem {
id: ConversationItemId(id),
response_id: resp_id,
item_type,
role,
content,
status,
created_at,
})
},
)
.collect()
}
async fn get_item(&self, item_id: &ConversationItemId) -> ItemResult<Option<ConversationItem>> {
let iid = item_id.0.clone();
self.with_connection(move |conn| {
let mut stmt = conn
.statement(
"SELECT id, response_id, item_type, role, content, status, created_at \
FROM conversation_items WHERE id = :1",
)
.build()
.map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&iid]).map_err(map_oracle_error)?;
if let Some(row_res) = rows.next() {
let row = row_res.map_err(map_oracle_error)?;
let id: String = row.get(0).map_err(map_oracle_error)?;
let response_id: Option<String> = row.get(1).map_err(map_oracle_error)?;
let item_type: String = row.get(2).map_err(map_oracle_error)?;
let role: Option<String> = row.get(3).map_err(map_oracle_error)?;
let content_raw: Option<String> = row.get(4).map_err(map_oracle_error)?;
let status: Option<String> = row.get(5).map_err(map_oracle_error)?;
let created_at: DateTime<Utc> = row.get(6).map_err(map_oracle_error)?;
let content = match content_raw {
Some(s) => serde_json::from_str(&s)?,
None => Value::Null,
};
Ok(Some(ConversationItem {
id: ConversationItemId(id),
response_id,
item_type,
role,
content,
status,
created_at,
}))
} else {
Ok(None)
}
})
.await
}
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ItemResult<bool> {
let cid = conversation_id.0.clone();
let iid = item_id.0.clone();
self.with_connection(move |conn| {
let count: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2",
&[&cid, &iid],
)
.map_err(map_oracle_error)?;
Ok(count > 0)
})
.await
}
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ItemResult<()> {
let cid = conversation_id.0.clone();
let iid = item_id.0.clone();
self.with_connection(move |conn| {
// Delete ONLY the link (do not delete the item itself)
conn.execute(
"DELETE FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2",
&[&cid, &iid],
)
.map_err(map_oracle_error)?;
Ok(())
})
.await
}
}
#[derive(Clone)]
struct ConversationItemOracleConnectionManager {
params: Arc<OracleConnectParams>,
}
#[derive(Clone)]
struct OracleConnectParams {
username: String,
password: String,
connect_descriptor: String,
}
impl ConversationItemOracleConnectionManager {
fn new(config: Arc<OracleConfig>) -> Self {
let params = OracleConnectParams {
username: config.username.clone(),
password: config.password.clone(),
connect_descriptor: config.connect_descriptor.clone(),
};
Self {
params: Arc::new(params),
}
}
}
impl std::fmt::Debug for ConversationItemOracleConnectionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConversationItemOracleConnectionManager")
.field("username", &self.params.username)
.field("connect_descriptor", &self.params.connect_descriptor)
.finish()
}
}
#[async_trait]
impl Manager for ConversationItemOracleConnectionManager {
type Type = Connection;
type Error = oracle::Error;
fn create(
&self,
) -> impl std::future::Future<Output = Result<Connection, oracle::Error>> + Send {
let params = self.params.clone();
async move {
let mut conn = Connection::connect(
&params.username,
&params.password,
&params.connect_descriptor,
)?;
conn.set_autocommit(true);
Ok(conn)
}
}
#[allow(clippy::manual_async_fn)]
fn recycle(
&self,
conn: &mut Connection,
_: &Metrics,
) -> impl std::future::Future<Output = RecycleResult<Self::Error>> + Send {
async move { conn.ping().map_err(RecycleError::Backend) }
}
}
fn configure_oracle_client(config: &OracleConfig) -> ItemResult<()> {
if let Some(wallet_path) = &config.wallet_path {
let wallet_path = Path::new(wallet_path);
if !wallet_path.is_dir() {
return Err(ConversationItemStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is not a directory",
wallet_path.display()
)));
}
if !wallet_path.join("tnsnames.ora").exists() && !wallet_path.join("sqlnet.ora").exists() {
return Err(ConversationItemStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is missing tnsnames.ora or sqlnet.ora",
wallet_path.display()
)));
}
std::env::set_var("TNS_ADMIN", wallet_path);
}
Ok(())
}
fn initialize_schema(config: &OracleConfig) -> ItemResult<()> {
let conn = Connection::connect(
&config.username,
&config.password,
&config.connect_descriptor,
)
.map_err(map_oracle_error)?;
let exists_items: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'CONVERSATION_ITEMS'",
&[],
)
.map_err(map_oracle_error)?;
if exists_items == 0 {
conn.execute(
"CREATE TABLE conversation_items (
id VARCHAR2(64) PRIMARY KEY,
response_id VARCHAR2(64),
item_type VARCHAR2(32) NOT NULL,
role VARCHAR2(32),
content CLOB,
status VARCHAR2(32),
created_at TIMESTAMP WITH TIME ZONE
)",
&[],
)
.map_err(map_oracle_error)?;
}
let exists_links: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'CONVERSATION_ITEM_LINKS'",
&[],
)
.map_err(map_oracle_error)?;
if exists_links == 0 {
conn.execute(
"CREATE TABLE conversation_item_links (
conversation_id VARCHAR2(64) NOT NULL,
item_id VARCHAR2(64) NOT NULL,
added_at TIMESTAMP WITH TIME ZONE,
CONSTRAINT pk_conv_item_link PRIMARY KEY (conversation_id, item_id)
)",
&[],
)
.map_err(map_oracle_error)?;
conn.execute(
"CREATE INDEX conv_item_links_conv_idx ON conversation_item_links (conversation_id, added_at)",
&[],
)
.map_err(map_oracle_error)?;
}
Ok(())
}
fn map_pool_error(err: PoolError<oracle::Error>) -> ConversationItemStorageError {
match err {
PoolError::Backend(e) => map_oracle_error(e),
other => ConversationItemStorageError::StorageError(format!(
"failed to obtain Oracle conversation item connection: {other}"
)),
}
}
fn map_oracle_error(err: oracle::Error) -> ConversationItemStorageError {
if let Some(db_err) = err.db_error() {
ConversationItemStorageError::StorageError(format!(
"Oracle error (code {}): {}",
db_err.code(),
db_err.message()
))
} else {
ConversationItemStorageError::StorageError(err.to_string())
}
}
use std::{
fmt::{Display, Formatter},
sync::Arc,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::conversations::ConversationId;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ConversationItemId(pub String);
impl Display for ConversationItemId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl From<String> for ConversationItemId {
fn from(value: String) -> Self {
Self(value)
}
}
impl From<&str> for ConversationItemId {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConversationItem {
pub id: ConversationItemId,
pub response_id: Option<String>,
pub item_type: String,
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewConversationItem {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<ConversationItemId>,
pub response_id: Option<String>,
pub item_type: String,
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SortOrder {
Asc,
Desc,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListParams {
pub limit: usize,
pub order: SortOrder,
pub after: Option<String>, // item_id cursor
}
pub type Result<T> = std::result::Result<T, ConversationItemStorageError>;
#[derive(Debug, thiserror::Error)]
pub enum ConversationItemStorageError {
#[error("Not found: {0}")]
NotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
#[async_trait]
pub trait ConversationItemStorage: Send + Sync + 'static {
async fn create_item(&self, item: NewConversationItem) -> Result<ConversationItem>;
async fn link_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
added_at: DateTime<Utc>,
) -> Result<()>;
async fn list_items(
&self,
conversation_id: &ConversationId,
params: ListParams,
) -> Result<Vec<ConversationItem>>;
/// Get a single item by ID
async fn get_item(&self, item_id: &ConversationItemId) -> Result<Option<ConversationItem>>;
/// Check if an item is linked to a conversation
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<bool>;
/// Delete an item link from a conversation (does not delete the item itself)
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> Result<()>;
}
pub type SharedConversationItemStorage = Arc<dyn ConversationItemStorage>;
/// Helper to build id prefix based on item_type
pub fn make_item_id(item_type: &str) -> ConversationItemId {
// Generate exactly 50 hex characters (25 bytes) for the part after the underscore
let mut rng = rand::rng();
let mut bytes = [0u8; 25];
rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
let prefix: String = match item_type {
"message" => "msg".to_string(),
"reasoning" => "rs".to_string(),
"mcp_call" => "mcp".to_string(),
"mcp_list_tools" => "mcpl".to_string(),
"function_tool_call" => "ftc".to_string(),
other => {
// Fallback: first 3 letters of type or "itm"
let mut p = other.chars().take(3).collect::<String>();
if p.is_empty() {
p = "itm".to_string();
}
p
}
};
ConversationItemId(format!("{}_{}", prefix, hex_string))
}
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use parking_lot::RwLock;
use super::conversations::{
Conversation, ConversationId, ConversationMetadata, ConversationStorage, NewConversation,
Result,
};
/// In-memory conversation storage used for development and tests
#[derive(Default, Clone)]
pub struct MemoryConversationStorage {
inner: Arc<RwLock<HashMap<ConversationId, Conversation>>>,
}
impl MemoryConversationStorage {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl ConversationStorage for MemoryConversationStorage {
async fn create_conversation(&self, input: NewConversation) -> Result<Conversation> {
let conversation = Conversation::new(input);
self.inner
.write()
.insert(conversation.id.clone(), conversation.clone());
Ok(conversation)
}
async fn get_conversation(&self, id: &ConversationId) -> Result<Option<Conversation>> {
Ok(self.inner.read().get(id).cloned())
}
async fn update_conversation(
&self,
id: &ConversationId,
metadata: Option<ConversationMetadata>,
) -> Result<Option<Conversation>> {
let mut store = self.inner.write();
if let Some(entry) = store.get_mut(id) {
entry.metadata = metadata;
return Ok(Some(entry.clone()));
}
Ok(None)
}
async fn delete_conversation(&self, id: &ConversationId) -> Result<bool> {
let removed = self.inner.write().remove(id).is_some();
Ok(removed)
}
}
use async_trait::async_trait;
use super::conversations::{
Conversation, ConversationId, ConversationMetadata, ConversationStorage, Result,
};
/// No-op implementation that synthesizes conversation responses without persistence
#[derive(Default, Debug, Clone)]
pub struct NoOpConversationStorage;
impl NoOpConversationStorage {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl ConversationStorage for NoOpConversationStorage {
async fn create_conversation(
&self,
input: super::conversations::NewConversation,
) -> Result<Conversation> {
Ok(Conversation::new(input))
}
async fn get_conversation(&self, _id: &ConversationId) -> Result<Option<Conversation>> {
Ok(None)
}
async fn update_conversation(
&self,
_id: &ConversationId,
_metadata: Option<ConversationMetadata>,
) -> Result<Option<Conversation>> {
Ok(None)
}
async fn delete_conversation(&self, _id: &ConversationId) -> Result<bool> {
Ok(false)
}
}
use std::{path::Path, sync::Arc, time::Duration};
use async_trait::async_trait;
use chrono::Utc;
use deadpool::managed::{Manager, Metrics, Pool, PoolError, RecycleError, RecycleResult};
use oracle::{sql_type::OracleType, Connection};
use serde_json::Value;
use crate::{
config::OracleConfig,
data_connector::conversations::{
Conversation, ConversationId, ConversationMetadata, ConversationStorage,
ConversationStorageError, NewConversation, Result,
},
};
#[derive(Clone)]
pub struct OracleConversationStorage {
pool: Pool<ConversationOracleConnectionManager>,
}
impl OracleConversationStorage {
pub fn new(config: OracleConfig) -> Result<Self> {
configure_oracle_client(&config)?;
initialize_schema(&config)?;
let config = Arc::new(config);
let manager = ConversationOracleConnectionManager::new(config.clone());
let mut builder = Pool::builder(manager)
.max_size(config.pool_max)
.runtime(deadpool::Runtime::Tokio1);
if config.pool_timeout_secs > 0 {
builder = builder.wait_timeout(Some(Duration::from_secs(config.pool_timeout_secs)));
}
let pool = builder.build().map_err(|err| {
ConversationStorageError::StorageError(format!(
"failed to build Oracle pool for conversations: {err}"
))
})?;
Ok(Self { pool })
}
async fn with_connection<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&Connection) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let connection = self.pool.get().await.map_err(map_pool_error)?;
tokio::task::spawn_blocking(move || {
let result = func(&connection);
drop(connection);
result
})
.await
.map_err(|err| {
ConversationStorageError::StorageError(format!(
"failed to execute Oracle conversation task: {err}"
))
})?
}
fn parse_metadata(raw: Option<String>) -> Result<Option<ConversationMetadata>> {
match raw {
Some(json) if !json.is_empty() => {
let value: Value = serde_json::from_str(&json)?;
match value {
Value::Object(map) => Ok(Some(map)),
Value::Null => Ok(None),
other => Err(ConversationStorageError::StorageError(format!(
"conversation metadata expected object, got {other}"
))),
}
}
_ => Ok(None),
}
}
}
#[async_trait]
impl ConversationStorage for OracleConversationStorage {
async fn create_conversation(&self, input: NewConversation) -> Result<Conversation> {
let conversation = Conversation::new(input);
let id_str = conversation.id.0.clone();
let created_at = conversation.created_at;
let metadata_json = conversation
.metadata
.as_ref()
.map(serde_json::to_string)
.transpose()?;
self.with_connection(move |conn| {
conn.execute(
"INSERT INTO conversations (id, created_at, metadata) VALUES (:1, :2, :3)",
&[&id_str, &created_at, &metadata_json],
)
.map(|_| ())
.map_err(map_oracle_error)
})
.await?;
Ok(conversation)
}
async fn get_conversation(&self, id: &ConversationId) -> Result<Option<Conversation>> {
let lookup = id.0.clone();
self.with_connection(move |conn| {
let mut stmt = conn
.statement("SELECT id, created_at, metadata FROM conversations WHERE id = :1")
.build()
.map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&lookup]).map_err(map_oracle_error)?;
if let Some(row_res) = rows.next() {
let row = row_res.map_err(map_oracle_error)?;
let id: String = row.get(0).map_err(map_oracle_error)?;
let created_at: chrono::DateTime<Utc> = row.get(1).map_err(map_oracle_error)?;
let metadata_raw: Option<String> = row.get(2).map_err(map_oracle_error)?;
let metadata = Self::parse_metadata(metadata_raw)?;
Ok(Some(Conversation::with_parts(
ConversationId(id),
created_at,
metadata,
)))
} else {
Ok(None)
}
})
.await
}
async fn update_conversation(
&self,
id: &ConversationId,
metadata: Option<ConversationMetadata>,
) -> Result<Option<Conversation>> {
let id_str = id.0.clone();
let metadata_json = metadata.as_ref().map(serde_json::to_string).transpose()?;
let conversation_id = id.clone();
self.with_connection(move |conn| {
let mut stmt = conn
.statement(
"UPDATE conversations \
SET metadata = :1 \
WHERE id = :2 \
RETURNING created_at INTO :3",
)
.build()
.map_err(map_oracle_error)?;
stmt.bind(3, &OracleType::TimestampTZ(6))
.map_err(map_oracle_error)?;
stmt.execute(&[&metadata_json, &id_str])
.map_err(map_oracle_error)?;
if stmt.row_count().map_err(map_oracle_error)? == 0 {
return Ok(None);
}
let mut created_at: Vec<chrono::DateTime<Utc>> =
stmt.returned_values(3).map_err(map_oracle_error)?;
let created_at = created_at.pop().ok_or_else(|| {
ConversationStorageError::StorageError(
"Oracle update did not return created_at".to_string(),
)
})?;
Ok(Some(Conversation::with_parts(
conversation_id,
created_at,
metadata,
)))
})
.await
}
async fn delete_conversation(&self, id: &ConversationId) -> Result<bool> {
let id_str = id.0.clone();
let res = self
.with_connection(move |conn| {
conn.execute("DELETE FROM conversations WHERE id = :1", &[&id_str])
.map_err(map_oracle_error)
})
.await?;
Ok(res.row_count().map_err(map_oracle_error)? > 0)
}
}
#[derive(Clone)]
struct ConversationOracleConnectionManager {
params: Arc<OracleConnectParams>,
}
impl ConversationOracleConnectionManager {
fn new(config: Arc<OracleConfig>) -> Self {
let params = OracleConnectParams {
username: config.username.clone(),
password: config.password.clone(),
connect_descriptor: config.connect_descriptor.clone(),
};
Self {
params: Arc::new(params),
}
}
}
impl std::fmt::Debug for ConversationOracleConnectionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConversationOracleConnectionManager")
.field("username", &self.params.username)
.field("connect_descriptor", &self.params.connect_descriptor)
.finish()
}
}
#[derive(Clone)]
struct OracleConnectParams {
username: String,
password: String,
connect_descriptor: String,
}
impl std::fmt::Debug for OracleConnectParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OracleConnectParams")
.field("username", &self.username)
.field("connect_descriptor", &self.connect_descriptor)
.finish()
}
}
#[async_trait]
impl Manager for ConversationOracleConnectionManager {
type Type = Connection;
type Error = oracle::Error;
fn create(
&self,
) -> impl std::future::Future<Output = std::result::Result<Connection, oracle::Error>> + Send
{
let params = self.params.clone();
async move {
let mut conn = Connection::connect(
&params.username,
&params.password,
&params.connect_descriptor,
)?;
conn.set_autocommit(true);
Ok(conn)
}
}
#[allow(clippy::manual_async_fn)]
fn recycle(
&self,
conn: &mut Connection,
_: &Metrics,
) -> impl std::future::Future<Output = RecycleResult<Self::Error>> + Send {
async move { conn.ping().map_err(RecycleError::Backend) }
}
}
fn configure_oracle_client(config: &OracleConfig) -> Result<()> {
if let Some(wallet_path) = &config.wallet_path {
let wallet_path = Path::new(wallet_path);
if !wallet_path.is_dir() {
return Err(ConversationStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is not a directory",
wallet_path.display()
)));
}
if !wallet_path.join("tnsnames.ora").exists() && !wallet_path.join("sqlnet.ora").exists() {
return Err(ConversationStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is missing tnsnames.ora or sqlnet.ora",
wallet_path.display()
)));
}
std::env::set_var("TNS_ADMIN", wallet_path);
}
Ok(())
}
fn initialize_schema(config: &OracleConfig) -> Result<()> {
let conn = Connection::connect(
&config.username,
&config.password,
&config.connect_descriptor,
)
.map_err(map_oracle_error)?;
let exists: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'CONVERSATIONS'",
&[],
)
.map_err(map_oracle_error)?;
if exists == 0 {
conn.execute(
"CREATE TABLE conversations (
id VARCHAR2(64) PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
metadata CLOB
)",
&[],
)
.map_err(map_oracle_error)?;
}
Ok(())
}
fn map_pool_error(err: PoolError<oracle::Error>) -> ConversationStorageError {
match err {
PoolError::Backend(e) => map_oracle_error(e),
other => ConversationStorageError::StorageError(format!(
"failed to obtain Oracle conversation connection: {other}"
)),
}
}
fn map_oracle_error(err: oracle::Error) -> ConversationStorageError {
if let Some(db_err) = err.db_error() {
ConversationStorageError::StorageError(format!(
"Oracle error (code {}): {}",
db_err.code(),
db_err.message()
))
} else {
ConversationStorageError::StorageError(err.to_string())
}
}
use std::{
fmt::{Display, Formatter},
sync::Arc,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde_json::{Map as JsonMap, Value};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ConversationId(pub String);
impl ConversationId {
pub fn new() -> Self {
let mut rng = rand::rng();
let mut bytes = [0u8; 24];
rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
Self(format!("conv_{}", hex_string))
}
}
impl Default for ConversationId {
fn default() -> Self {
Self::new()
}
}
impl From<String> for ConversationId {
fn from(value: String) -> Self {
Self(value)
}
}
impl From<&str> for ConversationId {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
impl Display for ConversationId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
/// Metadata payload persisted with a conversation
pub type ConversationMetadata = JsonMap<String, Value>;
/// Input payload for creating a conversation
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NewConversation {
/// Optional conversation ID (if None, a random ID will be generated)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<ConversationId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<ConversationMetadata>,
}
/// Stored conversation data structure
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Conversation {
pub id: ConversationId,
pub created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<ConversationMetadata>,
}
impl Conversation {
pub fn new(new_conversation: NewConversation) -> Self {
Self {
id: new_conversation.id.unwrap_or_default(),
created_at: Utc::now(),
metadata: new_conversation.metadata,
}
}
pub fn with_parts(
id: ConversationId,
created_at: DateTime<Utc>,
metadata: Option<ConversationMetadata>,
) -> Self {
Self {
id,
created_at,
metadata,
}
}
}
/// Result alias for conversation storage operations
pub type Result<T> = std::result::Result<T, ConversationStorageError>;
/// Error type for conversation storage operations
#[derive(Debug, thiserror::Error)]
pub enum ConversationStorageError {
#[error("Conversation not found: {0}")]
ConversationNotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
/// Trait describing the CRUD interface for conversation storage backends
#[async_trait]
pub trait ConversationStorage: Send + Sync + 'static {
async fn create_conversation(&self, input: NewConversation) -> Result<Conversation>;
async fn get_conversation(&self, id: &ConversationId) -> Result<Option<Conversation>>;
async fn update_conversation(
&self,
id: &ConversationId,
metadata: Option<ConversationMetadata>,
) -> Result<Option<Conversation>>;
async fn delete_conversation(&self, id: &ConversationId) -> Result<bool>;
}
/// Shared pointer alias for conversation storage
pub type SharedConversationStorage = Arc<dyn ConversationStorage>;
use std::{collections::HashMap, sync::Arc};
// core.rs
//
// Core types for the data connector module.
// Contains all traits, data types, error types, and IDs for all storage backends.
//
// Structure:
// 1. Conversation types + trait
// 2. ConversationItem types + trait
// 3. Response types + trait
use std::{
collections::HashMap,
fmt::{Display, Formatter},
sync::Arc,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Map as JsonMap, Value};
// ============================================================================
// PART 1: Conversation Storage
// ============================================================================
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ConversationId(pub String);
impl ConversationId {
pub fn new() -> Self {
let mut rng = rand::rng();
let mut bytes = [0u8; 25];
rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
Self(format!("conv_{}", hex_string))
}
}
impl Default for ConversationId {
fn default() -> Self {
Self::new()
}
}
impl From<String> for ConversationId {
fn from(value: String) -> Self {
Self(value)
}
}
impl From<&str> for ConversationId {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
impl Display for ConversationId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
/// Metadata payload persisted with a conversation
pub type ConversationMetadata = JsonMap<String, Value>;
/// Input payload for creating a conversation
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NewConversation {
/// Optional conversation ID (if None, a random ID will be generated)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<ConversationId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<ConversationMetadata>,
}
/// Stored conversation data structure
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Conversation {
pub id: ConversationId,
pub created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<ConversationMetadata>,
}
impl Conversation {
pub fn new(new_conversation: NewConversation) -> Self {
Self {
id: new_conversation.id.unwrap_or_default(),
created_at: Utc::now(),
metadata: new_conversation.metadata,
}
}
pub fn with_parts(
id: ConversationId,
created_at: DateTime<Utc>,
metadata: Option<ConversationMetadata>,
) -> Self {
Self {
id,
created_at,
metadata,
}
}
}
/// Result alias for conversation storage operations
pub type ConversationResult<T> = Result<T, ConversationStorageError>;
/// Error type for conversation storage operations
#[derive(Debug, thiserror::Error)]
pub enum ConversationStorageError {
#[error("Conversation not found: {0}")]
ConversationNotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
/// Trait describing the CRUD interface for conversation storage backends
#[async_trait]
pub trait ConversationStorage: Send + Sync + 'static {
async fn create_conversation(&self, input: NewConversation)
-> ConversationResult<Conversation>;
async fn get_conversation(
&self,
id: &ConversationId,
) -> ConversationResult<Option<Conversation>>;
async fn update_conversation(
&self,
id: &ConversationId,
metadata: Option<ConversationMetadata>,
) -> ConversationResult<Option<Conversation>>;
async fn delete_conversation(&self, id: &ConversationId) -> ConversationResult<bool>;
}
/// Shared pointer alias for conversation storage
pub type SharedConversationStorage = Arc<dyn ConversationStorage>;
// ============================================================================
// PART 2: ConversationItem Storage
// ============================================================================
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ConversationItemId(pub String);
impl Display for ConversationItemId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl From<String> for ConversationItemId {
fn from(value: String) -> Self {
Self(value)
}
}
impl From<&str> for ConversationItemId {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConversationItem {
pub id: ConversationItemId,
pub response_id: Option<String>,
pub item_type: String,
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewConversationItem {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<ConversationItemId>,
pub response_id: Option<String>,
pub item_type: String,
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SortOrder {
Asc,
Desc,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListParams {
pub limit: usize,
pub order: SortOrder,
pub after: Option<String>, // item_id cursor
}
pub type ConversationItemResult<T> = Result<T, ConversationItemStorageError>;
#[derive(Debug, thiserror::Error)]
pub enum ConversationItemStorageError {
#[error("Not found: {0}")]
NotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
#[async_trait]
pub trait ConversationItemStorage: Send + Sync + 'static {
async fn create_item(
&self,
item: NewConversationItem,
) -> ConversationItemResult<ConversationItem>;
async fn link_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
added_at: DateTime<Utc>,
) -> ConversationItemResult<()>;
async fn list_items(
&self,
conversation_id: &ConversationId,
params: ListParams,
) -> ConversationItemResult<Vec<ConversationItem>>;
/// Get a single item by ID
async fn get_item(
&self,
item_id: &ConversationItemId,
) -> ConversationItemResult<Option<ConversationItem>>;
/// Check if an item is linked to a conversation
async fn is_item_linked(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ConversationItemResult<bool>;
/// Delete an item link from a conversation (does not delete the item itself)
async fn delete_item(
&self,
conversation_id: &ConversationId,
item_id: &ConversationItemId,
) -> ConversationItemResult<()>;
}
pub type SharedConversationItemStorage = Arc<dyn ConversationItemStorage>;
/// Helper to build id prefix based on item_type
pub fn make_item_id(item_type: &str) -> ConversationItemId {
// Generate exactly 50 hex characters (25 bytes) for the part after the underscore
let mut rng = rand::rng();
let mut bytes = [0u8; 25];
rng.fill_bytes(&mut bytes);
let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
let prefix: String = match item_type {
"message" => "msg".to_string(),
"reasoning" => "rs".to_string(),
"mcp_call" => "mcp".to_string(),
"mcp_list_tools" => "mcpl".to_string(),
"function_tool_call" => "ftc".to_string(),
other => {
// Fallback: first 3 letters of type or "itm"
let mut p = other.chars().take(3).collect::<String>();
if p.is_empty() {
p = "itm".to_string();
}
p
}
};
ConversationItemId(format!("{}_{}", prefix, hex_string))
}
// ============================================================================
// PART 3: Response Storage
// ============================================================================
/// Response identifier
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
......@@ -57,7 +344,7 @@ pub struct StoredResponse {
pub metadata: HashMap<String, Value>,
/// When this response was created
pub created_at: chrono::DateTime<chrono::Utc>,
pub created_at: DateTime<Utc>,
/// User identifier (optional)
pub user: Option<String>,
......@@ -84,7 +371,7 @@ impl StoredResponse {
output: Value::Array(vec![]),
tool_calls: Vec::new(),
metadata: HashMap::new(),
created_at: chrono::Utc::now(),
created_at: Utc::now(),
user: None,
model: None,
conversation_id: None,
......@@ -159,19 +446,22 @@ pub enum ResponseStorageError {
SerializationError(#[from] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, ResponseStorageError>;
pub type ResponseResult<T> = Result<T, ResponseStorageError>;
/// Trait for response storage
#[async_trait]
pub trait ResponseStorage: Send + Sync {
/// Store a new response
async fn store_response(&self, response: StoredResponse) -> Result<ResponseId>;
async fn store_response(&self, response: StoredResponse) -> ResponseResult<ResponseId>;
/// Get a response by ID
async fn get_response(&self, response_id: &ResponseId) -> Result<Option<StoredResponse>>;
async fn get_response(
&self,
response_id: &ResponseId,
) -> ResponseResult<Option<StoredResponse>>;
/// Delete a response
async fn delete_response(&self, response_id: &ResponseId) -> Result<()>;
async fn delete_response(&self, response_id: &ResponseId) -> ResponseResult<()>;
/// Get the chain of responses leading to a given response
/// Returns responses in chronological order (oldest first)
......@@ -179,17 +469,17 @@ pub trait ResponseStorage: Send + Sync {
&self,
response_id: &ResponseId,
max_depth: Option<usize>,
) -> Result<ResponseChain>;
) -> ResponseResult<ResponseChain>;
/// List recent responses for a user
async fn list_user_responses(
&self,
user: &str,
limit: Option<usize>,
) -> Result<Vec<StoredResponse>>;
) -> ResponseResult<Vec<StoredResponse>>;
/// Delete all responses for a user
async fn delete_user_responses(&self, user: &str) -> Result<usize>;
async fn delete_user_responses(&self, user: &str) -> ResponseResult<usize>;
}
/// Type alias for shared storage
......
// factory.rs
//
// Factory function to create storage backends based on configuration.
// This centralizes storage initialization logic and fixes the bug where
// conversation_item_storage was missing/incorrect in server.rs.
use std::sync::Arc;
use tracing::info;
use super::{
core::{SharedConversationItemStorage, SharedConversationStorage, SharedResponseStorage},
memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
};
use crate::config::{HistoryBackend, OracleConfig, RouterConfig};
/// Create all three storage backends based on router configuration.
///
/// # Arguments
/// * `config` - Router configuration containing history_backend and oracle settings
///
/// # Returns
/// Tuple of (response_storage, conversation_storage, conversation_item_storage)
///
/// # Errors
/// Returns error string if Oracle configuration is missing or initialization fails
pub fn create_storage(
config: &RouterConfig,
) -> Result<
(
SharedResponseStorage,
SharedConversationStorage,
SharedConversationItemStorage,
),
String,
> {
match config.history_backend {
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
Ok((
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
Arc::new(MemoryConversationItemStorage::new()),
))
}
HistoryBackend::None => {
info!("Initializing data connector: None (no persistence)");
Ok((
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
Arc::new(NoOpConversationItemStorage::new()),
))
}
HistoryBackend::Oracle => {
let oracle_cfg = config
.oracle
.clone()
.ok_or("oracle configuration is required when history_backend=oracle")?;
info!(
"Initializing data connector: Oracle ATP (pool: {}-{})",
oracle_cfg.pool_min, oracle_cfg.pool_max
);
let storages = create_oracle_storage(&oracle_cfg)?;
info!("Data connector initialized successfully: Oracle ATP");
Ok(storages)
}
}
}
/// Create Oracle storage backends
fn create_oracle_storage(
oracle_cfg: &OracleConfig,
) -> Result<
(
SharedResponseStorage,
SharedConversationStorage,
SharedConversationItemStorage,
),
String,
> {
let response_storage = OracleResponseStorage::new(oracle_cfg.clone())
.map_err(|err| format!("failed to initialize Oracle response storage: {err}"))?;
let conversation_storage = OracleConversationStorage::new(oracle_cfg.clone())
.map_err(|err| format!("failed to initialize Oracle conversation storage: {err}"))?;
let conversation_item_storage = OracleConversationItemStorage::new(oracle_cfg.clone())
.map_err(|err| format!("failed to initialize Oracle conversation item storage: {err}"))?;
Ok((
Arc::new(response_storage),
Arc::new(conversation_storage),
Arc::new(conversation_item_storage),
))
}
// Data connector module for response storage and conversation storage
pub mod conversation_item_memory_store;
pub mod conversation_item_oracle_store;
pub mod conversation_items;
pub mod conversation_memory_store;
pub mod conversation_noop_store;
pub mod conversation_oracle_store;
pub mod conversations;
pub mod response_memory_store;
pub mod response_noop_store;
pub mod response_oracle_store;
pub mod responses;
//
// Simplified module structure:
// - core.rs: All traits, data types, and errors
// - memory.rs: All in-memory storage implementations
// - noop.rs: All no-op storage implementations
// - oracle.rs: All Oracle ATP storage implementations
// - factory.rs: Storage creation function
pub use conversation_item_memory_store::MemoryConversationItemStorage;
pub use conversation_item_oracle_store::OracleConversationItemStorage;
pub use conversation_items::{
ConversationItem, ConversationItemId, ConversationItemStorage, ConversationItemStorageError,
ListParams as ConversationItemsListParams, NewConversationItem,
Result as ConversationItemsResult, SharedConversationItemStorage,
SortOrder as ConversationItemsSortOrder,
};
pub use conversation_memory_store::MemoryConversationStorage;
pub use conversation_noop_store::NoOpConversationStorage;
pub use conversation_oracle_store::OracleConversationStorage;
pub use conversations::{
Conversation, ConversationId, ConversationMetadata, ConversationStorage,
ConversationStorageError, NewConversation, Result as ConversationResult,
SharedConversationStorage,
};
pub use response_memory_store::MemoryResponseStorage;
pub use response_noop_store::NoOpResponseStorage;
pub use response_oracle_store::OracleResponseStorage;
pub use responses::{
ResponseChain, ResponseId, ResponseStorage, ResponseStorageError, SharedResponseStorage,
StoredResponse,
};
mod core;
mod factory;
mod memory;
mod noop;
mod oracle;
// Re-export all core types
pub use core::*;
// Re-export factory function
pub use factory::create_storage;
// Re-export all storage implementations
pub use memory::*;
pub use noop::*;
pub use oracle::*;
//! NoOp storage implementations
//!
//! These implementations do nothing - useful for when persistence is disabled.
//!
//! Structure:
//! 1. NoOpConversationStorage
//! 2. NoOpConversationItemStorage
//! 3. NoOpResponseStorage
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use super::core::*;
// ============================================================================
// PART 1: NoOpConversationStorage
// ============================================================================
/// No-op implementation that synthesizes conversation responses without persistence
#[derive(Default, Debug, Clone)]
pub struct NoOpConversationStorage;
impl NoOpConversationStorage {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl ConversationStorage for NoOpConversationStorage {
async fn create_conversation(
&self,
input: NewConversation,
) -> ConversationResult<Conversation> {
Ok(Conversation::new(input))
}
async fn get_conversation(
&self,
_id: &ConversationId,
) -> ConversationResult<Option<Conversation>> {
Ok(None)
}
async fn update_conversation(
&self,
_id: &ConversationId,
_metadata: Option<ConversationMetadata>,
) -> ConversationResult<Option<Conversation>> {
Ok(None)
}
async fn delete_conversation(&self, _id: &ConversationId) -> ConversationResult<bool> {
Ok(false)
}
}
// ============================================================================
// PART 2: NoOpConversationItemStorage
// ============================================================================
/// No-op conversation item storage (does nothing)
#[derive(Clone, Copy, Default)]
pub struct NoOpConversationItemStorage;
impl NoOpConversationItemStorage {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl ConversationItemStorage for NoOpConversationItemStorage {
async fn create_item(
&self,
item: NewConversationItem,
) -> ConversationItemResult<ConversationItem> {
let id = item
.id
.clone()
.unwrap_or_else(|| make_item_id(&item.item_type));
Ok(ConversationItem {
id,
response_id: item.response_id,
item_type: item.item_type,
role: item.role,
content: item.content,
status: item.status,
created_at: Utc::now(),
})
}
async fn link_item(
&self,
_conversation_id: &ConversationId,
_item_id: &ConversationItemId,
_added_at: DateTime<Utc>,
) -> ConversationItemResult<()> {
Ok(())
}
async fn list_items(
&self,
_conversation_id: &ConversationId,
_params: ListParams,
) -> ConversationItemResult<Vec<ConversationItem>> {
Ok(Vec::new())
}
async fn get_item(
&self,
_item_id: &ConversationItemId,
) -> ConversationItemResult<Option<ConversationItem>> {
Ok(None)
}
async fn is_item_linked(
&self,
_conversation_id: &ConversationId,
_item_id: &ConversationItemId,
) -> ConversationItemResult<bool> {
Ok(false)
}
async fn delete_item(
&self,
_conversation_id: &ConversationId,
_item_id: &ConversationItemId,
) -> ConversationItemResult<()> {
Ok(())
}
}
// ============================================================================
// PART 3: NoOpResponseStorage
// ============================================================================
/// No-op implementation of response storage (does nothing)
pub struct NoOpResponseStorage;
impl NoOpResponseStorage {
pub fn new() -> Self {
Self
}
}
impl Default for NoOpResponseStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ResponseStorage for NoOpResponseStorage {
async fn store_response(&self, response: StoredResponse) -> ResponseResult<ResponseId> {
Ok(response.id)
}
async fn get_response(
&self,
_response_id: &ResponseId,
) -> ResponseResult<Option<StoredResponse>> {
Ok(None)
}
async fn delete_response(&self, _response_id: &ResponseId) -> ResponseResult<()> {
Ok(())
}
async fn get_response_chain(
&self,
_response_id: &ResponseId,
_max_depth: Option<usize>,
) -> ResponseResult<ResponseChain> {
Ok(ResponseChain::new())
}
async fn list_user_responses(
&self,
_user: &str,
_limit: Option<usize>,
) -> ResponseResult<Vec<StoredResponse>> {
Ok(Vec::new())
}
async fn delete_user_responses(&self, _user: &str) -> ResponseResult<usize> {
Ok(0)
}
}
This diff is collapsed.
use async_trait::async_trait;
use super::responses::{ResponseChain, ResponseId, ResponseStorage, Result, StoredResponse};
/// No-op implementation of response storage (does nothing)
pub struct NoOpResponseStorage;
impl NoOpResponseStorage {
pub fn new() -> Self {
Self
}
}
impl Default for NoOpResponseStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ResponseStorage for NoOpResponseStorage {
async fn store_response(&self, response: StoredResponse) -> Result<ResponseId> {
Ok(response.id)
}
async fn get_response(&self, _response_id: &ResponseId) -> Result<Option<StoredResponse>> {
Ok(None)
}
async fn delete_response(&self, _response_id: &ResponseId) -> Result<()> {
Ok(())
}
async fn get_response_chain(
&self,
_response_id: &ResponseId,
_max_depth: Option<usize>,
) -> Result<ResponseChain> {
Ok(ResponseChain::new())
}
async fn list_user_responses(
&self,
_user: &str,
_limit: Option<usize>,
) -> Result<Vec<StoredResponse>> {
Ok(Vec::new())
}
async fn delete_user_responses(&self, _user: &str) -> Result<usize> {
Ok(0)
}
}
use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
use async_trait::async_trait;
use deadpool::managed::{Manager, Metrics, Pool, PoolError, RecycleError, RecycleResult};
use oracle::{Connection, Row};
use serde_json::Value;
use crate::{
config::OracleConfig,
data_connector::responses::{
ResponseChain, ResponseId, ResponseStorage, ResponseStorageError, Result as StorageResult,
StoredResponse,
},
};
const SELECT_BASE: &str = "SELECT id, previous_response_id, input, instructions, output, \
tool_calls, metadata, created_at, user_id, model, conversation_id, raw_response FROM responses";
#[derive(Clone)]
pub struct OracleResponseStorage {
pool: Pool<OracleConnectionManager>,
}
impl OracleResponseStorage {
pub fn new(config: OracleConfig) -> StorageResult<Self> {
let config = Arc::new(config);
configure_oracle_client(&config)?;
initialize_schema(&config)?;
let manager = OracleConnectionManager::new(config.clone());
let mut builder = Pool::builder(manager)
.max_size(config.pool_max)
.runtime(deadpool::Runtime::Tokio1);
if config.pool_timeout_secs > 0 {
builder = builder.wait_timeout(Some(Duration::from_secs(config.pool_timeout_secs)));
}
let pool = builder.build().map_err(|err| {
ResponseStorageError::StorageError(format!(
"failed to build Oracle connection pool: {err}"
))
})?;
Ok(Self { pool })
}
async fn with_connection<F, T>(&self, func: F) -> StorageResult<T>
where
F: FnOnce(&Connection) -> StorageResult<T> + Send + 'static,
T: Send + 'static,
{
let connection = self.pool.get().await.map_err(map_pool_error)?;
tokio::task::spawn_blocking(move || {
let result = func(&connection);
drop(connection);
result
})
.await
.map_err(|err| {
ResponseStorageError::StorageError(format!(
"failed to execute Oracle query task: {err}"
))
})?
}
fn build_response_from_row(row: &Row) -> StorageResult<StoredResponse> {
let id: String = row
.get(0)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch id"))?;
let previous: Option<String> = row.get(1).map_err(|err| {
map_oracle_error(err).into_storage_error("fetch previous_response_id")
})?;
let input_json: Option<String> = row
.get(2)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch input"))?;
let instructions: Option<String> = row
.get(3)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch instructions"))?;
let output_json: Option<String> = row
.get(4)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch output"))?;
let tool_calls_json: Option<String> = row
.get(5)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch tool_calls"))?;
let metadata_json: Option<String> = row
.get(6)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch metadata"))?;
let created_at: chrono::DateTime<chrono::Utc> = row
.get(7)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch created_at"))?;
let user_id: Option<String> = row
.get(8)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch user_id"))?;
let model: Option<String> = row
.get(9)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch model"))?;
let conversation_id: Option<String> = row
.get(10)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch conversation_id"))?;
let raw_response_json: Option<String> = row
.get(11)
.map_err(|err| map_oracle_error(err).into_storage_error("fetch raw_response"))?;
let previous_response_id = previous.map(ResponseId);
let tool_calls = parse_tool_calls(tool_calls_json)?;
let metadata = parse_metadata(metadata_json)?;
let raw_response = parse_raw_response(raw_response_json)?;
let input = parse_json_value(input_json)?;
let output = parse_json_value(output_json)?;
Ok(StoredResponse {
id: ResponseId(id),
previous_response_id,
input,
instructions,
output,
tool_calls,
metadata,
created_at,
user: user_id,
model,
conversation_id,
raw_response,
})
}
}
#[async_trait]
impl ResponseStorage for OracleResponseStorage {
async fn store_response(&self, response: StoredResponse) -> StorageResult<ResponseId> {
let StoredResponse {
id,
previous_response_id,
input,
instructions,
output,
tool_calls,
metadata,
created_at,
user,
model,
conversation_id,
raw_response,
} = response;
let response_id = id.clone();
let response_id_str = response_id.0.clone();
let previous_id = previous_response_id.map(|r| r.0);
let json_input = serde_json::to_string(&input)?;
let json_output = serde_json::to_string(&output)?;
let json_tool_calls = serde_json::to_string(&tool_calls)?;
let json_metadata = serde_json::to_string(&metadata)?;
let json_raw_response = serde_json::to_string(&raw_response)?;
self.with_connection(move |conn| {
conn.execute(
"INSERT INTO responses (id, previous_response_id, input, instructions, output, \
tool_calls, metadata, created_at, user_id, model, conversation_id, raw_response) \
VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12)",
&[
&response_id_str,
&previous_id,
&json_input,
&instructions,
&json_output,
&json_tool_calls,
&json_metadata,
&created_at,
&user,
&model,
&conversation_id,
&json_raw_response,
],
)
.map(|_| ())
.map_err(map_oracle_error)
})
.await?;
Ok(response_id)
}
async fn get_response(
&self,
response_id: &ResponseId,
) -> StorageResult<Option<StoredResponse>> {
let id = response_id.0.clone();
self.with_connection(move |conn| {
let mut stmt = conn
.statement(&format!("{} WHERE id = :1", SELECT_BASE))
.build()
.map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&id]).map_err(map_oracle_error)?;
match rows.next() {
Some(row) => {
let row = row.map_err(map_oracle_error)?;
OracleResponseStorage::build_response_from_row(&row).map(Some)
}
None => Ok(None),
}
})
.await
}
async fn delete_response(&self, response_id: &ResponseId) -> StorageResult<()> {
let id = response_id.0.clone();
self.with_connection(move |conn| {
conn.execute("DELETE FROM responses WHERE id = :1", &[&id])
.map(|_| ())
.map_err(map_oracle_error)
})
.await
}
async fn get_response_chain(
&self,
response_id: &ResponseId,
max_depth: Option<usize>,
) -> StorageResult<ResponseChain> {
let mut chain = ResponseChain::new();
let mut current_id = Some(response_id.clone());
let mut visited = 0usize;
while let Some(ref lookup_id) = current_id {
if let Some(limit) = max_depth {
if visited >= limit {
break;
}
}
let fetched = self.get_response(lookup_id).await?;
match fetched {
Some(response) => {
current_id = response.previous_response_id.clone();
chain.responses.push(response);
visited += 1;
}
None => break,
}
}
chain.responses.reverse();
Ok(chain)
}
async fn list_user_responses(
&self,
user: &str,
limit: Option<usize>,
) -> StorageResult<Vec<StoredResponse>> {
let user = user.to_string();
self.with_connection(move |conn| {
let sql = if let Some(limit) = limit {
format!(
"SELECT * FROM ({} WHERE user_id = :1 ORDER BY created_at DESC) WHERE ROWNUM <= {}",
SELECT_BASE, limit
)
} else {
format!("{} WHERE user_id = :1 ORDER BY created_at DESC", SELECT_BASE)
};
let mut stmt = conn.statement(&sql).build().map_err(map_oracle_error)?;
let mut rows = stmt.query(&[&user]).map_err(map_oracle_error)?;
let mut results = Vec::new();
for row in &mut rows {
let row = row.map_err(map_oracle_error)?;
results.push(OracleResponseStorage::build_response_from_row(&row)?);
}
Ok(results)
})
.await
}
async fn delete_user_responses(&self, user: &str) -> StorageResult<usize> {
let user = user.to_string();
let affected = self
.with_connection(move |conn| {
conn.execute("DELETE FROM responses WHERE user_id = :1", &[&user])
.map_err(map_oracle_error)
})
.await?;
let deleted = affected.row_count().map_err(map_oracle_error)? as usize;
Ok(deleted)
}
}
#[derive(Clone)]
struct OracleConnectionManager {
params: Arc<OracleConnectParams>,
}
impl OracleConnectionManager {
fn new(config: Arc<OracleConfig>) -> Self {
let params = OracleConnectParams {
username: config.username.clone(),
password: config.password.clone(),
connect_descriptor: config.connect_descriptor.clone(),
};
Self {
params: Arc::new(params),
}
}
}
impl std::fmt::Debug for OracleConnectionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OracleConnectionManager")
.field("username", &self.params.username)
.field("connect_descriptor", &self.params.connect_descriptor)
.finish()
}
}
#[derive(Clone)]
struct OracleConnectParams {
username: String,
password: String,
connect_descriptor: String,
}
impl std::fmt::Debug for OracleConnectParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OracleConnectParams")
.field("username", &self.username)
.field("connect_descriptor", &self.connect_descriptor)
.finish()
}
}
#[async_trait]
impl Manager for OracleConnectionManager {
type Type = Connection;
type Error = oracle::Error;
fn create(
&self,
) -> impl std::future::Future<Output = Result<Connection, oracle::Error>> + Send {
let params = self.params.clone();
async move {
let mut conn = Connection::connect(
&params.username,
&params.password,
&params.connect_descriptor,
)?;
conn.set_autocommit(true);
Ok(conn)
}
}
#[allow(clippy::manual_async_fn)]
fn recycle(
&self,
conn: &mut Connection,
_: &Metrics,
) -> impl std::future::Future<Output = RecycleResult<Self::Error>> + Send {
async move { conn.ping().map_err(RecycleError::Backend) }
}
}
fn configure_oracle_client(config: &OracleConfig) -> StorageResult<()> {
if let Some(wallet_path) = &config.wallet_path {
let wallet_path = Path::new(wallet_path);
if !wallet_path.is_dir() {
return Err(ResponseStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is not a directory",
wallet_path.display()
)));
}
if !wallet_path.join("tnsnames.ora").exists() && !wallet_path.join("sqlnet.ora").exists() {
return Err(ResponseStorageError::StorageError(format!(
"Oracle wallet/config path '{}' is missing tnsnames.ora or sqlnet.ora",
wallet_path.display()
)));
}
std::env::set_var("TNS_ADMIN", wallet_path);
}
Ok(())
}
fn initialize_schema(config: &OracleConfig) -> StorageResult<()> {
let conn = Connection::connect(
&config.username,
&config.password,
&config.connect_descriptor,
)
.map_err(map_oracle_error)?;
let exists: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'RESPONSES'",
&[],
)
.map_err(map_oracle_error)?;
if exists == 0 {
conn.execute(
"CREATE TABLE responses (
id VARCHAR2(64) PRIMARY KEY,
conversation_id VARCHAR2(64),
previous_response_id VARCHAR2(64),
input CLOB,
instructions CLOB,
output CLOB,
tool_calls CLOB,
metadata CLOB,
created_at TIMESTAMP WITH TIME ZONE,
user_id VARCHAR2(128),
model VARCHAR2(128),
raw_response CLOB
)",
&[],
)
.map_err(map_oracle_error)?;
}
create_index_if_missing(
&conn,
"RESPONSES_PREV_IDX",
"CREATE INDEX responses_prev_idx ON responses(previous_response_id)",
)?;
create_index_if_missing(
&conn,
"RESPONSES_USER_IDX",
"CREATE INDEX responses_user_idx ON responses(user_id)",
)?;
Ok(())
}
fn create_index_if_missing(conn: &Connection, index_name: &str, ddl: &str) -> StorageResult<()> {
let count: i64 = conn
.query_row_as(
"SELECT COUNT(*) FROM user_indexes WHERE table_name = 'RESPONSES' AND index_name = :1",
&[&index_name],
)
.map_err(map_oracle_error)?;
if count == 0 {
if let Err(err) = conn.execute(ddl, &[]) {
if err.db_error().map(|db| db.code()) != Some(1408) {
return Err(map_oracle_error(err));
}
}
}
Ok(())
}
fn parse_tool_calls(raw: Option<String>) -> StorageResult<Vec<Value>> {
match raw {
Some(s) if !s.is_empty() => {
serde_json::from_str(&s).map_err(ResponseStorageError::SerializationError)
}
_ => Ok(Vec::new()),
}
}
fn parse_metadata(raw: Option<String>) -> StorageResult<HashMap<String, Value>> {
match raw {
Some(s) if !s.is_empty() => {
serde_json::from_str(&s).map_err(ResponseStorageError::SerializationError)
}
_ => Ok(HashMap::new()),
}
}
fn parse_raw_response(raw: Option<String>) -> StorageResult<Value> {
match raw {
Some(s) if !s.is_empty() => {
serde_json::from_str(&s).map_err(ResponseStorageError::SerializationError)
}
_ => Ok(Value::Null),
}
}
fn parse_json_value(raw: Option<String>) -> StorageResult<Value> {
match raw {
Some(s) if !s.is_empty() => {
serde_json::from_str(&s).map_err(ResponseStorageError::SerializationError)
}
_ => Ok(Value::Array(vec![])),
}
}
fn map_pool_error(err: PoolError<oracle::Error>) -> ResponseStorageError {
match err {
PoolError::Backend(e) => map_oracle_error(e),
other => ResponseStorageError::StorageError(format!(
"failed to obtain Oracle connection: {other}"
)),
}
}
fn map_oracle_error(err: oracle::Error) -> ResponseStorageError {
if let Some(db_err) = err.db_error() {
ResponseStorageError::StorageError(format!(
"Oracle error (code {}): {}",
db_err.code(),
db_err.message()
))
} else {
ResponseStorageError::StorageError(err.to_string())
}
}
trait OracleErrorExt {
fn into_storage_error(self, context: &str) -> ResponseStorageError;
}
impl OracleErrorExt for ResponseStorageError {
fn into_storage_error(self, context: &str) -> ResponseStorageError {
ResponseStorageError::StorageError(format!("{context}: {self}"))
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn parse_tool_calls_handles_empty_input() {
assert!(parse_tool_calls(None).unwrap().is_empty());
assert!(parse_tool_calls(Some(String::new())).unwrap().is_empty());
}
#[test]
fn parse_tool_calls_round_trips() {
let payload = json!([{ "type": "test", "value": 1 }]).to_string();
let parsed = parse_tool_calls(Some(payload)).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0]["type"], "test");
assert_eq!(parsed[0]["value"], 1);
}
#[test]
fn parse_metadata_defaults_to_empty_map() {
assert!(parse_metadata(None).unwrap().is_empty());
}
#[test]
fn parse_metadata_round_trips() {
let payload = json!({"key": "value", "nested": {"bool": true}}).to_string();
let parsed = parse_metadata(Some(payload)).unwrap();
assert_eq!(parsed.get("key").unwrap(), "value");
assert_eq!(parsed["nested"]["bool"], true);
}
#[test]
fn parse_raw_response_handles_null() {
assert_eq!(parse_raw_response(None).unwrap(), Value::Null);
}
#[test]
fn parse_raw_response_round_trips() {
let payload = json!({"id": "abc"}).to_string();
let parsed = parse_raw_response(Some(payload)).unwrap();
assert_eq!(parsed["id"], "abc");
}
}
......@@ -1021,7 +1021,7 @@ async fn load_conversation_history(
.collect::<serde_json::Map<String, serde_json::Value>>()
});
let new_conv = crate::data_connector::conversations::NewConversation {
let new_conv = crate::data_connector::NewConversation {
id: Some(conv_id.clone()), // Use user-provided conversation ID
metadata,
};
......@@ -1033,9 +1033,9 @@ async fn load_conversation_history(
// Load conversation history
const MAX_CONVERSATION_HISTORY_ITEMS: usize = 100;
let params = crate::data_connector::conversation_items::ListParams {
let params = crate::data_connector::ListParams {
limit: MAX_CONVERSATION_HISTORY_ITEMS,
order: crate::data_connector::conversation_items::SortOrder::Asc,
order: crate::data_connector::SortOrder::Asc,
after: None,
};
......
......@@ -14,10 +14,9 @@ use tracing::{debug, info, warn};
use super::{responses::build_stored_response, utils::generate_id};
use crate::{
data_connector::{
conversation_items::{ListParams, SortOrder},
Conversation, ConversationId, ConversationItemId, ConversationItemStorage,
ConversationStorage, NewConversation, NewConversationItem, ResponseId, ResponseStorage,
SharedConversationItemStorage, SharedConversationStorage,
ConversationStorage, ListParams, NewConversation, NewConversationItem, ResponseId,
ResponseStorage, SharedConversationItemStorage, SharedConversationStorage, SortOrder,
},
protocols::responses::{ResponseInput, ResponsesRequest},
};
......@@ -889,7 +888,7 @@ fn parse_item_from_value(
/// Convert ConversationItem to JSON response format
/// Extracts fields from content for special types (mcp_call, mcp_list_tools, etc.)
fn item_to_json(item: &crate::data_connector::conversation_items::ConversationItem) -> Value {
fn item_to_json(item: &crate::data_connector::ConversationItem) -> Value {
let mut obj = serde_json::Map::new();
obj.insert("id".to_string(), json!(item.id.0));
obj.insert("type".to_string(), json!(item.item_type));
......
......@@ -39,9 +39,8 @@ use crate::{
config::CircuitBreakerConfig,
core::{CircuitBreaker, CircuitBreakerConfig as CoreCircuitBreakerConfig},
data_connector::{
conversation_items::{ListParams, SortOrder},
ConversationId, ResponseId, SharedConversationItemStorage, SharedConversationStorage,
SharedResponseStorage,
ConversationId, ListParams, ResponseId, SharedConversationItemStorage,
SharedConversationStorage, SharedResponseStorage, SortOrder,
},
protocols::{
chat::ChatCompletionRequest,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment