Unverified Commit 615580d8 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: Base metrics: add generic ingress handler metrics (#2090)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent e2a514b2
...@@ -19,6 +19,8 @@ graph TD ...@@ -19,6 +19,8 @@ graph TD
PROMETHEUS -->|:9401/metrics| DCGM_EXPORTER[dcgm-exporter :9401] PROMETHEUS -->|:9401/metrics| DCGM_EXPORTER[dcgm-exporter :9401]
PROMETHEUS -->|:7777/metrics| NATS_PROM_EXP PROMETHEUS -->|:7777/metrics| NATS_PROM_EXP
PROMETHEUS -->|:8080/metrics| DYNAMOFE[Dynamo HTTP FE :8080] PROMETHEUS -->|:8080/metrics| DYNAMOFE[Dynamo HTTP FE :8080]
PROMETHEUS -->|:8081/metrics| DYNAMOBACKEND[Dynamo backend :8081]
DYNAMOFE --> DYNAMOBACKEND
GRAFANA -->|:9090/query API| PROMETHEUS GRAFANA -->|:9090/query API| PROMETHEUS
end end
``` ```
...@@ -34,12 +36,14 @@ As of Q2 2025, Dynamo HTTP Frontend metrics are exposed when you build container ...@@ -34,12 +36,14 @@ As of Q2 2025, Dynamo HTTP Frontend metrics are exposed when you build container
2. Start Dynamo dependencies. Assume you're at the root dynamo path: 2. Start Dynamo dependencies. Assume you're at the root dynamo path:
```bash ```bash
docker compose -f deploy/docker-compose.yml up -d # Minimum components for Dynamo: etcd/nats/dcgm-exporter # Start the basic services (etcd & natsd), along with Prometheus and Grafana
# or docker compose -f deploy/docker-compose.yml --profile metrics up -d
docker compose -f deploy/docker-compose.yml --profile metrics up -d # In addition to the above, start Prometheus & Grafana
# Minimum components for Dynamo: etcd/nats/dcgm-exporter
docker compose -f deploy/docker-compose.yml up -d
``` ```
To target specific GPU(s), export the variable below before running Docker Compose: Optional: To target specific GPU(s), export the variable below before running Docker Compose
```bash ```bash
export CUDA_VISIBLE_DEVICES=0,2 export CUDA_VISIBLE_DEVICES=0,2
``` ```
...@@ -63,9 +67,15 @@ As of Q2 2025, Dynamo HTTP Frontend metrics are exposed when you build container ...@@ -63,9 +67,15 @@ As of Q2 2025, Dynamo HTTP Frontend metrics are exposed when you build container
### Prometheus ### Prometheus
The Prometheus configuration is defined in [prometheus.yml](./prometheus.yml). It is configured to scrape metrics from the metrics aggregation service endpoint. The Prometheus configuration is specified in [prometheus.yml](./prometheus.yml). This file is set up to collect metrics from the metrics aggregation service endpoint.
Please be aware that you might need to modify the target settings to align with your specific host configuration and network environment.
After making changes to prometheus.yml, it is necessary to reload the configuration using the command below. Simply sending a kill -HUP signal will not suffice due to the caching of the volume that contains the prometheus.yml file.
Note: You may need to adjust the target based on your host configuration and network setup. ```
docker compose -f deploy/docker-compose.yml up prometheus -d --force-recreate
```
### Grafana ### Grafana
...@@ -82,11 +92,13 @@ The following configuration files should be present in this directory: ...@@ -82,11 +92,13 @@ The following configuration files should be present in this directory:
- [grafana-datasources.yml](./grafana-datasources.yml): Contains Grafana datasource configuration - [grafana-datasources.yml](./grafana-datasources.yml): Contains Grafana datasource configuration
- [grafana_dashboards/grafana-dashboard-providers.yml](./grafana_dashboards/grafana-dashboard-providers.yml): Contains Grafana dashboard provider configuration - [grafana_dashboards/grafana-dashboard-providers.yml](./grafana_dashboards/grafana-dashboard-providers.yml): Contains Grafana dashboard provider configuration
- [grafana_dashboards/grafana-dynamo-dashboard.json](./grafana_dashboards/grafana-dynamo-dashboard.json): A general Dynamo Dashboard for both SW and HW metrics. - [grafana_dashboards/grafana-dynamo-dashboard.json](./grafana_dashboards/grafana-dynamo-dashboard.json): A general Dynamo Dashboard for both SW and HW metrics.
- [grafana_dashboards/grafana-llm-metrics.json](./grafana_dashboards/grafana-llm-metrics.json): Contains Grafana dashboard configuration for LLM specific metrics.
- [grafana_dashboards/grafana-dcgm-metrics.json](./grafana_dashboards/grafana-dcgm-metrics.json): Contains Grafana dashboard configuration for DCGM GPU metrics - [grafana_dashboards/grafana-dcgm-metrics.json](./grafana_dashboards/grafana-dcgm-metrics.json): Contains Grafana dashboard configuration for DCGM GPU metrics
- [grafana_dashboards/grafana-llm-metrics.json](./grafana_dashboards/grafana-llm-metrics.json): This file, which is being phased out, contains the Grafana dashboard configuration for LLM-specific metrics. It requires an additional `metrics` component to operate concurrently. A new version is under development.
## Running the example `metrics` component ## Running the example `metrics` component
IMPORTANT: This section is being phased out, and some metrics may not function as expected. A new solution is under development.
When you run the example [components/metrics](../../components/metrics/README.md) component, it exposes a Prometheus /metrics endpoint with the followings (defined in [../../components/metrics/src/lib.rs](../../components/metrics/src/lib.rs)): When you run the example [components/metrics](../../components/metrics/README.md) component, it exposes a Prometheus /metrics endpoint with the followings (defined in [../../components/metrics/src/lib.rs](../../components/metrics/src/lib.rs)):
- `llm_requests_active_slots`: Number of currently active request slots per worker - `llm_requests_active_slots`: Number of currently active request slots per worker
- `llm_requests_total_slots`: Total available request slots per worker - `llm_requests_total_slots`: Total available request slots per worker
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
"editable": true, "editable": true,
"fiscalYearStartMonth": 0, "fiscalYearStartMonth": 0,
"graphTooltip": 0, "graphTooltip": 0,
"id": 1, "id": 4,
"links": [], "links": [],
"panels": [ "panels": [
{ {
...@@ -112,7 +112,7 @@ ...@@ -112,7 +112,7 @@
"refId": "A" "refId": "A"
} }
], ],
"title": "Requests / Sec", "title": "Frontend Requests / Sec",
"type": "timeseries" "type": "timeseries"
}, },
{ {
...@@ -205,7 +205,7 @@ ...@@ -205,7 +205,7 @@
"refId": "A" "refId": "A"
} }
], ],
"title": "Avg Time to First Token", "title": "Frontend Avg Time to First Token",
"type": "timeseries" "type": "timeseries"
}, },
{ {
...@@ -298,7 +298,7 @@ ...@@ -298,7 +298,7 @@
"refId": "A" "refId": "A"
} }
], ],
"title": "Avg Inter-Token Latency", "title": "Frontend Avg Inter-Token Latency",
"type": "timeseries" "type": "timeseries"
}, },
{ {
...@@ -391,7 +391,7 @@ ...@@ -391,7 +391,7 @@
"refId": "A" "refId": "A"
} }
], ],
"title": "Avg Request Duration", "title": "Frontend Avg Request Duration",
"type": "timeseries" "type": "timeseries"
}, },
{ {
...@@ -497,7 +497,7 @@ ...@@ -497,7 +497,7 @@
"refId": "B" "refId": "B"
} }
], ],
"title": "Avg Input/Output Sequence Length", "title": "Frontend Avg Input/Output Sequence Length",
"type": "timeseries" "type": "timeseries"
}, },
{ {
...@@ -611,17 +611,406 @@ ...@@ -611,17 +611,406 @@
], ],
"title": "DCGM GPU Utilization", "title": "DCGM GPU Utilization",
"type": "timeseries" "type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 0,
"y": 16
},
"id": 19,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"editorMode": "code",
"expr": "rate(dynamo_response_bytes_total{endpoint=\"generate\"}[1m])",
"legendFormat": "Response bytes",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"editorMode": "code",
"expr": "rate(dynamo_request_bytes_total{endpoint=\"generate\"}[1m])",
"hide": false,
"instant": false,
"legendFormat": "Request bytes",
"range": true,
"refId": "B"
}
],
"title": "dynamo.vllm bytes / sec",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 6,
"y": 16
},
"id": 18,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"editorMode": "code",
"expr": "rate(dynamo_requests_total{endpoint=\"generate\"}[1m])",
"legendFormat": "__auto",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"editorMode": "code",
"expr": "",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "B"
}
],
"title": "dynamo.vllm requests / sec",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 12,
"y": 16
},
"id": 20,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"editorMode": "code",
"expr": "dynamo_request_duration_seconds_sum / dynamo_request_duration_seconds_count",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "dynamo.vllm Avg Request Duration (seconds)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 6,
"x": 18,
"y": 16
},
"id": 21,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"editorMode": "code",
"expr": "rate(dynamo_errors_total{endpoint=\"generate\"}[1m])",
"legendFormat": "{{error_type}}",
"range": true,
"refId": "A"
}
],
"title": "dynamo.vllm Avg Errors / sec",
"type": "timeseries"
} }
], ],
"preload": false, "preload": false,
"refresh": "", "refresh": "",
"schemaVersion": 41, "schemaVersion": 41,
"tags": [ "tags": [],
"Dynamo",
"DCGM",
"etcd",
"NATS"
],
"templating": { "templating": {
"list": [] "list": []
}, },
...@@ -632,6 +1021,6 @@ ...@@ -632,6 +1021,6 @@
"timepicker": {}, "timepicker": {},
"timezone": "browser", "timezone": "browser",
"title": "Dynamo Dashboard", "title": "Dynamo Dashboard",
"uid": "a7d3733f-f8e7-423a-ab4b-b18e3d7d0357", "uid": "97ae8df9-138a-4f7a-9b0f-635b77d818fe",
"version": 5 "version": 1
} }
\ No newline at end of file
...@@ -34,11 +34,18 @@ scrape_configs: ...@@ -34,11 +34,18 @@ scrape_configs:
- targets: ['dcgm-exporter:9401'] # on the "monitoring" network - targets: ['dcgm-exporter:9401'] # on the "monitoring" network
# This is a demo service that needs to be launched manually. See components/metrics/README.md # This is a demo service that needs to be launched manually. See components/metrics/README.md
# Note that you may need to disable the firewall on your host. On Ubuntu: sudo ufw allow 8000/tcp # Note that you may need to disable the firewall on your host. On Ubuntu: sudo ufw allow 8080/tcp
- job_name: 'dynamo-backend' # You can also force the port, if the default is different: python -m dynamo.frontend --http-port 8080
- job_name: 'dynamo-frontend'
scrape_interval: 10s scrape_interval: 10s
static_configs: static_configs:
- targets: ['host.docker.internal:8000'] # on the "monitoring" network - targets: ['host.docker.internal:8080'] # on the "monitoring" network
# Launch via: DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 dynamo.<backend> ...
- job_name: 'dynamo-backend'
scrape_interval: 6s
static_configs:
- targets: ['host.docker.internal:8081']
# This is another demo aggregator that needs to be launched manually. See components/metrics/README.md # This is another demo aggregator that needs to be launched manually. See components/metrics/README.md
# Note that you may need to disable the firewall on your host. On Ubuntu: sudo ufw allow 9091/tcp # Note that you may need to disable the firewall on your host. On Ubuntu: sudo ufw allow 9091/tcp
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Multimodal Deployment Examples
This directory provides example workflows and reference implementations for deploying a multimodal model using Dynamo.
## Use the Latest Release
We recommend using the latest stable release of dynamo to avoid breaking changes:
[![GitHub Release](https://img.shields.io/github/v/release/ai-dynamo/dynamo)](https://github.com/ai-dynamo/dynamo/releases/latest)
You can find the latest release [here](https://github.com/ai-dynamo/dynamo/releases/latest) and check out the corresponding branch with:
```bash
git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
```
## Multimodal Aggregated Serving
### Components
- workers: For aggregated serving, we have two workers, [encode_worker](components/encode_worker.py) for encoding and [decode_worker](components/decode_worker.py) for prefilling and decoding.
- processor: Tokenizes the prompt and passes it to the decode worker.
- frontend: HTTP endpoint to handle incoming requests.
### Graph
In this graph, we have two workers, [encode_worker](components/encode_worker.py) and [decode_worker](components/decode_worker.py).
The encode worker is responsible for encoding the image and passing the embeddings to the decode worker via a combination of NATS and RDMA.
The work complete event is sent via NATS, while the embeddings tensor is transferred via RDMA through the NIXL interface.
Its decode worker then prefills and decodes the prompt, just like the [LLM aggregated serving](../llm/README.md) example.
By separating the encode from the prefill and decode stages, we can have a more flexible deployment and scale the
encode worker independently from the prefill and decode workers if needed.
This figure shows the flow of the graph:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --> decode_worker
decode_worker --> processor
decode_worker --image_url--> encode_worker
encode_worker --embeddings--> decode_worker
```
```bash
cd $DYNAMO_HOME/examples/multimodal
# Serve a LLaVA 1.5 7B model:
dynamo serve graphs.agg:Frontend -f ./configs/agg-llava.yaml
# Serve a Qwen2.5-VL model:
# dynamo serve graphs.agg:Frontend -f ./configs/agg-qwen.yaml
# Serve a Phi3V model:
# dynamo serve graphs.agg:Frontend -f ./configs/agg-phi3v.yaml
```
### Client
In another terminal:
```bash
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "llava-hf/llava-1.5-7b-hf",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "What is in this image?"
},
{
"type": "image_url",
"image_url": {
"url": "http://images.cocodataset.org/test2017/000000155781.jpg"
}
}
]
}
],
"max_tokens": 300,
"temperature": 0.0,
"stream": false
}'
```
If serving the example Qwen model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"Qwen/Qwen2.5-VL-7B-Instruct"`. If serving the example Phi3V model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"microsoft/Phi-3.5-vision-instruct"`.
You should see a response similar to this:
```json
{"id": "c37b946e-9e58-4d54-88c8-2dbd92c47b0c", "object": "chat.completion", "created": 1747725277, "model": "llava-hf/llava-1.5-7b-hf", "choices": [{"index": 0, "message": {"role": "assistant", "content": " In the image, there is a city bus parked on a street, with a street sign nearby on the right side. The bus appears to be stopped out of service. The setting is in a foggy city, giving it a slightly moody atmosphere."}, "finish_reason": "stop"}]}
```
## Multimodal Disaggregated Serving
### Components
- workers: For disaggregated serving, we have three workers, [encode_worker](components/encode_worker.py) for encoding, [decode_worker](components/decode_worker.py) for decoding, and [prefill_worker](components/prefill_worker.py) for prefilling.
- processor: Tokenizes the prompt and passes it to the decode worker.
- frontend: HTTP endpoint to handle incoming requests.
### Graph
In this graph, we have three workers, [encode_worker](components/encode_worker.py), [decode_worker](components/decode_worker.py), and [prefill_worker](components/prefill_worker.py).
For the Llava model, embeddings are only required during the prefill stage. As such, the encode worker is connected directly to the prefill worker.
The encode worker is responsible for encoding the image and passing the embeddings to the prefill worker via a combination of NATS and RDMA.
Its work complete event is sent via NATS, while the embeddings tensor is transferred via RDMA through the NIXL interface.
The prefill worker performs the prefilling step and forwards the KV cache to the decode worker for decoding.
For more details on the roles of the prefill and decode workers, refer to the [LLM disaggregated serving](../llm/README.md) example.
This figure shows the flow of the graph:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --> decode_worker
decode_worker --> processor
decode_worker --> prefill_worker
prefill_worker --> decode_worker
prefill_worker --image_url--> encode_worker
encode_worker --embeddings--> prefill_worker
```
```bash
cd $DYNAMO_HOME/examples/multimodal
dynamo serve graphs.disagg:Frontend -f configs/disagg.yaml
```
### Client
In another terminal:
```bash
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "llava-hf/llava-1.5-7b-hf",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "What is in this image?"
},
{
"type": "image_url",
"image_url": {
"url": "http://images.cocodataset.org/test2017/000000155781.jpg"
}
}
]
}
],
"max_tokens": 300,
"temperature": 0.0,
"stream": false
}'
```
You should see a response similar to this:
```json
{"id": "c1774d61-3299-4aa3-bea1-a0af6c055ba8", "object": "chat.completion", "created": 1747725645, "model": "llava-hf/llava-1.5-7b-hf", "choices": [{"index": 0, "message": {"role": "assistant", "content": " This image shows a passenger bus traveling down the road near power lines and trees. The bus displays a sign that says \"OUT OF SERVICE\" on its front."}, "finish_reason": "stop"}]}
```
***Note***: disaggregation is currently only confirmed to work with LLaVA. Qwen VL and PhiV are not confirmed to be supported.
## Deployment with Dynamo Operator
These multimodal examples can be deployed to a Kubernetes cluster using [Dynamo Cloud](../../docs/guides/dynamo_deploy/dynamo_cloud.md) and the Dynamo CLI.
### Prerequisites
You must have first followed the instructions in [deploy/cloud/helm/README.md](../../deploy/cloud/helm/README.md) to install Dynamo Cloud on your Kubernetes cluster.
**Note**: The `KUBE_NS` variable in the following steps must match the Kubernetes namespace where you installed Dynamo Cloud. You must also expose the `dynamo-store` service externally. This will be the endpoint the CLI uses to interface with Dynamo Cloud.
### Deployment Steps
For detailed deployment instructions, please refer to the [Operator Deployment Guide](../../docs/guides/dynamo_deploy/operator_deployment.md). The following are the specific commands for the multimodal examples:
```bash
# Set your project root directory
export PROJECT_ROOT=$(pwd)
# Configure environment variables (see operator_deployment.md for details)
export KUBE_NS=dynamo-cloud
export DYNAMO_CLOUD=http://localhost:8080 # If using port-forward
# OR
# export DYNAMO_CLOUD=https://dynamo-cloud.nvidia.com # If using Ingress/VirtualService
# Build the Dynamo base image (see operator_deployment.md for details)
export DYNAMO_IMAGE=<your-registry>/<your-image-name>:<your-tag>
# TODO: Apply Dynamo graph deployment for the example
```
**Note**: To avoid rate limiting from unauthenticated requests to HuggingFace (HF), you can provide your `HF_TOKEN` as a secret in your deployment. See the [operator deployment guide](../../docs/guides/dynamo_deploy/operator_deployment.md#referencing-secrets-in-your-deployment) for instructions on referencing secrets like `HF_TOKEN` in your deployment configuration.
**Note**: Optionally add `--Planner.no-operation=false` at the end of the deployment command to enable the planner component to take scaling actions on your deployment.
### Testing the Deployment
Once the deployment is complete, you can test it. If you have ingress available for your deployment, you can directly call the url returned
in `dynamo deployment get ${DEPLOYMENT_NAME}` and skip the steps to find and forward the frontend pod.
```bash
# Find your frontend pod
export FRONTEND_POD=$(kubectl get pods -n ${KUBE_NS} | grep "${DEPLOYMENT_NAME}-frontend" | sort -k1 | tail -n1 | awk '{print $1}')
# Forward the pod's port to localhost
kubectl port-forward pod/$FRONTEND_POD 8080:8080 -n ${KUBE_NS}
# Test the API endpoint
curl localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "llava-hf/llava-1.5-7b-hf",
"messages": [
{
"role": "user",
"content": [
{ "type": "text", "text": "What is in this image?" },
{ "type": "image_url", "image_url": { "url": "http://images.cocodataset.org/test2017/000000155781.jpg" } }
]
}
],
"max_tokens": 300,
"temperature": 0.0,
"stream": false
}'
```
If serving the example Qwen model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"Qwen/Qwen2.5-VL-7B-Instruct"`. If serving the example Phi3V model, replace `"llava-hf/llava-1.5-7b-hf"` in the `"model"` field with `"microsoft/Phi-3.5-vision-instruct"`.
For more details on managing deployments, testing, and troubleshooting, please refer to the [Operator Deployment Guide](../../docs/guides/dynamo_deploy/operator_deployment.md).
## Multimodal Aggregated Video Serving
This example demonstrates deploying an aggregated multimodal model that can process video inputs.
### Components
- workers: For video serving, we have two workers, [video_encode_worker](components/video_encode_worker.py) for decoding video into frames, and [video_decode_worker](components/video_decode_worker.py) for prefilling and decoding.
- processor: Tokenizes the prompt and passes it to the decode worker.
- frontend: HTTP endpoint to handle incoming requests.
### Graph
In this graph, we have two workers, `video_encode_worker` and `video_decode_worker`.
The `video_encode_worker` is responsible for decoding the video into a series of frames. Unlike the image pipeline which generates embeddings, this pipeline passes the raw frames directly to the `video_decode_worker`. This transfer is done efficiently using RDMA.
The `video_decode_worker` then receives these frames, and performs prefill and decode steps with the model. Separating the video processing from the language model inference allows for flexible scaling.
This figure shows the flow of the graph:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --> video_decode_worker
video_decode_worker --> processor
video_decode_worker --video_url--> video_encode_worker
video_encode_worker --frames--> video_decode_worker
```
```bash
cd $DYNAMO_HOME/examples/multimodal
# Serve a LLaVA-NeXT-Video-7B model:
dynamo serve graphs.agg_video:Frontend -f ./configs/agg_video.yaml
```
### Client
In another terminal:
```bash
curl -X 'POST' 'http://localhost:8080/v1/chat/completions' -H 'Content-Type: application/json' -d '{
"model": "llava-hf/LLaVA-NeXT-Video-7B-hf",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Describe the video in detail"
},
{
"type": "video_url",
"video_url": {
"url": "https://storage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
}
}
]
}
],
"max_tokens": 300,
"stream": false
}' | jq
```
You should see a response describing the video's content similar to
```json
{
"id": "b5714626-5889-4bb7-8c51-f3bca65b4683",
"object": "chat.completion",
"created": 1749772533,
"model": "llava-hf/LLaVA-NeXT-Video-7B-hf",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": " Sure! The video features a group of anthropomorphic animals who appear human-like. They're out in a meadow, which is a large, open area covered in grasses, and have given human qualities like speaking and a desire to go on adventures. The animals are seen play-fighting with each other clearly seen glancing at the camera when they sense it, blinking, and Roman the second can be directly heard by the camera reciting the line, \"When the challenge becomes insane, the behavior becomes erratic.\" A white rabbit is the first in shot and he winks the left eye and flips the right ear before shaking with the mouse and squirrel friends on a blurry rock ledge under the sky. At some point, the rabbit turns towards the camera and starts playing with the thing, and there's a distant mountain in the background. Furthermore, a little animal from a tree in the background flies with two rocks, and it's joined by the rest of the group of friends. That outro is an elder turtle in the Ramden musical style saturated with a horn-like thing pattern."
},
"finish_reason": "stop"
}
]
}
```
## Multimodal Disaggregated Video Serving
This example demonstrates deploying a disaggregated multimodal model that can process video inputs.
### Dependency
Video example relies on `av` package for video preprocessing inside the encode_worker.
Please install `av` inside the dynamo container to enable video example.
`pip install av`
### Components
- workers: For disaggregated video serving, we have three workers, [video_encode_worker](components/video_encode_worker.py) for decoding video into frames, [video_decode_worker](components/video_decode_worker.py) for decoding, and [video_prefill_worker](components/video_prefill_worker.py) for prefilling.
- processor: Tokenizes the prompt and passes it to the decode worker.
- frontend: HTTP endpoint to handle incoming requests.
### Graph
In this graph, we have three workers, `video_encode_worker`, `video_decode_worker`, and `video_prefill_worker`.
For the LLaVA-NeXT-Video-7B model, frames are only required during the prefill stage. As such, the `video_encode_worker` is connected directly to the `video_prefill_worker`.
The `video_encode_worker` is responsible for decoding the video into a series of frames and passing them to the `video_prefill_worker` via RDMA.
The `video_prefill_worker` performs the prefilling step and forwards the KV cache to the `video_decode_worker` for decoding.
This figure shows the flow of the graph:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --> video_decode_worker
video_decode_worker --> processor
video_decode_worker --> video_prefill_worker
video_prefill_worker --> video_decode_worker
video_prefill_worker --video_url--> video_encode_worker
video_encode_worker --frames--> video_prefill_worker
```
```bash
cd $DYNAMO_HOME/examples/multimodal
# Serve a LLaVA-NeXT-Video-7B model:
dynamo serve graphs.disagg_video:Frontend -f ./configs/disagg_video.yaml
```
### Client
In another terminal:
```bash
curl -X 'POST' 'http://localhost:8080/v1/chat/completions' -H 'Content-Type: application/json' -d '{
"model": "llava-hf/LLaVA-NeXT-Video-7B-hf",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Describe the video in detail"
},
{
"type": "video_url",
"video_url": {
"url": "https://storage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
}
}
]
}
],
"max_tokens": 300,
"stream": false
}' | jq
```
You should see a response describing the video's content similar to
```json
{
"id": "d1d641b1-4daf-48d3-9d06-6a60743b5a42",
"object": "chat.completion",
"created": 1749775300,
"model": "llava-hf/LLaVA-NeXT-Video-7B-hf",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": " The video features two animals in a lush, green outdoor environment. On the ground, there is a white rabbit with big brown eyes, a playful expression, and two antlers. The rabbit is accompanied by a uniquely colored bird with orange pupils, possibly a squirrel or a hamster, sitting on its head. These two animals seem to have embarked on an unlikely journey, flying together in the sky. The backdrop showcases rolling green hills and trees under the pleasant weather. The sky is clear, indicating a beautiful day. The colors and contrast suggest the landscape is during spring or summer, signifying the rabbit and bird could also be engaging in outdoor activities during those seasons. Overall, it's a charming scene depicting an unlikely yet harmonious pair, enjoying a surprise adventure in nature."
},
"finish_reason": "stop"
}
]
}
```
## Deploying Multimodal Examples on Kubernetes
This guide will help you quickly deploy and clean up the multimodal example services in Kubernetes.
### Prerequisites
- **Dynamo Cloud** is already deployed in your target Kubernetes namespace.
- You have `kubectl` access to your cluster and the correct namespace set in `$NAMESPACE`.
### Create a secret with huggingface token
```bash
export HF_TOKEN="huggingfacehub token with read permission to models"
kubectl create secret generic hf-token-secret --from-literal=HF_TOKEN=$HF_TOKEN -n $KUBE_NS || true
```
---
Choose the example you want to deploy or delete. The YAML files are located in `examples/multimodal/deploy/k8s/`.
### Deploy the Multimodal Example
```bash
kubectl apply -f examples/multimodal/deploy/k8s/<Example yaml file> -n $NAMESPACE
```
### Uninstall the Multimodal Example
```bash
kubectl delete -f examples/multimodal/deploy/k8s/<Example yaml file> -n $NAMESPACE
```
### Using a different dynamo container
To customize the container image used in your deployment, you will need to update the manifest before applying it.
You can use [`yq`](https://github.com/mikefarah/yq?tab=readme-ov-file#install), a portable command-line YAML processor.
Please follow the [installation instructions](https://github.com/mikefarah/yq?tab=readme-ov-file#install) for your platform if you do not already have `yq` installed. After installing `yq`, you can generate and apply your manifest as follows:
```bash
export DYNAMO_IMAGE=my-registry/my-image:tag
yq '.spec.services.[].extraPodSpec.mainContainer.image = env(DYNAMO_IMAGE)' $EXAMPLE_FILE > my_example_manifest.yaml
# install the dynamo example
kubectl apply -f my_example_manifest.yaml -n $NAMESPACE
# uninstall the dynamo example
kubectl delete -f my_example_manifest.yaml -n $NAMESPACE
```
\ No newline at end of file
...@@ -995,8 +995,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -995,8 +995,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"js-sys",
"libc", "libc",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
] ]
[[package]] [[package]]
...@@ -1006,9 +1008,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -1006,9 +1008,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"js-sys",
"libc", "libc",
"r-efi", "r-efi",
"wasi 0.14.2+wasi-0.2.4", "wasi 0.14.2+wasi-0.2.4",
"wasm-bindgen",
] ]
[[package]] [[package]]
...@@ -1134,6 +1138,23 @@ dependencies = [ ...@@ -1134,6 +1138,23 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http",
"hyper",
"hyper-util",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots 1.0.2",
]
[[package]] [[package]]
name = "hyper-timeout" name = "hyper-timeout"
version = "0.5.2" version = "0.5.2"
...@@ -1149,17 +1170,21 @@ dependencies = [ ...@@ -1149,17 +1170,21 @@ dependencies = [
[[package]] [[package]]
name = "hyper-util" name = "hyper-util"
version = "0.1.11" version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e"
dependencies = [ dependencies = [
"base64",
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-core",
"futures-util", "futures-util",
"http", "http",
"http-body", "http-body",
"hyper", "hyper",
"ipnet",
"libc", "libc",
"percent-encoding",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2",
"tokio", "tokio",
...@@ -1371,6 +1396,22 @@ dependencies = [ ...@@ -1371,6 +1396,22 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "ipnet"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "iri-string"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2"
dependencies = [
"memchr",
"serde",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.14.0" version = "0.14.0"
...@@ -1478,6 +1519,12 @@ version = "0.4.27" ...@@ -1478,6 +1519,12 @@ version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.1.0" version = "0.1.0"
...@@ -1993,6 +2040,61 @@ dependencies = [ ...@@ -1993,6 +2040,61 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "quinn"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8"
dependencies = [
"bytes",
"cfg_aliases",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"socket2",
"thiserror 2.0.12",
"tokio",
"tracing",
"web-time",
]
[[package]]
name = "quinn-proto"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
dependencies = [
"bytes",
"getrandom 0.3.2",
"lru-slab",
"rand 0.9.1",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
"tinyvec",
"tracing",
"web-time",
]
[[package]]
name = "quinn-udp"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970"
dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2",
"tracing",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.40"
...@@ -2140,6 +2242,47 @@ version = "0.8.5" ...@@ -2140,6 +2242,47 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "reqwest"
version = "0.12.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
dependencies = [
"base64",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper-util",
"js-sys",
"log",
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tokio-rustls",
"tokio-util",
"tower 0.5.2",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 1.0.2",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.14" version = "0.17.14"
...@@ -2160,6 +2303,12 @@ version = "0.1.24" ...@@ -2160,6 +2303,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.4.1" version = "0.4.1"
...@@ -2224,6 +2373,9 @@ name = "rustls-pki-types" ...@@ -2224,6 +2373,9 @@ name = "rustls-pki-types"
version = "1.11.0" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
dependencies = [
"web-time",
]
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
...@@ -2542,6 +2694,9 @@ name = "sync_wrapper" ...@@ -2542,6 +2694,9 @@ name = "sync_wrapper"
version = "1.0.2" version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
dependencies = [
"futures-core",
]
[[package]] [[package]]
name = "synstructure" name = "synstructure"
...@@ -2574,9 +2729,12 @@ dependencies = [ ...@@ -2574,9 +2729,12 @@ dependencies = [
"dynamo-runtime", "dynamo-runtime",
"futures", "futures",
"prometheus", "prometheus",
"rand 0.9.1",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-test",
] ]
[[package]] [[package]]
...@@ -2691,6 +2849,21 @@ dependencies = [ ...@@ -2691,6 +2849,21 @@ dependencies = [
"zerovec", "zerovec",
] ]
[[package]]
name = "tinyvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.44.2" version = "1.44.2"
...@@ -2741,6 +2914,19 @@ dependencies = [ ...@@ -2741,6 +2914,19 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-test"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.15" version = "0.7.15"
...@@ -2772,7 +2958,7 @@ dependencies = [ ...@@ -2772,7 +2958,7 @@ dependencies = [
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-util", "tokio-util",
"webpki-roots", "webpki-roots 0.26.8",
] ]
[[package]] [[package]]
...@@ -2898,6 +3084,24 @@ dependencies = [ ...@@ -2898,6 +3084,24 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "tower-http"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [
"bitflags 2.9.0",
"bytes",
"futures-util",
"http",
"http-body",
"iri-string",
"pin-project-lite",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "tower-layer" name = "tower-layer"
version = "0.3.3" version = "0.3.3"
...@@ -3174,6 +3378,19 @@ dependencies = [ ...@@ -3174,6 +3378,19 @@ dependencies = [
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.100" version = "0.2.100"
...@@ -3206,6 +3423,39 @@ dependencies = [ ...@@ -3206,6 +3423,39 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "0.26.8" version = "0.26.8"
...@@ -3215,6 +3465,15 @@ dependencies = [ ...@@ -3215,6 +3465,15 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
] ]
[[package]]
name = "webpki-roots"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2"
dependencies = [
"rustls-pki-types",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.2.8" version = "0.2.8"
......
...@@ -33,4 +33,4 @@ repository = "https://github.com/ai-dynamo/dynamo.git" ...@@ -33,4 +33,4 @@ repository = "https://github.com/ai-dynamo/dynamo.git"
[workspace.dependencies] [workspace.dependencies]
# local or crates.io # local or crates.io
dynamo-runtime = { path = "../" } dynamo-runtime = { path = "../" }
prometheus = { workspace = true } prometheus = { version = "0.14" }
...@@ -22,6 +22,10 @@ license.workspace = true ...@@ -22,6 +22,10 @@ license.workspace = true
homepage.workspace = true homepage.workspace = true
repository.workspace = true repository.workspace = true
[features]
default = []
integration = [] # Integration tests that require NATS
[dependencies] [dependencies]
dynamo-runtime = { workspace = true } dynamo-runtime = { workspace = true }
...@@ -31,3 +35,13 @@ serde = { version = "1", features = ["derive"] } ...@@ -31,3 +35,13 @@ serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" } serde_json = { version = "1" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
prometheus = { version = "0.14" } prometheus = { version = "0.14" }
[dev-dependencies]
rand = { version = "0.9.0" }
reqwest = { version = "0.12.22", default-features = false, features = ["json", "stream", "rustls-tls"] }
tokio-test = "0.4.4"
[[test]]
name = "integration_test"
path = "tests/integration_test.rs"
required-features = ["integration"]
# System Metrics Example # Generic Profiling for Work Handlers
Demonstrates custom metrics and monitoring in Dynamo Runtime using Prometheus. This example demonstrates how to add automatic Prometheus metrics profiling to any work handler without modifying the handler code itself.
## Overview ## Overview
- Automatic hierarchical labeling: Runtime automatically adds `namespace``component``endpoint` labels The `WorkHandlerMetrics` system provides automatic profiling capabilities that are applied to all work handlers automatically. It automatically tracks:
- Uses existing Prometheus implementations
- HTTP metrics endpoint automatically added
## Quick Start - **Request Count**: Total number of requests processed
- **Request Duration**: Time spent processing each request
- **Request/Response Bytes**: Total bytes received and sent
- **Error Count**: Total number of errors encountered
### Build Additionally, the example demonstrates how to add custom metrics with data bytes tracking in `MySystemStatsMetrics`.
```bash
cd lib/runtime/examples/system_metrics ## How It Works
cargo build
**Automatic Metrics**: All work handlers automatically get profiling metrics without any code changes.
**Custom Metrics**: If you want to add custom metrics IN ADDITION to the automatic ones, you can use the `add_metrics` method:
```rust
use dynamo_runtime::pipeline::network::Ingress;
// Automatic profiling - no code changes needed!
let ingress = Ingress::for_engine(my_handler)?;
// Optional: Add custom metrics IN ADDITION to automatic ones
ingress.add_metrics(&endpoint)?;
``` ```
### Run Server The endpoint automatically provides proper labeling (namespace, component, endpoint) for all metrics.
```bash
export DYN_LOG=1 DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ## Available Methods
cargo run --bin system_server
The `Ingress` struct provides methods for metrics:
- **Automatic**: All handlers get profiling metrics automatically
- `Ingress::add_metrics(&endpoint)` - Add custom metrics IN ADDITION to automatic ones (optional)
## Metrics Generated
### Automatic Metrics (No Code Changes Required)
The following Prometheus metrics are automatically created for all work handlers:
### Counters
- `requests_total` - Total requests processed
- `request_bytes_total` - Total bytes received in requests
- `response_bytes_total` - Total bytes sent in responses
- `errors_total` - Total errors encountered (with error_type labels)
### Error Types
The `errors_total` metric includes the following error types:
- `deserialization` - Errors parsing request messages
- `invalid_message` - Unexpected message format
- `response_stream` - Errors creating response streams
- `generate` - Errors in request processing
- `publish_response` - Errors publishing response data
- `publish_final` - Errors publishing final response
### Histograms
- `request_duration_seconds` - Request processing time
### Gauges
- `concurrent_requests` - Number of requests currently being processed
### Custom Metrics (Optional)
- `my_custom_bytes_processed_total` - Total data bytes processed by system handler (example)
### Labels
All metrics automatically include these labels from the endpoint:
- `namespace` - The namespace name
- `component` - The component name
- `endpoint` - The endpoint name
## Example Metrics Output
When the system is running, you'll see metrics from the /metrics HTTP path like this:
```prometheus
# HELP concurrent_requests Number of requests currently being processed by work handler
# TYPE concurrent_requests gauge
concurrent_requests{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 0
# HELP my_custom_bytes_processed_total Example of a custom metric. Total number of data bytes processed by system handler
# TYPE my_custom_bytes_processed_total counter
my_custom_bytes_processed_total{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 42
# HELP request_bytes_total Total number of bytes received in requests by work handler
# TYPE request_bytes_total counter
request_bytes_total{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 1098
# HELP request_duration_seconds Time spent processing requests by work handler
# TYPE request_duration_seconds histogram
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.005"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.01"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.025"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.05"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.1"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.25"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="0.5"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="1"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="2.5"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="5"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="10"} 3
request_duration_seconds_bucket{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace",le="+Inf"} 3
request_duration_seconds_sum{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 0.00048793700000000003
request_duration_seconds_count{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 3
# HELP requests_total Total number of requests processed by work handler
# TYPE requests_total counter
requests_total{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 3
# HELP response_bytes_total Total number of bytes sent in responses by work handler
# TYPE response_bytes_total counter
response_bytes_total{component="dyn_example_component",endpoint="dyn_example_endpoint9881",namespace="dyn_example_namespace"} 1917
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge
uptime_seconds{namespace="http_server"} 1.8226759879999999
``` ```
### Run Client ## Examples
```bash
cargo run --bin system_client ### Example 1: Simple Handler with Automatic Profiling
```rust
struct SimpleHandler;
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for SimpleHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
// Your business logic here
// No need to add any metrics code!
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
// Automatic profiling - no additional code needed!
let ingress = Ingress::for_engine(SimpleHandler::new())?;
``` ```
Note: Running the client will increment `service_requests_total`. ### Example 2: Custom Handler with Data Bytes Tracking
### View Metrics ```rust
```bash struct RequestHandler {
curl http://localhost:8081/metrics metrics: Option<Arc<MySystemStatsMetrics>>,
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
// Track data bytes processed (custom metric)
if let Some(metrics) = &self.metrics {
metrics.data_bytes_processed.inc_by(data.len() as u64);
}
// Your business logic here...
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
// Create custom metrics and handler
let system_metrics = MySystemStatsMetrics::from_endpoint(&endpoint)?;
let handler = RequestHandler::with_metrics(system_metrics);
let ingress = Ingress::for_engine(handler)?;
// Add custom metrics IN ADDITION to automatic ones
// You'll get both: automatic metrics (requests_total, request_duration_seconds, etc.)
// AND custom metrics (my_custom_bytes_processed_total)
ingress.add_metrics(&endpoint)?;
``` ```
Example output: ## Benefits
1. **Zero Code Changes**: Existing handlers automatically get profiling metrics
2. **Simple API**: Just create an Ingress and you get metrics automatically
3. **Optional Custom Metrics**: Add custom metrics when needed
4. **Automatic Profiling**: Request count, duration, and error tracking out of the box
5. **Automatic Labeling**: Endpoint provides proper namespace/component/endpoint labels
6. **Performance**: Minimal overhead, metrics are only recorded when provided
## Running the Example
**Important**: You must set the `DYN_SYSTEM_PORT` environment variable to specify which port the HTTP server will run on.
```bash
# Run the system metrics example
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 cargo run --bin system_server
``` ```
# HELP service_request_duration_seconds Time spent processing requests The server will start an HTTP server on the specified port (8081 in this example) that exposes the Prometheus metrics endpoint at `/metrics`.
# TYPE service_request_duration_seconds histogram
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.005"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.01"} 2 To Run an actual LLM frontend + server (aggregated example), launch both of them. By default, the frontend listens to port 8080.
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.025"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.05"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.1"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.25"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="1"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="2.5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="10"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="+Inf"} 2
service_request_duration_seconds_sum{component="component",endpoint="endpoint",namespace="system",service="backend"} 0.000022239000000000002
service_request_duration_seconds_count{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
# HELP service_requests_total Total number of requests processed
# TYPE service_requests_total counter
service_requests_total{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge
uptime_seconds{namespace="http_server"} 725.997013676
``` ```
python -m dynamo.frontend &
## Configuration DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 python -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager --no-enable-prefix-caching &
```
Then make curl requests to the frontend (see the [main README](../../../../README.md))
| Variable | Description | Default | ## Querying Metrics
|----------|-------------|---------|
| `DYN_LOG` | Enable logging | `0` |
| `DYN_SYSTEM_ENABLED` | Enable system metrics | `false` |
| `DYN_SYSTEM_PORT` | HTTP server port | `8081` |
## Metrics Once running, you can query the metrics:
- `service_requests_total`: Request counter ```bash
- `service_request_duration_seconds`: Request duration histogram # Get all work handler metrics
- `uptime_seconds`: Server uptime gauge curl http://localhost:8081/metrics | grep -E "(requests_total|request_bytes_total|response_bytes_total|errors_total|request_duration_seconds|concurrent_requests)"
This provides automatic context and grouping for all metrics without manual configuration. # Get request count for specific endpoint
curl http://localhost:8081/metrics | grep 'requests_total{endpoint="dyn_example_endpoint"}'
## Troubleshooting # Get request duration histogram
curl http://localhost:8081/metrics | grep 'request_duration_seconds'
- **Port in use**: Change `DYN_SYSTEM_PORT` # Get custom system metrics
- **Connection refused**: Ensure server is running first curl http://localhost:8081/metrics | grep 'my_custom_bytes_processed_total'
- **No metrics**: Verify `DYN_SYSTEM_ENABLED=true` ```
\ No newline at end of file \ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
// limitations under the License. // limitations under the License.
use futures::StreamExt; use futures::StreamExt;
use system_metrics::DEFAULT_NAMESPACE; use system_metrics::{DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE};
use dynamo_runtime::{ use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration, logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
...@@ -31,9 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -31,9 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?; let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?; let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace.component("component")?; let component = namespace.component(DEFAULT_COMPONENT)?;
let client = component.endpoint("endpoint").client().await?; let client = component.endpoint(DEFAULT_ENDPOINT).client().await?;
client.wait_for_instances().await?; client.wait_for_instances().await?;
let router = let router =
......
...@@ -13,50 +13,8 @@ ...@@ -13,50 +13,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use system_metrics::{MyStats, DEFAULT_NAMESPACE}; use dynamo_runtime::{logging, DistributedRuntime, Result, Runtime, Worker};
use system_metrics::backend;
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
},
protocols::annotated::Annotated,
stream, DistributedRuntime, Result, Runtime, Worker,
};
use prometheus::{Counter, Histogram};
use std::sync::Arc;
/// Service metrics struct using the metric classes from metrics.rs
pub struct MySystemStatsMetrics {
pub request_counter: Arc<Counter>,
pub request_duration: Arc<Histogram>,
}
impl MySystemStatsMetrics {
/// Create a new ServiceMetrics instance using the metric backend
pub fn new<R: MetricsRegistry>(
metrics_registry: Arc<R>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let request_counter = metrics_registry.create_counter(
"service_requests_total",
"Total number of requests processed",
&[("service", "backend")],
)?;
let request_duration = metrics_registry.create_histogram(
"service_request_duration_seconds",
"Time spent processing requests",
&[("service", "backend")],
None,
)?;
Ok(Self {
request_counter,
request_duration,
})
}
}
fn main() -> Result<()> { fn main() -> Result<()> {
logging::init(); logging::init();
...@@ -66,74 +24,5 @@ fn main() -> Result<()> { ...@@ -66,74 +24,5 @@ fn main() -> Result<()> {
async fn app(runtime: Runtime) -> Result<()> { async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?; let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
backend(distributed).await backend(distributed, None).await
}
struct RequestHandler {
metrics: Arc<MySystemStatsMetrics>,
}
impl RequestHandler {
fn new(metrics: Arc<MySystemStatsMetrics>) -> Arc<Self> {
Arc::new(Self { metrics })
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let start_time = std::time::Instant::now();
// Record request start
self.metrics.request_counter.inc();
let (data, ctx) = input.into_parts();
let chars = data
.chars()
.map(|c| Annotated::from_data(c.to_string()))
.collect::<Vec<_>>();
let stream = stream::iter(chars);
// Record request duration
let duration = start_time.elapsed();
self.metrics
.request_duration
.observe(duration.as_secs_f64());
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
async fn backend(drt: DistributedRuntime) -> Result<()> {
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component("component")?
.service_builder()
.create()
.await?
.endpoint("endpoint");
// make the ingress discoverable via a component service
// we must first create a service, then we can attach one more more endpoints
// attach an ingress to an engine, with the RequestHandler using the metrics struct
let endpoint_metrics = Arc::new(
MySystemStatsMetrics::new(Arc::new(endpoint.clone()))
.map_err(|e| Error::msg(e.to_string()))?,
);
let ingress = Ingress::for_engine(RequestHandler::new(endpoint_metrics.clone()))?;
endpoint
.endpoint_builder()
.stats_handler(|_stats| {
println!("Stats handler called with stats: {:?}", _stats);
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await?;
Ok(())
} }
...@@ -13,12 +13,120 @@ ...@@ -13,12 +13,120 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use serde::{Deserialize, Serialize}; use dynamo_runtime::{
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
},
protocols::annotated::Annotated,
stream, DistributedRuntime, Result,
};
use prometheus::IntCounter;
use std::sync::Arc;
pub const DEFAULT_NAMESPACE: &str = "system"; pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
#[derive(Serialize, Deserialize)] /// Stats structure returned by the endpoint's stats handler
// Dummy Stats object to demonstrate how to attach a custom stats handler #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct MyStats { pub struct MyStats {
pub val: u32, // Example value for demonstration purposes
pub val: i32,
}
/// Custom metrics for system stats with data bytes tracking
#[derive(Clone, Debug)]
pub struct MySystemStatsMetrics {
pub data_bytes_processed: Arc<IntCounter>,
}
impl MySystemStatsMetrics {
pub fn from_endpoint(
endpoint: &dynamo_runtime::component::Endpoint,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let data_bytes_processed = endpoint.create_intcounter(
"my_custom_bytes_processed_total",
"Example of a custom metric. Total number of data bytes processed by system handler",
&[],
)?;
Ok(Self {
data_bytes_processed,
})
}
}
#[derive(Clone)]
pub struct RequestHandler {
metrics: Option<Arc<MySystemStatsMetrics>>,
}
impl RequestHandler {
pub fn new() -> Arc<Self> {
Arc::new(Self { metrics: None })
}
pub fn with_metrics(metrics: MySystemStatsMetrics) -> Arc<Self> {
Arc::new(Self {
metrics: Some(Arc::new(metrics)),
})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
// Track data bytes processed if metrics are available
if let Some(metrics) = &self.metrics {
metrics.data_bytes_processed.inc_by(data.len() as u64);
}
let chars = data
.chars()
.map(|c| Annotated::from_data(c.to_string()))
.collect::<Vec<_>>();
let stream = stream::iter(chars);
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
/// Backend function that sets up the system server with metrics and ingress handler
/// This function can be reused by integration tests to ensure they use the exact same setup
pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Result<()> {
let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT);
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component(DEFAULT_COMPONENT)?
.service_builder()
.create()
.await?
.endpoint(endpoint_name);
// Create custom metrics for system stats
let system_metrics =
MySystemStatsMetrics::from_endpoint(&endpoint).expect("Failed to create system metrics");
// Use the factory pattern - single line factory call with metrics
let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;
endpoint
.endpoint_builder()
.stats_handler(|_stats| {
println!("Stats handler called with stats: {:?}", _stats);
// TODO(keivenc): return a real stats object
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await?;
Ok(())
} }
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "integration")]
use dynamo_runtime::{
pipeline::PushRouter, protocols::annotated::Annotated, DistributedRuntime, Result, Runtime,
};
use futures::StreamExt;
use rand::Rng;
use reqwest;
use std::env;
use system_metrics::{backend, DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE};
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_backend_with_metrics() -> Result<()> {
// Set environment variables for dynamic port allocation
env::set_var("DYN_SYSTEM_ENABLED", "true");
env::set_var("DYN_SYSTEM_PORT", "0");
// Generate a random endpoint name to avoid collisions
let random_suffix = rand::rng().random_range(1000..9999);
let test_endpoint = format!("{}{}", DEFAULT_ENDPOINT, random_suffix);
// Initialize logging
dynamo_runtime::logging::init();
// Create a runtime and distributed runtime for the backend
let runtime = Runtime::from_current()?;
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
// Get the HTTP server info to find the actual port
let http_server_info = distributed.http_server_info();
let metrics_port = match http_server_info {
Some(info) => {
println!("HTTP server running on: {}", info.address());
info.port()
}
None => {
panic!("HTTP server not started - check DYN_SYSTEM_ENABLED environment variable");
}
};
// Start the backend in a separate task with custom endpoint
let test_endpoint_clone = test_endpoint.clone();
let backend_handle =
tokio::spawn(async move { backend(distributed, Some(&test_endpoint_clone)).await });
// Give the backend some time to start up
sleep(Duration::from_millis(1000)).await;
// Create a client runtime to connect to the backend
let client_runtime = Runtime::from_current()?;
let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
// Connect to the backend similar to system_client.rs
let namespace = client_distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace.component(DEFAULT_COMPONENT)?;
let client = component.endpoint(&test_endpoint).client().await?;
// Wait for backend instances to be available
client.wait_for_instances().await?;
// Create a router and send some requests to generate metrics
let router =
PushRouter::<String, Annotated<String>>::from_client(client, Default::default()).await?;
// Send a few test requests to generate metrics
for i in 0..3 {
let test_message = format!("test message {}", i);
let mut stream = router.random(test_message.clone().into()).await?;
// Process the response stream
while let Some(resp) = stream.next().await {
println!("Response {}: {:?}", i, resp);
}
// Small delay between requests
sleep(Duration::from_millis(100)).await;
}
// Give some time for metrics to be updated
sleep(Duration::from_millis(500)).await;
// Now fetch the HTTP metrics endpoint using the dynamic port
let metrics_url = format!("http://localhost:{}/metrics", metrics_port);
println!("Fetching metrics from: {}", metrics_url);
// Make HTTP request to get metrics
let client = reqwest::Client::new();
let response = client.get(&metrics_url).send().await;
match response {
Ok(response) => {
if response.status().is_success() {
let metrics_content = response
.text()
.await
.unwrap_or_else(|_| "Failed to read response body".to_string());
println!("=== METRICS CONTENT ===");
println!("{}", metrics_content);
println!("=== END METRICS CONTENT ===");
// Parse and verify ingress metrics are greater than 0 (except concurrent_requests)
verify_ingress_metrics_greater_than_0(&metrics_content);
println!("Successfully retrieved and verified metrics!");
} else {
println!("HTTP request failed with status: {}", response.status());
panic!("Failed to get metrics: HTTP {}", response.status());
}
}
Err(e) => {
println!("Failed to connect to metrics endpoint: {}", e);
panic!("Failed to connect to metrics endpoint: {}", e);
}
}
// Shutdown the runtime
client_runtime.shutdown();
// Cancel the backend task
backend_handle.abort();
Ok(())
}
fn verify_ingress_metrics_greater_than_0(metrics_content: &str) {
// Define the work handler metrics we want to verify (excluding concurrent_requests which can be 0)
let metrics_to_verify = [
"my_custom_bytes_processed_total",
"requests_total",
"request_bytes_total",
"response_bytes_total",
"request_duration_seconds_count",
"request_duration_seconds_sum",
];
for metric_name in &metrics_to_verify {
let line = metrics_content
.lines()
.find(|l| l.contains(metric_name) && !l.contains("#"))
.unwrap_or_else(|| panic!("{} metric not found", metric_name));
let value = extract_metric_value(line);
assert!(
value > 0.0,
"{} should be greater than 0, got: {}",
metric_name,
value
);
println!("{}: {}", metric_name, value);
}
println!("All work handler metrics verified successfully!");
}
fn extract_metric_value(line: &str) -> f64 {
// Extract the numeric value from a Prometheus metric line
// Format: metric_name{labels} value
line.split_whitespace()
.last()
.expect("Metric line should have a value")
.parse::<f64>()
.expect("Metric value should be a valid number")
}
...@@ -69,6 +69,9 @@ impl EndpointConfigBuilder { ...@@ -69,6 +69,9 @@ impl EndpointConfigBuilder {
// acquire the registry lock // acquire the registry lock
let registry = endpoint.drt().component_registry.inner.lock().await; let registry = endpoint.drt().component_registry.inner.lock().await;
// Add metrics to the handler. The endpoint provides additional information to the handler.
handler.add_metrics(&endpoint)?;
// get the group // get the group
let group = registry let group = registry
.services .services
......
...@@ -24,6 +24,7 @@ use crate::{ ...@@ -24,6 +24,7 @@ use crate::{
}; };
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK}; use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};
use std::sync::OnceLock;
use derive_getters::Dissolve; use derive_getters::Dissolve;
use figment::error; use figment::error;
...@@ -97,6 +98,7 @@ impl DistributedRuntime { ...@@ -97,6 +98,7 @@ impl DistributedRuntime {
etcd_client, etcd_client,
nats_client, nats_client,
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
http_server: Arc::new(OnceLock::new()),
component_registry: component::Registry::new(), component_registry: component::Registry::new(),
is_static, is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
...@@ -121,8 +123,18 @@ impl DistributedRuntime { ...@@ -121,8 +123,18 @@ impl DistributedRuntime {
) )
.await .await
{ {
Ok((addr, _)) => { Ok((addr, handle)) => {
tracing::info!("HTTP server started successfully on {}", addr); tracing::info!("HTTP server started successfully on {}", addr);
// Store HTTP server information
let http_server_info =
crate::http_server::HttpServerInfo::new(addr, Some(handle));
// Initialize the http_server field
distributed_runtime
.http_server
.set(Arc::new(http_server_info))
.expect("HTTP server info should only be set once");
} }
Err(e) => { Err(e) => {
tracing::error!("HTTP server startup failed: {}", e); tracing::error!("HTTP server startup failed: {}", e);
...@@ -210,6 +222,11 @@ impl DistributedRuntime { ...@@ -210,6 +222,11 @@ impl DistributedRuntime {
self.nats_client.clone() self.nats_client.clone()
} }
/// Get HTTP server information if available
pub fn http_server_info(&self) -> Option<Arc<crate::http_server::HttpServerInfo>> {
self.http_server.get().cloned()
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
pub fn etcd_client(&self) -> Option<etcd::Client> { pub fn etcd_client(&self) -> Option<etcd::Client> {
self.etcd_client.clone() self.etcd_client.clone()
......
...@@ -22,10 +22,47 @@ use std::collections::HashMap; ...@@ -22,10 +22,47 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock; use std::sync::OnceLock;
use std::time::Instant; use std::time::Instant;
use tokio::net::TcpListener; use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing; use tracing;
/// HTTP server information containing socket address and handle
#[derive(Debug)]
pub struct HttpServerInfo {
pub socket_addr: std::net::SocketAddr,
pub handle: Option<Arc<JoinHandle<()>>>,
}
impl HttpServerInfo {
pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
Self {
socket_addr,
handle: handle.map(Arc::new),
}
}
pub fn address(&self) -> String {
self.socket_addr.to_string()
}
pub fn hostname(&self) -> String {
self.socket_addr.ip().to_string()
}
pub fn port(&self) -> u16 {
self.socket_addr.port()
}
}
impl Clone for HttpServerInfo {
fn clone(&self) -> Self {
Self {
socket_addr: self.socket_addr,
handle: self.handle.clone(),
}
}
}
pub struct HttpMetricsRegistry { pub struct HttpMetricsRegistry {
pub drt: Arc<crate::DistributedRuntime>, pub drt: Arc<crate::DistributedRuntime>,
} }
...@@ -58,8 +95,10 @@ impl HttpServerState { ...@@ -58,8 +95,10 @@ impl HttpServerState {
/// Create new HTTP server state with the provided metrics registry /// Create new HTTP server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
let http_metrics_registry = Arc::new(HttpMetricsRegistry { drt: drt.clone() }); let http_metrics_registry = Arc::new(HttpMetricsRegistry { drt: drt.clone() });
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
// to maintain consistency with the project's metric naming convention
let uptime_gauge = http_metrics_registry.as_ref().create_gauge( let uptime_gauge = http_metrics_registry.as_ref().create_gauge(
"uptime_seconds", "dynamo_uptime_seconds",
"Total uptime of the DistributedRuntime in seconds", "Total uptime of the DistributedRuntime in seconds",
&[], &[],
)?; )?;
...@@ -293,9 +332,9 @@ mod tests { ...@@ -293,9 +332,9 @@ mod tests {
println!("Full metrics response:\n{}", response); println!("Full metrics response:\n{}", response);
let expected = "\ let expected = "\
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds # HELP dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge # TYPE dynamo_uptime_seconds gauge
uptime_seconds{namespace=\"http_server\"} 42 dynamo_uptime_seconds{namespace=\"http_server\"} 42
"; ";
assert_eq!(response, expected); assert_eq!(response, expected);
} }
...@@ -405,6 +444,9 @@ uptime_seconds{namespace=\"http_server\"} 42 ...@@ -405,6 +444,9 @@ uptime_seconds{namespace=\"http_server\"} 42
#[tokio::test] #[tokio::test]
async fn test_spawn_http_server_endpoints() { async fn test_spawn_http_server_endpoints() {
// use reqwest for HTTP requests // use reqwest for HTTP requests
temp_env::async_with_vars(
[("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
async {
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let (addr, server_handle) = let (addr, server_handle) =
...@@ -416,8 +458,8 @@ uptime_seconds{namespace=\"http_server\"} 42 ...@@ -416,8 +458,8 @@ uptime_seconds{namespace=\"http_server\"} 42
println!("[test] Server should be up, starting requests..."); println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new(); let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [ for (path, expect_200, expect_body) in [
("/health", true, "OK"), ("/health", true, "ready"),
("/live", true, "OK"), ("/live", true, "ready"),
("/someRandomPathNotFoundHere", false, "Route not found"), ("/someRandomPathNotFoundHere", false, "Route not found"),
] { ] {
println!("[test] Sending request to {}", path); println!("[test] Sending request to {}", path);
...@@ -452,6 +494,9 @@ uptime_seconds{namespace=\"http_server\"} 42 ...@@ -452,6 +494,9 @@ uptime_seconds{namespace=\"http_server\"} 42
} }
} }
} }
},
)
.await;
} }
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Weak}, sync::{Arc, OnceLock, Weak},
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
...@@ -37,6 +37,7 @@ pub mod component; ...@@ -37,6 +37,7 @@ pub mod component;
pub mod discovery; pub mod discovery;
pub mod engine; pub mod engine;
pub mod http_server; pub mod http_server;
pub use http_server::HttpServerInfo;
pub mod logging; pub mod logging;
pub mod metrics; pub mod metrics;
pub mod pipeline; pub mod pipeline;
...@@ -150,6 +151,7 @@ pub struct DistributedRuntime { ...@@ -150,6 +151,7 @@ pub struct DistributedRuntime {
etcd_client: Option<transports::etcd::Client>, etcd_client: Option<transports::etcd::Client>,
nats_client: transports::nats::Client, nats_client: transports::nats::Client,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
http_server: Arc<OnceLock<Arc<http_server::HttpServerInfo>>>,
// local registry for components // local registry for components
// the registry allows us to use share runtime resources across instances of the same component object. // the registry allows us to use share runtime resources across instances of the same component object.
......
...@@ -18,34 +18,53 @@ ...@@ -18,34 +18,53 @@
//! This module provides registry classes for Prometheus metrics //! This module provides registry classes for Prometheus metrics
//! that auto populates the labels with the namespace-component-endpoint hierarchy. //! that auto populates the labels with the namespace-component-endpoint hierarchy.
use once_cell::sync::Lazy;
use regex::Regex;
use std::any::Any; use std::any::Any;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
// This constant determines whether metric names should include the full hierarchy as a prefix.
// If set to true, a hierarchy like ["", "mynamespace", "mycomponent", "myendpoint"]
// results in a metric name of "mynamespace_mycomponent_myendpoint__myendpoint".
// If false, the metric name will be just "myendpoint".
// This setting is applied *universally* to ensure consistent naming conventions.
pub const USE_PREFIXED_METRIC_NAMES: bool = false;
// If set to true, then metrics will be labeled with the namespace, component, and endpoint. // If set to true, then metrics will be labeled with the namespace, component, and endpoint.
pub const USE_AUTO_LABELS: bool = true; pub const USE_AUTO_LABELS: bool = true;
// Prometheus imports // Prometheus imports
use prometheus::Encoder; use prometheus::Encoder;
fn build_metric_name(prefix: &str, metric_name: &str) -> String { fn build_metric_name(namespace: &str, metric_name: &str) -> String {
if !USE_PREFIXED_METRIC_NAMES { if !namespace.is_empty() {
return metric_name.to_string(); format!("{}_{}", namespace, metric_name)
} else {
metric_name.to_string()
} }
}
if prefix.is_empty() { /// Lints a metric name component by stripping off invalid characters and validating Prometheus naming pattern
metric_name.to_string() /// Prometheus doesn't provide a built-in function to validate metric names, but the specification requires
} else { /// names to follow the pattern [a-zA-Z_:][a-zA-Z0-9_:]*. This function implements that validation.
// Double underscore to separate between prefix and actual metric name /// Returns error if sanitized name doesn't follow the required pattern.
format!("{}__{}", prefix, metric_name) fn lint_prometheus_name(name: &str) -> anyhow::Result<String> {
if name.is_empty() {
return Ok("".to_string());
}
static INVALID_CHARS_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"[^a-zA-Z0-9_:]").unwrap());
static PROMETHEUS_NAME_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").unwrap());
// Remove all invalid characters (everything except alphanumeric, colons, and underscores)
let sanitized = INVALID_CHARS_PATTERN.replace_all(name, "").to_string();
// Check if the sanitized name follows Prometheus naming pattern
if !sanitized.is_empty() && !PROMETHEUS_NAME_PATTERN.is_match(&sanitized) {
return Err(anyhow::anyhow!(
"Sanitized name '{}' does not follow Prometheus naming pattern [a-zA-Z_:][a-zA-Z0-9_:]*",
sanitized
));
} }
Ok(sanitized)
} }
/// Trait that defines common behavior for Prometheus metric types /// Trait that defines common behavior for Prometheus metric types
...@@ -120,6 +139,21 @@ impl PrometheusMetric for prometheus::IntGaugeVec { ...@@ -120,6 +139,21 @@ impl PrometheusMetric for prometheus::IntGaugeVec {
} }
} }
impl PrometheusMetric for prometheus::IntCounterVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::IntCounterVec::new(opts, label_names)
}
}
// Implement the trait for Histogram // Implement the trait for Histogram
impl PrometheusMetric for prometheus::Histogram { impl PrometheusMetric for prometheus::Histogram {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> { fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
...@@ -167,9 +201,24 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -167,9 +201,24 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let mut seen_keys = std::collections::HashSet::new(); let mut seen_keys = std::collections::HashSet::new();
let basename = registry.basename(); let basename = registry.basename();
let metric_name = build_metric_name(&registry.prefix(), metric_name);
let parent_hierarchy = registry.parent_hierarchy(); let parent_hierarchy = registry.parent_hierarchy();
// Build hierarchy: parent_hierarchy + [basename]
let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
let namespace = if hierarchy.len() >= 2 {
let potential_namespace = &hierarchy[1];
if !potential_namespace.is_empty() {
lint_prometheus_name(potential_namespace)?
} else {
"".to_string()
}
} else {
"".to_string()
};
let metric_name = build_metric_name(&namespace, metric_name);
// Validate that user-provided labels don't have duplicate keys // Validate that user-provided labels don't have duplicate keys
for (key, _) in labels { for (key, _) in labels {
if !seen_keys.insert(*key) { if !seen_keys.insert(*key) {
...@@ -179,8 +228,6 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -179,8 +228,6 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
)); ));
} }
} }
let hierarchy = [parent_hierarchy, vec![basename]].concat();
// Build updated_labels: auto-labels first, then user labels // Build updated_labels: auto-labels first, then user labels
let mut updated_labels: Vec<(String, String)> = Vec::new(); let mut updated_labels: Vec<(String, String)> = Vec::new();
...@@ -195,23 +242,26 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -195,23 +242,26 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
} }
} }
// Add auto-generated labels // Add auto-generated labels with sanitized values
if hierarchy.len() > 1 {
let namespace = &hierarchy[1];
if !namespace.is_empty() { if !namespace.is_empty() {
updated_labels.push(("namespace".to_string(), namespace.clone())); updated_labels.push(("namespace".to_string(), namespace.to_string()));
}
} }
if hierarchy.len() > 2 { if hierarchy.len() > 2 {
let component = &hierarchy[2]; let component = &hierarchy[2];
if !component.is_empty() { if !component.is_empty() {
updated_labels.push(("component".to_string(), component.clone())); let valid_component = lint_prometheus_name(component)?;
if !valid_component.is_empty() {
updated_labels.push(("component".to_string(), valid_component));
}
} }
} }
if hierarchy.len() > 3 { if hierarchy.len() > 3 {
let endpoint = &hierarchy[3]; let endpoint = &hierarchy[3];
if !endpoint.is_empty() { if !endpoint.is_empty() {
updated_labels.push(("endpoint".to_string(), endpoint.clone())); let valid_endpoint = lint_prometheus_name(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push(("endpoint".to_string(), valid_endpoint));
}
} }
} }
} }
...@@ -267,6 +317,21 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -267,6 +317,21 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let label_names = const_labels let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?; .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)? T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
// Special handling for IntCounterVec with label names
// const_labels parameter is required for IntCounterVec
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntCounterVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else { } else {
// Standard handling for Counter, IntCounter, Gauge, IntGauge // Standard handling for Counter, IntCounter, Gauge, IntGauge
// buckets and const_labels parameters are not valid for these types // buckets and const_labels parameters are not valid for these types
...@@ -288,7 +353,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -288,7 +353,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
}; };
// Iterate over the DRT's registry and register this metric across all hierarchical levels. // Iterate over the DRT's registry and register this metric across all hierarchical levels.
// The prefixed_hierarchy is structured as: ["", "mynamespace", "mynamespace_mycomponent", "mynamespace_mycomponent_myendpoint"] // The prefixed_hierarchy is structured as: ["", "testnamespace", "testnamespace_testcomponent", "testnamespace_testcomponent_testendpoint"]
// This prefixing is essential to differentiate between the names of children and grandchildren. // This prefixing is essential to differentiate between the names of children and grandchildren.
let mut prometheus_registry = registry let mut prometheus_registry = registry
.drt() .drt()
...@@ -298,10 +363,10 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -298,10 +363,10 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
// Build prefixed hierarchy and register metrics in a single loop // Build prefixed hierarchy and register metrics in a single loop
// current_prefix accumulates the hierarchical path as we iterate through hierarchy // current_prefix accumulates the hierarchical path as we iterate through hierarchy
// For example, if hierarchy = ["", "mynamespace", "mycomponent"], then: // For example, if hierarchy = ["", "testnamespace", "testcomponent"], then:
// - Iteration 1: current_prefix = "" (empty string from DRT) // - Iteration 1: current_prefix = "" (empty string from DRT)
// - Iteration 2: current_prefix = "mynamespace" // - Iteration 2: current_prefix = "testnamespace"
// - Iteration 3: current_prefix = "mynamespace_mycomponent" // - Iteration 3: current_prefix = "testnamespace_testcomponent"
let mut current_prefix = String::new(); let mut current_prefix = String::new();
for name in &hierarchy { for name in &hierarchy {
if !current_prefix.is_empty() && !name.is_empty() { if !current_prefix.is_empty() && !name.is_empty() {
...@@ -344,17 +409,19 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid ...@@ -344,17 +409,19 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid
// TODO: Add support for additional Prometheus metric types: // TODO: Add support for additional Prometheus metric types:
// - Counter: ✅ IMPLEMENTED - create_counter() // - Counter: ✅ IMPLEMENTED - create_counter()
// - CounterVec: ✅ IMPLEMENTED - create_countervec() // - CounterVec: ✅ IMPLEMENTED - create_countervec()
// - IntCounter: ✅ IMPLEMENTED - create_intcounter()
// - Gauge: ✅ IMPLEMENTED - create_gauge() // - Gauge: ✅ IMPLEMENTED - create_gauge()
// - IntGauge/IntGaugeVec: ✅ IMPLEMENTED - create_intgauge() and create_intgaugevec() // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
// - Histogram: ✅ IMPLEMENTED - create_histogram() // - Histogram: ✅ IMPLEMENTED - create_histogram()
// - Summary: create_summary() - for quantiles and sum/count metrics
// - HistogramVec with custom buckets: create_histogram_with_buckets() // - HistogramVec with custom buckets: create_histogram_with_buckets()
// - SummaryVec: create_summary_vec() - for labeled summaries
// - Untyped: create_untyped() - for untyped metrics
// - Info: create_info() - for info metrics with labels // - Info: create_info() - for info metrics with labels
// - IntCounter: ✅ IMPLEMENTED - create_intcounter()
// - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
// - IntGauge: ✅ IMPLEMENTED - create_intgauge()
// - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
// - Stateset: create_stateset() - for state-based metrics // - Stateset: create_stateset() - for state-based metrics
// - GaugeHistogram: create_gauge_histogram() - for gauge histograms // - Summary: create_summary() - for quantiles and sum/count metrics
// - SummaryVec: create_summary_vec() - for labeled summaries
// - Untyped: create_untyped() - for untyped metrics
/// Create a Counter metric /// Create a Counter metric
fn create_counter( fn create_counter(
...@@ -366,23 +433,31 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid ...@@ -366,23 +433,31 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid
create_metric(self, name, description, labels, None, None) create_metric(self, name, description, labels, None, None)
} }
/// Create a Gauge metric /// Create a CounterVec metric with label names (for dynamic labels)
fn create_gauge( fn create_countervec(
&self, &self,
name: &str, name: &str,
description: &str, description: &str,
labels: &[(&str, &str)], const_labels: &[&str],
) -> anyhow::Result<Arc<prometheus::Gauge>> { const_label_values: &[(&str, &str)],
create_metric(self, name, description, labels, None, None) ) -> anyhow::Result<Arc<prometheus::CounterVec>> {
create_metric(
self,
name,
description,
const_label_values,
None,
Some(const_labels),
)
} }
/// Create an IntCounter metric /// Create a Gauge metric
fn create_intcounter( fn create_gauge(
&self, &self,
name: &str, name: &str,
description: &str, description: &str,
labels: &[(&str, &str)], labels: &[(&str, &str)],
) -> anyhow::Result<Arc<prometheus::IntCounter>> { ) -> anyhow::Result<Arc<prometheus::Gauge>> {
create_metric(self, name, description, labels, None, None) create_metric(self, name, description, labels, None, None)
} }
...@@ -397,14 +472,24 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid ...@@ -397,14 +472,24 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid
create_metric(self, name, description, labels, buckets, None) create_metric(self, name, description, labels, buckets, None)
} }
/// Create a CounterVec metric with label names (for dynamic labels) /// Create an IntCounter metric
fn create_countervec( fn create_intcounter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<Arc<prometheus::IntCounter>> {
create_metric(self, name, description, labels, None, None)
}
/// Create an IntCounterVec metric with label names (for dynamic labels)
fn create_intcountervec(
&self, &self,
name: &str, name: &str,
description: &str, description: &str,
const_labels: &[&str], const_labels: &[&str],
const_label_values: &[(&str, &str)], const_label_values: &[(&str, &str)],
) -> anyhow::Result<Arc<prometheus::CounterVec>> { ) -> anyhow::Result<Arc<prometheus::IntCounterVec>> {
create_metric( create_metric(
self, self,
name, name,
...@@ -469,13 +554,97 @@ pub fn create_test_drt() -> crate::DistributedRuntime { ...@@ -469,13 +554,97 @@ pub fn create_test_drt() -> crate::DistributedRuntime {
}) })
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_metric_name_with_prefix() {
// Test that build_metric_name correctly prepends the namespace
let result = build_metric_name("", "requests");
assert_eq!(result, "requests");
let result = build_metric_name("dynamo", "requests");
assert_eq!(result, "dynamo_requests");
}
#[test]
fn test_lint_prometheus_name() {
// Test that valid components remain unchanged
assert_eq!(
lint_prometheus_name("testnamespace").unwrap(),
"testnamespace"
);
assert_eq!(
lint_prometheus_name("test_namespace").unwrap(),
"test_namespace"
);
assert_eq!(lint_prometheus_name("test123").unwrap(), "test123");
assert_eq!(
lint_prometheus_name("test:namespace").unwrap(),
"test:namespace"
);
assert_eq!(
lint_prometheus_name("_testnamespace").unwrap(),
"_testnamespace"
);
assert_eq!(
lint_prometheus_name("testnamespace_123").unwrap(),
"testnamespace_123"
);
// Test that invalid characters are stripped
assert_eq!(lint_prometheus_name("").unwrap(), ""); // Empty
assert_eq!(
lint_prometheus_name("test namespace").unwrap(),
"testnamespace"
); // Space removed
assert_eq!(
lint_prometheus_name("test.namespace").unwrap(),
"testnamespace"
); // Dot removed
assert_eq!(
lint_prometheus_name("test@namespace").unwrap(),
"testnamespace"
); // @ removed
assert_eq!(
lint_prometheus_name("test#namespace").unwrap(),
"testnamespace"
); // # removed
assert_eq!(
lint_prometheus_name("test$namespace").unwrap(),
"testnamespace"
); // $ removed
assert_eq!(
lint_prometheus_name("test!@#$%^&*()namespace").unwrap(),
"testnamespace"
); // Multiple special chars removed
assert_eq!(
lint_prometheus_name("testnamespace_123!").unwrap(),
"testnamespace_123"
); // Trailing special char removed
// Test that hyphens are stripped (not allowed in Prometheus names)
assert_eq!(
lint_prometheus_name("test-namespace").unwrap(),
"testnamespace"
); // Hyphen removed
assert_eq!(
lint_prometheus_name("test-namespace_123").unwrap(),
"testnamespace_123"
); // Hyphen removed
// Test validation errors for invalid patterns
assert!(lint_prometheus_name("123test").is_err()); // Starts with digit
assert!(lint_prometheus_name("").is_ok()); // Empty is allowed
}
}
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
#[cfg(test)] #[cfg(test)]
mod test_prefixes { mod test_prefixes {
use super::create_test_drt; use super::create_test_drt;
use super::*; use super::*;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
#[test] #[test]
fn test_hierarchical_prefixes_and_parent_hierarchies() { fn test_hierarchical_prefixes_and_parent_hierarchies() {
...@@ -484,20 +653,17 @@ mod test_prefixes { ...@@ -484,20 +653,17 @@ mod test_prefixes {
// Create a distributed runtime for testing // Create a distributed runtime for testing
let drt = create_test_drt(); let drt = create_test_drt();
// Generate random namespace name // Use a simple constant namespace name
let mut hasher = DefaultHasher::new(); let namespace_name = "testnamespace";
"test_namespace".hash(&mut hasher);
let random_num = hasher.finish();
let namespace_name = format!("mynamespace{}", random_num);
// Create namespace // Create namespace
let namespace = drt.namespace(&namespace_name).unwrap(); let namespace = drt.namespace(namespace_name).unwrap();
// Create component // Create component
let component = namespace.component("mycomponent").unwrap(); let component = namespace.component("testcomponent").unwrap();
// Create endpoint // Create endpoint
let endpoint = component.endpoint("myendpoint"); let endpoint = component.endpoint("testendpoint");
// Test DistributedRuntime hierarchy // Test DistributedRuntime hierarchy
println!("\n=== DistributedRuntime ==="); println!("\n=== DistributedRuntime ===");
...@@ -543,8 +709,8 @@ mod test_prefixes { ...@@ -543,8 +709,8 @@ mod test_prefixes {
assert_eq!( assert_eq!(
component.basename(), component.basename(),
"mycomponent", "testcomponent",
"Component basename should be 'mycomponent'" "Component basename should be 'testcomponent'"
); );
assert_eq!( assert_eq!(
component.parent_hierarchy(), component.parent_hierarchy(),
...@@ -553,8 +719,8 @@ mod test_prefixes { ...@@ -553,8 +719,8 @@ mod test_prefixes {
); );
assert_eq!( assert_eq!(
component.prefix(), component.prefix(),
format!("{}_mycomponent", namespace), format!("{}_testcomponent", namespace),
"Component prefix should be 'namespace_mycomponent'" "Component prefix should be 'namespace_testcomponent'"
); );
// Test Endpoint hierarchy // Test Endpoint hierarchy
...@@ -565,18 +731,18 @@ mod test_prefixes { ...@@ -565,18 +731,18 @@ mod test_prefixes {
assert_eq!( assert_eq!(
endpoint.basename(), endpoint.basename(),
"myendpoint", "testendpoint",
"Endpoint basename should be 'myendpoint'" "Endpoint basename should be 'testendpoint'"
); );
assert_eq!( assert_eq!(
endpoint.parent_hierarchy(), endpoint.parent_hierarchy(),
vec!["", &namespace_name, "mycomponent"], vec!["", &namespace_name, "testcomponent"],
"Endpoint parent hierarchy should contain the generated namespace name" "Endpoint parent hierarchy should contain the generated namespace name"
); );
assert_eq!( assert_eq!(
endpoint.prefix(), endpoint.prefix(),
format!("{}_mycomponent_myendpoint", namespace), format!("{}_testcomponent_testendpoint", namespace),
"Endpoint prefix should be 'namespace_mycomponent_myendpoint'" "Endpoint prefix should be 'namespace_testcomponent_testendpoint'"
); );
// Test hierarchy relationships // Test hierarchy relationships
...@@ -626,6 +792,46 @@ mod test_prefixes { ...@@ -626,6 +792,46 @@ mod test_prefixes {
println!("Component prefix: '{}'", component.prefix()); println!("Component prefix: '{}'", component.prefix());
println!("Endpoint prefix: '{}'", endpoint.prefix()); println!("Endpoint prefix: '{}'", endpoint.prefix());
println!("All hierarchy assertions passed!"); println!("All hierarchy assertions passed!");
// Test invalid namespace behavior
println!("\n=== Testing Invalid Namespace Behavior ===");
// Create a namespace with invalid name (contains hyphen)
let invalid_namespace = drt.namespace("test-namespace").unwrap();
// Debug: Let's see what the hierarchy looks like
println!(
"Invalid namespace basename: '{}'",
invalid_namespace.basename()
);
println!(
"Invalid namespace parent_hierarchy: {:?}",
invalid_namespace.parent_hierarchy()
);
println!("Invalid namespace prefix: '{}'", invalid_namespace.prefix());
// Try to create a metric - this should fail because the namespace name will be used in the metric name
let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
println!("Result with invalid namespace 'test-namespace':");
println!("{:?}", result);
// The result should be an error from Prometheus
assert!(
result.is_err(),
"Creating metric with invalid namespace should fail"
);
// For comparison, show a valid namespace works
let valid_namespace = drt.namespace("test_namespace").unwrap();
let valid_result = valid_namespace.create_counter("test_counter", "A test counter", &[]);
println!("Result with valid namespace 'test_namespace':");
println!("{:?}", valid_result);
assert!(
valid_result.is_ok(),
"Creating metric with valid namespace should succeed"
);
println!("✓ Invalid namespace behavior verified!");
} }
} }
...@@ -635,8 +841,6 @@ mod test_simple_metricsregistry_trait { ...@@ -635,8 +841,6 @@ mod test_simple_metricsregistry_trait {
use super::create_test_drt; use super::create_test_drt;
use super::*; use super::*;
use prometheus::Counter; use prometheus::Counter;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
...@@ -644,19 +848,16 @@ mod test_simple_metricsregistry_trait { ...@@ -644,19 +848,16 @@ mod test_simple_metricsregistry_trait {
// Setup real DRT and registry using the test-friendly constructor // Setup real DRT and registry using the test-friendly constructor
let drt = create_test_drt(); let drt = create_test_drt();
// Generate random namespace name // Use a simple constant namespace name
let mut hasher = DefaultHasher::new(); let namespace_name = "testnamespace";
"test_factory_namespace".hash(&mut hasher);
let random_num = hasher.finish();
let namespace_name = format!("mynamespace{}", random_num);
let namespace = drt.namespace(&namespace_name).unwrap(); let namespace = drt.namespace(namespace_name).unwrap();
let component = namespace.component("mycomponent").unwrap(); let component = namespace.component("testcomponent").unwrap();
let endpoint = component.endpoint("myendpoint"); let endpoint = component.endpoint("testendpoint");
// Test Counter creation // Test Counter creation
let counter = endpoint let counter = endpoint
.create_counter("mycounter", "A test counter", &[]) .create_counter("testcounter", "A test counter", &[])
.unwrap(); .unwrap();
counter.inc_by(123.456789); counter.inc_by(123.456789);
let epsilon = 0.01; let epsilon = 0.01;
...@@ -667,11 +868,10 @@ mod test_simple_metricsregistry_trait { ...@@ -667,11 +868,10 @@ mod test_simple_metricsregistry_trait {
println!("{}", endpoint_output); println!("{}", endpoint_output);
let expected_endpoint_output = format!( let expected_endpoint_output = format!(
r#"# HELP mycounter A test counter r#"# HELP testnamespace_testcounter A test counter
# TYPE mycounter counter # TYPE testnamespace_testcounter counter
mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.456789 testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
"#, "#
namespace_name
); );
assert_eq!( assert_eq!(
...@@ -685,7 +885,7 @@ mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.45 ...@@ -685,7 +885,7 @@ mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.45
// Test Gauge creation // Test Gauge creation
let gauge = component let gauge = component
.create_gauge("mygauge", "A test gauge", &[]) .create_gauge("testgauge", "A test gauge", &[])
.unwrap(); .unwrap();
gauge.set(50000.0); gauge.set(50000.0);
assert_eq!(gauge.get(), 50000.0); assert_eq!(gauge.get(), 50000.0);
...@@ -696,14 +896,13 @@ mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.45 ...@@ -696,14 +896,13 @@ mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.45
println!("{}", component_output); println!("{}", component_output);
let expected_component_output = format!( let expected_component_output = format!(
r#"# HELP mycounter A test counter r#"# HELP testnamespace_testcounter A test counter
# TYPE mycounter counter # TYPE testnamespace_testcounter counter
mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.456789 testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
# HELP mygauge A test gauge # HELP testnamespace_testgauge A test gauge
# TYPE mygauge gauge # TYPE testnamespace_testgauge gauge
mygauge{{component="mycomponent",namespace="{}"}} 50000 testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
"#, "#
namespace_name, namespace_name
); );
assert_eq!( assert_eq!(
...@@ -716,7 +915,7 @@ mygauge{{component="mycomponent",namespace="{}"}} 50000 ...@@ -716,7 +915,7 @@ mygauge{{component="mycomponent",namespace="{}"}} 50000
); );
let intcounter = namespace let intcounter = namespace
.create_intcounter("myintcounter", "A test int counter", &[]) .create_intcounter("testintcounter", "A test int counter", &[])
.unwrap(); .unwrap();
intcounter.inc_by(12345); intcounter.inc_by(12345);
assert_eq!(intcounter.get(), 12345); assert_eq!(intcounter.get(), 12345);
...@@ -727,17 +926,16 @@ mygauge{{component="mycomponent",namespace="{}"}} 50000 ...@@ -727,17 +926,16 @@ mygauge{{component="mycomponent",namespace="{}"}} 50000
println!("{}", namespace_output); println!("{}", namespace_output);
let expected_namespace_output = format!( let expected_namespace_output = format!(
r#"# HELP mycounter A test counter r#"# HELP testintcounter A test int counter
# TYPE mycounter counter # TYPE testintcounter counter
mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.456789 testintcounter{{namespace="testnamespace"}} 12345
# HELP mygauge A test gauge # HELP testnamespace_testcounter A test counter
# TYPE mygauge gauge # TYPE testnamespace_testcounter counter
mygauge{{component="mycomponent",namespace="{}"}} 50000 testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
# HELP myintcounter A test int counter # HELP testnamespace_testgauge A test gauge
# TYPE myintcounter counter # TYPE testnamespace_testgauge gauge
myintcounter{{namespace="{}"}} 12345 testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
"#, "#
namespace_name, namespace_name, namespace_name
); );
assert_eq!( assert_eq!(
...@@ -753,7 +951,7 @@ myintcounter{{namespace="{}"}} 12345 ...@@ -753,7 +951,7 @@ myintcounter{{namespace="{}"}} 12345
// lack labels since the DistributedRuntime is unnamed. // lack labels since the DistributedRuntime is unnamed.
let histogram = drt let histogram = drt
.create_histogram( .create_histogram(
"myhistogram", "testhistogram",
"A test histogram", "A test histogram",
&[], &[],
Some(vec![1.0, 2.5, 5.0, 10.0]), Some(vec![1.0, 2.5, 5.0, 10.0]),
...@@ -766,7 +964,7 @@ myintcounter{{namespace="{}"}} 12345 ...@@ -766,7 +964,7 @@ myintcounter{{namespace="{}"}} 12345
// Test CounterVec creation // Test CounterVec creation
let countervec = drt let countervec = drt
.create_countervec( .create_countervec(
"mycountervec", "testcountervec",
"A test counter vector", "A test counter vector",
&["method", "status"], &["method", "status"],
&[("service", "api")], &[("service", "api")],
...@@ -777,7 +975,7 @@ myintcounter{{namespace="{}"}} 12345 ...@@ -777,7 +975,7 @@ myintcounter{{namespace="{}"}} 12345
// Test IntGauge creation // Test IntGauge creation
let intgauge = drt let intgauge = drt
.create_intgauge("myintgauge", "A test int gauge", &[]) .create_intgauge("testintgauge", "A test int gauge", &[])
.unwrap(); .unwrap();
intgauge.set(42); intgauge.set(42);
assert_eq!(intgauge.get(), 42); assert_eq!(intgauge.get(), 42);
...@@ -785,7 +983,7 @@ myintcounter{{namespace="{}"}} 12345 ...@@ -785,7 +983,7 @@ myintcounter{{namespace="{}"}} 12345
// Test IntGaugeVec creation // Test IntGaugeVec creation
let intgaugevec = drt let intgaugevec = drt
.create_intgaugevec( .create_intgaugevec(
"myintgaugevec", "testintgaugevec",
"A test int gauge vector", "A test int gauge vector",
&["instance", "status"], &["instance", "status"],
&[("service", "api")], &[("service", "api")],
...@@ -804,37 +1002,36 @@ myintcounter{{namespace="{}"}} 12345 ...@@ -804,37 +1002,36 @@ myintcounter{{namespace="{}"}} 12345
println!("{}", drt_output); println!("{}", drt_output);
let expected_drt_output = format!( let expected_drt_output = format!(
r#"# HELP mycounter A test counter r#"# HELP testcountervec A test counter vector
# TYPE mycounter counter # TYPE testcountervec counter
mycounter{{component="mycomponent",endpoint="myendpoint",namespace="{}"}} 123.456789 testcountervec{{method="GET",service="api",status="200"}} 10
# HELP mycountervec A test counter vector testcountervec{{method="POST",service="api",status="201"}} 5
# TYPE mycountervec counter # HELP testhistogram A test histogram
mycountervec{{method="GET",service="api",status="200"}} 10 # TYPE testhistogram histogram
mycountervec{{method="POST",service="api",status="201"}} 5 testhistogram_bucket{{le="1"}} 0
# HELP mygauge A test gauge testhistogram_bucket{{le="2.5"}} 2
# TYPE mygauge gauge testhistogram_bucket{{le="5"}} 3
mygauge{{component="mycomponent",namespace="{}"}} 50000 testhistogram_bucket{{le="10"}} 3
# HELP myhistogram A test histogram testhistogram_bucket{{le="+Inf"}} 3
# TYPE myhistogram histogram testhistogram_sum 7.5
myhistogram_bucket{{le="1"}} 0 testhistogram_count 3
myhistogram_bucket{{le="2.5"}} 2 # HELP testintcounter A test int counter
myhistogram_bucket{{le="5"}} 3 # TYPE testintcounter counter
myhistogram_bucket{{le="10"}} 3 testintcounter{{namespace="testnamespace"}} 12345
myhistogram_bucket{{le="+Inf"}} 3 # HELP testintgauge A test int gauge
myhistogram_sum 7.5 # TYPE testintgauge gauge
myhistogram_count 3 testintgauge 42
# HELP myintcounter A test int counter # HELP testintgaugevec A test int gauge vector
# TYPE myintcounter counter # TYPE testintgaugevec gauge
myintcounter{{namespace="{}"}} 12345 testintgaugevec{{instance="server1",service="api",status="active"}} 10
# HELP myintgauge A test int gauge testintgaugevec{{instance="server2",service="api",status="inactive"}} 0
# TYPE myintgauge gauge # HELP testnamespace_testcounter A test counter
myintgauge 42 # TYPE testnamespace_testcounter counter
# HELP myintgaugevec A test int gauge vector testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
# TYPE myintgaugevec gauge # HELP testnamespace_testgauge A test gauge
myintgaugevec{{instance="server1",service="api",status="active"}} 10 # TYPE testnamespace_testgauge gauge
myintgaugevec{{instance="server2",service="api",status="inactive"}} 0 testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
"#, "#
namespace_name, namespace_name, namespace_name
); );
assert_eq!( assert_eq!(
......
...@@ -36,6 +36,11 @@ use super::{ ...@@ -36,6 +36,11 @@ use super::{
context, AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, context, AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO,
SegmentSource, ServiceBackend, ServiceEngine, SingleIn, Source, SegmentSource, ServiceBackend, ServiceEngine, SingleIn, Source,
}; };
use ingress::push_handler::WorkHandlerMetrics;
// Add Prometheus metrics types
use crate::metrics::MetricsRegistry;
use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {} pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {} impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
...@@ -278,12 +283,14 @@ struct RequestControlMessage { ...@@ -278,12 +283,14 @@ struct RequestControlMessage {
pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> { pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
segment: OnceLock<Arc<SegmentSource<Req, Resp>>>, segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
metrics: OnceLock<Arc<WorkHandlerMetrics>>,
} }
impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
segment: OnceLock::new(), segment: OnceLock::new(),
metrics: OnceLock::new(),
}) })
} }
...@@ -293,6 +300,15 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { ...@@ -293,6 +300,15 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
.map_err(|_| anyhow::anyhow!("Segment already set")) .map_err(|_| anyhow::anyhow!("Segment already set"))
} }
pub fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
let metrics = WorkHandlerMetrics::from_endpoint(endpoint)
.map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
self.metrics
.set(Arc::new(metrics))
.map_err(|_| anyhow::anyhow!("Metrics already set"))
}
pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> { pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
let ingress = Ingress::new(); let ingress = Ingress::new();
ingress.attach(segment)?; ingress.attach(segment)?;
...@@ -317,11 +333,19 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { ...@@ -317,11 +333,19 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
Ok(ingress) Ok(ingress)
} }
/// Helper method to access metrics if available
fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
self.metrics.get()
}
} }
#[async_trait] #[async_trait]
pub trait PushWorkHandler: Send + Sync { pub trait PushWorkHandler: Send + Sync {
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>; async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
/// Add metrics to the handler
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()>;
} }
/* /*
......
...@@ -14,7 +14,92 @@ ...@@ -14,7 +14,92 @@
// limitations under the License. // limitations under the License.
use super::*; use super::*;
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// Metrics configuration for profiling work handlers
#[derive(Clone, Debug)]
pub struct WorkHandlerMetrics {
pub request_counter: Arc<IntCounter>,
pub request_duration: Arc<Histogram>,
pub concurrent_requests: Arc<IntGauge>,
pub request_bytes: Arc<IntCounter>,
pub response_bytes: Arc<IntCounter>,
pub error_counter: Arc<IntCounterVec>,
}
impl WorkHandlerMetrics {
pub fn new(
request_counter: Arc<IntCounter>,
request_duration: Arc<Histogram>,
concurrent_requests: Arc<IntGauge>,
request_bytes: Arc<IntCounter>,
response_bytes: Arc<IntCounter>,
error_counter: Arc<IntCounterVec>,
) -> Self {
Self {
request_counter,
request_duration,
concurrent_requests,
request_bytes,
response_bytes,
error_counter,
}
}
/// Create WorkHandlerMetrics from an endpoint using its built-in labeling
pub fn from_endpoint(
endpoint: &crate::component::Endpoint,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let request_counter = endpoint.create_intcounter(
"requests_total",
"Total number of requests processed by work handler",
&[],
)?;
let request_duration = endpoint.create_histogram(
"request_duration_seconds",
"Time spent processing requests by work handler",
&[],
None,
)?;
let concurrent_requests = endpoint.create_intgauge(
"concurrent_requests",
"Number of requests currently being processed by work handler",
&[],
)?;
let request_bytes = endpoint.create_intcounter(
"request_bytes_total",
"Total number of bytes received in requests by work handler",
&[],
)?;
let response_bytes = endpoint.create_intcounter(
"response_bytes_total",
"Total number of bytes sent in responses by work handler",
&[],
)?;
let error_counter = endpoint.create_intcountervec(
"errors_total",
"Total number of errors in work handler processing",
&["error_type"],
&[],
)?;
Ok(Self::new(
request_counter,
request_duration,
concurrent_requests,
request_bytes,
response_bytes,
error_counter,
))
}
}
#[async_trait] #[async_trait]
impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>> impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>>
...@@ -22,7 +107,21 @@ where ...@@ -22,7 +107,21 @@ where
T: Data + for<'de> Deserialize<'de> + std::fmt::Debug, T: Data + for<'de> Deserialize<'de> + std::fmt::Debug,
U: Data + Serialize + std::fmt::Debug, U: Data + Serialize + std::fmt::Debug,
{ {
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
// Call the Ingress-specific add_metrics implementation
use crate::pipeline::network::Ingress;
Ingress::add_metrics(self, endpoint)
}
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> { async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> {
let start_time = std::time::Instant::now();
if let Some(m) = self.metrics() {
m.request_counter.inc();
m.concurrent_requests.inc();
m.request_bytes.inc_by(payload.len() as u64);
}
// decode the control message and the request // decode the control message and the request
let msg = TwoPartCodec::default() let msg = TwoPartCodec::default()
.decode_message(payload)? .decode_message(payload)?
...@@ -41,6 +140,11 @@ where ...@@ -41,6 +140,11 @@ where
Ok(cm) => cm, Ok(cm) => cm,
Err(err) => { Err(err) => {
let json_str = String::from_utf8_lossy(&header); let json_str = String::from_utf8_lossy(&header);
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["deserialization"])
.inc();
}
return Err(PipelineError::DeserializationError( return Err(PipelineError::DeserializationError(
format!("Failed deserializing to RequestControlMessage. err={err}, json_str={json_str}"), format!("Failed deserializing to RequestControlMessage. err={err}, json_str={json_str}"),
)); ));
...@@ -50,6 +154,11 @@ where ...@@ -50,6 +154,11 @@ where
(control_msg, request) (control_msg, request)
} }
_ => { _ => {
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["invalid_message"])
.inc();
}
return Err(PipelineError::Generic(String::from("Unexpected message from work queue; unable extract a TwoPartMessage with a header and data"))); return Err(PipelineError::Generic(String::from("Unexpected message from work queue; unable extract a TwoPartMessage with a header and data")));
} }
}; };
...@@ -68,6 +177,11 @@ where ...@@ -68,6 +177,11 @@ where
) )
.await .await
.map_err(|e| { .map_err(|e| {
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["response_stream"])
.inc();
}
PipelineError::Generic(format!("Failed to create response stream: {:?}", e,)) PipelineError::Generic(format!("Failed to create response stream: {:?}", e,))
})?; })?;
...@@ -78,7 +192,12 @@ where ...@@ -78,7 +192,12 @@ where
.expect("segment not set") .expect("segment not set")
.generate(request) .generate(request)
.await .await
.map_err(PipelineError::GenerateError); .map_err(|e| {
if let Some(m) = self.metrics() {
m.error_counter.with_label_values(&["generate"]).inc();
}
PipelineError::GenerateError(e)
});
// the prolouge is sent to the client to indicate that the stream is ready to receive data // the prolouge is sent to the client to indicate that the stream is ready to receive data
// or if the generate call failed, the error is sent to the client // or if the generate call failed, the error is sent to the client
...@@ -107,10 +226,18 @@ where ...@@ -107,10 +226,18 @@ where
}; };
let resp_bytes = serde_json::to_vec(&resp_wrapper) let resp_bytes = serde_json::to_vec(&resp_wrapper)
.expect("fatal error: invalid response object - this should never happen"); .expect("fatal error: invalid response object - this should never happen");
if let Some(m) = self.metrics() {
m.response_bytes.inc_by(resp_bytes.len() as u64);
}
if (publisher.send(resp_bytes.into()).await).is_err() { if (publisher.send(resp_bytes.into()).await).is_err() {
tracing::error!("Failed to publish response for stream {}", context.id()); tracing::error!("Failed to publish response for stream {}", context.id());
context.stop_generating(); context.stop_generating();
send_complete_final = false; send_complete_final = false;
if let Some(m) = self.metrics() {
m.error_counter
.with_label_values(&["publish_response"])
.inc();
}
break; break;
} }
} }
...@@ -121,13 +248,25 @@ where ...@@ -121,13 +248,25 @@ where
}; };
let resp_bytes = serde_json::to_vec(&resp_wrapper) let resp_bytes = serde_json::to_vec(&resp_wrapper)
.expect("fatal error: invalid response object - this should never happen"); .expect("fatal error: invalid response object - this should never happen");
if let Some(m) = self.metrics() {
m.response_bytes.inc_by(resp_bytes.len() as u64);
}
if (publisher.send(resp_bytes.into()).await).is_err() { if (publisher.send(resp_bytes.into()).await).is_err() {
tracing::error!( tracing::error!(
"Failed to publish complete final for stream {}", "Failed to publish complete final for stream {}",
context.id() context.id()
); );
if let Some(m) = self.metrics() {
m.error_counter.with_label_values(&["publish_final"]).inc();
} }
} }
}
if let Some(m) = self.metrics() {
let duration = start_time.elapsed();
m.request_duration.observe(duration.as_secs_f64());
m.concurrent_requests.dec();
}
Ok(()) Ok(())
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment