Unverified Commit eb8cac6f authored by Keyang Ru's avatar Keyang Ru Committed by GitHub
Browse files

[router] add py binding and readme for openai router and history backend (#11453)


Co-authored-by: default avatargemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
parent 5ea96ac7
......@@ -79,6 +79,25 @@ python -m sglang_router.launch_router \
--worker-urls http://worker1:8000 http://worker2:8000
```
#### OpenAI Backend Mode
Route requests to OpenAI or OpenAI-compatible endpoints:
```bash
# Route to OpenAI API
python -m sglang_router.launch_router \
--backend openai \
--worker-urls https://api.openai.com
# Route to custom OpenAI-compatible endpoint
python -m sglang_router.launch_router \
--backend openai \
--worker-urls http://my-openai-compatible-service:8000
```
**Note**:
- OpenAI backend mode acts as a simple proxy. Load balancing is not applicable in this mode.
- This mode now requires exactly one `--worker-urls` entry.
#### Launch Router with Worker URLs in prefill-decode mode
```bash
# Note that the prefill and decode URLs must be provided in the following format:
......@@ -194,6 +213,65 @@ python -m sglang_router.launch_router \
Default headers: `x-request-id`, `x-correlation-id`, `x-trace-id`, `request-id`
### History Backend (Conversation Storage)
Store conversation and response data for tracking, debugging, or analytics.
**NOTE: This feature is currently only supported in OpenAI router mode.**
#### Available options for history storage
- **Memory** (default): In-memory storage, fast but ephemeral
- **None**: No storage, minimal overhead
- **Oracle**: Persistent storage using Oracle ATP
```bash
# Memory backend (default)
python -m sglang_router.launch_router --backend openai \
--worker-urls https://api.openai.com \
--history-backend memory
# No storage for maximum performance
python -m sglang_router.launch_router --backend openai \
--worker-urls https://api.openai.com \
--history-backend none
# Oracle ATP backend
# Install Oracle Instant Client
# https://download.oracle.com/otn_software/linux/instantclient/2390000/instantclient-basic-linux.x64-23.9.0.25.07.zip
export LD_LIBRARY_PATH=/home/ubuntu/instant-client/instantclient_23_9
# choose ONE of the following connection methods:
# Option 1: Using full connection descriptor
export ATP_DSN="(description=(address=(protocol=tcps)(port=1522)(host=adb.region.oraclecloud.com))(connect_data=(service_name=service_name)))"
# Option 2: Using TNS alias (requires wallet)
export ATP_TNS_ALIAS="sglroutertestatp_high"
export ATP_WALLET_PATH="/path/to/wallet"
# service user config
export ATP_USER="admin"
export ATP_PASSWORD="YourPassword123"
python -m sglang_router.launch_router \
--worker-urls https://api.openai.com \
--backend openai \
--history-backend oracle
```
**Oracle Configuration Parameters:**
- `--oracle-tns-alias`: TNS alias from tnsnames.ora (env: `ATP_TNS_ALIAS`)
- Requires `--oracle-wallet-path` to locate tnsnames.ora
- `--oracle-username`: Database username (env: `ATP_USER`)
- `--oracle-password`: Database password (env: `ATP_PASSWORD`)
- `--oracle-wallet-path`: Path to wallet directory (env: `ATP_WALLET_PATH`)
- Required when using TNS alias
- `--oracle-pool-min`: Minimum connections (default: 1, env: `ATP_POOL_MIN`)
- `--oracle-pool-max`: Maximum connections (default: 16, env: `ATP_POOL_MAX`)
**Note**: You must provide **either** `--oracle-tns-alias` **or** `--oracle-connect-descriptor`, but not both.
## Advanced Features
### Kubernetes Service Discovery
......@@ -407,6 +485,24 @@ curl -X POST http://localhost:8080/add_worker?url=http://worker3:8000&api_key=wo
### Command Line Arguments Reference
#### Backend Selection
- `--backend`: Backend runtime to use (default: `sglang`)
- `sglang`: SGLang workers (default)
- `openai`: OpenAI or OpenAI-compatible endpoints
#### History Backend
- `--history-backend`: Storage backend for conversations (default: `memory`)
- `memory`: In-memory storage (default)
- `none`: No storage
- `oracle`: Oracle ATP persistent storage
- `--oracle-tns-alias`: Oracle TNS alias from tnsnames.ora (env: `ATP_TNS_ALIAS`, mutually exclusive with `--oracle-connect-descriptor`)
- `--oracle-connect-descriptor`: Oracle full connection string (env: `ATP_DSN`, mutually exclusive with `--oracle-tns-alias`)
- `--oracle-username`: Oracle username (env: `ATP_USER`)
- `--oracle-password`: Oracle password (env: `ATP_PASSWORD`)
- `--oracle-wallet-path`: Oracle wallet directory (env: `ATP_WALLET_PATH`, required for TNS alias)
- `--oracle-pool-min`: Min pool connections (default: 1, env: `ATP_POOL_MIN`)
- `--oracle-pool-max`: Max pool connections (default: 16, env: `ATP_POOL_MAX`)
#### Service Discovery
- `--service-discovery`: Enable Kubernetes service discovery
- `--service-discovery-port`: Port for worker URLs (default: 8000)
......@@ -479,6 +575,8 @@ The continuous integration pipeline includes comprehensive testing, benchmarking
- **Random**: Distributes requests randomly across available workers
- **Round Robin**: Sequential distribution across workers in rotation
- **Prefill-Decode Disaggregation**: Specialized load balancing for separated prefill and decode servers
- **Multiple Backend Support**: Route to SGLang, OpenAI, or other OpenAI-compatible services
- **Conversation Storage**: Track conversations with memory, Oracle ATP, or disable for minimal overhead
- **Service Discovery**: Automatic Kubernetes worker discovery and health management
- **Monitoring**: Comprehensive Prometheus metrics and structured logging
- **Scalability**: Handles thousands of concurrent connections with efficient resource utilization
from typing import Optional
from sglang_router.router_args import RouterArgs
from sglang_router_rs import PolicyType
from sglang_router_rs import BackendType, HistoryBackendType, PolicyType, PyOracleConfig
from sglang_router_rs import Router as _Router
......@@ -18,6 +18,39 @@ def policy_from_str(policy_str: Optional[str]) -> PolicyType:
return policy_map[policy_str]
def backend_from_str(backend_str: Optional[str]) -> BackendType:
"""Convert backend string to BackendType enum."""
if isinstance(backend_str, BackendType):
return backend_str
if backend_str is None:
return BackendType.Sglang
backend_map = {"sglang": BackendType.Sglang, "openai": BackendType.Openai}
backend_lower = backend_str.lower()
if backend_lower not in backend_map:
raise ValueError(
f"Unknown backend: {backend_str}. Valid options: {', '.join(backend_map.keys())}"
)
return backend_map[backend_lower]
def history_backend_from_str(backend_str: Optional[str]) -> HistoryBackendType:
"""Convert history backend string to HistoryBackendType enum."""
if isinstance(backend_str, HistoryBackendType):
return backend_str
if backend_str is None:
return HistoryBackendType.Memory
backend_lower = backend_str.lower()
if backend_lower == "memory":
return HistoryBackendType.Memory
elif backend_lower == "none":
# Use getattr to access 'None' which is a Python keyword
return getattr(HistoryBackendType, "None")
elif backend_lower == "oracle":
return HistoryBackendType.Oracle
else:
raise ValueError(f"Unknown history backend: {backend_str}")
class Router:
"""
A high-performance router for distributing requests across worker nodes.
......@@ -119,8 +152,49 @@ class Router:
args_dict["prefill_policy"] = policy_from_str(args_dict["prefill_policy"])
args_dict["decode_policy"] = policy_from_str(args_dict["decode_policy"])
# remove mini_lb parameter
args_dict.pop("mini_lb")
# Convert backend
args_dict["backend"] = backend_from_str(args_dict.get("backend"))
# Convert history_backend to enum first
history_backend_raw = args_dict.get("history_backend", "memory")
history_backend = history_backend_from_str(history_backend_raw)
# Convert Oracle config if needed
oracle_config = None
if history_backend == HistoryBackendType.Oracle:
# Prioritize TNS alias over connect descriptor
tns_alias = args_dict.get("oracle_tns_alias")
connect_descriptor = args_dict.get("oracle_connect_descriptor")
# Use TNS alias if provided, otherwise use connect descriptor
final_descriptor = tns_alias if tns_alias else connect_descriptor
oracle_config = PyOracleConfig(
password=args_dict.get("oracle_password"),
username=args_dict.get("oracle_username"),
connect_descriptor=final_descriptor,
wallet_path=args_dict.get("oracle_wallet_path"),
pool_min=args_dict.get("oracle_pool_min", 1),
pool_max=args_dict.get("oracle_pool_max", 16),
pool_timeout_secs=args_dict.get("oracle_pool_timeout_secs", 30),
)
args_dict["oracle_config"] = oracle_config
args_dict["history_backend"] = history_backend
# Remove fields that shouldn't be passed to Rust Router constructor
fields_to_remove = [
"mini_lb",
"oracle_wallet_path",
"oracle_tns_alias",
"oracle_connect_descriptor",
"oracle_username",
"oracle_password",
"oracle_pool_min",
"oracle_pool_max",
"oracle_pool_timeout_secs",
]
for field in fields_to_remove:
args_dict.pop(field, None)
return Router(_Router(**args_dict))
......
import argparse
import dataclasses
import logging
import os
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
......@@ -88,6 +89,18 @@ class RouterArgs:
chat_template: Optional[str] = None
reasoning_parser: Optional[str] = None
tool_call_parser: Optional[str] = None
# Backend selection
backend: str = "sglang"
# History backend configuration
history_backend: str = "memory"
oracle_wallet_path: Optional[str] = None
oracle_tns_alias: Optional[str] = None
oracle_connect_descriptor: Optional[str] = None
oracle_username: Optional[str] = None
oracle_password: Optional[str] = None
oracle_pool_min: int = 1
oracle_pool_max: int = 16
oracle_pool_timeout_secs: int = 30
@staticmethod
def add_cli_args(
......@@ -466,6 +479,73 @@ class RouterArgs:
default=None,
help="Specify the parser for handling tool-call interactions",
)
# Backend selection
parser.add_argument(
f"--{prefix}backend",
type=str,
default=RouterArgs.backend,
choices=["sglang", "openai"],
help="Backend runtime to use (default: sglang)",
)
# History backend configuration
parser.add_argument(
f"--{prefix}history-backend",
type=str,
default=RouterArgs.history_backend,
choices=["memory", "none", "oracle"],
help="History storage backend for conversations and responses (default: memory)",
)
# Oracle configuration
parser.add_argument(
f"--{prefix}oracle-wallet-path",
type=str,
default=os.getenv("ATP_WALLET_PATH"),
help="Path to Oracle ATP wallet directory (env: ATP_WALLET_PATH)",
)
parser.add_argument(
f"--{prefix}oracle-tns-alias",
type=str,
default=os.getenv("ATP_TNS_ALIAS"),
help="Oracle TNS alias from tnsnames.ora (env: ATP_TNS_ALIAS).",
)
parser.add_argument(
f"--{prefix}oracle-connect-descriptor",
type=str,
default=os.getenv("ATP_DSN"),
help="Oracle connection descriptor/DSN (full connection string) (env: ATP_DSN)",
)
parser.add_argument(
f"--{prefix}oracle-username",
type=str,
default=os.getenv("ATP_USER"),
help="Oracle database username (env: ATP_USER)",
)
parser.add_argument(
f"--{prefix}oracle-password",
type=str,
default=os.getenv("ATP_PASSWORD"),
help="Oracle database password (env: ATP_PASSWORD)",
)
parser.add_argument(
f"--{prefix}oracle-pool-min",
type=int,
default=int(os.getenv("ATP_POOL_MIN", RouterArgs.oracle_pool_min)),
help="Minimum Oracle connection pool size (default: 1, env: ATP_POOL_MIN)",
)
parser.add_argument(
f"--{prefix}oracle-pool-max",
type=int,
default=int(os.getenv("ATP_POOL_MAX", RouterArgs.oracle_pool_max)),
help="Maximum Oracle connection pool size (default: 16, env: ATP_POOL_MAX)",
)
parser.add_argument(
f"--{prefix}oracle-pool-timeout-secs",
type=int,
default=int(
os.getenv("ATP_POOL_TIMEOUT_SECS", RouterArgs.oracle_pool_timeout_secs)
),
help="Oracle connection pool timeout in seconds (default: 30, env: ATP_POOL_TIMEOUT_SECS)",
)
@classmethod
def from_cli_args(
......
......@@ -29,9 +29,69 @@ impl ConfigValidator {
Self::validate_retry(&retry_cfg)?;
Self::validate_circuit_breaker(&cb_cfg)?;
if config.history_backend == HistoryBackend::Oracle && config.oracle.is_none() {
// Validate Oracle configuration if enabled
if config.history_backend == HistoryBackend::Oracle {
if config.oracle.is_none() {
return Err(ConfigError::MissingRequired {
field: "oracle".to_string(),
});
}
// Validate Oracle configuration details
if let Some(oracle) = &config.oracle {
Self::validate_oracle(oracle)?;
}
}
Ok(())
}
/// Validate Oracle configuration
fn validate_oracle(oracle: &OracleConfig) -> ConfigResult<()> {
// Validate username is not empty
if oracle.username.is_empty() {
return Err(ConfigError::MissingRequired {
field: "oracle".to_string(),
field: "oracle.username".to_string(),
});
}
// Validate password is not empty
if oracle.password.is_empty() {
return Err(ConfigError::MissingRequired {
field: "oracle.password".to_string(),
});
}
// Validate connect_descriptor is not empty
if oracle.connect_descriptor.is_empty() {
return Err(ConfigError::MissingRequired {
field: "oracle_dsn or oracle_tns_alias".to_string(),
});
}
// Validate pool_min is at least 1
if oracle.pool_min < 1 {
return Err(ConfigError::InvalidValue {
field: "oracle.pool_min".to_string(),
value: oracle.pool_min.to_string(),
reason: "Must be at least 1".to_string(),
});
}
// Validate pool_max is greater than or equal to pool_min
if oracle.pool_max < oracle.pool_min {
return Err(ConfigError::InvalidValue {
field: "oracle.pool_max".to_string(),
value: oracle.pool_max.to_string(),
reason: "Must be >= oracle.pool_min".to_string(),
});
}
// Validate pool_timeout_secs is greater than 0
if oracle.pool_timeout_secs == 0 {
return Err(ConfigError::InvalidValue {
field: "oracle.pool_timeout_secs".to_string(),
value: oracle.pool_timeout_secs.to_string(),
reason: "Must be > 0".to_string(),
});
}
......
......@@ -30,6 +30,113 @@ pub enum PolicyType {
PowerOfTwo,
}
#[pyclass(eq)]
#[derive(Clone, PartialEq, Debug)]
pub enum BackendType {
Sglang,
Openai,
}
#[pyclass(eq)]
#[derive(Clone, PartialEq, Debug)]
pub enum HistoryBackendType {
Memory,
None,
Oracle,
}
#[pyclass]
#[derive(Clone, PartialEq)]
pub struct PyOracleConfig {
#[pyo3(get, set)]
pub wallet_path: Option<String>,
#[pyo3(get, set)]
pub connect_descriptor: Option<String>,
#[pyo3(get, set)]
pub username: Option<String>,
#[pyo3(get, set)]
pub password: Option<String>,
#[pyo3(get, set)]
pub pool_min: usize,
#[pyo3(get, set)]
pub pool_max: usize,
#[pyo3(get, set)]
pub pool_timeout_secs: u64,
}
impl std::fmt::Debug for PyOracleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PyOracleConfig")
.field("wallet_path", &self.wallet_path)
.field("connect_descriptor", &"<redacted>")
.field("username", &self.username)
.field("password", &"<redacted>")
.field("pool_min", &self.pool_min)
.field("pool_max", &self.pool_max)
.field("pool_timeout_secs", &self.pool_timeout_secs)
.finish()
}
}
#[pymethods]
impl PyOracleConfig {
#[new]
#[pyo3(signature = (
password = None,
username = None,
connect_descriptor = None,
wallet_path = None,
pool_min = 1,
pool_max = 16,
pool_timeout_secs = 30,
))]
fn new(
password: Option<String>,
username: Option<String>,
connect_descriptor: Option<String>,
wallet_path: Option<String>,
pool_min: usize,
pool_max: usize,
pool_timeout_secs: u64,
) -> PyResult<Self> {
if pool_min == 0 {
return Err(pyo3::exceptions::PyValueError::new_err(
"pool_min must be at least 1",
));
}
if pool_max < pool_min {
return Err(pyo3::exceptions::PyValueError::new_err(
"pool_max must be >= pool_min",
));
}
Ok(PyOracleConfig {
wallet_path,
connect_descriptor,
username,
password,
pool_min,
pool_max,
pool_timeout_secs,
})
}
}
impl PyOracleConfig {
fn to_config_oracle(&self) -> config::OracleConfig {
// Simple conversion - validation happens later in validate_oracle()
config::OracleConfig {
wallet_path: self.wallet_path.clone(),
connect_descriptor: self.connect_descriptor.clone().unwrap_or_default(),
username: self.username.clone().unwrap_or_default(),
password: self.password.clone().unwrap_or_default(),
pool_min: self.pool_min,
pool_max: self.pool_max,
pool_timeout_secs: self.pool_timeout_secs,
}
}
}
#[pyclass]
#[derive(Debug, Clone, PartialEq)]
struct Router {
......@@ -93,6 +200,9 @@ struct Router {
chat_template: Option<String>,
reasoning_parser: Option<String>,
tool_call_parser: Option<String>,
backend: BackendType,
history_backend: HistoryBackendType,
oracle_config: Option<PyOracleConfig>,
}
impl Router {
......@@ -132,6 +242,10 @@ impl Router {
RoutingMode::Regular {
worker_urls: vec![],
}
} else if matches!(self.backend, BackendType::Openai) {
RoutingMode::OpenAI {
worker_urls: self.worker_urls.clone(),
}
} else if self.pd_disaggregation {
RoutingMode::PrefillDecode {
prefill_urls: self.prefill_urls.clone().unwrap_or_default(),
......@@ -170,6 +284,20 @@ impl Router {
_ => None,
};
let history_backend = match self.history_backend {
HistoryBackendType::Memory => config::HistoryBackend::Memory,
HistoryBackendType::None => config::HistoryBackend::None,
HistoryBackendType::Oracle => config::HistoryBackend::Oracle,
};
let oracle = if matches!(self.history_backend, HistoryBackendType::Oracle) {
self.oracle_config
.as_ref()
.map(|cfg| cfg.to_config_oracle())
} else {
None
};
Ok(config::RouterConfig {
mode,
policy,
......@@ -218,8 +346,8 @@ impl Router {
model_path: self.model_path.clone(),
tokenizer_path: self.tokenizer_path.clone(),
chat_template: self.chat_template.clone(),
history_backend: config::HistoryBackend::Memory,
oracle: None,
history_backend,
oracle,
reasoning_parser: self.reasoning_parser.clone(),
tool_call_parser: self.tool_call_parser.clone(),
})
......@@ -289,6 +417,9 @@ impl Router {
chat_template = None,
reasoning_parser = None,
tool_call_parser = None,
backend = BackendType::Sglang,
history_backend = HistoryBackendType::Memory,
oracle_config = None,
))]
#[allow(clippy::too_many_arguments)]
fn new(
......@@ -351,6 +482,9 @@ impl Router {
chat_template: Option<String>,
reasoning_parser: Option<String>,
tool_call_parser: Option<String>,
backend: BackendType,
history_backend: HistoryBackendType,
oracle_config: Option<PyOracleConfig>,
) -> PyResult<Self> {
let mut all_urls = worker_urls.clone();
......@@ -427,6 +561,9 @@ impl Router {
chat_template,
reasoning_parser,
tool_call_parser,
backend,
history_backend,
oracle_config,
})
}
......@@ -491,6 +628,9 @@ impl Router {
#[pymodule]
fn sglang_router_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PolicyType>()?;
m.add_class::<BackendType>()?;
m.add_class::<HistoryBackendType>()?;
m.add_class::<PyOracleConfig>()?;
m.add_class::<Router>()?;
Ok(())
}
......@@ -14,7 +14,7 @@ use chrono::Utc;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{info, warn};
use tracing::{debug, info, warn};
use super::responses::build_stored_response;
......@@ -958,14 +958,14 @@ async fn create_and_link_item(
.await
.map_err(|e| format!("Failed to link item: {}", e))?;
info!(
debug!(
conversation_id = %conv_id.0,
item_id = %created.id.0,
item_type = %created.item_type,
"Persisted conversation item and link"
);
} else {
info!(
debug!(
item_id = %created.id.0,
item_type = %created.item_type,
"Persisted conversation item (no conversation link)"
......
......@@ -27,7 +27,7 @@ use std::{
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{info, warn};
use tracing::warn;
// Import from sibling modules
use super::conversations::{
......@@ -197,6 +197,11 @@ impl OpenAIRouter {
Ok(r) => r,
Err(e) => {
self.circuit_breaker.record_failure();
tracing::error!(
url = %url,
error = %e,
"Failed to forward request to OpenAI"
);
return (
StatusCode::BAD_GATEWAY,
format!("Failed to forward request to OpenAI: {}", e),
......@@ -518,12 +523,6 @@ impl crate::routers::RouterTrait for OpenAIRouter {
) -> Response {
let url = format!("{}/v1/responses", self.base_url);
info!(
requested_store = body.store,
is_streaming = body.stream,
"openai_responses_request"
);
// Validate mutually exclusive params: previous_response_id and conversation
// TODO: this validation logic should move the right place, also we need a proper error message module
if body.previous_response_id.is_some() && body.conversation.is_some() {
......
......@@ -132,19 +132,30 @@ impl AppContext {
SharedResponseStorage,
SharedConversationStorage,
) = match router_config.history_backend {
HistoryBackend::Memory => (
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
),
HistoryBackend::None => (
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
),
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
(
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
)
}
HistoryBackend::None => {
info!("Initializing data connector: None (no persistence)");
(
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
)
}
HistoryBackend::Oracle => {
let oracle_cfg = router_config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!(
"Initializing data connector: Oracle ATP (pool: {}-{})",
oracle_cfg.pool_min, oracle_cfg.pool_max
);
let response_storage =
OracleResponseStorage::new(oracle_cfg.clone()).map_err(|err| {
format!("failed to initialize Oracle response storage: {err}")
......@@ -155,6 +166,7 @@ impl AppContext {
format!("failed to initialize Oracle conversation storage: {err}")
})?;
info!("Data connector initialized successfully: Oracle ATP");
(Arc::new(response_storage), Arc::new(conversation_storage))
}
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment