# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import math from unittest.mock import MagicMock, patch import pytest from dynamo import prometheus_names from dynamo.planner.utils.prometheus import ( FrontendMetric, FrontendMetricContainer, PrometheusAPIClient, ) pytestmark = [ pytest.mark.gpu_0, pytest.mark.pre_merge, pytest.mark.unit, pytest.mark.planner, pytest.mark.vllm, ] @pytest.fixture def mock_prometheus_result(): """Fixture providing mock prometheus result data for testing""" return [ { "metric": { "container": "main", "dynamo_namespace": "different_namespace", "model": "different_model", "namespace": "dynamo-system", }, "value": [1758857776.071, 10.5], }, { "metric": { "container": "main", "dynamo_namespace": "target_namespace", "model": "target_model", "namespace": "dynamo-system", }, "value": [1758857776.071, 42.7], }, { "metric": { "container": "worker", "dynamo_namespace": "target_namespace", "model": "target_model", "namespace": "dynamo-system", }, "value": [1758857776.071, 35.5], }, { "metric": { "container": "sidecar", "dynamo_namespace": "target_namespace", "model": "target_model", "namespace": "dynamo-system", }, "value": [30.0, 15.5], }, ] def test_frontend_metric_container_with_nan_value(): test_data = { "metric": { "container": "main", "dynamo_namespace": "vllm-disagg-planner", "endpoint": "http", "instance": "10.244.2.163:8000", "job": "dynamo-system/dynamo-frontend", "model": "qwen/qwen3-0.6b", "namespace": "dynamo-system", "pod": "vllm-disagg-planner-frontend-865f84c49-6q7s5", }, "value": [1758857776.071, "NaN"], } container = FrontendMetricContainer.model_validate(test_data) assert container.metric.container == "main" assert container.metric.dynamo_namespace == "vllm-disagg-planner" assert container.metric.endpoint == "http" assert container.metric.instance == "10.244.2.163:8000" assert container.metric.job == "dynamo-system/dynamo-frontend" assert container.metric.model == "qwen/qwen3-0.6b" assert container.metric.namespace == "dynamo-system" assert container.metric.pod == "vllm-disagg-planner-frontend-865f84c49-6q7s5" assert container.value[0] == 1758857776.071 assert math.isnan( container.value[1] ) # becomes special float value that can't be asserted to itself test_data["value"][1] = 42.5 # type: ignore[index] container = FrontendMetricContainer.model_validate(test_data) assert container.value[1] == 42.5 def test_frontend_metric_with_partial_data(): """Test FrontendMetric with partial data (optional fields)""" test_data = { "container": "main", "model": "qwen/qwen3-0.6b", "namespace": "dynamo-system", } metric = FrontendMetric.model_validate(test_data) # Assert provided fields assert metric.container == "main" assert metric.model == "qwen/qwen3-0.6b" assert metric.namespace == "dynamo-system" # Assert optional fields are None assert metric.dynamo_namespace is None assert metric.endpoint is None assert metric.instance is None assert metric.job is None assert metric.pod is None def test_get_average_metric_none_result(): """Test _get_average_metric when prometheus returns None""" # TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils # for xdist-safe parallel execution. client = PrometheusAPIClient("http://localhost:9090", "test_namespace") with patch.object(client.prom, "custom_query") as mock_query: mock_query.return_value = None result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="test_model", ) assert result == 0 def test_get_average_metric_empty_result(): """Test _get_average_metric when prometheus returns empty list""" client = PrometheusAPIClient("http://localhost:9090", "test_namespace") with patch.object(client.prom, "custom_query") as mock_query: mock_query.return_value = [] result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="test_model", ) assert result == 0 def test_get_average_metric_no_matching_containers(mock_prometheus_result): """Test _get_average_metric with valid containers but no matches""" client = PrometheusAPIClient("http://localhost:9090", "test_namespace") with patch.object(client.prom, "custom_query") as mock_query: # Use only the first container which doesn't match target criteria mock_query.return_value = [mock_prometheus_result[0]] result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="target_model", ) assert result == 0 def test_get_average_metric_one_matching_container(mock_prometheus_result): """Test _get_average_metric with one matching container""" client = PrometheusAPIClient("http://localhost:9090", "target_namespace") with patch.object(client.prom, "custom_query") as mock_query: # Use first two containers - one doesn't match, one does mock_query.return_value = mock_prometheus_result[:2] result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="target_model", ) assert result == 42.7 def test_get_average_metric_with_validation_error(): """Test _get_average_metric with one valid container and one that fails validation""" client = PrometheusAPIClient("http://localhost:9090", "target_namespace") mock_result = [ { "metric": { "container": "main", "dynamo_namespace": "target_namespace", "model": "target_model", "namespace": "dynamo-system", }, "value": [1758857776.071, 25.5], }, { # Invalid structure - missing required fields that will cause validation error "invalid_structure": "bad_data", "value": "not_a_tuple", }, ] with patch.object(client.prom, "custom_query") as mock_query: mock_query.return_value = mock_result result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="target_model", ) assert result == 25.5 def test_get_average_metric_multiple_matching_containers(mock_prometheus_result): """Test _get_average_metric with multiple matching containers returns average""" client = PrometheusAPIClient("http://localhost:9090", "target_namespace") with patch.object(client.prom, "custom_query") as mock_query: # Use containers 1, 2, 3 which all match target criteria mock_query.return_value = mock_prometheus_result[1:] result = client._get_average_metric( full_metric_name="test_metric", interval="60s", operation_name="test operation", model_name="target_model", ) # Average of 42.7, 35.5, and 15.5 (using value[1] from each container) expected = (42.7 + 35.5 + 15.5) / 3 assert result == expected # --------------------------------------------------------------------------- # Router metrics source tests # --------------------------------------------------------------------------- @pytest.fixture def router_client(): """PrometheusAPIClient configured with metrics_source='router'.""" # TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils # for xdist-safe parallel execution. client = PrometheusAPIClient( "http://localhost:9090", "test-fe-namespace", metrics_source="router" ) client.prom = MagicMock() client.prom.custom_query.return_value = [{"value": [0, "42.0"]}] return client class TestPrometheusAPIClientRouterSource: """Tests for PrometheusAPIClient when metrics_source='router'.""" def test_get_avg_inter_token_latency_dispatches_to_router_histogram( self, router_client ): """get_avg_inter_token_latency with router source queries dynamo_component_router_* metric.""" result = router_client.get_avg_inter_token_latency("60s", "mymodel") assert result == 42.0 call_args = str(router_client.prom.custom_query.call_args) expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INTER_TOKEN_LATENCY_SECONDS}" assert expected_metric in call_args def test_get_avg_time_to_first_token_dispatches_to_router_histogram( self, router_client ): """get_avg_time_to_first_token with router source queries dynamo_component_router_* metric.""" result = router_client.get_avg_time_to_first_token("60s", "mymodel") assert result == 42.0 call_args = str(router_client.prom.custom_query.call_args) expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.TIME_TO_FIRST_TOKEN_SECONDS}" assert expected_metric in call_args def test_get_avg_input_sequence_tokens_dispatches_to_router_histogram( self, router_client ): """get_avg_input_sequence_tokens with router source queries dynamo_component_router_* metric.""" result = router_client.get_avg_input_sequence_tokens("60s", "mymodel") assert result == 42.0 call_args = str(router_client.prom.custom_query.call_args) expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INPUT_SEQUENCE_TOKENS}" assert expected_metric in call_args def test_get_avg_output_sequence_tokens_dispatches_to_router_histogram( self, router_client ): """get_avg_output_sequence_tokens with router source queries dynamo_component_router_* metric.""" result = router_client.get_avg_output_sequence_tokens("60s", "mymodel") assert result == 42.0 call_args = str(router_client.prom.custom_query.call_args) expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.OUTPUT_SEQUENCE_TOKENS}" assert expected_metric in call_args def test_get_avg_request_count_uses_router_requests_total(self, router_client): """get_avg_request_count with router source queries dynamo_component_router_requests_total.""" result = router_client.get_avg_request_count("60s", "mymodel") assert result == 42.0 call_args = str(router_client.prom.custom_query.call_args) expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.REQUESTS_TOTAL}" assert expected_metric in call_args def test_dynamo_namespace_filter_in_router_histogram_query(self, router_client): """Router histogram query must filter by dynamo_namespace so each pool planner only reads its own LocalRouter's metrics, not the cluster-wide aggregate. dynamo_component_router_* metrics use MetricsHierarchy which injects dynamo_namespace with underscores. DYN_NAMESPACE dashes are normalized to underscores for the PromQL filter. """ router_client.get_avg_inter_token_latency("60s", "mymodel") call_args = str(router_client.prom.custom_query.call_args) assert "dynamo_namespace" in call_args, ( "dynamo_namespace filter missing from router histogram query — " "without it, all pool planners read the same cluster-wide aggregate" ) # MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized assert "test_fe_namespace" in call_args def test_dynamo_namespace_filter_in_router_request_count_query(self, router_client): """Router request count query must filter by dynamo_namespace. dynamo_component_router_* get dynamo_namespace from MetricsHierarchy (underscores). """ router_client.get_avg_request_count("60s", "mymodel") call_args = str(router_client.prom.custom_query.call_args) assert "dynamo_namespace" in call_args, ( "dynamo_namespace filter missing from router request count query — " "without it, all pool planners read the same cluster-wide aggregate" ) # MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized assert "test_fe_namespace" in call_args def test_router_histogram_returns_zero_on_empty_result(self, router_client): """_get_router_average_histogram returns 0 when Prometheus has no data.""" router_client.prom.custom_query.return_value = [] result = router_client.get_avg_inter_token_latency("60s", "mymodel") assert result == 0 def test_router_request_count_returns_zero_on_empty_result(self, router_client): """get_avg_request_count (router) returns 0 when Prometheus has no data.""" router_client.prom.custom_query.return_value = [] result = router_client.get_avg_request_count("60s", "mymodel") assert result == 0 def test_router_histogram_returns_zero_on_nan(self, router_client): """_get_router_average_histogram returns 0 when value is NaN.""" router_client.prom.custom_query.return_value = [{"value": [0, "NaN"]}] result = router_client.get_avg_inter_token_latency("60s", "mymodel") assert result == 0 def test_warn_if_router_not_scraped_logs_warning_when_absent( self, router_client, caplog ): """warn_if_router_not_scraped logs a warning when absent() returns a result.""" router_client.prom.custom_query.return_value = [{"value": [0, "1"]}] with caplog.at_level(logging.WARNING): router_client.warn_if_router_not_scraped() assert any( "No 'dynamo_component_router_requests_total'" in r.message for r in caplog.records ) def test_warn_if_router_not_scraped_silent_when_present( self, router_client, caplog ): """warn_if_router_not_scraped is silent when the metric exists (absent() returns empty).""" router_client.prom.custom_query.return_value = [] with caplog.at_level(logging.WARNING): router_client.warn_if_router_not_scraped() assert not any( "dynamo_component_router_requests_total" in r.message for r in caplog.records )