"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "dd6c399565fe203898e14f1d92c87be35f07f24f"
Unverified Commit 5aa2f53f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): add Prometheus metrics to standalone indexer (#7339)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent c8f7ce90
...@@ -72,9 +72,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" ...@@ -72,9 +72,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.6.21" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"anstyle-parse", "anstyle-parse",
...@@ -93,9 +93,9 @@ checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" ...@@ -93,9 +93,9 @@ checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]] [[package]]
name = "anstyle-parse" name = "anstyle-parse"
version = "0.2.7" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
dependencies = [ dependencies = [
"utf8parse", "utf8parse",
] ]
...@@ -871,7 +871,7 @@ version = "0.27.0" ...@@ -871,7 +871,7 @@ version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fce8dd7fcfcbf3a0a87d8f515194b49d6135acab73e18bd380d1d93bb1a15eb" checksum = "3fce8dd7fcfcbf3a0a87d8f515194b49d6135acab73e18bd380d1d93bb1a15eb"
dependencies = [ dependencies = [
"clap 4.5.60", "clap 4.6.0",
"heck 0.4.1", "heck 0.4.1",
"indexmap 2.13.0", "indexmap 2.13.0",
"log", "log",
...@@ -998,9 +998,9 @@ dependencies = [ ...@@ -998,9 +998,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.60" version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
...@@ -1008,9 +1008,9 @@ dependencies = [ ...@@ -1008,9 +1008,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.5.60" version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
...@@ -1020,9 +1020,9 @@ dependencies = [ ...@@ -1020,9 +1020,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.5.55" version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a"
dependencies = [ dependencies = [
"heck 0.5.0", "heck 0.5.0",
"proc-macro2", "proc-macro2",
...@@ -1032,9 +1032,9 @@ dependencies = [ ...@@ -1032,9 +1032,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "1.0.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]] [[package]]
name = "cmake" name = "cmake"
...@@ -1092,9 +1092,9 @@ dependencies = [ ...@@ -1092,9 +1092,9 @@ dependencies = [
[[package]] [[package]]
name = "config" name = "config"
version = "0.15.19" version = "0.15.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b30fa8254caad766fc03cb0ccae691e14bf3bd72bfff27f72802ce729551b3d6" checksum = "4fe5feec195269515c4722937cd7ffcfe7b4205d18d2e6577b7223ecb159ab00"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"convert_case", "convert_case",
...@@ -1105,7 +1105,7 @@ dependencies = [ ...@@ -1105,7 +1105,7 @@ dependencies = [
"serde-untagged", "serde-untagged",
"serde_core", "serde_core",
"serde_json", "serde_json",
"toml 0.9.12+spec-1.1.0", "toml 1.0.6+spec-1.1.0",
"winnow", "winnow",
"yaml-rust2", "yaml-rust2",
] ]
...@@ -1304,7 +1304,7 @@ dependencies = [ ...@@ -1304,7 +1304,7 @@ dependencies = [
"anes", "anes",
"cast", "cast",
"ciborium", "ciborium",
"clap 4.5.60", "clap 4.6.0",
"criterion-plot 0.5.0", "criterion-plot 0.5.0",
"futures", "futures",
"is-terminal", "is-terminal",
...@@ -1847,7 +1847,7 @@ version = "1.0.0" ...@@ -1847,7 +1847,7 @@ version = "1.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"clap 4.5.60", "clap 4.6.0",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-mocker", "dynamo-mocker",
"dynamo-tokens", "dynamo-tokens",
...@@ -1888,7 +1888,7 @@ dependencies = [ ...@@ -1888,7 +1888,7 @@ dependencies = [
"async-trait", "async-trait",
"axum 0.8.4", "axum 0.8.4",
"bytes", "bytes",
"clap 4.5.60", "clap 4.6.0",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
...@@ -1941,7 +1941,7 @@ dependencies = [ ...@@ -1941,7 +1941,7 @@ dependencies = [
"bytemuck", "bytemuck",
"bytes", "bytes",
"chrono", "chrono",
"clap 4.5.60", "clap 4.6.0",
"criterion 0.3.6", "criterion 0.3.6",
"cudarc", "cudarc",
"dashmap 5.5.3", "dashmap 5.5.3",
...@@ -3709,7 +3709,7 @@ dependencies = [ ...@@ -3709,7 +3709,7 @@ dependencies = [
"anyhow", "anyhow",
"base64 0.21.7", "base64 0.21.7",
"bytecount", "bytecount",
"clap 4.5.60", "clap 4.6.0",
"fancy-regex 0.11.0", "fancy-regex 0.11.0",
"fraction", "fraction",
"getrandom 0.2.17", "getrandom 0.2.17",
...@@ -3902,7 +3902,7 @@ dependencies = [ ...@@ -3902,7 +3902,7 @@ dependencies = [
name = "kvbm-kernels" name = "kvbm-kernels"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"clap 4.5.60", "clap 4.6.0",
"cudarc", "cudarc",
"half 2.7.1", "half 2.7.1",
"ndarray 0.17.2", "ndarray 0.17.2",
...@@ -4446,7 +4446,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -4446,7 +4446,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9929c605e135347ccda5e0e7ff0627217f23189915a5c3971265ce490a14abf" checksum = "a9929c605e135347ccda5e0e7ff0627217f23189915a5c3971265ce490a14abf"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap 4.5.60", "clap 4.6.0",
"colored", "colored",
"futures", "futures",
"modelexpress-common", "modelexpress-common",
...@@ -4470,7 +4470,7 @@ dependencies = [ ...@@ -4470,7 +4470,7 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"chrono", "chrono",
"clap 4.5.60", "clap 4.6.0",
"config", "config",
"hf-hub", "hf-hub",
"jiff", "jiff",
...@@ -5080,9 +5080,9 @@ dependencies = [ ...@@ -5080,9 +5080,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.21.3" version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]] [[package]]
name = "once_cell_polyfill" name = "once_cell_polyfill"
...@@ -5133,7 +5133,7 @@ dependencies = [ ...@@ -5133,7 +5133,7 @@ dependencies = [
"anyhow", "anyhow",
"base64 0.22.1", "base64 0.22.1",
"bstr", "bstr",
"clap 4.5.60", "clap 4.6.0",
"fancy-regex 0.13.0", "fancy-regex 0.13.0",
"futures", "futures",
"image", "image",
...@@ -7893,13 +7893,13 @@ dependencies = [ ...@@ -7893,13 +7893,13 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.9.12+spec-1.1.0" version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" checksum = "399b1124a3c9e16766831c6bba21e50192572cdd98706ea114f9502509686ffc"
dependencies = [ dependencies = [
"serde_core", "serde_core",
"serde_spanned 1.0.4", "serde_spanned 1.0.4",
"toml_datetime 0.7.5+spec-1.1.0", "toml_datetime 1.0.0+spec-1.1.0",
"toml_parser", "toml_parser",
"winnow", "winnow",
] ]
...@@ -7913,15 +7913,6 @@ dependencies = [ ...@@ -7913,15 +7913,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "toml_datetime"
version = "0.7.5+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
dependencies = [
"serde_core",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "1.0.0+spec-1.1.0" version = "1.0.0+spec-1.1.0"
...@@ -9687,9 +9678,9 @@ dependencies = [ ...@@ -9687,9 +9678,9 @@ dependencies = [
[[package]] [[package]]
name = "zune-jpeg" name = "zune-jpeg"
version = "0.5.12" version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410e9ecef634c709e3831c2cfdb8d9c32164fae1c67496d5b68fff728eec37fe" checksum = "ec5f41c76397b7da451efd19915684f727d7e1d516384ca6bd0ec43ec94de23c"
dependencies = [ dependencies = [
"zune-core", "zune-core",
] ]
...@@ -99,6 +99,30 @@ dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-mod ...@@ -99,6 +99,30 @@ dynamo-kv-indexer --port 8090 [--threads 4] [--block-size 16 --model-name my-mod
## HTTP API ## HTTP API
### `GET /health` — Liveness check
Returns `200 OK` unconditionally.
```bash
curl http://localhost:8090/health
```
### `GET /metrics` — Prometheus metrics
Returns metrics in Prometheus text exposition format. Available when the binary is built with the `metrics` feature (enabled by default via `standalone-indexer`).
```bash
curl http://localhost:8090/metrics
```
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `dynamo_kvindexer_request_duration_seconds` | Histogram | `endpoint` | HTTP request latency |
| `dynamo_kvindexer_requests_total` | Counter | `endpoint`, `method` | Total HTTP requests |
| `dynamo_kvindexer_errors_total` | Counter | `endpoint`, `status_class` | HTTP error responses (4xx/5xx) |
| `dynamo_kvindexer_models` | Gauge | — | Number of active model+tenant indexers |
| `dynamo_kvindexer_workers` | Gauge | — | Number of registered worker instances |
### `POST /register` — Register an endpoint ### `POST /register` — Register an endpoint
Register a ZMQ endpoint for an instance. Each call creates or reuses the indexer for the given `(model_name, tenant_id)` pair. Register a ZMQ endpoint for an instance. Each call creates or reuses the indexer for the given `(model_name, tenant_id)` pair.
...@@ -307,7 +331,7 @@ graph TD ...@@ -307,7 +331,7 @@ graph TD
REG[Worker Registry] REG[Worker Registry]
ZMQ[ZMQ SUB Listeners] ZMQ[ZMQ SUB Listeners]
IDX["Indexer Map<br/>(model, tenant) → Radix Tree"] IDX["Indexer Map<br/>(model, tenant) → Radix Tree"]
HTTP[HTTP API<br/>/query /dump /register] HTTP[HTTP API<br/>/query /dump /register /metrics /health]
end end
CLIENT[External Client] CLIENT[External Client]
......
...@@ -132,6 +132,13 @@ class frontend_service: ...@@ -132,6 +132,13 @@ class frontend_service:
OPERATION_LABEL = "operation" OPERATION_LABEL = "operation"
class kv_publisher:
"""KV Publisher metrics"""
# Total number of raw events dropped by engines before reaching publisher (detected via event_id gaps)
ENGINES_DROPPED_EVENTS_TOTAL = "kv_publisher_engines_dropped_events_total"
class kvbm: class kvbm:
"""KVBM""" """KVBM"""
...@@ -167,6 +174,21 @@ class kvbm: ...@@ -167,6 +174,21 @@ class kvbm:
OBJECT_WRITE_FAILURES = "object_write_failures" OBJECT_WRITE_FAILURES = "object_write_failures"
class kvindexer:
"""Standalone KV indexer HTTP service metrics"""
# HTTP request latency
REQUEST_DURATION_SECONDS = "request_duration_seconds"
# Total HTTP requests
REQUESTS_TOTAL = "requests_total"
# HTTP error responses (4xx/5xx)
ERRORS_TOTAL = "errors_total"
# Number of active model+tenant indexers
MODELS = "models"
# Number of registered worker instances
WORKERS = "workers"
class kvrouter: class kvrouter:
# Number of KV cache events applied to the index (including status) # Number of KV cache events applied to the index (including status)
KV_CACHE_EVENTS_APPLIED = "kv_cache_events_applied" KV_CACHE_EVENTS_APPLIED = "kv_cache_events_applied"
...@@ -225,6 +247,8 @@ class name_prefix: ...@@ -225,6 +247,8 @@ class name_prefix:
ROUTER = "dynamo_router" ROUTER = "dynamo_router"
# Prefix for tokio runtime metrics # Prefix for tokio runtime metrics
TOKIO = "dynamo_tokio" TOKIO = "dynamo_tokio"
# Prefix for standalone KV indexer metrics
KVINDEXER = "dynamo_kvindexer"
class router: class router:
...@@ -265,25 +289,6 @@ class routing_overhead: ...@@ -265,25 +289,6 @@ class routing_overhead:
TOTAL_MS = "overhead_total_ms" TOTAL_MS = "overhead_total_ms"
class trtllm_additional:
"""Additional TRT-LLM worker metrics beyond what the engine natively provides."""
# Total number of aborted/cancelled requests
NUM_ABORTED_REQUESTS_TOTAL = "trtllm_num_aborted_requests_total"
# Total number of requests containing image content
REQUEST_TYPE_IMAGE_TOTAL = "trtllm_request_type_image_total"
# Total number of requests using guided/structured decoding
REQUEST_TYPE_STRUCTURED_OUTPUT_TOTAL = "trtllm_request_type_structured_output_total"
# Total number of successful KV cache transfers
KV_TRANSFER_SUCCESS_TOTAL = "trtllm_kv_transfer_success_total"
# KV cache transfer latency per request in seconds
KV_TRANSFER_LATENCY_SECONDS = "trtllm_kv_transfer_latency_seconds"
# KV cache transfer size per request in bytes
KV_TRANSFER_BYTES = "trtllm_kv_transfer_bytes"
# KV cache transfer speed per request in GB/s
KV_TRANSFER_SPEED_GB_S = "trtllm_kv_transfer_speed_gb_s"
class task_tracker: class task_tracker:
"""Task tracker Prometheus metric name suffixes""" """Task tracker Prometheus metric name suffixes"""
...@@ -318,6 +323,25 @@ class tokio_perf: ...@@ -318,6 +323,25 @@ class tokio_perf:
ALIVE_TASKS = "alive_tasks" ALIVE_TASKS = "alive_tasks"
class trtllm_additional:
"""Additional TRT-LLM worker metrics beyond what the engine natively provides."""
# Total number of aborted/cancelled requests
NUM_ABORTED_REQUESTS_TOTAL = "trtllm_num_aborted_requests_total"
# Total number of requests containing image content
REQUEST_TYPE_IMAGE_TOTAL = "trtllm_request_type_image_total"
# Total number of requests using guided/structured decoding
REQUEST_TYPE_STRUCTURED_OUTPUT_TOTAL = "trtllm_request_type_structured_output_total"
# Total number of successful KV cache transfers
KV_TRANSFER_SUCCESS_TOTAL = "trtllm_kv_transfer_success_total"
# KV cache transfer latency per request in seconds
KV_TRANSFER_LATENCY_SECONDS = "trtllm_kv_transfer_latency_seconds"
# KV cache transfer size per request in bytes
KV_TRANSFER_BYTES = "trtllm_kv_transfer_bytes"
# KV cache transfer speed per request in GB/s
KV_TRANSFER_SPEED_GB_S = "trtllm_kv_transfer_speed_gb_s"
class work_handler: class work_handler:
"""Work handler Prometheus metric names""" """Work handler Prometheus metric names"""
......
...@@ -12,7 +12,7 @@ repository.workspace = true ...@@ -12,7 +12,7 @@ repository.workspace = true
[features] [features]
default = [] default = []
metrics = [] metrics = ["dep:dynamo-runtime"]
bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"] bench = ["dep:clap", "dep:indicatif", "dep:serde_json", "dep:plotters"]
standalone-indexer = ["metrics", "dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"] standalone-indexer = ["metrics", "dep:axum", "dep:bytes", "dep:zeromq", "dep:serde_json", "dep:reqwest"]
indexer-bin = ["standalone-indexer", "dep:clap", "dep:tracing-subscriber"] indexer-bin = ["standalone-indexer", "dep:clap", "dep:tracing-subscriber"]
...@@ -20,7 +20,7 @@ test-endpoints = ["indexer-bin"] ...@@ -20,7 +20,7 @@ test-endpoints = ["indexer-bin"]
[dependencies] [dependencies]
# repo # repo
dynamo-runtime = { workspace = true } dynamo-runtime = { workspace = true, optional = true }
dynamo-tokens = { workspace = true } dynamo-tokens = { workspace = true }
# workspace # workspace
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "metrics")]
use std::sync::LazyLock;
#[cfg(feature = "metrics")]
use std::time::Instant;
#[cfg(feature = "metrics")]
use axum::{extract::MatchedPath, http::Request, middleware::Next, response::Response};
#[cfg(feature = "metrics")]
use dynamo_runtime::metrics::prometheus_names::{kvindexer, name_prefix};
#[cfg(feature = "metrics")]
use prometheus::{
HistogramVec, IntCounterVec, IntGauge, Opts, exponential_buckets, histogram_opts,
};
#[cfg(feature = "metrics")]
pub struct StandaloneIndexerMetrics {
pub request_duration: HistogramVec,
pub requests_total: IntCounterVec,
pub errors_total: IntCounterVec,
pub models: IntGauge,
pub workers: IntGauge,
}
#[cfg(feature = "metrics")]
static METRICS: LazyLock<StandaloneIndexerMetrics> = LazyLock::new(|| {
let prefix = name_prefix::KVINDEXER;
StandaloneIndexerMetrics {
request_duration: HistogramVec::new(
histogram_opts!(
format!("{prefix}_{}", kvindexer::REQUEST_DURATION_SECONDS),
"HTTP request latency",
exponential_buckets(0.0001, 2.0, 20).expect("valid bucket params")
),
&["endpoint"],
)
.expect("valid histogram"),
requests_total: IntCounterVec::new(
Opts::new(
format!("{prefix}_{}", kvindexer::REQUESTS_TOTAL),
"Total HTTP requests",
),
&["endpoint", "method"],
)
.expect("valid counter"),
errors_total: IntCounterVec::new(
Opts::new(
format!("{prefix}_{}", kvindexer::ERRORS_TOTAL),
"HTTP error responses (4xx/5xx)",
),
&["endpoint", "status_class"],
)
.expect("valid counter"),
models: IntGauge::new(
format!("{prefix}_{}", kvindexer::MODELS),
"Number of active model+tenant indexers",
)
.expect("valid gauge"),
workers: IntGauge::new(
format!("{prefix}_{}", kvindexer::WORKERS),
"Number of registered worker instances",
)
.expect("valid gauge"),
}
});
#[cfg(feature = "metrics")]
pub fn register(registry: &prometheus::Registry) -> Result<(), prometheus::Error> {
let m = &*METRICS;
registry.register(Box::new(m.request_duration.clone()))?;
registry.register(Box::new(m.requests_total.clone()))?;
registry.register(Box::new(m.errors_total.clone()))?;
registry.register(Box::new(m.models.clone()))?;
registry.register(Box::new(m.workers.clone()))?;
Ok(())
}
#[cfg(feature = "metrics")]
pub async fn metrics_middleware(req: Request<axum::body::Body>, next: Next) -> Response {
let path = req
.extensions()
.get::<MatchedPath>()
.map(|m| m.as_str().to_owned())
.unwrap_or_else(|| "unknown".to_owned());
let method = req.method().as_str().to_owned();
let start = Instant::now();
let response = next.run(req).await;
let elapsed = start.elapsed().as_secs_f64();
let m = &*METRICS;
m.requests_total
.with_label_values(&[path.as_str(), method.as_str()])
.inc();
m.request_duration
.with_label_values(&[path.as_str()])
.observe(elapsed);
let status = response.status().as_u16();
if status >= 400 {
let class = if status < 500 { "4xx" } else { "5xx" };
m.errors_total
.with_label_values(&[path.as_str(), class])
.inc();
}
response
}
#[cfg(feature = "metrics")]
pub fn inc_models() {
METRICS.models.inc();
}
#[cfg(not(feature = "metrics"))]
pub fn inc_models() {}
#[cfg(feature = "metrics")]
pub fn inc_workers() {
METRICS.workers.inc();
}
#[cfg(not(feature = "metrics"))]
pub fn inc_workers() {}
#[cfg(feature = "metrics")]
pub fn dec_workers() {
METRICS.workers.dec();
}
#[cfg(not(feature = "metrics"))]
pub fn dec_workers() {}
#[cfg(all(test, feature = "metrics"))]
mod tests {
use super::*;
use prometheus::Encoder;
#[test]
fn register_and_encode() {
let registry = prometheus::Registry::new();
register(&registry).expect("registration should succeed");
inc_models();
inc_workers();
inc_workers();
dec_workers();
let encoder = prometheus::TextEncoder::new();
let mut buf = Vec::new();
encoder.encode(&registry.gather(), &mut buf).unwrap();
let output = String::from_utf8(buf).unwrap();
assert!(output.contains("dynamo_kvindexer_request_duration_seconds"));
assert!(output.contains("dynamo_kvindexer_requests_total"));
assert!(output.contains("dynamo_kvindexer_errors_total"));
assert!(output.contains("dynamo_kvindexer_models 1"));
assert!(output.contains("dynamo_kvindexer_workers 1"));
}
}
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
pub mod indexer; pub mod indexer;
pub mod listener; pub mod listener;
pub mod metrics;
pub mod recovery; pub mod recovery;
pub mod registry; pub mod registry;
pub mod server; pub mod server;
...@@ -97,7 +98,18 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> { ...@@ -97,7 +98,18 @@ pub async fn run_server(config: IndexerConfig) -> anyhow::Result<()> {
registry.signal_ready(); registry.signal_ready();
let state = Arc::new(AppState { registry }); #[cfg(feature = "metrics")]
let prom_registry = {
let r = prometheus::Registry::new();
metrics::register(&r).expect("failed to register indexer metrics");
r
};
let state = Arc::new(AppState {
registry,
#[cfg(feature = "metrics")]
prom_registry,
});
let app = create_router(state); let app = create_router(state);
let listener = TcpListener::bind(("0.0.0.0", config.port)).await?; let listener = TcpListener::bind(("0.0.0.0", config.port)).await?;
......
...@@ -113,6 +113,7 @@ impl WorkerRegistry { ...@@ -113,6 +113,7 @@ impl WorkerRegistry {
block_size, block_size,
"Creating new indexer" "Creating new indexer"
); );
super::metrics::inc_models();
IndexerEntry { IndexerEntry {
indexer: create_indexer(block_size, self.num_threads), indexer: create_indexer(block_size, self.num_threads),
block_size, block_size,
...@@ -135,14 +136,14 @@ impl WorkerRegistry { ...@@ -135,14 +136,14 @@ impl WorkerRegistry {
// Check for duplicate and insert replay endpoint while holding the lock briefly. // Check for duplicate and insert replay endpoint while holding the lock briefly.
{ {
let mut entry = self let mut entry = self.workers.entry(instance_id).or_insert_with(|| {
.workers super::metrics::inc_workers();
.entry(instance_id) WorkerEntry {
.or_insert_with(|| WorkerEntry {
endpoints: HashMap::new(), endpoints: HashMap::new(),
replay_endpoints: HashMap::new(), replay_endpoints: HashMap::new(),
cancels: HashMap::new(), cancels: HashMap::new(),
}); }
});
if entry.endpoints.contains_key(&dp_rank) { if entry.endpoints.contains_key(&dp_rank) {
bail!("instance {instance_id} dp_rank {dp_rank} already registered"); bail!("instance {instance_id} dp_rank {dp_rank} already registered");
...@@ -211,6 +212,8 @@ impl WorkerRegistry { ...@@ -211,6 +212,8 @@ impl WorkerRegistry {
.remove(&instance_id) .remove(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?; .ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
super::metrics::dec_workers();
for cancel in entry.cancels.values() { for cancel in entry.cancels.values() {
cancel.cancel(); cancel.cancel();
} }
...@@ -288,6 +291,8 @@ impl WorkerRegistry { ...@@ -288,6 +291,8 @@ impl WorkerRegistry {
.remove(&instance_id) .remove(&instance_id)
.ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?; .ok_or_else(|| anyhow::anyhow!("instance {instance_id} not found"))?;
super::metrics::dec_workers();
for cancel in entry.cancels.values() { for cancel in entry.cancels.values() {
cancel.cancel(); cancel.cancel();
} }
......
...@@ -9,6 +9,8 @@ use axum::http::StatusCode; ...@@ -9,6 +9,8 @@ use axum::http::StatusCode;
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::{Json, Router}; use axum::{Json, Router};
#[cfg(feature = "metrics")]
use prometheus::Encoder;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq}; use crate::protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
...@@ -17,6 +19,8 @@ use super::registry::{IndexerKey, WorkerRegistry}; ...@@ -17,6 +19,8 @@ use super::registry::{IndexerKey, WorkerRegistry};
pub struct AppState { pub struct AppState {
pub registry: WorkerRegistry, pub registry: WorkerRegistry,
#[cfg(feature = "metrics")]
pub prom_registry: prometheus::Registry,
} }
fn default_tenant() -> String { fn default_tenant() -> String {
...@@ -363,6 +367,27 @@ async fn dump_events(State(state): State<Arc<AppState>>) -> impl IntoResponse { ...@@ -363,6 +367,27 @@ async fn dump_events(State(state): State<Arc<AppState>>) -> impl IntoResponse {
(StatusCode::OK, Json(serde_json::json!(result))) (StatusCode::OK, Json(serde_json::json!(result)))
} }
async fn handle_health() -> StatusCode {
StatusCode::OK
}
#[cfg(feature = "metrics")]
async fn handle_metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let encoder = prometheus::TextEncoder::new();
let mut buf = Vec::new();
encoder
.encode(&state.prom_registry.gather(), &mut buf)
.unwrap();
(
StatusCode::OK,
[(
axum::http::header::CONTENT_TYPE,
prometheus::TEXT_FORMAT.to_string(),
)],
buf,
)
}
pub fn create_router(state: Arc<AppState>) -> Router { pub fn create_router(state: Arc<AppState>) -> Router {
let router = Router::new() let router = Router::new()
.route("/register", post(register)) .route("/register", post(register))
...@@ -373,12 +398,27 @@ pub fn create_router(state: Arc<AppState>) -> Router { ...@@ -373,12 +398,27 @@ pub fn create_router(state: Arc<AppState>) -> Router {
.route("/dump", get(dump_events)) .route("/dump", get(dump_events))
.route("/register_peer", post(register_peer)) .route("/register_peer", post(register_peer))
.route("/deregister_peer", post(deregister_peer)) .route("/deregister_peer", post(deregister_peer))
.route("/peers", get(list_peers)); .route("/peers", get(list_peers))
.route("/health", get(handle_health));
#[cfg(feature = "test-endpoints")] #[cfg(feature = "test-endpoints")]
let router = router let router = router
.route("/test/pause_listener", post(test_pause_listener)) .route("/test/pause_listener", post(test_pause_listener))
.route("/test/resume_listener", post(test_resume_listener)); .route("/test/resume_listener", post(test_resume_listener));
router.with_state(state) let router = router.with_state(state.clone());
#[cfg(feature = "metrics")]
let router = {
let metrics_route = Router::new()
.route("/metrics", get(handle_metrics))
.with_state(state);
router
.layer(axum::middleware::from_fn(
super::metrics::metrics_middleware,
))
.merge(metrics_route)
};
router
} }
...@@ -74,6 +74,9 @@ pub mod name_prefix { ...@@ -74,6 +74,9 @@ pub mod name_prefix {
/// Prefix for tokio runtime metrics /// Prefix for tokio runtime metrics
pub const TOKIO: &str = "dynamo_tokio"; pub const TOKIO: &str = "dynamo_tokio";
/// Prefix for standalone KV indexer metrics
pub const KVINDEXER: &str = "dynamo_kvindexer";
} }
/// Automatically inserted Prometheus label names used across the metrics system /// Automatically inserted Prometheus label names used across the metrics system
...@@ -519,6 +522,24 @@ pub mod tokio_perf { ...@@ -519,6 +522,24 @@ pub mod tokio_perf {
pub const ALIVE_TASKS: &str = "alive_tasks"; pub const ALIVE_TASKS: &str = "alive_tasks";
} }
/// Standalone KV indexer HTTP service metrics
pub mod kvindexer {
/// HTTP request latency
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
/// Total HTTP requests
pub const REQUESTS_TOTAL: &str = "requests_total";
/// HTTP error responses (4xx/5xx)
pub const ERRORS_TOTAL: &str = "errors_total";
/// Number of active model+tenant indexers
pub const MODELS: &str = "models";
/// Number of registered worker instances
pub const WORKERS: &str = "workers";
}
// KvRouter (including KvIndexer) Prometheus metric names // KvRouter (including KvIndexer) Prometheus metric names
pub mod kvrouter { pub mod kvrouter {
/// Number of KV cache events applied to the index (including status) /// Number of KV cache events applied to the index (including status)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment