main.rs 10 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
//! Metrics is a metrics aggregator designed to operate within a namespace and collect
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20
21
22
23
24
//! metrics from all workers.
//!
//! Metrics will collect for now:
//!
//! - LLM Worker Load:Capacity
//!   - These metrics will be scraped by the LLM NATS Service API's stats request
//!   - Request Slots: [Active, Total]
//!   - KV Cache Blocks: [Active, Total]
25
26
27
28
//! - KV Hit Rate:
//!   - These metrics will be collected from KV hit rate events published by the KV router
//!   - ISL Blocks: Cumulative count of total blocks in all KV hit rate events
//!   - Overlap Blocks: Cumulative count of blocks that were already in the KV cache
29
use clap::Parser;
Neelay Shah's avatar
Neelay Shah committed
30
31
32
use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
Ryan Olson's avatar
Ryan Olson committed
33
    error, logging,
34
    traits::events::{EventPublisher, EventSubscriber},
Ryan Olson's avatar
Ryan Olson committed
35
36
37
    utils::{Duration, Instant},
    DistributedRuntime, ErrorContext, Result, Runtime, Worker,
};
38
39
use futures::stream::StreamExt;
use std::sync::Arc;
Ryan Olson's avatar
Ryan Olson committed
40

41
// Import from our library
42
use metrics::{
43
    collect_endpoints, extract_metrics, postprocess_metrics, LLMWorkerLoadCapacityConfig,
44
    MetricsMode, PrometheusMetricsCollector,
45
46
};

47
/// CLI arguments for the metrics application
48
49
50
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
51
52
53
54
    /// Namespace to operate in and subscribe to events on
    #[arg(long, env = "DYN_NAMESPACE", default_value = "dynamo")]
    namespace: String,

55
56
57
58
59
60
61
62
    /// Component to scrape metrics from
    #[arg(long)]
    component: String,

    /// Endpoint to scrape metrics from
    #[arg(long)]
    endpoint: String,

63
    /// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
64
    #[arg(long, default_value = "1")]
65
    poll_interval: u64,
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

    /// Host for serving or pushing prometheus metrics (default: 0.0.0.0)
    #[arg(
        long,
        default_value = "0.0.0.0",
        help_heading = "Prometheus Metrics Config"
    )]
    host: String,

    /// Port to run the Prometheus metrics server on (default: 9091)
    #[arg(
        long,
        default_value = "9091",
        help_heading = "Prometheus Metrics Config"
    )]
    port: u16,

    /// Push metrics to an external Prometheus Pushgateway instead of hosting them in-process
    #[arg(long, help_heading = "Prometheus Metrics Config")]
    push: bool,

    /// Push interval in seconds, when using push mode (minimum 1 second, default: 2)
    #[arg(long, default_value = "2", help_heading = "Prometheus Metrics Config")]
    push_interval: u64,
90
}
Ryan Olson's avatar
Ryan Olson committed
91

92
93
94
95
fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
    if args.component.is_empty() {
        return Err(error!("Component name cannot be empty"));
    }
Ryan Olson's avatar
Ryan Olson committed
96

97
98
    if args.endpoint.is_empty() {
        return Err(error!("Endpoint name cannot be empty"));
Ryan Olson's avatar
Ryan Olson committed
99
100
    }

101
102
    if args.poll_interval < 1 {
        return Err(error!("Polling interval must be at least 1 second"));
Ryan Olson's avatar
Ryan Olson committed
103
104
    }

105
106
107
108
    if args.push && args.push_interval < 1 {
        return Err(error!("Push interval must be at least 1 second"));
    }

Ryan Olson's avatar
Ryan Olson committed
109
    Ok(LLMWorkerLoadCapacityConfig {
110
111
        component_name: args.component.clone(),
        endpoint_name: args.endpoint.clone(),
Ryan Olson's avatar
Ryan Olson committed
112
113
114
    })
}

115
116
async fn app(runtime: Runtime) -> Result<()> {
    let args = Args::parse();
117
    let config = get_config(&args)?;
118
    tracing::debug!("Config: {config:?}");
Ryan Olson's avatar
Ryan Olson committed
119
120
121

    let drt = DistributedRuntime::from_settings(runtime.clone()).await?;

122
    let namespace = drt.namespace(args.namespace)?;
Ryan Olson's avatar
Ryan Olson committed
123
124
    let component = namespace.component("count")?;

125
    // Create unique instance of Count
Ryan Olson's avatar
Ryan Olson committed
126
    let key = format!("{}/instance", component.etcd_path());
127
    tracing::debug!("Creating unique instance of Count at {key}");
Ryan Olson's avatar
Ryan Olson committed
128
    drt.etcd_client()
129
        .expect("Unreachable because of DistributedRuntime::from_settings above")
Ryan Olson's avatar
Ryan Olson committed
130
131
132
        .kv_create(
            key,
            serde_json::to_vec_pretty(&config)?,
133
            Some(drt.primary_lease().unwrap().id()),
Ryan Olson's avatar
Ryan Olson committed
134
135
136
137
        )
        .await
        .context("Unable to create unique instance of Count; possibly one already exists")?;

138
139
    let target_component = namespace.component(&config.component_name)?;
    let target_endpoint = target_component.endpoint(&config.endpoint_name);
Ryan Olson's avatar
Ryan Olson committed
140

141
    let service_path = target_endpoint.path();
Ryan Olson's avatar
Ryan Olson committed
142
    let service_subject = target_endpoint.subject();
143
    tracing::info!("Scraping endpoint {service_path} for stats");
Ryan Olson's avatar
Ryan Olson committed
144

145
146
    // Safety: DistributedRuntime::from_settings ensures this is Some
    let token = drt.primary_lease().unwrap().child_token();
147
    let event_name = format!("l2c.{}.{}", config.component_name, config.endpoint_name);
Ryan Olson's avatar
Ryan Olson committed
148

149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
    // Initialize Prometheus metrics with the selected mode
    let metrics_collector = PrometheusMetricsCollector::new()?;
    let metrics_collector = Arc::new(tokio::sync::Mutex::new(metrics_collector));

    // Start metrics collection in the selected mode
    let metrics_mode = if args.push {
        MetricsMode::Push {
            host: args.host,
            port: args.port,
            job: "dynamo_push_metrics".to_string(),
            interval: args.push_interval,
        }
    } else {
        MetricsMode::Pull {
            host: args.host,
            port: args.port,
        }
    };

    metrics_collector.lock().await.start(metrics_mode)?;
169

170
    // TODO: Consider removing event subscription until metrics are more standardized
171
    // Subscribe to KV hit rate events
172
    let kv_hit_rate_subject = KV_HIT_RATE_SUBJECT;
173
    tracing::debug!("Subscribing to KV hit rate events on subject: {kv_hit_rate_subject}");
174

175
    // Clone fields for the event subscription task
176
177
    let config_clone = config.clone();
    let namespace_clone = namespace.clone();
178
    let metrics_collector_clone = metrics_collector.clone();
179
180
181
182
183

    // Spawn a task to handle KV hit rate events
    tokio::spawn(async move {
        match namespace_clone.subscribe(kv_hit_rate_subject).await {
            Ok(mut subscriber) => {
184
                tracing::debug!("Successfully subscribed to KV hit rate events");
185
186
187
188
189
190
191

                while let Some(msg) = subscriber.next().await {
                    match serde_json::from_slice::<KVHitRateEvent>(&msg.payload) {
                        Ok(event) => {
                            // TODO: Lower to debug
                            let cache_hit_pct =
                                (event.overlap_blocks as f64 / event.isl_blocks as f64) * 100.0;
192
                            tracing::debug!(
193
194
195
196
197
198
199
200
                                "Received KV hit rate event: worker_id={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
                                event.worker_id,
                                event.isl_blocks,
                                event.overlap_blocks,
                                cache_hit_pct
                            );

                            // Update metrics with the event data
201
                            let mut metrics = metrics_collector_clone.lock().await;
202
203
204
205
206
207
208
209
                            metrics.update_kv_hit_rate(
                                &config_clone,
                                event.worker_id,
                                event.isl_blocks,
                                event.overlap_blocks,
                            );
                        }
                        Err(e) => {
210
                            tracing::warn!("Failed to deserialize KV hit rate event: {e}");
211
212
213
214
215
216
217
218
219
220
221
                        }
                    }
                }

                tracing::warn!("KV hit rate event subscription stream ended");
            }
            Err(e) => {
                tracing::error!("Failed to subscribe to KV hit rate events: {:?}", e);
            }
        }
    });
Ryan Olson's avatar
Ryan Olson committed
222
223

    loop {
224
        let next = Instant::now() + Duration::from_secs(args.poll_interval);
Ryan Olson's avatar
Ryan Olson committed
225

226
227
228
229
        // Collect and process metrics
        let scrape_timeout = Duration::from_secs(1);
        let endpoints =
            collect_endpoints(&target_component, &service_subject, scrape_timeout).await?;
230
231
232
233
234
        if endpoints.is_empty() {
            tracing::warn!("No endpoints found matching {service_path}");
            continue;
        }

235
236
        let metrics = extract_metrics(&endpoints);
        let processed = postprocess_metrics(&metrics, &endpoints);
237
        if processed.endpoints.is_empty() {
238
            tracing::warn!("No metrics found matching {service_path}");
239
240
241
        } else {
            tracing::info!("Aggregated metrics: {processed:?}");
        }
Ryan Olson's avatar
Ryan Olson committed
242

243
        // Update Prometheus metrics
244
        metrics_collector.lock().await.update(&config, &processed);
245

246
247
248
        // TODO: Enable KV Routers to subscribe to metrics events published here
        // for a single view of the aggregated metrics, as opposed to the current
        // approach where each KV Router computes and published its own metrics.
249
        // Publish metrics event
Ryan Olson's avatar
Ryan Olson committed
250
251
        namespace.publish(&event_name, &processed).await?;

252
        // Wait until cancelled or the next tick
Ryan Olson's avatar
Ryan Olson committed
253
254
        match tokio::time::timeout_at(next, token.cancelled()).await {
            Ok(_) => break,
255
            Err(_) => continue,
Ryan Olson's avatar
Ryan Olson committed
256
257
258
259
260
261
        }
    }

    Ok(())
}

262
263
264
265
fn main() -> Result<()> {
    logging::init();
    let worker = Worker::from_settings()?;
    worker.execute(app)
Ryan Olson's avatar
Ryan Olson committed
266
}
267
268
269
270
271
272
273
274

#[cfg(test)]
mod tests {
    use super::*;
    use std::env;

    #[test]
    fn test_namespace_from_env() {
275
        env::set_var("DYN_NAMESPACE", "test-namespace");
276
277
278
279
        let args = Args::parse_from(["count", "--component", "comp", "--endpoint", "end"]);
        assert_eq!(args.namespace, "test-namespace");
    }
}