sink.rs 3.58 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
use async_nats::jetstream;
use async_trait::async_trait;
6
7
8
9
10
use std::sync::Arc;
use tokio::sync::broadcast;

use super::{bus, handle::AuditRecord};

11
#[async_trait]
12
13
pub trait AuditSink: Send + Sync {
    fn name(&self) -> &'static str;
14
    async fn emit(&self, rec: &AuditRecord);
15
16
17
}

pub struct StderrSink;
18
#[async_trait]
19
20
21
22
impl AuditSink for StderrSink {
    fn name(&self) -> &'static str {
        "stderr"
    }
23
    async fn emit(&self, rec: &AuditRecord) {
24
25
26
27
28
29
30
31
32
        match serde_json::to_string(rec) {
            Ok(js) => {
                tracing::info!(target="dynamo_llm::audit", log_type="audit", record=%js, "audit")
            }
            Err(e) => tracing::warn!("audit: serialize failed: {e}"),
        }
    }
}

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
pub struct NatsSink {
    js: jetstream::Context,
    subject: String,
}

impl NatsSink {
    pub fn new(nats_client: &dynamo_runtime::transports::nats::Client) -> Self {
        let subject = std::env::var("DYN_AUDIT_NATS_SUBJECT")
            .unwrap_or_else(|_| "dynamo.audit.v1".to_string());
        Self {
            js: nats_client.jetstream().clone(),
            subject,
        }
    }
}

#[async_trait]
impl AuditSink for NatsSink {
    fn name(&self) -> &'static str {
        "nats"
    }

    async fn emit(&self, rec: &AuditRecord) {
        match serde_json::to_vec(rec) {
            Ok(bytes) => {
                if let Err(e) = self.js.publish(self.subject.clone(), bytes.into()).await {
                    tracing::warn!("nats: publish failed: {e}");
                }
            }
            Err(e) => tracing::warn!("nats: serialize failed: {e}"),
        }
    }
}

fn parse_sinks_from_env(
    nats_client: Option<&dynamo_runtime::transports::nats::Client>,
) -> Vec<Arc<dyn AuditSink>> {
70
71
72
73
74
    let cfg = std::env::var("DYN_AUDIT_SINKS").unwrap_or_else(|_| "stderr".into());
    let mut out: Vec<Arc<dyn AuditSink>> = Vec::new();
    for name in cfg.split(',').map(|s| s.trim().to_lowercase()) {
        match name.as_str() {
            "stderr" | "" => out.push(Arc::new(StderrSink)),
75
76
77
78
79
80
81
82
83
            "nats" => {
                if let Some(client) = nats_client {
                    out.push(Arc::new(NatsSink::new(client)));
                } else {
                    tracing::warn!(
                        "NATS sink requested but no DistributedRuntime NATS client available; skipping"
                    );
                }
            }
84
85
86
87
88
89
90
91
            // "pg"   => out.push(Arc::new(PostgresSink::from_env())),
            other => tracing::warn!(%other, "audit: unknown sink ignored"),
        }
    }
    out
}

/// spawn one worker per sink; each subscribes to the bus (off hot path)
92
93
94
pub fn spawn_workers_from_env(drt: Option<&dynamo_runtime::DistributedRuntime>) {
    let nats_client = drt.and_then(|d| d.nats_client());
    let sinks = parse_sinks_from_env(nats_client);
95
96
97
98
99
100
    for sink in sinks {
        let name = sink.name();
        let mut rx: broadcast::Receiver<Arc<AuditRecord>> = bus::subscribe();
        tokio::spawn(async move {
            loop {
                match rx.recv().await {
101
                    Ok(rec) => sink.emit(&rec).await,
102
103
104
105
106
107
108
109
110
111
112
113
                    Err(broadcast::error::RecvError::Lagged(n)) => tracing::warn!(
                        sink = name,
                        dropped = n,
                        "audit bus lagged; dropped records"
                    ),
                    Err(broadcast::error::RecvError::Closed) => break,
                }
            }
        });
    }
    tracing::info!("Audit sinks ready.");
}