Unverified Commit c36794d9 authored by Ryan McCormick's avatar Ryan McCormick Committed by GitHub
Browse files

fix: Account for Metrics.decode() changes (#619)

- Replace custom StatsWithData object with library-defined Metrics object
- Update parsing logic to account for change to Metrics.decode()
parent 6edf46c4
......@@ -147,24 +147,6 @@ pub struct LLMWorkerLoadCapacityConfig {
pub endpoint_name: String,
}
// TODO: This is _really_ close to the async_nats::service::Stats object,
// but it's missing a few fields like "name", so use a temporary struct
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsWithData {
// Standard NATS Service API fields
pub average_processing_time: f64,
pub last_error: String,
pub num_errors: u64,
pub num_requests: u64,
pub processing_time: u64,
pub queue_group: String,
// Field containing custom stats handler data
pub data: serde_json::Value,
}
/// Metrics collector for exposing metrics to prometheus/grafana
pub struct PrometheusMetricsCollector {
metrics: PrometheusMetrics,
......@@ -584,28 +566,24 @@ pub async fn collect_endpoints(
pub fn extract_metrics(endpoints: &[EndpointInfo]) -> Vec<ForwardPassMetrics> {
let endpoint_data = endpoints.iter().map(|e| e.data.clone()).collect::<Vec<_>>();
// Extract StatsWithData objects from endpoint services
let stats: Vec<StatsWithData> = endpoint_data
// Extract ForwardPassMetrics objects from endpoint services
let metrics: Vec<ForwardPassMetrics> = endpoint_data
.iter()
.filter_map(|e| {
let metrics_data = e.as_ref()?;
metrics_data.clone().decode::<StatsWithData>().ok()
})
.collect();
tracing::debug!("Stats: {stats:?}");
// Extract ForwardPassMetrics nested within Stats object
let metrics: Vec<ForwardPassMetrics> = stats
.iter()
.filter_map(
|s| match serde_json::from_value::<ForwardPassMetrics>(s.data.clone()) {
Ok(metrics) => Some(metrics),
match metrics_data.clone().decode::<ForwardPassMetrics>() {
Ok(stats) => Some(stats),
Err(err) => {
tracing::warn!("Error decoding metrics: {err}");
tracing::error!(
"Failed to decode ForwardPassMetrics data: {}. Raw data: {:?}",
err,
metrics_data
);
None
}
},
)
}
})
.collect();
tracing::debug!("Metrics: {metrics:?}");
......
......@@ -227,10 +227,15 @@ async fn app(runtime: Runtime) -> Result<()> {
let scrape_timeout = Duration::from_secs(1);
let endpoints =
collect_endpoints(&target_component, &service_subject, scrape_timeout).await?;
if endpoints.is_empty() {
tracing::warn!("No endpoints found matching {service_path}");
continue;
}
let metrics = extract_metrics(&endpoints);
let processed = postprocess_metrics(&metrics, &endpoints);
if processed.endpoints.is_empty() {
tracing::warn!("No endpoints found matching {service_path}");
tracing::warn!("No metrics found matching {service_path}");
} else {
tracing::info!("Aggregated metrics: {processed:?}");
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment