sink.rs 3.49 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use anyhow::Context as _;
5
6
use async_nats::jetstream;
use async_trait::async_trait;
7
use dynamo_runtime::transports::nats;
8
9
10
11
12
use std::sync::Arc;
use tokio::sync::broadcast;

use super::{bus, handle::AuditRecord};

13
#[async_trait]
14
15
pub trait AuditSink: Send + Sync {
    fn name(&self) -> &'static str;
16
    async fn emit(&self, rec: &AuditRecord);
17
18
19
}

pub struct StderrSink;
20
#[async_trait]
21
22
23
24
impl AuditSink for StderrSink {
    fn name(&self) -> &'static str {
        "stderr"
    }
25
    async fn emit(&self, rec: &AuditRecord) {
26
27
28
29
30
31
32
33
34
        match serde_json::to_string(rec) {
            Ok(js) => {
                tracing::info!(target="dynamo_llm::audit", log_type="audit", record=%js, "audit")
            }
            Err(e) => tracing::warn!("audit: serialize failed: {e}"),
        }
    }
}

35
36
37
38
39
40
pub struct NatsSink {
    js: jetstream::Context,
    subject: String,
}

impl NatsSink {
41
    pub fn new(nats_client: dynamo_runtime::transports::nats::Client) -> Self {
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        let subject = std::env::var("DYN_AUDIT_NATS_SUBJECT")
            .unwrap_or_else(|_| "dynamo.audit.v1".to_string());
        Self {
            js: nats_client.jetstream().clone(),
            subject,
        }
    }
}

#[async_trait]
impl AuditSink for NatsSink {
    fn name(&self) -> &'static str {
        "nats"
    }

    async fn emit(&self, rec: &AuditRecord) {
        match serde_json::to_vec(rec) {
            Ok(bytes) => {
                if let Err(e) = self.js.publish(self.subject.clone(), bytes.into()).await {
                    tracing::warn!("nats: publish failed: {e}");
                }
            }
            Err(e) => tracing::warn!("nats: serialize failed: {e}"),
        }
    }
}

69
async fn parse_sinks_from_env() -> anyhow::Result<Vec<Arc<dyn AuditSink>>> {
70
71
72
73
74
    let cfg = std::env::var("DYN_AUDIT_SINKS").unwrap_or_else(|_| "stderr".into());
    let mut out: Vec<Arc<dyn AuditSink>> = Vec::new();
    for name in cfg.split(',').map(|s| s.trim().to_lowercase()) {
        match name.as_str() {
            "stderr" | "" => out.push(Arc::new(StderrSink)),
75
            "nats" => {
76
77
78
79
80
                let nats_client = nats::ClientOptions::default()
                    .connect()
                    .await
                    .context("Attempting to connect NATS sink from env var DYN_AUDIT_SINKS")?;
                out.push(Arc::new(NatsSink::new(nats_client)));
81
            }
82
83
84
85
            // "pg"   => out.push(Arc::new(PostgresSink::from_env())),
            other => tracing::warn!(%other, "audit: unknown sink ignored"),
        }
    }
86
    Ok(out)
87
88
89
}

/// spawn one worker per sink; each subscribes to the bus (off hot path)
90
91
pub async fn spawn_workers_from_env() -> anyhow::Result<()> {
    let sinks = parse_sinks_from_env().await?;
92
93
    for sink in sinks {
        let name = sink.name();
94
        let mut rx: broadcast::Receiver<AuditRecord> = bus::subscribe();
95
96
97
        tokio::spawn(async move {
            loop {
                match rx.recv().await {
98
                    Ok(rec) => sink.emit(&rec).await,
99
100
101
102
103
104
105
106
107
108
109
                    Err(broadcast::error::RecvError::Lagged(n)) => tracing::warn!(
                        sink = name,
                        dropped = n,
                        "audit bus lagged; dropped records"
                    ),
                    Err(broadcast::error::RecvError::Closed) => break,
                }
            }
        });
    }
    tracing::info!("Audit sinks ready.");
110
    Ok(())
111
}