Unverified Commit f849e1a6 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(kv-router): migrate raw zmq paths to libzmq and refresh lockfiles (#7871)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent bfc59cd2
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SubSocket};
use anyhow::Result;
use futures::SinkExt;
use tmq::{
Context, Multipart, SocketBuilder,
publish::{Publish, publish},
router::{Router, router},
subscribe::{Subscribe, subscribe},
};
use tokio::sync::Mutex;
const INITIAL_SETUP_BACKOFF_MS: u64 = 10;
const MAX_SETUP_BACKOFF_MS: u64 = 5000;
const MAX_SETUP_BACKOFF_EXPONENT: u32 = 8;
pub(crate) type MultipartMessage = Vec<Vec<u8>>;
pub(crate) type SharedPubSocket = Arc<Mutex<Publish>>;
pub(crate) type SubSocket = Subscribe;
fn calculate_setup_backoff_ms(consecutive_errors: u32) -> u64 {
std::cmp::min(
INITIAL_SETUP_BACKOFF_MS * 2_u64.pow(consecutive_errors.min(MAX_SETUP_BACKOFF_EXPONENT)),
MAX_SETUP_BACKOFF_MS,
)
const ZMQ_RCVTIMEOUT_MS: i32 = 100;
const ZMQ_SNDTIMEOUT_MS: i32 = 0;
const ZMQ_RECONNECT_IVL_MS: i32 = 100;
const ZMQ_RECONNECT_IVL_MAX_MS: i32 = 5000;
const ZMQ_TCP_KEEPALIVE: i32 = 1;
const ZMQ_LINGER_MS: i32 = 0;
fn configure_common_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
builder
.set_linger(ZMQ_LINGER_MS)
.set_reconnect_ivl(ZMQ_RECONNECT_IVL_MS)
.set_reconnect_ivl_max(ZMQ_RECONNECT_IVL_MAX_MS)
.set_tcp_keepalive(ZMQ_TCP_KEEPALIVE)
}
pub(crate) async fn connect_sub_socket_with_retry(
zmq_endpoint: &str,
zmq_topic: Option<&str>,
cancellation_token: &CancellationToken,
log_prefix: &str,
) -> Option<SubSocket> {
let mut consecutive_errors = 0u32;
let topic = zmq_topic.unwrap_or("");
fn configure_receive_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
configure_common_builder(builder).set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)
}
loop {
if cancellation_token.is_cancelled() {
tracing::debug!("{log_prefix}: cancelled before connecting to {zmq_endpoint}");
return None;
}
fn configure_bidirectional_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
configure_receive_builder(builder).set_sndtimeo(ZMQ_SNDTIMEOUT_MS)
}
let mut socket = SubSocket::new();
fn configure_send_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
configure_common_builder(builder).set_sndtimeo(ZMQ_SNDTIMEOUT_MS)
}
match socket.subscribe(topic).await {
Ok(()) => {}
Err(e) => {
consecutive_errors += 1;
let backoff_ms = calculate_setup_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
consecutive_errors=%consecutive_errors,
backoff_ms=%backoff_ms,
"{log_prefix}: failed to subscribe on ZMQ socket during setup, retrying"
);
tokio::select! {
biased;
_ = cancellation_token.cancelled() => return None,
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
}
continue;
}
}
pub(crate) async fn connect_sub_socket(endpoint: &str, topic: Option<&str>) -> Result<SubSocket> {
let ctx = Context::new();
let socket = configure_receive_builder(subscribe(&ctx))
.connect(endpoint)?
.subscribe(topic.unwrap_or("").as_bytes())?;
Ok(socket)
}
pub(crate) async fn bind_pub_socket(endpoint: &str) -> Result<SharedPubSocket> {
let ctx = Context::new();
let socket = configure_send_builder(publish(&ctx)).bind(endpoint)?;
Ok(Arc::new(Mutex::new(socket)))
}
pub(crate) async fn bind_router_socket(endpoint: &str) -> Result<Router> {
let ctx = Context::new();
let socket = configure_bidirectional_builder(router(&ctx)).bind(endpoint)?;
Ok(socket)
}
pub(crate) fn multipart_message(multipart: Multipart) -> MultipartMessage {
multipart.into_iter().map(|frame| frame.to_vec()).collect()
}
pub(crate) async fn send_multipart<S>(
socket: &Arc<Mutex<S>>,
frames: MultipartMessage,
) -> Result<()>
where
S: futures::Sink<Multipart, Error = tmq::TmqError> + Unpin,
{
socket.lock().await.send(Multipart::from(frames)).await?;
Ok(())
}
match socket.connect(zmq_endpoint).await {
Ok(()) => return Some(socket),
Err(e) => {
consecutive_errors += 1;
let backoff_ms = calculate_setup_backoff_ms(consecutive_errors);
tracing::warn!(
error=%e,
consecutive_errors=%consecutive_errors,
backoff_ms=%backoff_ms,
"{log_prefix}: failed to connect ZMQ SUB during setup, retrying"
);
tokio::select! {
biased;
_ = cancellation_token.cancelled() => return None,
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
}
}
}
}
pub(crate) async fn send_multipart_direct<S>(socket: &mut S, frames: MultipartMessage) -> Result<()>
where
S: futures::Sink<Multipart, Error = tmq::TmqError> + Unpin,
{
socket.send(Multipart::from(frames)).await?;
Ok(())
}
......@@ -29,8 +29,7 @@ anyhow = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
async_zmq = { workspace = true }
zmq = { workspace = true }
tmq = { workspace = true }
lru = { version = "0.12" }
axum = { workspace = true }
blake3 = { workspace = true }
......
......@@ -8,7 +8,7 @@ version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"getrandom 0.3.4",
"once_cell",
"version_check",
......@@ -47,9 +47,9 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
[[package]]
name = "arc-swap"
version = "1.8.2"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5"
checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6"
dependencies = [
"rustversion",
]
......@@ -154,20 +154,6 @@ dependencies = [
"syn",
]
[[package]]
name = "async_zmq"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "499c7104563d51146553fb0963f00210d8825833789e0ed270dd96aeeff6ac93"
dependencies = [
"futures",
"mio 0.6.23",
"once_cell",
"slab",
"thiserror 1.0.69",
"zmq",
]
[[package]]
name = "atomic"
version = "0.6.1"
......@@ -191,9 +177,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "aws-lc-rs"
version = "1.16.1"
version = "1.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94bffc006df10ac2a68c83692d734a465f8ee6c5b384d8545a636f81d858f4bf"
checksum = "a054912289d18629dc78375ba2c3726a3afe3ff71b4edba9dedfca0e3446d1fc"
dependencies = [
"aws-lc-sys",
"zeroize",
......@@ -201,9 +187,9 @@ dependencies = [
[[package]]
name = "aws-lc-sys"
version = "0.38.0"
version = "0.39.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4321e568ed89bb5a7d291a7f37997c2c0df89809d7b6d12062c81ddb54aa782e"
checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399"
dependencies = [
"cc",
"cmake",
......@@ -322,16 +308,16 @@ checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af"
[[package]]
name = "blake3"
version = "1.8.3"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d"
checksum = "4d2d5991425dfd0785aed03aedcf0b321d61975c9b5b3689c774a2610ae0b51e"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if 1.0.4",
"cfg-if",
"constant_time_eq",
"cpufeatures",
"cpufeatures 0.3.0",
]
[[package]]
......@@ -372,9 +358,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.57"
version = "1.2.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283"
dependencies = [
"find-msvc-tools",
"jobserver",
......@@ -392,12 +378,6 @@ dependencies = [
"target-lexicon",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.4"
......@@ -424,9 +404,9 @@ dependencies = [
[[package]]
name = "cmake"
version = "0.1.57"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d"
checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678"
dependencies = [
"cc",
]
......@@ -487,6 +467,15 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
......@@ -559,8 +548,8 @@ version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if 1.0.4",
"cpufeatures",
"cfg-if",
"cpufeatures 0.2.17",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
......@@ -655,7 +644,7 @@ version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
......@@ -765,9 +754,9 @@ dependencies = [
[[package]]
name = "dircpy"
version = "0.3.19"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a88521b0517f5f9d51d11925d8ab4523497dcf947073fa3231a311b63941131c"
checksum = "ebcbec2b9a580ddee352ac38523d2ecd4dcaad53532957034394556909e27f4b"
dependencies = [
"jwalk",
"log",
......@@ -814,7 +803,6 @@ dependencies = [
"async-once-cell",
"async-stream",
"async-trait",
"async_zmq",
"axum",
"bincode",
"blake3",
......@@ -856,6 +844,7 @@ dependencies = [
"serde_json",
"socket2 0.5.10",
"thiserror 2.0.18",
"tmq",
"tokio",
"tokio-rayon",
"tokio-stream",
......@@ -868,7 +857,6 @@ dependencies = [
"uuid",
"validator",
"xxhash-rust",
"zmq",
]
[[package]]
......@@ -1024,7 +1012,7 @@ version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"libc",
"libredox",
]
......@@ -1077,22 +1065,6 @@ dependencies = [
"libc",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags 1.3.2",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.32"
......@@ -1197,7 +1169,7 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"js-sys",
"libc",
"wasi",
......@@ -1210,7 +1182,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"js-sys",
"libc",
"r-efi 5.3.0",
......@@ -1224,7 +1196,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"libc",
"r-efi 6.0.0",
"wasip2",
......@@ -1326,7 +1298,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "617aaa3557aef3810a6369d0a99fac8a080891b68bd9f9812a1eeda0c0730cbd"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"libc",
"windows-link",
]
......@@ -1384,9 +1356,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.8.1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca"
dependencies = [
"atomic-waker",
"bytes",
......@@ -1399,7 +1371,6 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"pin-utils",
"smallvec",
"tokio",
"want",
......@@ -1486,12 +1457,13 @@ dependencies = [
[[package]]
name = "icu_collections"
version = "2.1.1"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43"
checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c"
dependencies = [
"displaydoc",
"potential_utf",
"utf8_iter",
"yoke",
"zerofrom",
"zerovec",
......@@ -1499,9 +1471,9 @@ dependencies = [
[[package]]
name = "icu_locale_core"
version = "2.1.1"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6"
checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29"
dependencies = [
"displaydoc",
"litemap",
......@@ -1512,9 +1484,9 @@ dependencies = [
[[package]]
name = "icu_normalizer"
version = "2.1.1"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599"
checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4"
dependencies = [
"icu_collections",
"icu_normalizer_data",
......@@ -1526,15 +1498,15 @@ dependencies = [
[[package]]
name = "icu_normalizer_data"
version = "2.1.1"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a"
checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38"
[[package]]
name = "icu_properties"
version = "2.1.2"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec"
checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de"
dependencies = [
"icu_collections",
"icu_locale_core",
......@@ -1546,15 +1518,15 @@ dependencies = [
[[package]]
name = "icu_properties_data"
version = "2.1.2"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af"
checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14"
[[package]]
name = "icu_provider"
version = "2.1.1"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614"
checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421"
dependencies = [
"displaydoc",
"icu_locale_core",
......@@ -1600,9 +1572,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.13.0"
version = "2.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff"
dependencies = [
"equivalent",
"hashbrown 0.16.1",
......@@ -1636,15 +1608,6 @@ dependencies = [
"libc",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.12.0"
......@@ -1653,9 +1616,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
[[package]]
name = "iri-string"
version = "0.7.10"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a"
checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20"
dependencies = [
"memchr",
"serde",
......@@ -1672,9 +1635,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.17"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]]
name = "jobserver"
......@@ -1688,10 +1651,12 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.91"
version = "0.3.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9"
dependencies = [
"cfg-if",
"futures-util",
"once_cell",
"wasm-bindgen",
]
......@@ -1753,16 +1718,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]]
name = "kqueue"
version = "1.1.1"
......@@ -1906,15 +1861,15 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.183"
version = "0.2.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d"
checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af"
[[package]]
name = "libredox"
version = "0.1.14"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a"
checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08"
dependencies = [
"bitflags 2.11.0",
"libc",
......@@ -1930,15 +1885,15 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "litemap"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0"
[[package]]
name = "local-ip-address"
version = "0.6.10"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79ef8c257c92ade496781a32a581d43e3d512cf8ce714ecf04ea80f93ed0ff4a"
checksum = "d4a59a0cb1c7f84471ad5cd38d768c2a29390d17f1ff2827cdf49bc53e8ac70b"
dependencies = [
"libc",
"neli",
......@@ -2012,25 +1967,6 @@ dependencies = [
"unicase",
]
[[package]]
name = "mio"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.11"
......@@ -2045,27 +1981,15 @@ dependencies = [
[[package]]
name = "mio"
version = "1.1.1"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
dependencies = [
"libc",
"wasi",
"windows-sys 0.61.2",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "multimap"
version = "0.10.1"
......@@ -2101,17 +2025,6 @@ dependencies = [
"syn",
]
[[package]]
name = "net2"
version = "0.2.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "nkeys"
version = "0.4.5"
......@@ -2165,9 +2078,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-traits"
......@@ -2246,9 +2159,9 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.31.0"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f"
dependencies = [
"http",
"opentelemetry",
......@@ -2324,7 +2237,7 @@ version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"libc",
"redox_syscall 0.5.18",
"smallvec",
......@@ -2459,12 +2372,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.10.2"
......@@ -2495,9 +2402,9 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
[[package]]
name = "potential_utf"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77"
checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
dependencies = [
"zerovec",
]
......@@ -2577,7 +2484,7 @@ version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"fnv",
"lazy_static",
"memchr",
......@@ -2661,9 +2568,9 @@ dependencies = [
[[package]]
name = "pulldown-cmark"
version = "0.13.1"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83c41efbf8f90ac44de7f3a868f0867851d261b56291732d0cbf7cceaaeb55a6"
checksum = "7c3a14896dfa883796f1cb410461aef38810ea05f2b2c33c5aded3649095fdad"
dependencies = [
"bitflags 2.11.0",
"memchr",
......@@ -2951,7 +2858,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if 1.0.4",
"cfg-if",
"getrandom 0.2.17",
"libc",
"untrusted",
......@@ -2979,9 +2886,9 @@ dependencies = [
[[package]]
name = "rustc-hash"
version = "2.1.1"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
[[package]]
name = "rustc_version"
......@@ -3016,7 +2923,7 @@ dependencies = [
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.103.9",
"rustls-webpki 0.103.10",
"subtle",
"zeroize",
]
......@@ -3077,9 +2984,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.9"
version = "0.103.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
dependencies = [
"aws-lc-rs",
"ring",
......@@ -3195,9 +3102,9 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.27"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd"
[[package]]
name = "serde"
......@@ -3343,8 +3250,8 @@ version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if 1.0.4",
"cpufeatures",
"cfg-if",
"cpufeatures 0.2.17",
"digest",
]
......@@ -3577,7 +3484,7 @@ version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
]
[[package]]
......@@ -3615,9 +3522,9 @@ dependencies = [
[[package]]
name = "tinystr"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869"
checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d"
dependencies = [
"displaydoc",
"zerovec",
......@@ -3638,6 +3545,19 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tmq"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3f41ac3a42f65436eed7e1afe80dbe8a982dcac2ea4581bf61bc2d3dcfb19a1"
dependencies = [
"futures",
"log",
"thiserror 1.0.69",
"tokio",
"zmq",
]
[[package]]
name = "tokio"
version = "1.48.0"
......@@ -3646,7 +3566,7 @@ checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
dependencies = [
"bytes",
"libc",
"mio 1.1.1",
"mio 1.2.0",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
......@@ -4075,9 +3995,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.22.0"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37"
checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9"
dependencies = [
"getrandom 0.4.2",
"js-sys",
......@@ -4178,11 +4098,11 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.114"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0"
dependencies = [
"cfg-if 1.0.4",
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
......@@ -4191,23 +4111,19 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.64"
version = "0.4.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8"
checksum = "03623de6905b7206edd0a75f69f747f134b7f0a2323392d664448bf2d3c5d87e"
dependencies = [
"cfg-if 1.0.4",
"futures-util",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.114"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
......@@ -4215,9 +4131,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.114"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2"
dependencies = [
"bumpalo",
"proc-macro2",
......@@ -4228,9 +4144,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.114"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b"
dependencies = [
"unicode-ident",
]
......@@ -4284,9 +4200,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.91"
version = "0.3.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9"
checksum = "cd70027e39b12f0849461e08ffc50b9cd7688d942c1c8e3c7b22273236b4dd0a"
dependencies = [
"js-sys",
"wasm-bindgen",
......@@ -4320,34 +4236,6 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.11"
......@@ -4357,12 +4245,6 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.62.2"
......@@ -4743,19 +4625,9 @@ dependencies = [
[[package]]
name = "writeable"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4"
[[package]]
name = "xxhash-rust"
......@@ -4771,9 +4643,9 @@ checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yoke"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954"
checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca"
dependencies = [
"stable_deref_trait",
"yoke-derive",
......@@ -4782,9 +4654,9 @@ dependencies = [
[[package]]
name = "yoke-derive"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e"
dependencies = [
"proc-macro2",
"quote",
......@@ -4794,18 +4666,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.42"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3"
checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.42"
version = "0.8.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f"
checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4"
dependencies = [
"proc-macro2",
"quote",
......@@ -4814,18 +4686,18 @@ dependencies = [
[[package]]
name = "zerofrom"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5"
checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1"
dependencies = [
"proc-macro2",
"quote",
......@@ -4851,9 +4723,9 @@ dependencies = [
[[package]]
name = "zerotrie"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851"
checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf"
dependencies = [
"displaydoc",
"yoke",
......@@ -4862,9 +4734,9 @@ dependencies = [
[[package]]
name = "zerovec"
version = "0.11.5"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002"
checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239"
dependencies = [
"yoke",
"zerofrom",
......@@ -4873,9 +4745,9 @@ dependencies = [
[[package]]
name = "zerovec-derive"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555"
dependencies = [
"proc-macro2",
"quote",
......
......@@ -14,17 +14,25 @@
//! - Frame 2: sequence (8 bytes, u64 big-endian) - for fast deduplication
//! - Frame 3: Binary frame (5-byte header + EventEnvelope payload)
use anyhow::Result;
use anyhow::{Result, anyhow};
use async_stream::stream;
use async_trait::async_trait;
use bytes::Bytes;
use std::sync::{Arc, Mutex};
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use tmq::{
AsZmqSocket, Context, Multipart, SocketBuilder,
publish::{Publish, publish},
subscribe::{Subscribe, subscribe},
};
use tokio::sync::{Mutex, broadcast};
/// High Water Mark (HWM) for ZMQ sockets.
/// This controls the maximum number of messages that can be queued.
/// Default ZMQ HWM is 1000, which limits scalability.
const ZMQ_SNDHWM: i32 = 100_000; // Send buffer: 100K messages
const ZMQ_RCVHWM: i32 = 100_000; // Receive buffer: 100K messages
const ZMQ_SNDTIMEOUT_MS: i32 = 0; // Send timeout: fail fast under pressure
const ZMQ_RCVTIMEOUT_MS: i32 = 100; // Receive timeout: 100ms (avoids blocking forever)
use super::codec::MsgpackCodec;
......@@ -32,20 +40,31 @@ use super::frame::Frame;
use super::transport::{EventTransportRx, EventTransportTx, WireStream};
use crate::discovery::EventTransportKind;
/// Parts of a received ZMQ multipart message.
struct ZmqMessage {
#[allow(dead_code)]
topic: Vec<u8>,
publisher_id: u64,
sequence: u64,
data: Vec<u8>,
fn configure_publish_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
builder
.set_sndhwm(ZMQ_SNDHWM)
.set_sndtimeo(ZMQ_SNDTIMEOUT_MS)
}
fn configure_subscribe_builder<T>(builder: SocketBuilder<T>) -> SocketBuilder<T>
where
T: tmq::FromZmqSocket<T>,
{
builder
.set_rcvhwm(ZMQ_RCVHWM)
.set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)
}
fn multipart_message(multipart: Multipart) -> Vec<Vec<u8>> {
multipart.into_iter().map(|frame| frame.to_vec()).collect()
}
/// ZMQ PUB transport for publishing events.
///
/// Uses raw zmq::Socket with configured HWM for better scalability.
pub struct ZmqPubTransport {
socket: Arc<Mutex<zmq::Socket>>,
socket: Arc<Mutex<Publish>>,
topic: String,
}
......@@ -57,38 +76,19 @@ impl ZmqPubTransport {
///
/// Returns the transport and the actual bound endpoint.
pub async fn bind(endpoint: &str, topic: &str) -> Result<(Self, String)> {
// Parse the endpoint to check if we need to find an available port
let actual_endpoint = if endpoint.ends_with(":0") {
// Find an available port using TcpListener
let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await?;
let actual_addr = listener.local_addr()?;
let port = actual_addr.port();
drop(listener); // Close listener so ZMQ can bind to the port
drop(listener);
format!("tcp://0.0.0.0:{}", port)
format!("tcp://0.0.0.0:{port}")
} else {
endpoint.to_string()
};
// Create raw ZMQ socket with HWM configuration
let endpoint_for_closure = actual_endpoint.clone();
let socket = tokio::task::spawn_blocking(move || -> Result<zmq::Socket> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PUB)?;
// Configure High Water Mark for better scalability
socket.set_sndhwm(ZMQ_SNDHWM)?;
// Set send timeout to 0 (non-blocking)
socket.set_sndtimeo(0)?;
// Bind to endpoint
socket.bind(&endpoint_for_closure)?;
Ok(socket)
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
let ctx = Context::new();
let socket = configure_publish_builder(publish(&ctx)).bind(&actual_endpoint)?;
tracing::info!(
endpoint = %actual_endpoint,
......@@ -112,26 +112,8 @@ impl ZmqPubTransport {
/// Connect to single broker XSUB endpoint (broker mode)
pub async fn connect(xsub_endpoint: &str, topic: &str) -> Result<Self> {
let endpoint_owned = xsub_endpoint.to_string();
let topic_owned = topic.to_string();
let socket = tokio::task::spawn_blocking(move || -> Result<zmq::Socket> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PUB)?;
// Configure High Water Mark for better scalability
socket.set_sndhwm(ZMQ_SNDHWM)?;
// Set send timeout to 0 (non-blocking)
socket.set_sndtimeo(0)?;
// Connect (not bind) to broker's XSUB
socket.connect(&endpoint_owned)?;
Ok(socket)
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
let ctx = Context::new();
let socket = configure_publish_builder(publish(&ctx)).connect(xsub_endpoint)?;
tracing::info!(
endpoint = %xsub_endpoint,
......@@ -142,39 +124,24 @@ impl ZmqPubTransport {
Ok(Self {
socket: Arc::new(Mutex::new(socket)),
topic: topic_owned,
topic: topic.to_string(),
})
}
/// Connect to multiple broker XSUB endpoints (HA mode)
pub async fn connect_multiple(xsub_endpoints: &[String], topic: &str) -> Result<Self> {
if xsub_endpoints.is_empty() {
let mut endpoints = xsub_endpoints.iter();
let Some(first_endpoint) = endpoints.next() else {
anyhow::bail!("Cannot connect to zero endpoints");
}
let endpoints_owned = xsub_endpoints.to_vec();
let topic_owned = topic.to_string();
let socket = tokio::task::spawn_blocking(move || -> Result<zmq::Socket> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PUB)?;
// Configure High Water Mark for better scalability
socket.set_sndhwm(ZMQ_SNDHWM)?;
// Set send timeout to 0 (non-blocking)
socket.set_sndtimeo(0)?;
};
// Connect to all XSUB endpoints (ZMQ handles load balancing)
for endpoint in &endpoints_owned {
socket.connect(endpoint)?;
tracing::debug!(endpoint = %endpoint, "ZMQ PUB connected to broker XSUB");
}
let ctx = Context::new();
let socket = configure_publish_builder(publish(&ctx)).connect(first_endpoint)?;
Ok(socket)
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
for endpoint in endpoints {
socket.get_socket().connect(endpoint)?;
tracing::debug!(endpoint = %endpoint, "ZMQ PUB connected to broker XSUB");
}
tracing::info!(
num_endpoints = xsub_endpoints.len(),
......@@ -185,7 +152,7 @@ impl ZmqPubTransport {
Ok(Self {
socket: Arc::new(Mutex::new(socket)),
topic: topic_owned,
topic: topic.to_string(),
})
}
}
......@@ -193,35 +160,22 @@ impl ZmqPubTransport {
#[async_trait]
impl EventTransportTx for ZmqPubTransport {
async fn publish(&self, _subject: &str, envelope_bytes: Bytes) -> Result<()> {
// Decode envelope to extract publisher_id and sequence for fast deduplication
let codec = MsgpackCodec;
let envelope = codec.decode_envelope(&envelope_bytes)?;
// Create binary frame
let frame = Frame::new(envelope_bytes);
let frame_bytes = frame.encode();
// Prepare multipart message: [topic, publisher_id, sequence, frame_bytes]
let topic_bytes = self.topic.as_bytes().to_vec();
let publisher_id_bytes = envelope.publisher_id.to_be_bytes().to_vec();
let sequence_bytes = envelope.sequence.to_be_bytes().to_vec();
let frame_vec = frame_bytes.to_vec();
let socket = Arc::clone(&self.socket);
tokio::task::spawn_blocking(move || -> Result<()> {
let socket = socket.lock().unwrap();
// Send topic frame (for ZMQ subscription filtering)
socket.send(&topic_bytes, zmq::SNDMORE)?;
// Send publisher_id (for fast deduplication)
socket.send(&publisher_id_bytes, zmq::SNDMORE)?;
// Send sequence (for fast deduplication)
socket.send(&sequence_bytes, zmq::SNDMORE)?;
// Send data frame (complete envelope)
socket.send(&frame_vec, 0)?;
Ok(())
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
let frames = vec![
self.topic.as_bytes().to_vec(),
envelope.publisher_id.to_be_bytes().to_vec(),
envelope.sequence.to_be_bytes().to_vec(),
frame.encode().to_vec(),
];
self.socket
.lock()
.await
.send(Multipart::from(frames))
.await?;
Ok(())
}
......@@ -233,40 +187,19 @@ impl EventTransportTx for ZmqPubTransport {
/// ZMQ SUB transport for subscribing to events.
///
/// Uses a background socket pump to avoid holding the socket lock across stream lifetimes.
/// Multiple subscribers can receive events concurrently via broadcast channel.
/// Uses a background async reader to fan out frames to multiple local subscribers.
pub struct ZmqSubTransport {
socket: Arc<Mutex<zmq::Socket>>,
broadcast_tx: tokio::sync::broadcast::Sender<Bytes>,
broadcast_tx: broadcast::Sender<Bytes>,
_socket_pump_handle: tokio::task::JoinHandle<()>,
}
impl ZmqSubTransport {
/// Create a new ZMQ subscriber by connecting to a single endpoint.
pub async fn connect(endpoint: &str, topic: &str) -> Result<Self> {
let endpoint_owned = endpoint.to_string();
let topic_owned = topic.to_string();
let socket = tokio::task::spawn_blocking(move || -> Result<zmq::Socket> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::SUB)?;
// Configure High Water Mark for better scalability
socket.set_rcvhwm(ZMQ_RCVHWM)?;
// Set receive timeout to avoid blocking forever (fixes test hangs)
socket.set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)?;
// Connect to endpoint
socket.connect(&endpoint_owned)?;
// Subscribe to topic
socket.set_subscribe(topic_owned.as_bytes())?;
Ok(socket)
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
let ctx = Context::new();
let socket = configure_subscribe_builder(subscribe(&ctx))
.connect(endpoint)?
.subscribe(topic.as_bytes())?;
tracing::info!(
endpoint = %endpoint,
......@@ -275,16 +208,10 @@ impl ZmqSubTransport {
"ZMQ SUB transport connected with configured HWM"
);
let socket = Arc::new(Mutex::new(socket));
// Create broadcast channel for multiple subscribers
let (broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
// Start background socket pump
let pump_handle = Self::start_socket_pump(Arc::clone(&socket), broadcast_tx.clone());
let (broadcast_tx, _) = broadcast::channel(1024);
let pump_handle = Self::start_socket_pump(socket, broadcast_tx.clone());
Ok(Self {
socket,
broadcast_tx,
_socket_pump_handle: pump_handle,
})
......@@ -296,43 +223,26 @@ impl ZmqSubTransport {
}
/// Connect to multiple broker XPUB endpoints (HA mode)
/// Reuses existing connect_multiple implementation
pub async fn connect_broker_multiple(xpub_endpoints: &[String], topic: &str) -> Result<Self> {
Self::connect_multiple(xpub_endpoints, topic).await
}
/// Create a new ZMQ subscriber by connecting to multiple endpoints (fan-in).
pub async fn connect_multiple(endpoints: &[String], topic: &str) -> Result<Self> {
if endpoints.is_empty() {
let mut endpoints_iter = endpoints.iter();
let Some(first_endpoint) = endpoints_iter.next() else {
anyhow::bail!("Cannot connect to zero endpoints");
}
let endpoints_owned = endpoints.to_vec();
let topic_owned = topic.to_string();
let socket = tokio::task::spawn_blocking(move || -> Result<zmq::Socket> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::SUB)?;
// Configure High Water Mark for better scalability
socket.set_rcvhwm(ZMQ_RCVHWM)?;
// Set receive timeout to avoid blocking forever (fixes test hangs)
socket.set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)?;
// Connect to all endpoints
for endpoint in &endpoints_owned {
socket.connect(endpoint)?;
tracing::debug!(endpoint = %endpoint, "ZMQ SUB connected to endpoint");
}
};
// Subscribe to topic
socket.set_subscribe(topic_owned.as_bytes())?;
let ctx = Context::new();
let socket = configure_subscribe_builder(subscribe(&ctx))
.connect(first_endpoint)?
.subscribe(topic.as_bytes())?;
Ok(socket)
})
.await
.map_err(|e| anyhow::anyhow!("Task join error: {}", e))??;
for endpoint in endpoints_iter {
socket.get_socket().connect(endpoint)?;
tracing::debug!(endpoint = %endpoint, "ZMQ SUB connected to endpoint");
}
tracing::info!(
num_endpoints = endpoints.len(),
......@@ -341,115 +251,76 @@ impl ZmqSubTransport {
"ZMQ SUB transport connected to multiple endpoints with configured HWM"
);
let socket = Arc::new(Mutex::new(socket));
// Create broadcast channel for multiple subscribers
let (broadcast_tx, _) = tokio::sync::broadcast::channel(1024);
// Start background socket pump
let pump_handle = Self::start_socket_pump(Arc::clone(&socket), broadcast_tx.clone());
let (broadcast_tx, _) = broadcast::channel(1024);
let pump_handle = Self::start_socket_pump(socket, broadcast_tx.clone());
Ok(Self {
socket,
broadcast_tx,
_socket_pump_handle: pump_handle,
})
}
/// Background task that reads from socket and broadcasts to all subscribers.
///
/// This task holds the socket lock only briefly during each recv operation,
/// allowing multiple subscribers to receive concurrently via broadcast channel.
/// Uses finite timeout to avoid blocking forever (fixes test hangs from ZMQ "slow joiner" problem).
fn start_socket_pump(
socket: Arc<Mutex<zmq::Socket>>,
broadcast_tx: tokio::sync::broadcast::Sender<Bytes>,
mut socket: Subscribe,
broadcast_tx: broadcast::Sender<Bytes>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
// Receive multipart message in blocking task: [topic, publisher_id, sequence, frame_bytes]
let socket_clone = Arc::clone(&socket);
let result = tokio::task::spawn_blocking(move || -> Result<Option<ZmqMessage>> {
let socket = socket_clone.lock().unwrap();
// Receive topic frame (may timeout with EAGAIN)
let topic = match socket.recv_bytes(0) {
Ok(data) => data,
Err(zmq::Error::EAGAIN) => return Ok(None), // Timeout, retry
Err(e) => return Err(e.into()),
};
// Receive publisher_id frame (8 bytes, u64 big-endian)
let publisher_id_bytes = socket.recv_bytes(0)?;
if publisher_id_bytes.len() != 8 {
anyhow::bail!(
"Invalid publisher_id frame: expected 8 bytes, got {}",
publisher_id_bytes.len()
);
}
let publisher_id = u64::from_be_bytes(publisher_id_bytes.try_into().unwrap());
// Receive sequence frame (8 bytes, u64 big-endian)
let sequence_bytes = socket.recv_bytes(0)?;
if sequence_bytes.len() != 8 {
anyhow::bail!(
"Invalid sequence frame: expected 8 bytes, got {}",
sequence_bytes.len()
);
}
let sequence = u64::from_be_bytes(sequence_bytes.try_into().unwrap());
// Receive data frame
let data = socket.recv_bytes(0)?;
Ok(Some(ZmqMessage {
topic,
publisher_id,
sequence,
data,
}))
})
.await;
match result {
Ok(Ok(Some(ZmqMessage {
publisher_id,
sequence,
data: frame_bytes,
..
}))) => {
// Log dedup metadata for debugging
tracing::trace!(
publisher_id = publisher_id,
sequence = sequence,
"Socket pump received ZMQ message"
);
// Parse binary frame
let frame_bytes = Bytes::from(frame_bytes);
match Frame::decode(frame_bytes) {
Ok(frame) => {
// Broadcast payload to all subscribers
// Ignore send errors (no receivers or lagging receivers)
let _ = broadcast_tx.send(frame.payload);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to decode ZMQ frame in socket pump");
continue;
}
}
}
Ok(Ok(None)) => {
// Timeout (EAGAIN), continue polling
continue;
}
Ok(Err(e)) => {
tracing::error!(error = %e, "ZMQ receive error in socket pump");
let Some(result) = socket.next().await else {
tracing::info!("ZMQ socket stream ended");
break;
};
let frames = match result {
Ok(frames) => multipart_message(frames),
Err(error) => {
tracing::error!(error = %error, "ZMQ receive error in socket pump");
break;
}
Err(e) => {
tracing::error!(error = %e, "Task join error in socket pump");
break;
};
if frames.len() != 4 {
tracing::warn!(
frame_count = frames.len(),
"Unexpected multipart frame count in socket pump"
);
continue;
}
let publisher_id_bytes = &frames[1];
if publisher_id_bytes.len() != 8 {
tracing::warn!(
actual = publisher_id_bytes.len(),
"Invalid publisher_id frame in socket pump"
);
continue;
}
let publisher_id =
u64::from_be_bytes(publisher_id_bytes.as_slice().try_into().unwrap());
let sequence_bytes = &frames[2];
if sequence_bytes.len() != 8 {
tracing::warn!(
actual = sequence_bytes.len(),
"Invalid sequence frame in socket pump"
);
continue;
}
let sequence = u64::from_be_bytes(sequence_bytes.as_slice().try_into().unwrap());
tracing::trace!(
publisher_id = publisher_id,
sequence = sequence,
"Socket pump received ZMQ message"
);
let frame_bytes = Bytes::from(frames[3].clone());
match Frame::decode(frame_bytes) {
Ok(frame) => {
let _ = broadcast_tx.send(frame.payload);
}
Err(error) => {
tracing::warn!(error = %error, "Failed to decode ZMQ frame in socket pump");
}
}
}
......@@ -462,22 +333,14 @@ impl ZmqSubTransport {
#[async_trait]
impl EventTransportRx for ZmqSubTransport {
async fn subscribe(&self, _subject: &str) -> Result<WireStream> {
// Subscribe to broadcast channel (does not hold socket lock)
let mut receiver = self.broadcast_tx.subscribe();
let stream = stream! {
loop {
match receiver.recv().await {
Ok(payload) => {
yield Ok(payload);
}
Ok(payload) => yield Ok(payload),
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(
skipped = skipped,
"Subscriber lagged behind, skipped messages"
);
// Continue receiving, don't break the stream
continue;
tracing::warn!(skipped = skipped, "Subscriber lagged behind, skipped messages");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Broadcast channel closed");
......@@ -504,7 +367,7 @@ mod tests {
#[tokio::test]
async fn test_zmq_pubsub_basic() {
let port = 25555;
let endpoint = format!("tcp://127.0.0.1:{}", port);
let endpoint = format!("tcp://127.0.0.1:{port}");
let topic = "test-topic";
let (publisher, _actual_endpoint) = ZmqPubTransport::bind(&endpoint, topic)
......@@ -517,7 +380,6 @@ mod tests {
.await
.expect("Failed to create subscriber");
use futures::StreamExt;
let mut stream = subscriber
.subscribe(topic)
.await
......@@ -551,14 +413,13 @@ mod tests {
#[tokio::test]
async fn test_zmq_multiple_messages() {
let port = 25556;
let endpoint = format!("tcp://127.0.0.1:{}", port);
let endpoint = format!("tcp://127.0.0.1:{port}");
let topic = "multi-test";
let (publisher, _) = ZmqPubTransport::bind(&endpoint, topic).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let subscriber = ZmqSubTransport::connect(&endpoint, topic).await.unwrap();
use futures::StreamExt;
let mut stream = subscriber.subscribe(topic).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
......@@ -570,7 +431,7 @@ mod tests {
sequence: i,
published_at: 1700000000000 + i,
topic: topic.to_string(),
payload: Bytes::from(format!("message {}", i)),
payload: Bytes::from(format!("message {i}")),
};
let bytes = codec.encode_envelope(&envelope).unwrap();
......@@ -579,7 +440,7 @@ mod tests {
for i in 0..5 {
let result = timeout(Duration::from_secs(2), stream.next()).await;
assert!(result.is_ok(), "Timeout on message {}", i);
assert!(result.is_ok(), "Timeout on message {i}");
let received = result.unwrap().unwrap().unwrap();
let decoded = codec.decode_envelope(&received).unwrap();
......
......@@ -5,8 +5,8 @@
//!
//! This module provides a ZMQ transport for the [crate::DistributedRuntime].
//!
//! Currently, the [Server] consists of a [async_zmq::Router] and the [Client] leverages
//! a [async_zmq::Dealer].
//! Currently, the [Server] consists of a [tmq::router::Router] and the [Client] leverages
//! a [tmq::dealer::Dealer].
//!
//! The distributed service pattern we will use is based on the Harmony pattern described in
//! [Chapter 8: A Framework for Distributed Computing](https://zguide.zeromq.org/docs/chapter8/#True-Peer-Connectivity-Harmony-Pattern).
......@@ -16,19 +16,21 @@
//! equivalent of a connection pool per upstream service at the cost of needing an extra internal
//! routing step per service endpoint.
use anyhow::{Result, anyhow};
use async_zmq::{Context, Dealer, Router, Sink, SinkExt, StreamExt};
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use derive_getters::Dissolve;
use futures::TryStreamExt;
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, os::fd::FromRawFd, sync::Arc, time::Duration, vec::IntoIter};
use std::{collections::HashMap, sync::Arc};
use tmq::{AsZmqSocket, Context as TmqContext, dealer, router};
use tokio::{
sync::{Mutex, mpsc},
task::{JoinError, JoinHandle},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
pub type MultipartMessage = Vec<Vec<u8>>;
// Core message types
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ControlMessage {
......@@ -89,8 +91,8 @@ pub struct Server {
}
impl Server {
/// Create a new [Server] which is a [async_zmq::Router] with the given [async_zmq::Context] and address to bind
/// the ZMQ [async_zmq::Router] socket.
/// Create a new [Server] which is a [tmq::router::Router] with the given [tmq::Context]
/// and address to bind the ZMQ router socket.
///
/// If the event loop processing the router fails with an error, the signal is propagated through the [CancellationToken]
/// by issuing a [CancellationToken::cancel].
......@@ -99,12 +101,12 @@ impl Server {
///
/// The [ServerExecutionHandle] is the handle for background task executing the [Server].
pub async fn new(
context: &Context,
context: &TmqContext,
address: &str,
cancel_token: CancellationToken,
) -> Result<(Self, ServerExecutionHandle)> {
let router = async_zmq::router(address)?.with_context(context).bind()?;
let fd = router.as_raw_socket().get_fd()?;
let router = router(context).bind(address)?;
let fd = router.get_socket().get_fd()?;
let state = Arc::new(Mutex::new(RouterState::new()));
// can cancel the router's event loop
......@@ -142,7 +144,7 @@ impl Server {
// pub async fn register_stream(&)
async fn run(
router: Router<IntoIter<Vec<u8>>, Vec<u8>>,
router: tmq::router::Router,
state: Arc<Mutex<RouterState>>,
token: CancellationToken,
) -> Result<()> {
......@@ -298,19 +300,17 @@ impl ServerExecutionHandle {
// Client implementation
pub struct Client {
dealer: Dealer<IntoIter<Vec<u8>>, Vec<u8>>,
dealer: tmq::dealer::Dealer,
}
impl Client {
fn new(context: &Context, address: &str) -> Result<Self> {
let dealer = async_zmq::dealer(address)?
.with_context(context)
.connect()?;
fn new(context: &TmqContext, address: &str) -> Result<Self> {
let dealer = dealer(context).connect(address)?;
Ok(Self { dealer })
}
fn dealer(&mut self) -> &mut Dealer<IntoIter<Vec<u8>>, Vec<u8>> {
fn dealer(&mut self) -> &mut tmq::dealer::Dealer {
&mut self.dealer
}
......@@ -356,7 +356,7 @@ mod tests {
#[tokio::test]
async fn test_basic_communication() -> Result<()> {
let context = Context::new();
let context = TmqContext::new();
let address = "tcp://127.0.0.1:1337";
let token = CancellationToken::new();
......@@ -373,7 +373,10 @@ mod tests {
client
.dealer()
.send(vec![id.as_bytes().to_vec(), id.as_bytes().to_vec()].into())
.send(tmq::Multipart::from(vec![
id.as_bytes().to_vec(),
id.as_bytes().to_vec(),
]))
.await?;
let receive_result = rx.recv().await;
......@@ -384,7 +387,7 @@ mod tests {
let received_str = String::from_utf8_lossy(&received).to_string();
assert_eq!(received_str, "test-request");
client.dealer().close().await?;
drop(client);
handle.cancel();
handle.join().await?;
......
......@@ -871,14 +871,14 @@ def _test_router_indexers_sync(
router_event_threads=router_event_threads,
)
# If standalone indexer mode, launch mockers one-by-one and register.
# If standalone indexer mode, launch workers one-by-one and register.
# We need to create a temporary endpoint just to discover worker IDs.
if standalone_indexer_url:
tmp_runtime = get_runtime(store_backend, request_plane)
tmp_endpoint = tmp_runtime.endpoint(
f"{engine_workers.namespace}.{engine_workers.component_name}.generate"
)
await engine_workers.launch_mockers_with_indexer(tmp_endpoint)
await engine_workers.launch_workers_with_indexer(tmp_endpoint)
async def send_requests_to_router(router, num_requests, router_name, endpoint):
# Now send the actual requests
......@@ -1511,10 +1511,10 @@ def _test_router_decisions(
# Create KvRouterConfig with lower snapshot threshold for testing
# Use async to manage the test flow
async def test_sync():
# If standalone indexer mode, launch mockers one-by-one and register.
# If standalone indexer mode, launch workers one-by-one and register.
# Must happen before KvRouter creation since KvRouter blocks until workers appear.
if standalone_indexer_url:
await engine_workers.launch_mockers_with_indexer(endpoint)
await engine_workers.launch_workers_with_indexer(endpoint)
# Workers register one instance per process (not per dp_rank)
expected_num_instances = engine_workers.num_workers
......
......@@ -242,8 +242,10 @@ def run_indexers_sync_test(
block_size: int,
model_name: str,
num_workers: int,
extra_process_kwargs: dict[str, Any] | None = None,
):
nats_process, _etcd_process = runtime_services_dynamic_ports
process_kwargs = extra_process_kwargs or {}
with engine_process_cls(
request,
......@@ -253,6 +255,7 @@ def run_indexers_sync_test(
store_backend=store_backend,
durable_kv_events=durable_kv_events,
**{engine_args_name: engine_args},
**process_kwargs,
) as engine_workers:
_test_router_indexers_sync(
engine_workers=engine_workers,
......@@ -264,4 +267,13 @@ def run_indexers_sync_test(
test_nats_interruption=not durable_kv_events,
nats_server=nats_process if not durable_kv_events else None,
durable_kv_events=durable_kv_events,
standalone_indexer_url=getattr(
engine_workers, "standalone_indexer_url", None
),
standalone_indexer_b_url=getattr(
engine_workers, "standalone_indexer_b_url", None
),
test_zmq_replay=bool(
getattr(engine_workers, "standalone_indexer_url", None)
),
)
......@@ -203,7 +203,7 @@ class MockerProcess:
"""Manages mocker engine instances with shared tokio runtime via --num-workers.
When standalone_indexer=True, launches mockers one-by-one (each as --num-workers 1)
and runs a standalone HTTP KV indexer binary alongside them. Call launch_mockers_with_indexer()
and runs a standalone HTTP KV indexer binary alongside them. Call launch_workers_with_indexer()
in async context to start mockers and register their ZMQ ports with the indexer.
"""
......@@ -282,7 +282,7 @@ class MockerProcess:
self._standalone_indexer_port = indexer_ports[0]
self._standalone_indexer_b_port = indexer_ports[1]
request.addfinalizer(lambda: deallocate_ports(indexer_ports))
# Don't build a single mocker command — we'll launch per-mocker in launch_mockers_with_indexer
# Don't build a single mocker command — we'll launch per-worker in launch_workers_with_indexer
self._process = None
else:
command = _build_mocker_command(
......@@ -347,14 +347,14 @@ class MockerProcess:
f"Starting standalone indexer on port {self._standalone_indexer_port}"
)
self._indexer_process.__enter__()
# Don't start mocker processes yet — launch_mockers_with_indexer will do it
# Don't start mocker processes yet — launch_workers_with_indexer will do it
else:
logger.info(f"Starting mocker process with {self.num_workers} worker(s)")
self._process.__enter__()
return self
async def launch_mockers_with_indexer(self, endpoint):
"""Launch mockers one-by-one and register each with the standalone indexer.
async def launch_workers_with_indexer(self, endpoint):
"""Launch workers one-by-one and register each with the standalone indexer.
For each mocker:
1. Launch a mocker process with --num-workers 1
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
# Timing notes (measured locally):
# - GPU-1 subset (`-m "gpu_1 and not gpu_2"`): 130.43s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
......@@ -10,6 +12,7 @@ import logging
import os
from typing import Any, Dict, Optional
import aiohttp
import pytest
from tests.router.e2e_harness import (
......@@ -18,7 +21,11 @@ from tests.router.e2e_harness import (
run_indexers_sync_test,
run_router_decisions_test,
)
from tests.router.helper import generate_random_suffix
from tests.router.helper import (
generate_random_suffix,
get_kv_indexer_command,
wait_for_indexer_workers_active,
)
from tests.utils.constants import DefaultPort
from tests.utils.managed_process import ManagedProcess
from tests.utils.port_utils import allocate_ports, deallocate_ports
......@@ -74,6 +81,8 @@ class VLLMProcess(ManagedEngineProcessMixin):
request_plane: str = "tcp",
store_backend: str = "etcd",
durable_kv_events: bool = False,
standalone_indexer: bool = False,
zmq_replay: bool = False,
):
"""Initialize vLLM workers with dynamo integration.
......@@ -100,16 +109,41 @@ class VLLMProcess(ManagedEngineProcessMixin):
self.num_workers = num_workers
self.data_parallel_size = data_parallel_size
self.worker_processes = []
self.worker_id_to_zmq_ports: dict[int, dict[int, str]] = {}
self._worker_id_to_replay_ports: dict[int, dict[int, str]] = {}
self.store_backend = store_backend
self._request = request
self._request_plane = request_plane
self._standalone_indexer = standalone_indexer
self._zmq_replay = zmq_replay
self._standalone_indexer_port: Optional[int] = None
self._standalone_indexer_b_port: Optional[int] = None
self._indexer_process: Optional[ManagedProcess] = None
self._indexer_b_process: Optional[ManagedProcess] = None
# Dynamically allocate unique system, KV event, and NIXL side-channel
# ports (one of each per worker) to avoid conflicts in parallel test runs.
self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._nixl_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._replay_ports = (
allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
if standalone_indexer and zmq_replay
else []
)
self._indexer_ports = (
allocate_ports(2, DefaultPort.SYSTEM1.value) if standalone_indexer else []
)
if standalone_indexer:
self._standalone_indexer_port = self._indexer_ports[0]
self._standalone_indexer_b_port = self._indexer_ports[1]
request.addfinalizer(
lambda: deallocate_ports(
self._system_ports + self._kv_event_ports + self._nixl_ports
self._system_ports
+ self._kv_event_ports
+ self._nixl_ports
+ self._replay_ports
+ self._indexer_ports
)
)
......@@ -123,6 +157,7 @@ class VLLMProcess(ManagedEngineProcessMixin):
enforce_eager = vllm_args.get("enforce_eager", False)
self.model_name = model
self.block_size = vllm_args.get("block_size", BLOCK_SIZE)
# Create vLLM worker processes
# Matches test.sh behavior:
......@@ -195,17 +230,22 @@ class VLLMProcess(ManagedEngineProcessMixin):
system_port = self._system_ports[worker_idx]
kv_event_port = self._kv_event_ports[worker_idx]
nixl_port = self._nixl_ports[worker_idx]
replay_port = (
self._replay_ports[worker_idx]
if worker_idx < len(self._replay_ports)
else None
)
# Pass KV events config explicitly via CLI
kv_events_cfg = json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": f"tcp://*:{kv_event_port}",
"enable_kv_cache_events": True,
}
)
command.extend(["--kv-events-config", kv_events_cfg])
kv_events_cfg: Dict[str, Any] = {
"publisher": "zmq",
"topic": "kv-events",
"endpoint": f"tcp://*:{kv_event_port}",
"enable_kv_cache_events": True,
}
if replay_port is not None:
kv_events_cfg["replay_endpoint"] = f"tcp://*:{replay_port}"
command.extend(["--kv-events-config", json.dumps(kv_events_cfg)])
env = os.environ.copy() # Copy parent environment
env_vars = {
......@@ -248,6 +288,178 @@ class VLLMProcess(ManagedEngineProcessMixin):
f"with endpoint: {self.endpoint}"
)
@property
def standalone_indexer_url(self) -> Optional[str]:
if self._standalone_indexer_port is not None:
return f"http://localhost:{self._standalone_indexer_port}"
return None
@property
def standalone_indexer_b_url(self) -> Optional[str]:
if self._standalone_indexer_b_port is not None:
return f"http://localhost:{self._standalone_indexer_b_port}"
return None
def __enter__(self):
if not self._standalone_indexer:
return super().__enter__()
indexer_cmd = [
*get_kv_indexer_command(),
"--block-size",
str(self.block_size),
"--port",
str(self._standalone_indexer_port),
]
self._indexer_process = ManagedProcess(
command=indexer_cmd,
timeout=120,
display_output=True,
health_check_ports=[self._standalone_indexer_port],
health_check_urls=[],
log_dir=self._request.node.name,
terminate_all_matching_process_names=False,
display_name="dynamo-kv-indexer",
)
logger.info(
"Starting standalone indexer on port %s", self._standalone_indexer_port
)
self._indexer_process.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._standalone_indexer:
for process in self.worker_processes:
process.__exit__(exc_type, exc_val, exc_tb)
if self._indexer_b_process is not None:
self._indexer_b_process.__exit__(exc_type, exc_val, exc_tb)
self._indexer_b_process = None
if self._indexer_process is not None:
self._indexer_process.__exit__(exc_type, exc_val, exc_tb)
self._indexer_process = None
return
super().__exit__(exc_type, exc_val, exc_tb)
async def launch_workers_with_indexer(self, endpoint):
if not self._standalone_indexer:
raise RuntimeError(
"launch_workers_with_indexer requires standalone_indexer=True"
)
client = await endpoint.client()
known_ids: set[int] = set()
register_url = f"{self.standalone_indexer_url}/register"
async with aiohttp.ClientSession() as session:
for worker_idx, process in enumerate(self.worker_processes):
process.__enter__()
new_worker_id = None
for _ in range(120):
ids = set(client.instance_ids())
new = ids - known_ids
if new:
new_worker_id = new.pop()
known_ids.add(new_worker_id)
break
await asyncio.sleep(0.5)
if new_worker_id is None:
raise RuntimeError(
f"Timed out waiting for vLLM worker {worker_idx} to register "
f"(known_ids={known_ids})"
)
zmq_endpoint = f"tcp://127.0.0.1:{self._kv_event_ports[worker_idx]}"
replay_endpoint = (
f"tcp://127.0.0.1:{self._replay_ports[worker_idx]}"
if worker_idx < len(self._replay_ports)
else None
)
payload = {
"instance_id": new_worker_id,
"endpoint": zmq_endpoint,
"dp_rank": 0,
"model_name": self.model_name,
"block_size": self.block_size,
}
if replay_endpoint is not None:
payload["replay_endpoint"] = replay_endpoint
async with session.post(register_url, json=payload) as resp:
if resp.status != 201:
body = await resp.text()
raise RuntimeError(
f"Failed to register vLLM instance {new_worker_id}: "
f"{resp.status} {body}"
)
self.worker_id_to_zmq_ports[new_worker_id] = {0: zmq_endpoint}
if replay_endpoint is not None:
self._worker_id_to_replay_ports[new_worker_id] = {
0: replay_endpoint
}
logger.info(
"vLLM worker %s: worker_id=%s, zmq_endpoint=%s, replay_endpoint=%s",
worker_idx,
new_worker_id,
zmq_endpoint,
replay_endpoint,
)
await wait_for_indexer_workers_active(
self.standalone_indexer_url, self.worker_id_to_zmq_ports
)
logger.info(
"All %s vLLM workers launched and registered with indexer",
self.num_workers,
)
def launch_indexer(self):
if not self._standalone_indexer or self._standalone_indexer_b_port is None:
raise RuntimeError("launch_indexer requires standalone_indexer=True")
if not self.worker_id_to_zmq_ports:
raise RuntimeError("launch_indexer requires workers to be registered first")
worker_entries = []
for worker_id, zmq_addresses in self.worker_id_to_zmq_ports.items():
for dp_rank, zmq_endpoint in zmq_addresses.items():
worker_entries.append(f"{worker_id}:{dp_rank}={zmq_endpoint}")
workers_arg = ",".join(worker_entries)
indexer_b_cmd = [
*get_kv_indexer_command(),
"--block-size",
str(self.block_size),
"--port",
str(self._standalone_indexer_b_port),
"--peers",
f"http://localhost:{self._standalone_indexer_port}",
"--workers",
workers_arg,
"--model-name",
self.model_name,
]
self._indexer_b_process = ManagedProcess(
command=indexer_b_cmd,
timeout=120,
display_output=True,
health_check_ports=[self._standalone_indexer_b_port],
health_check_urls=[],
log_dir=self._request.node.name,
terminate_all_matching_process_names=False,
display_name="dynamo-kv-indexer-b",
)
logger.info(
"Starting standalone indexer B on port %s with peer http://localhost:%s",
self._standalone_indexer_b_port,
self._standalone_indexer_port,
)
self._indexer_b_process.__enter__()
process_name = "vLLM worker"
cleanup_name = "vLLM worker resources"
init_delay_reason = "initialize NIXL before starting next worker"
......@@ -394,4 +606,5 @@ def test_vllm_indexers_sync(
block_size=BLOCK_SIZE,
model_name=MODEL_NAME,
num_workers=2,
extra_process_kwargs={"standalone_indexer": True, "zmq_replay": True},
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment