Unverified Commit 3a3f5bf2 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Add a "model" label to Component metrics (#2389)

parent d0a63635
......@@ -132,7 +132,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
component = (
runtime.namespace(config.namespace)
.component(config.component)
.add_labels([("model", config.model)])
)
await component.create_service()
generate_endpoint = component.endpoint(config.endpoint)
......@@ -165,7 +170,11 @@ async def init(runtime: DistributedRuntime, config: Config):
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
component = (
runtime.namespace(config.namespace)
.component(config.component)
.add_labels([("model", config.model)])
)
await component.create_service()
generate_endpoint = component.endpoint(config.endpoint)
......
......@@ -145,6 +145,7 @@ impl MetricsMode {
pub struct LLMWorkerLoadCapacityConfig {
pub component_name: String,
pub endpoint_name: String,
pub model_name: Option<String>,
}
/// Metrics collector for exposing metrics to prometheus/grafana
......
......@@ -31,6 +31,7 @@ use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
error, logging,
metrics::MetricsRegistry,
traits::events::{EventPublisher, EventSubscriber},
utils::{Duration, Instant},
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
......@@ -60,6 +61,10 @@ struct Args {
#[arg(long)]
endpoint: String,
/// Model name for the target component (optional)
#[arg(long)]
model_name: Option<String>,
/// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
#[arg(long, default_value = "1")]
poll_interval: u64,
......@@ -109,6 +114,7 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
Ok(LLMWorkerLoadCapacityConfig {
component_name: args.component.clone(),
endpoint_name: args.endpoint.clone(),
model_name: args.model_name.clone(),
})
}
......@@ -131,7 +137,14 @@ async fn app(runtime: Runtime) -> Result<()> {
.await
.context("Unable to create unique instance of Count; possibly one already exists")?;
let target_component = namespace.component(&config.component_name)?;
let target_component = {
let c = namespace.component(&config.component_name)?;
if let Some(ref model) = config.model_name {
c.add_labels(&[("model", model.as_str())])?
} else {
c
}
};
let target_endpoint = target_component.endpoint(&config.endpoint_name);
let service_path = target_endpoint.path();
......
......@@ -485,6 +485,21 @@ impl Component {
Ok(())
})
}
/// Add constant labels to this component (for metrics). Returns a new Component with labels.
/// labels: list of (key, value) tuples.
fn add_labels(&self, labels: Vec<(String, String)>) -> PyResult<Component> {
use rs::metrics::MetricsRegistry as _;
let pairs: Vec<(&str, &str)> = labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let inner = self.inner.clone().add_labels(&pairs).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
})
}
}
#[pymethods]
......
......@@ -7,6 +7,7 @@ use anyhow::Context as _;
use tokio::sync::{mpsc::Receiver, Notify};
use dynamo_runtime::{
metrics::MetricsRegistry,
pipeline::{
network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource,
ServiceBackend, SingleIn, Source,
......@@ -169,7 +170,8 @@ impl ModelWatcher {
let component = self
.drt
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", &model_entry.name)]))?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let Some(etcd_client) = self.drt.etcd_client() else {
......
......@@ -22,10 +22,12 @@ use crate::{
Annotated,
},
};
use dynamo_runtime::{
component::Client,
distributed::DistributedConfig,
engine::{AsyncEngineStream, Data},
metrics::MetricsRegistry,
pipeline::{
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
ServiceEngine, ServiceFrontend, SingleIn, Source,
......@@ -109,7 +111,9 @@ pub async fn prepare_engine(
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", card.slug().to_string().as_str())]))?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let kv_chooser = if router_mode == RouterMode::KV {
......
......@@ -15,7 +15,9 @@ use crate::{
Annotated,
},
};
use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::metrics::MetricsRegistry;
use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
};
......@@ -31,9 +33,25 @@ pub async fn run(
let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: EndpointId = path.parse()?;
let model_name = match &engine_config {
EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => {
Some(model.service_name().to_string())
}
EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => {
Some(model.service_name().to_string())
}
};
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
.and_then(|c| {
if let Some(ref name) = model_name {
c.add_labels(&[("model", name.as_str())])
} else {
Ok(c)
}
})?;
let endpoint = component
.service_builder()
.create()
......
......@@ -14,8 +14,8 @@
// limitations under the License.
use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt,
DistributedRuntime, Result, Runtime, Worker,
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
stream::StreamExt, DistributedRuntime, Result, Runtime, Worker,
};
use hello_world::DEFAULT_NAMESPACE;
......@@ -31,6 +31,7 @@ async fn app(runtime: Runtime) -> Result<()> {
let client = distributed
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.endpoint("generate")
.client()
.await?;
......
......@@ -15,6 +15,7 @@
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
......@@ -69,6 +70,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.service_builder()
.create()
.await?
......
......@@ -17,8 +17,8 @@ use futures::StreamExt;
use service_metrics::DEFAULT_NAMESPACE;
use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
DistributedRuntime, Result, Runtime, Worker,
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
utils::Duration, DistributedRuntime, Result, Runtime, Worker,
};
fn main() -> Result<()> {
......@@ -31,7 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace.component("backend")?;
let component = namespace
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?;
let client = component.endpoint("generate").client().await?;
......
......@@ -17,6 +17,7 @@ use service_metrics::{MyStats, DEFAULT_NAMESPACE};
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
......@@ -71,6 +72,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?
.service_builder()
.create()
.await?
......
......@@ -16,6 +16,7 @@ use std::sync::Arc;
pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
pub const DEFAULT_MODEL_NAME: &str = "dyn_example_model";
/// Stats structure returned by the endpoint's stats handler
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
......@@ -90,6 +91,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Re
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component(DEFAULT_COMPONENT)?
.add_labels(&[("model", DEFAULT_MODEL_NAME)])?
.service_builder()
.create()
.await?
......
......@@ -125,6 +125,10 @@ pub struct Component {
#[validate(custom(function = "validate_allowed_chars"))]
name: String,
/// Additional labels for metrics
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
// todo - restrict the namespace to a-z0-9-_A-Z
/// Namespace
#[builder(setter(into))]
......@@ -183,6 +187,16 @@ impl MetricsRegistry for Component {
]
.concat()
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
let mut all_labels = self.namespace.stored_labels();
all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str())));
all_labels
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl Component {
......@@ -220,6 +234,7 @@ impl Component {
component: self.clone(),
name: endpoint.into(),
is_static: self.is_static,
labels: Vec::new(),
}
}
......@@ -285,6 +300,9 @@ pub struct Endpoint {
name: String,
is_static: bool,
/// Additional labels for metrics
labels: Vec<(String, String)>,
}
impl Hash for Endpoint {
......@@ -329,6 +347,16 @@ impl MetricsRegistry for Endpoint {
]
.concat()
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
let mut all_labels = self.component.stored_labels();
all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str())));
all_labels
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl Endpoint {
......@@ -447,6 +475,10 @@ pub struct Namespace {
#[builder(default = "None")]
parent: Option<Arc<Namespace>>,
/// Additional labels for metrics
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
}
impl DistributedRuntimeProvider for Namespace {
......
......@@ -86,6 +86,18 @@ impl MetricsRegistry for Namespace {
fn parent_hierarchy(&self) -> Vec<String> {
vec![self.drt().basename()]
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
#[cfg(feature = "integration")]
......
......@@ -40,6 +40,18 @@ impl MetricsRegistry for DistributedRuntime {
fn parent_hierarchy(&self) -> Vec<String> {
vec![] // drt is the root, so no parent hierarchy
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl DistributedRuntime {
......@@ -90,6 +102,7 @@ impl DistributedRuntime {
prometheus::Registry,
>::new())),
system_health,
labels: Vec::new(),
};
// Start system status server if enabled
......
......@@ -178,4 +178,7 @@ pub struct DistributedRuntime {
// This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
// Additional labels for metrics
labels: Vec<(String, String)>,
}
......@@ -65,6 +65,21 @@ fn lint_prometheus_name(name: &str) -> anyhow::Result<String> {
Ok(sanitized)
}
/// Validate that a label slice has no duplicate keys.
/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
let mut seen_keys = std::collections::HashSet::new();
for (key, _) in labels {
if !seen_keys.insert(*key) {
return Err(anyhow::anyhow!(
"Duplicate label key '{}' found in labels",
key
));
}
}
Ok(())
}
/// Trait that defines common behavior for Prometheus metric types
pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
/// Create a new metric with the given options
......@@ -196,7 +211,16 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
const_labels: Option<&[&str]>,
) -> anyhow::Result<T> {
// Validate that user-provided labels don't have duplicate keys
let mut seen_keys = std::collections::HashSet::new();
validate_no_duplicate_label_keys(labels)?;
// Validate that user-provided labels don't conflict with stored labels
for (key, _) in registry.stored_labels() {
if labels.iter().any(|(k, _)| *k == key) {
return Err(anyhow::anyhow!(
"Label key '{}' already exists in registry.",
key
));
}
}
let basename = registry.basename();
let parent_hierarchy = registry.parent_hierarchy();
......@@ -206,16 +230,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let metric_name = build_metric_name(metric_name);
// Validate that user-provided labels don't have duplicate keys
for (key, _) in labels {
if !seen_keys.insert(*key) {
return Err(anyhow::anyhow!(
"Duplicate label key '{}' found in labels",
key
));
}
}
// Build updated_labels: auto-labels first, then user labels
// Build updated_labels: auto-labels first, then `labels` + stored labels
let mut updated_labels: Vec<(String, String)> = Vec::new();
if USE_AUTO_LABELS {
......@@ -266,6 +281,13 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string())),
);
// Add stored labels (safe because overlaps were rejected above)
updated_labels.extend(
registry
.stored_labels()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string())),
);
// Handle different metric types
let prometheus_metric = if std::any::TypeId::of::<T>()
......@@ -388,6 +410,47 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid
// Get the name of this registry (without any prefix)
fn basename(&self) -> String;
/// Get any stored labels for this registry
fn stored_labels(&self) -> Vec<(&str, &str)> {
Vec::new()
}
/// Get mutable access to the labels storage - implementors must provide this
fn labels_mut(&mut self) -> &mut Vec<(String, String)>;
/// Add labels to this registry and return a new instance with the labels.
/// This allows for method chaining like: runtime.namespace(...).add_labels(...)?
/// Fails if:
/// - Provided `labels` contains duplicate keys, or
/// - Any provided key already exists in the registry's stored labels.
fn add_labels(mut self, labels: &[(&str, &str)]) -> anyhow::Result<Self>
where
Self: Sized,
{
validate_no_duplicate_label_keys(labels)?;
// 2) Validate no overlap with existing stored labels
let existing: std::collections::HashSet<&str> =
self.stored_labels().into_iter().map(|(k, _)| k).collect();
if let Some(conflict) = labels
.iter()
.map(|(k, _)| *k)
.find(|k| existing.contains(k))
{
return Err(anyhow::anyhow!(
"Label key '{}' already exists in registry; refusing to overwrite",
conflict
));
}
// 3) Safe to append
let labels_storage = self.labels_mut();
for (key, value) in labels {
labels_storage.push((key.to_string(), value.to_string()));
}
Ok(self)
}
/// Retrieve the complete hierarchy and basename for this registry. Currently, the prefix for drt is an empty string,
/// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future
/// scenarios where drt's prefix might be assigned a value.
......@@ -850,6 +913,33 @@ mod test_simple_metricsregistry_trait {
use prometheus::Counter;
use std::sync::Arc;
#[test]
fn test_component_prometheus_output_contains_custom_label() {
// Arrange: DRT → namespace → component with a custom label
let drt = create_test_drt();
let namespace = drt.namespace("testnamespace").unwrap();
let component = namespace
.component("testcomponent")
.unwrap()
.add_labels(&[("service", "api")])
.unwrap();
// Act: create a simple gauge and render Prometheus text
let gauge = component
.create_gauge("with_label", "Gauge with custom label", &[])
.unwrap();
gauge.set(1.0);
let output = component.prometheus_metrics_fmt().unwrap();
// Assert: custom label is present (don’t rely on label ordering)
assert!(
output.contains("dynamo_component_with_label{") && output.contains(r#"service="api""#),
"Expected custom label service=\"api\" in Prometheus output:\n{}",
output
);
}
#[test]
fn test_factory_methods_via_registry_trait() {
// Setup real DRT and registry using the test-friendly constructor
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment