Unverified Commit 105436c3 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

fix: guard inflight_requests and request_duration from early returns. (#2576)

parent 174389e6
...@@ -18,6 +18,7 @@ use crate::protocols::maybe_error::MaybeError; ...@@ -18,6 +18,7 @@ use crate::protocols::maybe_error::MaybeError;
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge}; use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::info_span; use tracing::info_span;
use tracing::Instrument; use tracing::Instrument;
...@@ -106,6 +107,20 @@ impl WorkHandlerMetrics { ...@@ -106,6 +107,20 @@ impl WorkHandlerMetrics {
} }
} }
// RAII guard to ensure inflight gauge is decremented and request duration is observed on all code paths.
struct RequestMetricsGuard {
inflight_requests: prometheus::IntGauge,
request_duration: prometheus::Histogram,
start_time: Instant,
}
impl Drop for RequestMetricsGuard {
fn drop(&mut self) {
self.inflight_requests.dec();
self.request_duration
.observe(self.start_time.elapsed().as_secs_f64());
}
}
#[async_trait] #[async_trait]
impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>> impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>>
where where
...@@ -125,11 +140,17 @@ where ...@@ -125,11 +140,17 @@ where
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> { async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> {
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
if let Some(m) = self.metrics() { // Increment inflight and ensure it's decremented on all exits via RAII guard
let _inflight_guard = self.metrics().map(|m| {
m.request_counter.inc(); m.request_counter.inc();
m.inflight_requests.inc(); m.inflight_requests.inc();
m.request_bytes.inc_by(payload.len() as u64); m.request_bytes.inc_by(payload.len() as u64);
} RequestMetricsGuard {
inflight_requests: m.inflight_requests.clone(),
request_duration: m.request_duration.clone(),
start_time,
}
});
// decode the control message and the request // decode the control message and the request
let msg = TwoPartCodec::default() let msg = TwoPartCodec::default()
...@@ -292,11 +313,8 @@ where ...@@ -292,11 +313,8 @@ where
} }
} }
if let Some(m) = self.metrics() { // Ensure the metrics guard is not dropped until the end of the function.
let duration = start_time.elapsed(); drop(_inflight_guard);
m.request_duration.observe(duration.as_secs_f64());
m.inflight_requests.dec();
}
Ok(()) Ok(())
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment