"docs/vscode:/vscode.git/clone" did not exist on "db14d63f0725fd9e223a72efb673fa38965a8edf"
Unverified Commit abd0ba5d authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(llm): retry worker-local ZMQ subscriber startup (#7788)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 546d6a1e
......@@ -15,8 +15,9 @@ use std::time::Duration;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use zeromq::SocketRecv;
use crate::utils::zmq::connect_sub_socket_with_retry;
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventPublisher;
......@@ -61,15 +62,11 @@ impl FpmEventRelay {
publisher: EventPublisher,
cancel: CancellationToken,
) {
let mut socket = SubSocket::new();
if let Err(e) = socket.subscribe("").await {
tracing::error!("FPM relay: failed to subscribe on ZMQ socket: {e}");
let Some(mut socket) =
connect_sub_socket_with_retry(&zmq_endpoint, None, &cancel, "FPM relay").await
else {
return;
}
if let Err(e) = socket.connect(&zmq_endpoint).await {
tracing::error!("FPM relay: failed to connect ZMQ SUB to {zmq_endpoint}: {e}");
return;
}
};
tracing::info!("FPM relay: connected to {zmq_endpoint}");
let mut consecutive_errors: u32 = 0;
......
......@@ -8,8 +8,9 @@ use std::time::Duration;
use rmp_serde as rmps;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use zeromq::SocketRecv;
use crate::utils::zmq::connect_sub_socket_with_retry;
use dynamo_kv_router::protocols::*;
use dynamo_kv_router::zmq_wire::*;
......@@ -41,17 +42,16 @@ pub(super) async fn start_zmq_listener(
);
let warning_count = Arc::new(AtomicU32::new(0));
let mut socket = SubSocket::new();
if let Err(e) = socket.subscribe(&zmq_topic).await {
tracing::error!("Failed to subscribe on ZMQ socket: {}", e);
return;
}
if let Err(e) = socket.connect(&zmq_endpoint).await {
tracing::error!("Failed to connect ZMQ SUB socket to {zmq_endpoint}: {e}");
let Some(mut socket) = connect_sub_socket_with_retry(
&zmq_endpoint,
Some(&zmq_topic),
&cancellation_token,
"ZMQ listener",
)
.await
else {
return;
}
};
let mut consecutive_errors = 0u32;
#[expect(unused_assignments)]
......
......@@ -3,6 +3,7 @@
pub mod lora;
pub mod prefix_matcher;
pub mod zmq;
pub use lora::lora_name_to_id;
pub use prefix_matcher::{MarkerMatcher, MatchResult};
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SubSocket};
const INITIAL_SETUP_BACKOFF_MS: u64 = 10;
const MAX_SETUP_BACKOFF_MS: u64 = 5000;
const MAX_SETUP_BACKOFF_EXPONENT: u32 = 8;
fn calculate_setup_backoff_ms(consecutive_errors: u32) -> u64 {
std::cmp::min(
INITIAL_SETUP_BACKOFF_MS * 2_u64.pow(consecutive_errors.min(MAX_SETUP_BACKOFF_EXPONENT)),
MAX_SETUP_BACKOFF_MS,
)
}
pub(crate) async fn connect_sub_socket_with_retry(
zmq_endpoint: &str,
zmq_topic: Option<&str>,
cancellation_token: &CancellationToken,
log_prefix: &str,
) -> Option<SubSocket> {
let mut consecutive_errors = 0u32;
let topic = zmq_topic.unwrap_or("");
loop {
if cancellation_token.is_cancelled() {
tracing::debug!("{log_prefix}: cancelled before connecting to {zmq_endpoint}");
return None;
}
let mut socket = SubSocket::new();
match socket.subscribe(topic).await {
Ok(()) => {}
Err(e) => {
consecutive_errors += 1;
let backoff_ms = calculate_setup_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
consecutive_errors=%consecutive_errors,
backoff_ms=%backoff_ms,
"{log_prefix}: failed to subscribe on ZMQ socket during setup, retrying"
);
tokio::select! {
biased;
_ = cancellation_token.cancelled() => return None,
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
}
continue;
}
}
match socket.connect(zmq_endpoint).await {
Ok(()) => return Some(socket),
Err(e) => {
consecutive_errors += 1;
let backoff_ms = calculate_setup_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
consecutive_errors=%consecutive_errors,
backoff_ms=%backoff_ms,
"{log_prefix}: failed to connect ZMQ SUB during setup, retrying"
);
tokio::select! {
biased;
_ = cancellation_token.cancelled() => return None,
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
}
}
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment