"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "16a280584cbf0929bf55064fcd5b17d281d862dd"
Unverified Commit cabdaebd authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(metrics): auto-inject worker_id label into metrics (#8089)


Signed-off-by: default avatartmontfort <tmontfort@nvidia.com>
Signed-off-by: default avatarThomas Montfort <tmontfort@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 45364b5f
......@@ -65,12 +65,12 @@ def register_engine_metrics_callback(
This registers a callback that is invoked when /metrics is scraped, passing through
engine-specific metrics alongside Dynamo runtime metrics.
Automatically injects dynamo_namespace, dynamo_component, dynamo_endpoint, model,
and model_name labels when namespace_name and component_name are provided.
Automatically injects dynamo_namespace, dynamo_component, dynamo_endpoint, worker_id,
model, and model_name labels when namespace_name and component_name are provided.
Label Precedence (highest to lowest):
1. Existing labels from source metrics - never changed, never overwritten
2. Auto-injected labels (dynamo_*, model*) - added by Dynamo automatically
2. Auto-injected labels (dynamo_*, worker_id, model*) - added by Dynamo automatically
3. Custom labels (inject_custom_labels) - user-provided, lowest precedence
If inject_custom_labels contains keys that conflict with auto-injected labels,
......@@ -84,8 +84,8 @@ def register_engine_metrics_callback(
inject_custom_labels: Optional dict of custom labels to inject (e.g. {"lora_adapter": "my-lora"}).
Injected at collection time without modifying source metrics.
Reserved labels (le, quantile) will raise ValueError.
Auto-labels (dynamo_namespace, dynamo_component, dynamo_endpoint, model,
model_name) are added automatically and should not be in inject_custom_labels.
Auto-labels (dynamo_namespace, dynamo_component, dynamo_endpoint, worker_id,
model, model_name) are added automatically and should not be in inject_custom_labels.
namespace_name: Explicit namespace name for auto-labels (from config.namespace)
component_name: Explicit component name for auto-labels (from config.component)
endpoint_name: Explicit endpoint name for auto-labels (from config.endpoint, defaults to "generate")
......@@ -136,6 +136,18 @@ def register_engine_metrics_callback(
labels.ENDPOINT: endpoint_name_final, # "dynamo_endpoint"
}
# Add worker_id label from connection_id (discovery instance ID).
# This provides a stable per-worker identity label so metrics from different
# workers serving the same endpoint can be distinguished without relying on
# Kubernetes labels. Mirrors Rust auto-label injection in create_metric().
try:
conn_id = endpoint.connection_id()
auto_labels[labels.WORKER_ID] = format(conn_id, "x")
except Exception as e:
logging.debug(
f"Could not obtain connection_id for worker_id label injection: {e}"
)
# Add model labels if model_name is provided
if model_name:
auto_labels[labels.MODEL] = model_name # "model" (OpenAI standard)
......@@ -155,7 +167,7 @@ def register_engine_metrics_callback(
# Merge labels with correct precedence:
# 1. Existing labels (from source metrics) - never overwritten
# 2. Auto-labels (dynamo_*, model*) - injected by Dynamo
# 2. Auto-labels (dynamo_*, worker_id, model*) - injected by Dynamo
# 3. Custom labels (inject_custom_labels) - user-provided, lowest precedence
# Put custom labels first, then overwrite with auto-labels (higher precedence)
final_inject_labels = {**final_inject_labels, **auto_labels}
......
......@@ -227,6 +227,10 @@ impl MetricsHierarchy for Component {
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
fn connection_id(&self) -> Option<u64> {
Some(self.drt.connection_id())
}
}
impl Component {
......@@ -390,6 +394,10 @@ impl MetricsHierarchy for Endpoint {
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
fn connection_id(&self) -> Option<u64> {
Some(self.component.drt().connection_id())
}
}
impl Endpoint {
......
......@@ -30,4 +30,9 @@ impl MetricsHierarchy for Namespace {
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
fn connection_id(&self) -> Option<u64> {
use crate::traits::DistributedRuntimeProvider;
Some(self.drt().connection_id())
}
}
......@@ -94,6 +94,10 @@ impl MetricsHierarchy for DistributedRuntime {
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
fn connection_id(&self) -> Option<u64> {
Some(self.discovery_client.instance_id())
}
}
impl std::fmt::Debug for DistributedRuntime {
......
......@@ -232,7 +232,11 @@ pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
// Validate that user-provided labels don't conflict with auto-generated labels
for (key, _) in labels {
if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
if *key == labels::NAMESPACE
|| *key == labels::COMPONENT
|| *key == labels::ENDPOINT
|| *key == labels::WORKER_ID
{
return Err(anyhow::anyhow!(
"Label '{}' is automatically added by auto-label injection and cannot be manually set",
key
......@@ -240,6 +244,24 @@ pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
}
}
// Also validate that vector label names (const_labels) don't collide with auto-injected
// const labels. A variable label named "worker_id" would conflict with the auto-injected
// worker_id const label, causing a prometheus registration error or ambiguous output.
if let Some(label_names) = const_labels {
for name in label_names.iter() {
if *name == labels::NAMESPACE
|| *name == labels::COMPONENT
|| *name == labels::ENDPOINT
|| *name == labels::WORKER_ID
{
return Err(anyhow::anyhow!(
"Variable label name '{}' conflicts with auto-injected const label and cannot be used",
name
));
}
}
}
// Add auto-generated labels with sanitized values
// Hierarchy: [drt, namespace, component, endpoint]
if hierarchy_names.len() > 1 {
......@@ -270,6 +292,13 @@ pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
}
}
// Auto-inject worker_id label from the hierarchy's connection_id (discovery instance ID).
// This provides a stable per-worker identity label so metrics from different workers
// serving the same endpoint can be distinguished without relying on Kubernetes labels.
if let Some(conn_id) = hierarchy.connection_id() {
updated_labels.push((labels::WORKER_ID.to_string(), format!("{:x}", conn_id)));
}
// Add user labels
updated_labels.extend(
labels
......@@ -568,6 +597,15 @@ pub trait MetricsHierarchy: Send + Sync {
// Provided methods - have default implementations
// ========================================================================
/// Get the connection ID (discovery instance ID) for this hierarchy level.
///
/// Returns `Some(id)` when the hierarchy has access to the DistributedRuntime
/// (e.g. Namespace, Component, Endpoint). Used by `create_metric()` to auto-inject
/// the `worker_id` label. Returns `None` by default.
fn connection_id(&self) -> Option<u64> {
None
}
/// Access the metrics interface for this hierarchy
/// This is a provided method that works for any type implementing MetricsHierarchy
fn metrics(&self) -> Metrics<&Self>
......@@ -591,6 +629,10 @@ impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
fn get_metrics_registry(&self) -> &MetricsRegistry {
(**self).get_metrics_registry()
}
fn connection_id(&self) -> Option<u64> {
(**self).connection_id()
}
}
/// Type alias for runtime callback functions to reduce complexity
......@@ -997,6 +1039,32 @@ mod test_helpers {
Some((name, labels, value))
}
/// Injects a `worker_id` label into Prometheus metric data lines.
/// Prometheus places const labels (like worker_id) before special labels
/// (like histogram `le`), so for histogram bucket lines we insert before
/// `,le=`. For all other metric lines, we insert before the closing `}`.
/// Comment lines and lines without labels are left unchanged.
pub fn inject_worker_id(expected: &str, wid: &str) -> String {
let wid_label = format!(",worker_id=\"{}\"", wid);
expected
.lines()
.map(|line| {
if line.starts_with('#') || line.trim().is_empty() || !line.contains('{') {
line.to_string()
} else if let Some(le_pos) = line.find(",le=") {
// Histogram bucket lines: worker_id is a const label, `le` is special,
// so worker_id sorts before `le` in Prometheus output.
let mut s = line.to_string();
s.insert_str(le_pos, &wid_label);
s
} else {
line.replacen("}", &format!("{}}}", wid_label), 1)
}
})
.collect::<Vec<_>>()
.join("\n")
}
}
#[cfg(test)]
......@@ -1372,9 +1440,17 @@ mod test_metricsregistry_prometheus_fmt_outputs {
println!("Endpoint output:");
println!("{}", endpoint_output_raw);
let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
// worker_id is runtime-generated (etcd lease ID), so we grab it from the DRT
// and inject it into expected strings via the inject_worker_id helper.
let wid = format!("{:x}", drt.connection_id());
use super::test_helpers::inject_worker_id;
let expected_endpoint_output = inject_worker_id(
r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#,
&wid,
);
assert_eq!(
endpoint_output_raw.trim_end_matches('\n'),
......@@ -1400,12 +1476,15 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
println!("Component output:");
println!("{}", component_output_raw);
let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
let expected_component_output = inject_worker_id(
r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#,
&wid,
);
assert_eq!(
component_output_raw.trim_end_matches('\n'),
......@@ -1430,7 +1509,8 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
println!("Namespace output:");
println!("{}", namespace_output_raw);
let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
let expected_namespace_output = inject_worker_id(
r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
......@@ -1438,7 +1518,9 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#,
&wid,
);
assert_eq!(
namespace_output_raw.trim_end_matches('\n'),
......@@ -1505,7 +1587,8 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
// The uptime_seconds value is dynamic (depends on elapsed wall-clock time),
// so we check all other lines exactly and validate uptime separately.
let expected_drt_output_without_uptime = r#"# HELP dynamo_component_testcounter A test counter
let expected_drt_output_without_uptime = inject_worker_id(
r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testcountervec A test counter vector
......@@ -1540,16 +1623,18 @@ dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#;
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#,
&wid,
);
// Split actual output into non-uptime lines and validate the uptime value line.
// The uptime metric now carries a worker_id label, so we match on the metric name
// prefix and extract the value as the last whitespace-delimited token.
let mut non_uptime_lines = Vec::new();
let mut saw_uptime_value = false;
for line in drt_output_raw.trim_end_matches('\n').lines() {
if line.starts_with("dynamo_component_uptime_seconds ") {
let val_str = line
.strip_prefix("dynamo_component_uptime_seconds ")
.unwrap();
if line.starts_with("dynamo_component_uptime_seconds") && !line.starts_with('#') {
let val_str = line.split_whitespace().last().unwrap();
val_str.parse::<f64>().expect("uptime should be a float");
saw_uptime_value = true;
} else if line.starts_with("# HELP dynamo_component_uptime_seconds")
......@@ -1580,11 +1665,13 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser
// Wait briefly so the uptime gauge is clearly positive on the next scrape.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
let uptime_after: f64 = drt_output_after
let uptime_line = drt_output_after
.lines()
.find(|l| l.starts_with("dynamo_component_uptime_seconds "))
.expect("uptime_seconds metric should be present after sleep")
.strip_prefix("dynamo_component_uptime_seconds ")
.find(|l| l.starts_with("dynamo_component_uptime_seconds") && !l.starts_with('#'))
.expect("uptime_seconds metric should be present after sleep");
let uptime_after: f64 = uptime_line
.split_whitespace()
.last()
.unwrap()
.parse()
.expect("uptime should be a float");
......@@ -1649,10 +1736,16 @@ dynamo_component_errors_total 5"#;
// Get merged Prometheus output from component level
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_requests_total Total requests
let wid = format!("{:x}", drt.connection_id());
use super::test_helpers::inject_worker_id;
let expected_output = inject_worker_id(
r#"# HELP dynamo_component_requests_total Total requests
# TYPE dynamo_component_requests_total counter
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#,
&wid,
);
assert_eq!(
output.trim_end_matches('\n'),
......@@ -1695,9 +1788,15 @@ dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep
// Get merged output - duplicates should be deduplicated
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
let wid = format!("{:x}", drt.connection_id());
use super::test_helpers::inject_worker_id;
let expected_output = inject_worker_id(
r#"# HELP dynamo_component_dup_metric Duplicate metric test
# TYPE dynamo_component_dup_metric counter
dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#,
&wid,
);
assert_eq!(
output.trim_end_matches('\n'),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment