Unverified Commit d85d6dba authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] configure workflow retries and timeout based on routerConfig (#12252)

parent c5642a7a
......@@ -154,7 +154,7 @@ jobs:
pytest-rust-2:
if: github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'run-ci')
runs-on: 4-gpu-a10
timeout-minutes: 16
timeout-minutes: 32
steps:
- name: Checkout code
uses: actions/checkout@v4
......
......@@ -820,7 +820,29 @@ impl StepExecutor for ActivateWorkerStep {
/// Note: Actual health check timeouts and retry attempts are configured per-worker
/// via WorkerConfigRequest (populated from router config). The timeouts and retry
/// policies here serve as workflow-level bounds to prevent infinite waiting.
pub fn create_worker_registration_workflow() -> WorkflowDefinition {
///
/// # Arguments
/// * `router_config` - Router configuration containing health check settings
pub fn create_worker_registration_workflow(
router_config: &crate::config::RouterConfig,
) -> WorkflowDefinition {
// Use health check timeout from config with 30 second buffer as workflow-level upper bound
let detect_timeout = Duration::from_secs(router_config.health_check.timeout_secs + 30);
// Calculate max_attempts to match the detect_timeout
// With Linear backoff (increment 1s, max 5s):
// - Attempts 1-5: 0s, 1s, 2s, 3s, 4s = 10s total
// - Attempts 6+: 5s each
// max_attempts = 5 + (timeout_seconds - 10) / 5
// Use 90% of timeout to leave buffer for actual connection attempts
let timeout_secs = detect_timeout.as_secs() as f64;
let effective_timeout = timeout_secs * 0.9;
let max_attempts = if effective_timeout > 10.0 {
(5 + ((effective_timeout - 10.0) / 5.0).ceil() as u32).max(3)
} else {
3
};
WorkflowDefinition::new("worker_registration", "Worker Registration")
.add_step(
StepDefinition::new(
......@@ -829,14 +851,14 @@ pub fn create_worker_registration_workflow() -> WorkflowDefinition {
Arc::new(DetectConnectionModeStep),
)
.with_retry(RetryPolicy {
max_attempts: 100,
max_attempts,
backoff: BackoffStrategy::Linear {
increment: Duration::from_secs(1),
max: Duration::from_secs(5),
},
})
// Workflow-level timeout (upper bound); step uses config.health_check_timeout_secs
.with_timeout(Duration::from_secs(7200)) // 2 hours max
// Workflow-level timeout uses configured health check timeout + buffer
.with_timeout(detect_timeout)
.with_failure_action(FailureAction::FailWorkflow),
)
.add_step(
......
......@@ -737,14 +737,17 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
.subscribe(Arc::new(LoggingSubscriber))
.await;
engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config.router_config));
engine.register_workflow(create_worker_removal_workflow());
engine.register_workflow(create_mcp_registration_workflow());
app_context
.workflow_engine
.set(engine)
.expect("WorkflowEngine should only be initialized once");
info!("Workflow engine initialized with worker and MCP registration workflows");
info!(
"Workflow engine initialized with worker and MCP registration workflows (health check timeout: {}s)",
config.router_config.health_check.timeout_secs
);
info!(
"Initializing workers for routing mode: {:?}",
......@@ -764,6 +767,8 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
.await
.map_err(|e| format!("Failed to submit worker initialization job: {}", e))?;
info!("Worker initialization job submitted (will complete in background)");
if let Some(mcp_config) = &config.router_config.mcp_config {
info!("Found {} MCP server(s) in config", mcp_config.servers.len());
let mcp_job = Job::InitializeMcpServers {
......
......@@ -69,7 +69,7 @@ pub async fn create_test_context(config: RouterConfig) -> Arc<AppContext> {
let app_context = Arc::new(
AppContext::builder()
.router_config(config)
.router_config(config.clone())
.client(client)
.rate_limiter(rate_limiter)
.tokenizer(None) // tokenizer
......@@ -104,7 +104,7 @@ pub async fn create_test_context(config: RouterConfig) -> Arc<AppContext> {
create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine,
};
let engine = Arc::new(WorkflowEngine::new());
engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config));
engine.register_workflow(create_worker_removal_workflow());
app_context
.workflow_engine
......@@ -180,7 +180,7 @@ pub async fn create_test_context_with_mcp_config(
let app_context = Arc::new(
AppContext::builder()
.router_config(config)
.router_config(config.clone())
.client(client)
.rate_limiter(rate_limiter)
.tokenizer(None) // tokenizer
......@@ -215,7 +215,7 @@ pub async fn create_test_context_with_mcp_config(
create_worker_registration_workflow, create_worker_removal_workflow, WorkflowEngine,
};
let engine = Arc::new(WorkflowEngine::new());
engine.register_workflow(create_worker_registration_workflow());
engine.register_workflow(create_worker_registration_workflow(&config));
engine.register_workflow(create_worker_removal_workflow());
app_context
.workflow_engine
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment