"benchmarks/vscode:/vscode.git/clone" did not exist on "bea92eae727b26fbff0b8059c06d0b11d256315a"
namespace.rs 4.06 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use anyhow::Context;
Ryan Olson's avatar
Ryan Olson committed
5
use async_trait::async_trait;
6
7
use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
Ryan Olson's avatar
Ryan Olson committed
8
9

use super::*;
10
use crate::metrics::MetricsRegistry;
11
use crate::traits::events::{EventPublisher, EventSubscriber};
Ryan Olson's avatar
Ryan Olson committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

#[async_trait]
impl EventPublisher for Namespace {
    fn subject(&self) -> String {
        format!("namespace.{}", self.name)
    }

    async fn publish(
        &self,
        event_name: impl AsRef<str> + Send + Sync,
        event: &(impl Serialize + Send + Sync),
    ) -> Result<()> {
        let bytes = serde_json::to_vec(event)?;
        self.publish_bytes(event_name, bytes).await
    }

    async fn publish_bytes(
        &self,
        event_name: impl AsRef<str> + Send + Sync,
        bytes: Vec<u8>,
    ) -> Result<()> {
        let subject = format!("{}.{}", self.subject(), event_name.as_ref());
        Ok(self
            .drt()
            .nats_client()
            .client()
            .publish(subject, bytes.into())
            .await?)
    }
}

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#[async_trait]
impl EventSubscriber for Namespace {
    async fn subscribe(
        &self,
        event_name: impl AsRef<str> + Send + Sync,
    ) -> Result<async_nats::Subscriber> {
        let subject = format!("{}.{}", self.subject(), event_name.as_ref());
        Ok(self.drt().nats_client().client().subscribe(subject).await?)
    }

    async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
        &self,
        event_name: impl AsRef<str> + Send + Sync,
    ) -> Result<impl Stream<Item = Result<T>> + Send> {
        let subscriber = self.subscribe(event_name).await?;

        // Transform the subscriber into a stream of deserialized events
        let stream = subscriber.map(move |msg| {
            serde_json::from_slice::<T>(&msg.payload)
62
                .with_context(|| format!("Failed to deserialize event payload: {:?}", msg.payload))
63
64
65
66
67
68
        });

        Ok(stream)
    }
}

69
70
71
72
73
74
impl MetricsRegistry for Namespace {
    fn basename(&self) -> String {
        self.name.clone()
    }

    fn parent_hierarchy(&self) -> Vec<String> {
75
76
        // Build as: [ "" (DRT), non-empty parent basenames from root -> leaf ]
        let mut names = vec![String::new()]; // Start with empty string for DRT
77

78
79
80
81
82
83
        // Collect parent basenames from root to leaf
        let parent_names: Vec<String> =
            std::iter::successors(self.parent.as_deref(), |ns| ns.parent.as_deref())
                .map(|ns| ns.basename())
                .filter(|name| !name.is_empty())
                .collect();
84

85
86
87
        // Append parent names in reverse order (root to leaf)
        names.extend(parent_names.into_iter().rev());
        names
88
    }
89
90
}

91
#[cfg(feature = "integration")]
Ryan Olson's avatar
Ryan Olson committed
92
93
94
95
96
97
98
99
#[cfg(test)]
mod tests {
    use super::*;

    // todo - make a distributed runtime fixture
    // todo - two options - fully mocked or integration test
    #[tokio::test]
    async fn test_publish() {
100
101
        let rt = Runtime::from_current().unwrap();
        let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
102
103
        let ns = dtr.namespace("test_namespace_publish".to_string()).unwrap();
        ns.publish("test_event", &"test".to_string()).await.unwrap();
104
        rt.shutdown();
Ryan Olson's avatar
Ryan Olson committed
105
    }
106
107
108
109
110

    #[tokio::test]
    async fn test_subscribe() {
        let rt = Runtime::from_current().unwrap();
        let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
111
112
113
        let ns = dtr
            .namespace("test_namespace_subscribe".to_string())
            .unwrap();
114
115

        // Create a subscriber
116
        let mut subscriber = ns.subscribe("test_event").await.unwrap();
117
118

        // Publish a message
119
        ns.publish("test_event", &"test_message".to_string())
120
121
122
123
124
125
126
127
128
129
130
            .await
            .unwrap();

        // Receive the message
        if let Some(msg) = subscriber.next().await {
            let received = String::from_utf8(msg.payload.to_vec()).unwrap();
            assert_eq!(received, "\"test_message\"");
        }

        rt.shutdown();
    }
Ryan Olson's avatar
Ryan Olson committed
131
}