Unverified Commit ebde0f66 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: event plane apis and nats implementation (#5624)

parent e6ad5bc5
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
//! These are the low-level building blocks for the distributed system. //! These are the low-level building blocks for the distributed system.
pub mod etcd; pub mod etcd;
pub mod event_plane;
pub mod nats; pub mod nats;
pub mod tcp; pub mod tcp;
mod utils; mod utils;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Event plane codec for serializing/deserializing envelopes and payloads.
use anyhow::Result;
use bytes::Bytes;
use serde::{Serialize, de::DeserializeOwned};
use super::EventEnvelope;
#[derive(Debug, Clone, Copy, Default)]
pub struct MsgpackCodec;
impl MsgpackCodec {
pub fn encode_envelope(&self, envelope: &EventEnvelope) -> Result<Bytes> {
Ok(Bytes::from(rmp_serde::to_vec_named(envelope)?))
}
pub fn decode_envelope(&self, bytes: &Bytes) -> Result<EventEnvelope> {
Ok(rmp_serde::from_slice(bytes)?)
}
pub fn encode_payload<T: Serialize>(&self, payload: &T) -> Result<Bytes> {
Ok(Bytes::from(rmp_serde::to_vec_named(payload)?))
}
pub fn decode_payload<T: DeserializeOwned>(&self, bytes: &Bytes) -> Result<T> {
Ok(rmp_serde::from_slice(bytes)?)
}
pub fn name(&self) -> &'static str {
"msgpack"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone, PartialEq, Serialize, serde::Deserialize)]
struct TestEvent {
worker_id: u64,
message: String,
}
#[test]
fn test_msgpack_codec_envelope_roundtrip() {
let codec = MsgpackCodec;
let envelope = EventEnvelope {
publisher_id: 12345,
sequence: 42,
published_at: 1700000000000,
topic: "test-topic".to_string(),
payload: Bytes::from("test payload"),
};
let encoded = codec.encode_envelope(&envelope).unwrap();
let decoded = codec.decode_envelope(&encoded).unwrap();
assert_eq!(decoded.publisher_id, envelope.publisher_id);
assert_eq!(decoded.sequence, envelope.sequence);
assert_eq!(decoded.published_at, envelope.published_at);
assert_eq!(decoded.topic, envelope.topic);
assert_eq!(decoded.payload, envelope.payload);
}
#[test]
fn test_msgpack_codec_payload_roundtrip() {
let codec = MsgpackCodec;
let event = TestEvent {
worker_id: 123,
message: "hello world".to_string(),
};
let encoded = codec.encode_payload(&event).unwrap();
let decoded: TestEvent = codec.decode_payload(&encoded).unwrap();
assert_eq!(decoded, event);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Binary frame format for event transport
//!
//! - **Fixed 5-byte header**
//! - **Versioned**: Protocol evolution support
//! - **Payload length**: Enables proper frame boundary detection
use bytes::{Buf, BufMut, Bytes, BytesMut};
use thiserror::Error;
/// Frame protocol version
pub const FRAME_VERSION: u8 = 1;
/// Fixed header size in bytes
pub const FRAME_HEADER_SIZE: usize = 5;
/// Frame encoding/decoding errors
#[derive(Debug, Error)]
pub enum FrameError {
#[error("Incomplete frame header: expected {FRAME_HEADER_SIZE} bytes, got {0} bytes")]
IncompleteHeader(usize),
#[error("Incomplete frame payload: expected {expected} bytes, got {available} bytes")]
IncompletePayload { expected: usize, available: usize },
#[error("Unsupported protocol version: {0} (expected {FRAME_VERSION})")]
UnsupportedVersion(u8),
#[error("Frame too large: {0} bytes exceeds maximum")]
FrameTooLarge(usize),
}
/// Frame header (5 bytes fixed)
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FrameHeader {
/// Protocol version (currently 1)
pub version: u8,
/// Payload length in bytes
pub payload_len: u32,
}
impl FrameHeader {
/// Encode header to bytes
pub fn encode(&self, buf: &mut BytesMut) {
buf.put_u8(self.version);
buf.put_u32(self.payload_len);
}
/// Decode header from bytes
pub fn decode(buf: &mut impl Buf) -> Result<Self, FrameError> {
if buf.remaining() < FRAME_HEADER_SIZE {
return Err(FrameError::IncompleteHeader(buf.remaining()));
}
let version = buf.get_u8();
if version != FRAME_VERSION {
return Err(FrameError::UnsupportedVersion(version));
}
let payload_len = buf.get_u32();
Ok(FrameHeader {
version,
payload_len,
})
}
/// Get total frame size (header + payload)
pub fn frame_size(&self) -> usize {
FRAME_HEADER_SIZE + self.payload_len as usize
}
}
/// Complete frame (header + payload)
#[derive(Debug, Clone)]
pub struct Frame {
pub header: FrameHeader,
pub payload: Bytes,
}
impl Frame {
pub fn new(payload: Bytes) -> Self {
Self {
header: FrameHeader {
version: FRAME_VERSION,
payload_len: payload.len() as u32,
},
payload,
}
}
/// Encode frame to wire format
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.header.frame_size());
self.header.encode(&mut buf);
buf.put(self.payload.clone());
buf.freeze()
}
/// Decode frame from wire format
pub fn decode(mut buf: impl Buf) -> Result<Self, FrameError> {
let header = FrameHeader::decode(&mut buf)?;
let payload_len = header.payload_len as usize;
if buf.remaining() < payload_len {
return Err(FrameError::IncompletePayload {
expected: payload_len,
available: buf.remaining(),
});
}
let payload = buf.copy_to_bytes(payload_len);
Ok(Frame { header, payload })
}
pub fn size(&self) -> usize {
self.header.frame_size()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_frame_header_encode_decode() {
let header = FrameHeader {
version: FRAME_VERSION,
payload_len: 1024,
};
let mut buf = BytesMut::new();
header.encode(&mut buf);
assert_eq!(buf.len(), FRAME_HEADER_SIZE);
let decoded = FrameHeader::decode(&mut buf).unwrap();
assert_eq!(decoded.version, header.version);
assert_eq!(decoded.payload_len, header.payload_len);
}
#[test]
fn test_frame_encode_decode_roundtrip() {
let payload = Bytes::from("hello world");
let frame = Frame::new(payload.clone());
let encoded = frame.encode();
let decoded = Frame::decode(encoded).unwrap();
assert_eq!(decoded.header.version, FRAME_VERSION);
assert_eq!(decoded.payload, payload);
}
#[test]
fn test_frame_error_incomplete_header() {
let buf = Bytes::from(vec![1, 2, 3]); // Only 3 bytes
let result = Frame::decode(buf);
assert!(matches!(result, Err(FrameError::IncompleteHeader(3))));
}
#[test]
fn test_frame_error_incomplete_payload() {
let mut buf = BytesMut::new();
let header = FrameHeader {
version: FRAME_VERSION,
payload_len: 1000, // Claims 1000 bytes
};
header.encode(&mut buf);
buf.put_slice(b"short"); // Only 5 bytes provided
let result = Frame::decode(buf.freeze());
assert!(matches!(
result,
Err(FrameError::IncompletePayload {
expected: 1000,
available: 5
})
));
}
#[test]
fn test_frame_error_unsupported_version() {
let mut buf = BytesMut::new();
buf.put_u8(99); // Invalid version
buf.put_u32(0); // payload_len
let result = FrameHeader::decode(&mut buf);
assert!(matches!(result, Err(FrameError::UnsupportedVersion(99))));
}
#[test]
fn test_zero_length_payload() {
let payload = Bytes::new();
let frame = Frame::new(payload.clone());
let encoded = frame.encode();
assert_eq!(encoded.len(), FRAME_HEADER_SIZE);
let decoded = Frame::decode(encoded).unwrap();
assert_eq!(decoded.payload.len(), 0);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Event Plane: Transport-agnostic pub/sub communication layer.
mod codec;
mod frame;
mod nats_transport;
mod traits;
mod transport;
pub use codec::MsgpackCodec;
pub use frame::{Frame, FrameError, FrameHeader};
pub use nats_transport::NatsTransport;
pub use traits::{EventEnvelope, EventStream, TypedEventStream};
pub use transport::{EventTransportRx, EventTransportTx, WireStream};
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! NATS transport implementation for the event plane.
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use super::transport::{EventTransportRx, EventTransportTx, WireStream};
use crate::DistributedRuntime;
use crate::discovery::EventTransportKind;
pub struct NatsTransport {
drt: DistributedRuntime,
}
impl NatsTransport {
pub fn new(drt: DistributedRuntime) -> Self {
Self { drt }
}
}
#[async_trait]
impl EventTransportTx for NatsTransport {
async fn publish(&self, subject: &str, envelope_bytes: Bytes) -> Result<()> {
self.drt
.kv_router_nats_publish(subject.to_string(), envelope_bytes)
.await
}
fn kind(&self) -> EventTransportKind {
EventTransportKind::Nats
}
}
#[async_trait]
impl EventTransportRx for NatsTransport {
async fn subscribe(&self, subject: &str) -> Result<WireStream> {
let subscriber = self
.drt
.kv_router_nats_subscribe(subject.to_string())
.await?;
let stream = subscriber.map(|msg| Ok(msg.payload));
Ok(Box::pin(stream))
}
fn kind(&self) -> EventTransportKind {
EventTransportKind::Nats
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Event plane types for transport-agnostic pub/sub.
use anyhow::Result;
use bytes::Bytes;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EventEnvelope {
/// Unique identifier of the publisher (typically discovery instance_id)
pub publisher_id: u64,
/// Monotonically increasing sequence number per publisher
pub sequence: u64,
/// Unix timestamp in milliseconds when the event was published
pub published_at: u64,
/// The topic this event was published to
pub topic: String,
/// The serialized event payload
#[serde(with = "bytes_serde")]
pub payload: Bytes,
}
/// Serde helper for Bytes serialization with MessagePack
mod bytes_serde {
use bytes::Bytes;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(bytes: &Bytes, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_bytes(bytes)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Bytes, D::Error>
where
D: Deserializer<'de>,
{
let bytes: Vec<u8> = Deserialize::deserialize(deserializer)?;
Ok(Bytes::from(bytes))
}
}
/// A stream of event envelopes from a subscription.
pub type EventStream = Pin<Box<dyn Stream<Item = Result<EventEnvelope>> + Send>>;
/// A stream of typed events with their envelopes.
pub type TypedEventStream<T> = Pin<Box<dyn Stream<Item = Result<(EventEnvelope, T)>> + Send>>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_envelope_msgpack_serialization() {
let envelope = EventEnvelope {
publisher_id: 12345,
sequence: 1,
published_at: 1700000000000,
topic: "test-topic".to_string(),
payload: Bytes::from("test payload"),
};
let msgpack = rmp_serde::to_vec(&envelope).unwrap();
let deserialized: EventEnvelope = rmp_serde::from_slice(&msgpack).unwrap();
assert_eq!(deserialized.publisher_id, 12345);
assert_eq!(deserialized.sequence, 1);
assert_eq!(deserialized.published_at, 1700000000000);
assert_eq!(deserialized.topic, "test-topic");
assert_eq!(deserialized.payload, Bytes::from("test payload"));
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Transport trait abstractions for the event plane.
//!
//! These traits define the low-level interface for different transport backends
//! (NATS, ZMQ) to implement. The event plane uses these traits to provide a
//! unified pub/sub API.
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use std::pin::Pin;
use crate::discovery::EventTransportKind;
/// Stream of raw wire bytes from a subscription.
pub type WireStream = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>;
/// Transport-agnostic interface for publishing events.
#[async_trait]
pub trait EventTransportTx: Send + Sync {
/// Publish raw envelope bytes to a subject/topic.
async fn publish(&self, subject: &str, envelope_bytes: Bytes) -> Result<()>;
fn kind(&self) -> EventTransportKind;
}
/// Transport-agnostic interface for subscribing to events.
#[async_trait]
pub trait EventTransportRx: Send + Sync {
/// Subscribe to a subject/topic and return a stream of raw envelope bytes.
async fn subscribe(&self, subject: &str) -> Result<WireStream>;
fn kind(&self) -> EventTransportKind;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment