"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "f0065bb41cb8c0edefe731886afa704d039b5b53"
Unverified Commit 1174c819 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: remove deprecated events api (#5778)

parent fc92fc18
...@@ -13,7 +13,7 @@ use crate::Component; ...@@ -13,7 +13,7 @@ use crate::Component;
use llm_rs::kv_router::indexer::KvIndexerInterface; use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::protocols::compute_block_hash_for_seq; use llm_rs::kv_router::protocols::compute_block_hash_for_seq;
use rs::pipeline::{AsyncEngine, SingleIn}; use rs::pipeline::{AsyncEngine, SingleIn};
use rs::traits::events::EventSubscriber; use rs::transports::event_plane::EventSubscriber;
use tracing; use tracing;
use llm_rs::kv_router::protocols::*; use llm_rs::kv_router::protocols::*;
...@@ -890,24 +890,32 @@ impl KvRecorder { ...@@ -890,24 +890,32 @@ impl KvRecorder {
.map_err(to_pyerr)?; .map_err(to_pyerr)?;
// Subscribe to KV events // Subscribe to KV events
let mut kv_events_rx = component let mut kv_events_rx = EventSubscriber::for_component(
.inner &component.inner,
.subscribe(llm_rs::kv_router::KV_EVENT_SUBJECT) llm_rs::kv_router::KV_EVENT_SUBJECT,
.await )
.map_err(to_pyerr)?; .await
.map_err(to_pyerr)?
.typed::<llm_rs::kv_router::protocols::RouterEvent>();
let event_tx = inner.event_sender(); let event_tx = inner.event_sender();
// Spawn a task to forward events to the recorder // Spawn a task to forward events to the recorder
tokio::spawn(async move { tokio::spawn(async move {
while let Some(event) = kv_events_rx.next().await { while let Some(result) = kv_events_rx.next().await {
let event: llm_rs::kv_router::protocols::RouterEvent = let event = match result {
serde_json::from_slice(&event.payload).unwrap(); Ok((_envelope, event)) => event,
Err(e) => {
tracing::warn!("KvRecorder failed to decode kv event: {:?}", e);
continue;
}
};
tracing::debug!("KvRecorder received kv event: {:?}", event); tracing::debug!("KvRecorder received kv event: {:?}", event);
if let Err(e) = event_tx.send(event).await { if let Err(e) = event_tx.send(event).await {
tracing::trace!( tracing::trace!(
"KvRecorder failed to send kv event; shutting down: {:?}", "KvRecorder failed to send kv event; shutting down: {:?}",
e e
); );
break;
} }
} }
}); });
......
...@@ -16,9 +16,7 @@ use tokio::sync::mpsc; ...@@ -16,9 +16,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use zeromq::{Socket, SocketRecv, SubSocket}; use zeromq::{Socket, SocketRecv, SubSocket};
use dynamo_runtime::traits::{ use dynamo_runtime::traits::DistributedRuntimeProvider;
DistributedRuntimeProvider, events::EventPublisher as EventPublisherTrait,
};
use dynamo_runtime::transports::event_plane::EventPublisher; use dynamo_runtime::transports::event_plane::EventPublisher;
use dynamo_runtime::{ use dynamo_runtime::{
component::{Component, Namespace}, component::{Component, Namespace},
...@@ -298,7 +296,7 @@ impl EventSink for EventPublisher { ...@@ -298,7 +296,7 @@ impl EventSink for EventPublisher {
#[async_trait] #[async_trait]
impl EventSink for NatsQueue { impl EventSink for NatsQueue {
async fn publish_event(&self, event: &RouterEvent) -> Result<()> { async fn publish_event(&self, event: &RouterEvent) -> Result<()> {
self.publish(KV_EVENT_SUBJECT, event).await NatsQueue::publish_event(self, KV_EVENT_SUBJECT, event).await
} }
} }
...@@ -384,9 +382,9 @@ async fn start_event_processor_jetstream( ...@@ -384,9 +382,9 @@ async fn start_event_processor_jetstream(
} }
} }
// Then publish to event plane for global distribution // Then publish to NATS JetStream for global distribution
if let Err(e) = publisher.publish_event(&router_event).await { if let Err(e) = publisher.publish_event(KV_EVENT_SUBJECT, &router_event).await {
tracing::error!("Failed to publish event to event plane: {}", e); tracing::error!("Failed to publish event to NATS JetStream: {}", e);
} }
} }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use anyhow::{Context, Result}; // Note: The EventPublisher and EventSubscriber trait impls have been removed.
use async_trait::async_trait; // Use dynamo_runtime::transports::event_plane::{EventPublisher, EventSubscriber} instead:
use futures::stream::StreamExt; // - EventPublisher::for_component(&component, topic)
use futures::{Stream, TryStreamExt}; // - EventSubscriber::for_component(&component, topic)
use serde::{Deserialize, Serialize};
use crate::component::Component;
use crate::traits::DistributedRuntimeProvider;
use crate::traits::events::{EventPublisher, EventSubscriber};
#[async_trait]
impl EventPublisher for Component {
fn subject(&self) -> String {
format!("namespace.{}.component.{}", self.namespace.name, self.name)
}
async fn publish(
&self,
event_name: impl AsRef<str> + Send + Sync,
event: &(impl Serialize + Send + Sync),
) -> Result<()> {
let bytes = serde_json::to_vec(event)?;
self.publish_bytes(event_name, bytes).await
}
async fn publish_bytes(
&self,
event_name: impl AsRef<str> + Send + Sync,
bytes: Vec<u8>,
) -> Result<()> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
self.drt()
.kv_router_nats_publish(subject, bytes.into())
.await?;
Ok(())
}
}
#[async_trait]
impl EventSubscriber for Component {
async fn subscribe(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<async_nats::Subscriber> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
Ok(self.drt().kv_router_nats_subscribe(subject).await?)
}
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<impl Stream<Item = Result<T>> + Send> {
let subscriber = self.subscribe(event_name).await?;
// Transform the subscriber into a stream of deserialized events
let stream = subscriber.map(move |msg| {
serde_json::from_slice::<T>(&msg.payload)
.with_context(|| format!("Failed to deserialize event payload: {:?}", msg.payload))
});
Ok(stream)
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use crate::{DistributedRuntime, Runtime};
use super::*;
// todo - make a distributed runtime fixture
// todo - two options - fully mocked or integration test
#[tokio::test]
async fn test_publish_and_subscribe() {
let rt = Runtime::from_current().unwrap();
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
let ns = dtr.namespace("test_component".to_string()).unwrap();
let cp = ns.component("test_component".to_string()).unwrap();
// Create a subscriber on the component
let mut subscriber = cp.subscribe("test_event").await.unwrap();
// Publish a message from the component
cp.publish("test_event", &"test_message".to_string())
.await
.unwrap();
// Receive the message
if let Some(msg) = subscriber.next().await {
let received = String::from_utf8(msg.payload.to_vec()).unwrap();
assert_eq!(received, "\"test_message\"");
}
rt.shutdown();
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
use serde::Deserialize;
use serde::Serialize;
use crate::component::Namespace; use crate::component::Namespace;
use crate::metrics::{MetricsHierarchy, MetricsRegistry}; use crate::metrics::{MetricsHierarchy, MetricsRegistry};
use crate::traits::DistributedRuntimeProvider;
use crate::traits::events::{EventPublisher, EventSubscriber};
#[async_trait]
impl EventPublisher for Namespace {
fn subject(&self) -> String {
format!("namespace.{}", self.name)
}
async fn publish(
&self,
event_name: impl AsRef<str> + Send + Sync,
event: &(impl Serialize + Send + Sync),
) -> Result<()> {
let bytes = serde_json::to_vec(event)?;
self.publish_bytes(event_name, bytes).await
}
async fn publish_bytes(
&self,
event_name: impl AsRef<str> + Send + Sync,
bytes: Vec<u8>,
) -> Result<()> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
self.drt()
.kv_router_nats_publish(subject, bytes.into())
.await?;
Ok(())
}
}
#[async_trait]
impl EventSubscriber for Namespace {
async fn subscribe(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<async_nats::Subscriber> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
Ok(self.drt().kv_router_nats_subscribe(subject).await?)
}
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<impl Stream<Item = Result<T>> + Send> {
let subscriber = self.subscribe(event_name).await?;
// Transform the subscriber into a stream of deserialized events
let stream = subscriber.map(move |msg| {
serde_json::from_slice::<T>(&msg.payload)
.with_context(|| format!("Failed to deserialize event payload: {:?}", msg.payload))
});
Ok(stream)
}
}
impl MetricsHierarchy for Namespace { impl MetricsHierarchy for Namespace {
fn basename(&self) -> String { fn basename(&self) -> String {
...@@ -95,47 +31,3 @@ impl MetricsHierarchy for Namespace { ...@@ -95,47 +31,3 @@ impl MetricsHierarchy for Namespace {
&self.metrics_registry &self.metrics_registry
} }
} }
#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use crate::{DistributedRuntime, Runtime};
use super::*;
// todo - make a distributed runtime fixture
// todo - two options - fully mocked or integration test
#[tokio::test]
async fn test_publish() {
let rt = Runtime::from_current().unwrap();
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
let ns = dtr.namespace("test_namespace_publish".to_string()).unwrap();
ns.publish("test_event", &"test".to_string()).await.unwrap();
rt.shutdown();
}
#[tokio::test]
async fn test_subscribe() {
let rt = Runtime::from_current().unwrap();
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
let ns = dtr
.namespace("test_namespace_subscribe".to_string())
.unwrap();
// Create a subscriber
let mut subscriber = ns.subscribe("test_event").await.unwrap();
// Publish a message
ns.publish("test_event", &"test_message".to_string())
.await
.unwrap();
// Receive the message
if let Some(msg) = subscriber.next().await {
let received = String::from_utf8(msg.payload.to_vec()).unwrap();
assert_eq!(received, "\"test_message\"");
}
rt.shutdown();
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
pub mod events;
use super::{DistributedRuntime, Runtime}; use super::{DistributedRuntime, Runtime};
/// A trait for objects that proivde access to the [Runtime] /// A trait for objects that proivde access to the [Runtime]
pub trait RuntimeProvider { pub trait RuntimeProvider {
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::fmt::Debug;
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
// #[async_trait]
// pub trait Publisher: Debug + Clone + Send + Sync {
// async fn publish(&self, event: &(impl Serialize + Send + Sync)) -> Result<()>;
// }
/// An [EventPublisher] is an object that can publish events.
///
/// Each implementation of [EventPublisher] will define the root subject.
#[async_trait]
pub trait EventPublisher {
/// The base subject used for this implementation of the [EventPublisher].
fn subject(&self) -> String;
/// Publish a single event to the event plane. The `event_name` will be `.` concatenated with the
/// base subject provided by the implementation.
async fn publish(
&self,
event_name: impl AsRef<str> + Send + Sync,
event: &(impl Serialize + Send + Sync),
) -> Result<()>;
/// Publish a single event as bytes to the event plane. The `event_name` will be `.` concatenated with the
/// base subject provided by the implementation.
async fn publish_bytes(
&self,
event_name: impl AsRef<str> + Send + Sync,
bytes: Vec<u8>,
) -> Result<()>;
// /// Create a new publisher for the given event name. The `event_name` will be `.` concatenated with the
// /// base subject provided by the implementation.
// fn publisher(&self, event_name: impl AsRef<str>) -> impl Publisher;
// /// Create a new publisher for the given event name. The `event_name` will be `.` concatenated with the
// fn publisher(&self, event_name: impl AsRef<str>) -> Result<Publisher>;
// fn publisher_bytes(&self, event_name: impl AsRef<str>) -> &PublisherBytes;
}
/// An [EventSubscriber] is an object that can subscribe to events.
///
/// This trait provides methods to subscribe to events published on specific subjects.
#[async_trait]
pub trait EventSubscriber {
/// Subscribe to events with the given event name.
/// The `event_name` will be `.` concatenated with the base subject provided by the implementation.
/// Returns a subscriber that can be used to receive events.
async fn subscribe(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<async_nats::Subscriber>;
/// Subscribe to events with the given event name and deserialize them to the specified type.
/// This is a convenience method that combines subscribe and deserialization.
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
&self,
event_name: impl AsRef<str> + Send + Sync,
) -> Result<impl futures::Stream<Item = Result<T>> + Send>;
}
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
//! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together. //! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together.
use crate::metrics::MetricsHierarchy; use crate::metrics::MetricsHierarchy;
use crate::protocols::EndpointId; use crate::protocols::EndpointId;
use crate::traits::events::EventPublisher;
use anyhow::Result; use anyhow::Result;
use async_nats::connection::State; use async_nats::connection::State;
...@@ -840,27 +839,26 @@ impl NatsQueue { ...@@ -840,27 +839,26 @@ impl NatsQueue {
} }
} }
#[async_trait] impl NatsQueue {
impl EventPublisher for NatsQueue { pub fn event_subject(&self) -> String {
fn subject(&self) -> String {
self.stream_name.clone() self.stream_name.clone()
} }
async fn publish( pub async fn publish_event(
&self, &self,
event_name: impl AsRef<str> + Send + Sync, event_name: impl AsRef<str> + Send + Sync,
event: &(impl Serialize + Send + Sync), event: &(impl Serialize + Send + Sync),
) -> Result<()> { ) -> Result<()> {
let bytes = serde_json::to_vec(event)?; let bytes = serde_json::to_vec(event)?;
self.publish_bytes(event_name, bytes).await self.publish_event_bytes(event_name, bytes).await
} }
async fn publish_bytes( pub async fn publish_event_bytes(
&self, &self,
event_name: impl AsRef<str> + Send + Sync, event_name: impl AsRef<str> + Send + Sync,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref()); let subject = format!("{}.{}", self.event_subject(), event_name.as_ref());
// Note: enqueue_task requires &mut self, but EventPublisher requires &self // Note: enqueue_task requires &mut self, but EventPublisher requires &self
// We need to ensure the client is connected and use it directly // We need to ensure the client is connected and use it directly
...@@ -1036,10 +1034,10 @@ mod tests { ...@@ -1036,10 +1034,10 @@ mod tests {
"message4".to_string(), "message4".to_string(),
]; ];
// Using the EventPublisher trait to publish messages // Publish messages using NatsQueue
for (idx, msg) in message_strings.iter().enumerate() { for (idx, msg) in message_strings.iter().enumerate() {
queue1 queue1
.publish("queue", msg) .publish_event("queue", msg)
.await .await
.unwrap_or_else(|_| panic!("Failed to publish message {}", idx + 1)); .unwrap_or_else(|_| panic!("Failed to publish message {}", idx + 1));
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment