Unverified Commit cd4773fb authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: ForwardPassMetrics dynamo event plane integration (#7250)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 197f6595
......@@ -4,14 +4,33 @@
"""
ForwardPassMetrics schema for per-iteration scheduler telemetry.
Published over ZMQ PUB by InstrumentedScheduler, consumed by the
planner or any ZMQ SUB listener.
Uses msgspec.Struct for zero-copy serialization (same approach as KV cache events).
We do not use prometheus for forward pass metrics because:
1. Metric scrapper for pull based prometheus metrics is async with engine.
Metrics can be easily lost/repeated.
2. Push based prometheus uses HTTP and might not scale as well as ZMQ.
3. Existing KV event infra can be reused for forward pass metrics.
Uses msgspec.Struct for zero-copy serialization (same approach as
vLLM's KV cache events).
Data flow (two-layer relay, same architecture as KV events)::
TODO: hook to our rust infra for discovery
TODO: add metrics for Trtllm/SGLang
EngineCore child process:
InstrumentedScheduler -> _FpmPublisherThread -> ZMQ PUB (localhost)
Dynamo parent process:
FpmEventRelay (ZMQ SUB) -> EventPublisher -> Event Plane (NATS/ZMQ)
Consumer (planner, etc.):
FpmEventSubscriber (auto-discovered) -> decode() -> ForwardPassMetrics
The raw ZMQ hop is needed because the scheduler runs in a forked child
process without access to the Dynamo runtime. The FpmEventRelay bridge
in the parent process handles event plane transport and discovery
registration automatically.
See ``dynamo.common.recv_forward_pass_metrics`` for a standalone
consumer example.
TODO: add metrics for TrtLLM/SGLang
TODO: planner consuming these metrics instead of frontend/router metrics
"""
......@@ -20,6 +39,42 @@ from __future__ import annotations
import msgspec
class WelfordAccumulator:
"""Welford's online algorithm for count / sum / population-variance.
Numerically stable single-pass computation -- avoids catastrophic
cancellation that sum-of-squares can suffer with large values.
Usage::
acc = WelfordAccumulator()
for v in values:
acc.add(v)
print(acc.n, acc.s, acc.variance())
"""
__slots__ = ("n", "s", "_mean", "_m2")
def __init__(self) -> None:
self.n = 0
self.s = 0
self._mean = 0.0
self._m2 = 0.0
def add(self, v: int) -> None:
self.n += 1
self.s += v
delta = v - self._mean
self._mean += delta / self.n
delta2 = v - self._mean
self._m2 += delta * delta2
def variance(self) -> float:
if self.n == 0:
return 0.0
return self._m2 / self.n
class ScheduledRequestMetrics(
msgspec.Struct,
frozen=True,
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Receive ForwardPassMetrics via the Dynamo event plane.
Auto-discovers engine publishers through the discovery plane (K8s CRD /
etcd / file) and prints each metric message as JSON.
Usage:
python -m dynamo.common.recv_forward_pass_metrics \\
--namespace dynamo --component backend --endpoint generate \\
[--discovery-backend etcd] [--request-plane nats]
"""
import argparse
import asyncio
import json
import os
import sys
import msgspec
from dynamo.common.forward_pass_metrics import decode
from dynamo.runtime import DistributedRuntime
def main() -> None:
parser = argparse.ArgumentParser(
description="Receive ForwardPassMetrics from the Dynamo event plane"
)
parser.add_argument(
"--namespace", default="dynamo", help="Dynamo namespace (default: dynamo)"
)
parser.add_argument(
"--component", default="backend", help="Dynamo component (default: backend)"
)
parser.add_argument(
"--endpoint", default="generate", help="Dynamo endpoint (default: generate)"
)
parser.add_argument(
"--discovery-backend",
default=os.environ.get("DYN_DISCOVERY_BACKEND", "etcd"),
help="Discovery backend (default: etcd)",
)
parser.add_argument(
"--request-plane",
default=os.environ.get("DYN_REQUEST_PLANE", "nats"),
help="Request plane (default: nats)",
)
args = parser.parse_args()
asyncio.run(run(args))
async def run(args: argparse.Namespace) -> None:
from dynamo.llm import FpmEventSubscriber
loop = asyncio.get_running_loop()
event_plane = os.environ.get("DYN_EVENT_PLANE", "nats")
enable_nats = args.request_plane == "nats" or event_plane == "nats"
runtime = DistributedRuntime(
loop, args.discovery_backend, args.request_plane, enable_nats
)
endpoint = runtime.endpoint(f"{args.namespace}.{args.component}.{args.endpoint}")
subscriber = FpmEventSubscriber(endpoint)
json_encoder = msgspec.json.Encoder()
print(
f"Subscribed to forward-pass-metrics via event plane "
f"(namespace={args.namespace}, component={args.component}) "
f"Ctrl+C to stop",
file=sys.stderr,
)
seq = 0
try:
while True:
data = await asyncio.to_thread(subscriber.recv)
if data is None:
print("Stream closed.", file=sys.stderr)
break
metrics = decode(data)
pretty = json.loads(json_encoder.encode(metrics))
print(f"[seq={seq}] {json.dumps(pretty, indent=2)}", flush=True)
seq += 1
except KeyboardInterrupt:
print("\nStopped.", file=sys.stderr)
finally:
subscriber.shutdown()
if __name__ == "__main__":
main()
......@@ -259,7 +259,7 @@ def update_engine_config_with_dynamo(
f"(use_kv_events={dynamo_config.use_kv_events})"
)
if envs.is_set("DYN_VLLM_FORWARDPASS_METRIC_PORT"):
if envs.is_set("DYN_FORWARDPASS_METRIC_PORT"):
existing_cls = getattr(engine_config, "scheduler_cls", None)
if existing_cls is None:
defaults[
......@@ -267,11 +267,11 @@ def update_engine_config_with_dynamo(
] = "dynamo.vllm.instrumented_scheduler.InstrumentedScheduler"
logger.info(
"Forward pass metrics enabled: scheduler_cls set to InstrumentedScheduler "
f"(port={envs.DYN_VLLM_FORWARDPASS_METRIC_PORT})"
f"(port={envs.DYN_FORWARDPASS_METRIC_PORT})"
)
else:
logger.warning(
f"DYN_VLLM_FORWARDPASS_METRIC_PORT is set but scheduler_cls "
f"DYN_FORWARDPASS_METRIC_PORT is set but scheduler_cls "
f"is already '{existing_cls}'. InstrumentedScheduler will NOT "
f"be injected. To use forward pass metrics, either remove "
f"--scheduler-cls or subclass InstrumentedScheduler."
......
......@@ -20,7 +20,7 @@ REGISTERED_PORT_MAX = 49151
if TYPE_CHECKING:
DYN_VLLM_KV_EVENT_PORT: int = 20080
DYN_VLLM_FORWARDPASS_METRIC_PORT: int = 20380
DYN_FORWARDPASS_METRIC_PORT: int = 20380
def _resolve_port(env_var: str, default_port: int) -> int:
......@@ -62,8 +62,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Port used for KV events publishing to the frontend
# Note: This env variable is ignored if explicitly using --kv-events-config ''
"DYN_VLLM_KV_EVENT_PORT": lambda: _resolve_port("DYN_VLLM_KV_EVENT_PORT", 20080),
"DYN_VLLM_FORWARDPASS_METRIC_PORT": lambda: _resolve_port(
"DYN_VLLM_FORWARDPASS_METRIC_PORT", 20380
"DYN_FORWARDPASS_METRIC_PORT": lambda: _resolve_port(
"DYN_FORWARDPASS_METRIC_PORT", 20380
),
}
......
......@@ -310,6 +310,7 @@ class BaseWorkerHandler(ABC):
self.engine_client = engine
self.default_sampling_params = default_sampling_params
self.kv_publishers: list[KvEventPublisher] | None = None
self.fpm_relays: list | None = None
self.generate_endpoint = generate_endpoint
self.config = config
self.engine_monitor = VllmEngineMonitor(runtime, engine, shutdown_event)
......
......@@ -34,6 +34,7 @@ from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
WelfordAccumulator,
encode,
)
......@@ -47,36 +48,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
DEFAULT_FPM_PORT = 20380
ENV_FPM_PORT = "DYN_VLLM_FORWARDPASS_METRIC_PORT"
class _Accum:
"""Welford's online algorithm for count / sum / population-variance.
Numerically stable single-pass computation -- avoids catastrophic
cancellation that sum-of-squares can suffer with large values.
"""
__slots__ = ("n", "s", "_mean", "_m2")
def __init__(self) -> None:
self.n = 0
self.s = 0
self._mean = 0.0
self._m2 = 0.0
def add(self, v: int) -> None:
self.n += 1
self.s += v
delta = v - self._mean
self._mean += delta / self.n
delta2 = v - self._mean
self._m2 += delta * delta2
def variance(self) -> float:
if self.n == 0:
return 0.0
return self._m2 / self.n
ENV_FPM_PORT = "DYN_FORWARDPASS_METRIC_PORT"
# ---------------------------------------------------------------------------
......@@ -256,7 +228,7 @@ class InstrumentedScheduler(Scheduler):
return result
# ------------------------------------------------------------------
# Metric extraction (single-pass with _Accum, no lists)
# Metric extraction (single-pass with WelfordAccumulator, no lists)
# ------------------------------------------------------------------
def _extract_metrics(
......@@ -280,9 +252,9 @@ class InstrumentedScheduler(Scheduler):
num_prefill = 0
sum_prefill_tokens = 0
prefill_lengths = _Accum()
prefill_lengths = WelfordAccumulator()
sum_prefill_kv_tokens = 0
decode_kv = _Accum()
decode_kv = WelfordAccumulator()
for req in new_reqs:
num_prefill += 1
......@@ -313,8 +285,8 @@ class InstrumentedScheduler(Scheduler):
def _compute_queued(self) -> QueuedRequestMetrics:
"""Single-pass aggregation over self.waiting -- no intermediate list."""
prefill = _Accum()
decode_kv = _Accum()
prefill = WelfordAccumulator()
decode_kv = WelfordAccumulator()
for request in self.waiting:
if request.status == RequestStatus.PREEMPTED:
......
......@@ -41,6 +41,7 @@ from dynamo.runtime import DistributedRuntime, Endpoint
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.worker_factory import WorkerFactory
from . import envs
from .args import Config, _uses_dynamo_connector, parse_args
from .constants import DisaggregationMode
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler, get_dp_range_for_worker
......@@ -369,6 +370,51 @@ def setup_kv_event_publisher(
return kv_publishers if kv_publishers else None
def setup_fpm_relay(
generate_endpoint: Endpoint,
vllm_config: VllmConfig,
) -> Optional[list]:
"""
Set up forward pass metrics relays for the event plane.
Creates one FpmEventRelay per dp_rank. Each relay subscribes to the
local raw ZMQ PUB from InstrumentedScheduler (in the EngineCore child
process) and re-publishes to the Dynamo event plane with automatic
discovery registration.
Returns:
List of FpmEventRelay instances, or None if FPM is not enabled.
"""
if not envs.is_set("DYN_FORWARDPASS_METRIC_PORT"):
return None
try:
from dynamo.llm import FpmEventRelay
except ImportError:
logger.warning(
"FpmEventRelay not available (Rust bindings not built with FPM support). "
"Forward pass metrics will not be relayed to the event plane."
)
return None
dp_start, dp_size = get_dp_range_for_worker(vllm_config)
relays = []
for dp_rank in range(dp_start, dp_start + dp_size):
base_port = envs.DYN_FORWARDPASS_METRIC_PORT
zmq_endpoint = f"tcp://127.0.0.1:{base_port + dp_rank}"
relay = FpmEventRelay(
endpoint=generate_endpoint,
zmq_endpoint=zmq_endpoint,
)
relays.append(relay)
logger.info(f"FPM relay for dp_rank={dp_rank} subscribing to {zmq_endpoint}")
return relays if relays else None
def setup_vllm_engine(
config: Config,
stat_logger: Optional[StatLoggerFactory] = None,
......@@ -618,6 +664,7 @@ async def init_prefill(
)
# Use pre-created engine if provided (checkpoint mode), otherwise create new
fpm_worker_id = str(generate_endpoint.connection_id())
if snapshot_engine is not None:
(
engine_client,
......@@ -626,6 +673,11 @@ async def init_prefill(
prometheus_temp_dir,
_component_gauges,
) = snapshot_engine
# TODO: The scheduler in the child process still has worker_id=""
# because the engine was forked before the runtime existed.
# Propagating the new ID to the child requires shared memory or
# a restart of the EngineCore process.
vllm_config.additional_config["fpm_worker_id"] = fpm_worker_id
else:
(
engine_client,
......@@ -633,9 +685,7 @@ async def init_prefill(
default_sampling_params,
prometheus_temp_dir,
_component_gauges,
) = setup_vllm_engine(
config, fpm_worker_id=str(generate_endpoint.connection_id())
)
) = setup_vllm_engine(config, fpm_worker_id=fpm_worker_id)
handler = PrefillWorkerHandler(
runtime,
......@@ -675,6 +725,13 @@ async def init_prefill(
if kv_publishers:
handler.kv_publishers = kv_publishers
# Set up forward pass metrics relay (child ZMQ -> event plane).
# In checkpoint mode the engine was created before the runtime, so
# ForwardPassMetrics.worker_id will be empty (relay still works).
fpm_relays = setup_fpm_relay(generate_endpoint, vllm_config)
if fpm_relays:
handler.fpm_relays = fpm_relays
setup_metrics_collection(config, generate_endpoint, logger)
# Register sleep/wake_up engine routes
......@@ -790,6 +847,7 @@ async def init(
)
# Use pre-created engine if provided (checkpoint mode), otherwise create new
fpm_worker_id = str(generate_endpoint.connection_id())
if snapshot_engine is not None:
(
engine_client,
......@@ -798,6 +856,7 @@ async def init(
prometheus_temp_dir,
component_gauges,
) = snapshot_engine
vllm_config.additional_config["fpm_worker_id"] = fpm_worker_id
# Factory is created after unpack so component_gauges is available
factory = StatLoggerFactory(
endpoint=generate_endpoint,
......@@ -816,9 +875,7 @@ async def init(
default_sampling_params,
prometheus_temp_dir,
component_gauges,
) = setup_vllm_engine(
config, factory, fpm_worker_id=str(generate_endpoint.connection_id())
)
) = setup_vllm_engine(config, factory, fpm_worker_id=fpm_worker_id)
# TODO Hack to get data, move this to registering in TBD
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
......@@ -862,6 +919,13 @@ async def init(
if kv_publishers:
handler.kv_publishers = kv_publishers
# Set up forward pass metrics relay (child ZMQ -> event plane).
# In checkpoint mode the engine was created before the runtime, so
# ForwardPassMetrics.worker_id will be empty (relay still works).
fpm_relays = setup_fpm_relay(generate_endpoint, vllm_config)
if fpm_relays:
handler.fpm_relays = fpm_relays
setup_metrics_collection(config, generate_endpoint, logger)
# Register sleep/wake_up engine routes
......
......@@ -166,6 +166,8 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::OverlapScores>()?;
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::RadixTree>()?;
m.add_class::<llm::fpm::FpmEventRelay>()?;
m.add_class::<llm::fpm::FpmEventSubscriber>()?;
m.add_class::<llm::lora::LoRADownloader>()?;
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpAsyncEngine>()?;
......
......@@ -24,6 +24,7 @@
use super::*;
pub mod entrypoint;
pub mod fpm;
pub mod kv;
pub mod local_model;
pub mod lora;
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Python bindings for Forward Pass Metrics (FPM = ForwardPassMetrics) event plane integration.
//!
//! - `FpmEventRelay`: thin wrapper around `dynamo_llm::fpm_publisher::FpmEventRelay`
//! - `FpmEventSubscriber`: wraps `EventSubscriber::for_component` for the consumer side
use std::sync::Arc;
use pyo3::prelude::*;
use tokio_util::sync::CancellationToken;
use super::*;
use crate::Endpoint;
use crate::to_pyerr;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventSubscriber;
const FPM_TOPIC: &str = "forward-pass-metrics";
// ---------------------------------------------------------------------------
// Relay: raw ZMQ (child process) -> event plane
// ---------------------------------------------------------------------------
/// Relay that bridges ForwardPassMetrics from a local raw ZMQ PUB socket
/// (InstrumentedScheduler in EngineCore child process) to the Dynamo event
/// plane with automatic discovery registration.
#[pyclass]
pub(crate) struct FpmEventRelay {
inner: llm_rs::fpm_publisher::FpmEventRelay,
}
#[pymethods]
impl FpmEventRelay {
/// Create a relay that bridges raw ZMQ to the event plane.
///
/// Args:
/// endpoint: Dynamo component endpoint (provides runtime + discovery).
/// zmq_endpoint: Local ZMQ PUB address to subscribe to
/// (e.g., "tcp://127.0.0.1:20380").
#[new]
#[pyo3(signature = (endpoint, zmq_endpoint))]
fn new(endpoint: Endpoint, zmq_endpoint: String) -> PyResult<Self> {
let component = endpoint.inner.component().clone();
let inner =
llm_rs::fpm_publisher::FpmEventRelay::new(component, zmq_endpoint).map_err(to_pyerr)?;
Ok(Self { inner })
}
/// Shut down the relay task.
fn shutdown(&self) {
self.inner.shutdown();
}
}
// ---------------------------------------------------------------------------
// Subscriber: event plane -> consumer
// ---------------------------------------------------------------------------
/// Subscriber for ForwardPassMetrics from the event plane.
///
/// Auto-discovers engine publishers via the discovery plane (K8s CRD / etcd / file).
/// Returns raw msgspec-serialized bytes that Python decodes with
/// `forward_pass_metrics.decode()`.
#[pyclass]
pub(crate) struct FpmEventSubscriber {
rx: Arc<std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>>>,
cancel: CancellationToken,
}
#[pymethods]
impl FpmEventSubscriber {
/// Create a subscriber that auto-discovers FPM publishers.
///
/// Args:
/// endpoint: Dynamo component endpoint (provides runtime + discovery).
#[new]
#[pyo3(signature = (endpoint,))]
fn new(endpoint: Endpoint) -> PyResult<Self> {
let component = endpoint.inner.component().clone();
let rt = component.drt().runtime().secondary();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
rt.spawn(async move {
let mut subscriber = match EventSubscriber::for_component(&component, FPM_TOPIC).await {
Ok(s) => s,
Err(e) => {
tracing::error!("FPM subscriber: failed to create: {e}");
return;
}
};
tracing::info!("FPM subscriber: listening for forward-pass-metrics events");
loop {
tokio::select! {
biased;
_ = cancel_clone.cancelled() => {
tracing::info!("FPM subscriber: shutting down");
break;
}
event = subscriber.next() => {
match event {
Some(Ok(envelope)) => {
if tx.send(envelope.payload.to_vec()).is_err() {
tracing::info!("FPM subscriber: receiver dropped, exiting");
break;
}
}
Some(Err(e)) => {
tracing::warn!("FPM subscriber: event error: {e}");
}
None => {
tracing::info!("FPM subscriber: stream ended");
break;
}
}
}
}
}
});
Ok(Self {
rx: Arc::new(std::sync::Mutex::new(rx)),
cancel,
})
}
/// Blocking receive of next message bytes. Releases the GIL while waiting.
///
/// Returns the raw msgspec payload, or None if the stream is closed.
fn recv(&self, py: Python) -> PyResult<Option<Vec<u8>>> {
let rx = self.rx.clone();
py.allow_threads(move || {
let mut guard = rx
.lock()
.map_err(|e| to_pyerr(format!("lock poisoned: {e}")))?;
Ok(guard.blocking_recv())
})
}
/// Shut down the subscriber.
fn shutdown(&self) {
self.cancel.cancel();
}
}
impl Drop for FpmEventSubscriber {
fn drop(&mut self) {
self.cancel.cancel();
}
}
......@@ -779,6 +779,64 @@ class KvEventPublisher:
"""
...
class FpmEventRelay:
"""
Relay that bridges ForwardPassMetrics from a local raw ZMQ PUB socket
(InstrumentedScheduler in EngineCore child process) to the Dynamo event
plane with automatic discovery registration.
"""
def __init__(
self,
endpoint: Endpoint,
zmq_endpoint: str,
) -> None:
"""
Create a relay.
Args:
endpoint: Dynamo component endpoint (provides runtime + discovery).
zmq_endpoint: Local ZMQ PUB address to subscribe to
(e.g., "tcp://127.0.0.1:20380").
"""
...
def shutdown(self) -> None:
"""Shut down the relay task."""
...
class FpmEventSubscriber:
"""
Subscriber for ForwardPassMetrics from the Dynamo event plane.
Auto-discovers engine publishers via the discovery plane.
"""
def __init__(self, endpoint: Endpoint) -> None:
"""
Create a subscriber that auto-discovers FPM publishers.
Args:
endpoint: Dynamo component endpoint (provides runtime + discovery).
"""
...
def recv(self) -> Optional[bytes]:
"""
Blocking receive of the next message (raw msgspec bytes).
Releases the GIL while waiting.
Returns:
Raw msgspec payload, or None if the stream is closed.
"""
...
def shutdown(self) -> None:
"""Shut down the subscriber."""
...
class HttpService:
"""
A HTTP service for dynamo applications.
......
......@@ -7,6 +7,8 @@ import logging
from dynamo._core import EngineType
from dynamo._core import EntrypointArgs as EntrypointArgs
from dynamo._core import FpmEventRelay as FpmEventRelay
from dynamo._core import FpmEventSubscriber as FpmEventSubscriber
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpService as HttpService
from dynamo._core import KserveGrpcService as KserveGrpcService
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Forward Pass Metrics (FPM = ForwardPassMetrics) relay.
//!
//! Subscribes to the raw ZMQ PUB from `InstrumentedScheduler` (running in
//! a vLLM EngineCore child process) and re-publishes the payloads to the
//! Dynamo event plane with automatic discovery registration.
//!
//! This follows the same two-layer architecture as
//! [`crate::kv_router::publisher::KvEventPublisher`], but is much simpler:
//! no event transformation, no batching, no local indexer — just raw byte relay.
use std::time::Duration;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventPublisher;
const FPM_TOPIC: &str = "forward-pass-metrics";
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
/// A relay that bridges ForwardPassMetrics from a local raw ZMQ PUB socket
/// to the Dynamo event plane.
pub struct FpmEventRelay {
cancel: CancellationToken,
}
impl FpmEventRelay {
/// Create and start a new relay.
///
/// - `component`: Dynamo component (provides runtime + discovery scope).
/// - `zmq_endpoint`: Local ZMQ PUB address to subscribe to
/// (e.g., `tcp://127.0.0.1:20380`).
pub fn new(component: Component, zmq_endpoint: String) -> Result<Self> {
let rt = component.drt().runtime().secondary();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let publisher =
rt.block_on(async { EventPublisher::for_component(&component, FPM_TOPIC).await })?;
rt.spawn(async move {
Self::relay_loop(zmq_endpoint, publisher, cancel_clone).await;
});
Ok(Self { cancel })
}
/// Shut down the relay task.
pub fn shutdown(&self) {
self.cancel.cancel();
}
async fn relay_loop(
zmq_endpoint: String,
publisher: EventPublisher,
cancel: CancellationToken,
) {
let mut socket = SubSocket::new();
if let Err(e) = socket.subscribe("").await {
tracing::error!("FPM relay: failed to subscribe on ZMQ socket: {e}");
return;
}
if let Err(e) = socket.connect(&zmq_endpoint).await {
tracing::error!("FPM relay: failed to connect ZMQ SUB to {zmq_endpoint}: {e}");
return;
}
tracing::info!("FPM relay: connected to {zmq_endpoint}");
let mut consecutive_errors: u32 = 0;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("FPM relay: shutting down");
break;
}
result = socket.recv() => {
match result {
Ok(msg) => {
consecutive_errors = 0;
// ZMQ multipart: [topic, seq, payload]
let mut frames: Vec<Vec<u8>> = msg
.into_vec()
.into_iter()
.map(|f| f.to_vec())
.collect();
if frames.len() == 3 {
let payload = frames.swap_remove(2);
if let Err(e) = publisher.publish_bytes(payload).await {
tracing::warn!("FPM relay: event plane publish failed: {e}");
}
} else {
tracing::warn!(
"FPM relay: unexpected ZMQ frame count: expected 3, got {}",
frames.len()
);
}
}
Err(e) => {
consecutive_errors += 1;
tracing::warn!(
"FPM relay: ZMQ recv error ({consecutive_errors}/{MAX_CONSECUTIVE_ERRORS}): {e}"
);
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
tracing::error!("FPM relay: too many consecutive errors, exiting");
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
}
}
}
impl Drop for FpmEventRelay {
fn drop(&mut self) {
self.cancel.cancel();
}
}
......@@ -16,6 +16,7 @@ pub mod discovery;
pub mod endpoint_type;
pub mod engines;
pub mod entrypoint;
pub mod fpm_publisher;
pub mod grpc;
pub mod http;
pub mod hub;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment