test_prometheus.py 15.7 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import logging
17
import math
18
from unittest.mock import MagicMock, patch
19
20
21

import pytest

22
from dynamo import prometheus_names
23
from dynamo.planner.monitoring.traffic_metrics import (
24
25
26
27
28
    FrontendMetric,
    FrontendMetricContainer,
    PrometheusAPIClient,
)

29
30
31
32
33
pytestmark = [
    pytest.mark.gpu_0,
    pytest.mark.pre_merge,
    pytest.mark.unit,
    pytest.mark.planner,
34
    pytest.mark.vllm,
35
36
]

37
38

@pytest.fixture
39
40
def mock_prometheus_result():
    """Fixture providing mock prometheus result data for testing"""
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
    return [
        {
            "metric": {
                "container": "main",
                "dynamo_namespace": "different_namespace",
                "model": "different_model",
                "namespace": "dynamo-system",
            },
            "value": [1758857776.071, 10.5],
        },
        {
            "metric": {
                "container": "main",
                "dynamo_namespace": "target_namespace",
                "model": "target_model",
                "namespace": "dynamo-system",
            },
            "value": [1758857776.071, 42.7],
        },
        {
            "metric": {
                "container": "worker",
                "dynamo_namespace": "target_namespace",
                "model": "target_model",
                "namespace": "dynamo-system",
            },
            "value": [1758857776.071, 35.5],
        },
        {
            "metric": {
                "container": "sidecar",
                "dynamo_namespace": "target_namespace",
                "model": "target_model",
                "namespace": "dynamo-system",
            },
            "value": [30.0, 15.5],
        },
    ]


def test_frontend_metric_container_with_nan_value():
    test_data = {
        "metric": {
            "container": "main",
            "dynamo_namespace": "vllm-disagg-planner",
            "endpoint": "http",
            "instance": "10.244.2.163:8000",
            "job": "dynamo-system/dynamo-frontend",
            "model": "qwen/qwen3-0.6b",
            "namespace": "dynamo-system",
            "pod": "vllm-disagg-planner-frontend-865f84c49-6q7s5",
        },
        "value": [1758857776.071, "NaN"],
    }

    container = FrontendMetricContainer.model_validate(test_data)
    assert container.metric.container == "main"
    assert container.metric.dynamo_namespace == "vllm-disagg-planner"
    assert container.metric.endpoint == "http"
    assert container.metric.instance == "10.244.2.163:8000"
    assert container.metric.job == "dynamo-system/dynamo-frontend"
    assert container.metric.model == "qwen/qwen3-0.6b"
    assert container.metric.namespace == "dynamo-system"
    assert container.metric.pod == "vllm-disagg-planner-frontend-865f84c49-6q7s5"
    assert container.value[0] == 1758857776.071
    assert math.isnan(
        container.value[1]
    )  # becomes special float value that can't be asserted to itself

    test_data["value"][1] = 42.5  # type: ignore[index]
    container = FrontendMetricContainer.model_validate(test_data)
    assert container.value[1] == 42.5


def test_frontend_metric_with_partial_data():
    """Test FrontendMetric with partial data (optional fields)"""
    test_data = {
        "container": "main",
        "model": "qwen/qwen3-0.6b",
        "namespace": "dynamo-system",
    }

    metric = FrontendMetric.model_validate(test_data)

    # Assert provided fields
    assert metric.container == "main"
    assert metric.model == "qwen/qwen3-0.6b"
    assert metric.namespace == "dynamo-system"

    # Assert optional fields are None
    assert metric.dynamo_namespace is None
    assert metric.endpoint is None
    assert metric.instance is None
    assert metric.job is None
    assert metric.pod is None


def test_get_average_metric_none_result():
    """Test _get_average_metric when prometheus returns None"""
140
141
    # TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils
    #       for xdist-safe parallel execution.
142
143
144
145
146
147
    client = PrometheusAPIClient("http://localhost:9090", "test_namespace")

    with patch.object(client.prom, "custom_query") as mock_query:
        mock_query.return_value = None

        result = client._get_average_metric(
148
            full_metric_name="test_metric",
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
            interval="60s",
            operation_name="test operation",
            model_name="test_model",
        )

        assert result == 0


def test_get_average_metric_empty_result():
    """Test _get_average_metric when prometheus returns empty list"""
    client = PrometheusAPIClient("http://localhost:9090", "test_namespace")

    with patch.object(client.prom, "custom_query") as mock_query:
        mock_query.return_value = []

        result = client._get_average_metric(
165
            full_metric_name="test_metric",
166
167
168
169
170
171
172
173
            interval="60s",
            operation_name="test operation",
            model_name="test_model",
        )

        assert result == 0


174
def test_get_average_metric_no_matching_containers(mock_prometheus_result):
175
176
177
178
179
    """Test _get_average_metric with valid containers but no matches"""
    client = PrometheusAPIClient("http://localhost:9090", "test_namespace")

    with patch.object(client.prom, "custom_query") as mock_query:
        # Use only the first container which doesn't match target criteria
180
        mock_query.return_value = [mock_prometheus_result[0]]
181
182

        result = client._get_average_metric(
183
            full_metric_name="test_metric",
184
185
186
187
188
189
190
191
            interval="60s",
            operation_name="test operation",
            model_name="target_model",
        )

        assert result == 0


192
def test_get_average_metric_one_matching_container(mock_prometheus_result):
193
194
195
196
197
    """Test _get_average_metric with one matching container"""
    client = PrometheusAPIClient("http://localhost:9090", "target_namespace")

    with patch.object(client.prom, "custom_query") as mock_query:
        # Use first two containers - one doesn't match, one does
198
        mock_query.return_value = mock_prometheus_result[:2]
199
200

        result = client._get_average_metric(
201
            full_metric_name="test_metric",
202
203
204
205
206
207
208
209
210
211
212
213
            interval="60s",
            operation_name="test operation",
            model_name="target_model",
        )

        assert result == 42.7


def test_get_average_metric_with_validation_error():
    """Test _get_average_metric with one valid container and one that fails validation"""
    client = PrometheusAPIClient("http://localhost:9090", "target_namespace")

214
    mock_result = [
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
        {
            "metric": {
                "container": "main",
                "dynamo_namespace": "target_namespace",
                "model": "target_model",
                "namespace": "dynamo-system",
            },
            "value": [1758857776.071, 25.5],
        },
        {
            # Invalid structure - missing required fields that will cause validation error
            "invalid_structure": "bad_data",
            "value": "not_a_tuple",
        },
    ]

    with patch.object(client.prom, "custom_query") as mock_query:
232
        mock_query.return_value = mock_result
233
234

        result = client._get_average_metric(
235
            full_metric_name="test_metric",
236
237
238
239
240
241
242
243
            interval="60s",
            operation_name="test operation",
            model_name="target_model",
        )

        assert result == 25.5


244
def test_get_average_metric_multiple_matching_containers(mock_prometheus_result):
245
246
247
248
249
    """Test _get_average_metric with multiple matching containers returns average"""
    client = PrometheusAPIClient("http://localhost:9090", "target_namespace")

    with patch.object(client.prom, "custom_query") as mock_query:
        # Use containers 1, 2, 3 which all match target criteria
250
        mock_query.return_value = mock_prometheus_result[1:]
251
252

        result = client._get_average_metric(
253
            full_metric_name="test_metric",
254
255
256
257
258
            interval="60s",
            operation_name="test operation",
            model_name="target_model",
        )

259
        # Average of 42.7, 35.5, and 15.5 (using value[1] from each container)
260
261
        expected = (42.7 + 35.5 + 15.5) / 3
        assert result == expected
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406


# ---------------------------------------------------------------------------
# Router metrics source tests
# ---------------------------------------------------------------------------


@pytest.fixture
def router_client():
    """PrometheusAPIClient configured with metrics_source='router'."""
    # TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils
    #       for xdist-safe parallel execution.
    client = PrometheusAPIClient(
        "http://localhost:9090", "test-fe-namespace", metrics_source="router"
    )
    client.prom = MagicMock()
    client.prom.custom_query.return_value = [{"value": [0, "42.0"]}]
    return client


class TestPrometheusAPIClientRouterSource:
    """Tests for PrometheusAPIClient when metrics_source='router'."""

    def test_get_avg_inter_token_latency_dispatches_to_router_histogram(
        self, router_client
    ):
        """get_avg_inter_token_latency with router source queries dynamo_component_router_* metric."""
        result = router_client.get_avg_inter_token_latency("60s", "mymodel")
        assert result == 42.0

        call_args = str(router_client.prom.custom_query.call_args)
        expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INTER_TOKEN_LATENCY_SECONDS}"
        assert expected_metric in call_args

    def test_get_avg_time_to_first_token_dispatches_to_router_histogram(
        self, router_client
    ):
        """get_avg_time_to_first_token with router source queries dynamo_component_router_* metric."""
        result = router_client.get_avg_time_to_first_token("60s", "mymodel")
        assert result == 42.0

        call_args = str(router_client.prom.custom_query.call_args)
        expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.TIME_TO_FIRST_TOKEN_SECONDS}"
        assert expected_metric in call_args

    def test_get_avg_input_sequence_tokens_dispatches_to_router_histogram(
        self, router_client
    ):
        """get_avg_input_sequence_tokens with router source queries dynamo_component_router_* metric."""
        result = router_client.get_avg_input_sequence_tokens("60s", "mymodel")
        assert result == 42.0

        call_args = str(router_client.prom.custom_query.call_args)
        expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INPUT_SEQUENCE_TOKENS}"
        assert expected_metric in call_args

    def test_get_avg_output_sequence_tokens_dispatches_to_router_histogram(
        self, router_client
    ):
        """get_avg_output_sequence_tokens with router source queries dynamo_component_router_* metric."""
        result = router_client.get_avg_output_sequence_tokens("60s", "mymodel")
        assert result == 42.0

        call_args = str(router_client.prom.custom_query.call_args)
        expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.OUTPUT_SEQUENCE_TOKENS}"
        assert expected_metric in call_args

    def test_get_avg_request_count_uses_router_requests_total(self, router_client):
        """get_avg_request_count with router source queries dynamo_component_router_requests_total."""
        result = router_client.get_avg_request_count("60s", "mymodel")
        assert result == 42.0

        call_args = str(router_client.prom.custom_query.call_args)
        expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.REQUESTS_TOTAL}"
        assert expected_metric in call_args

    def test_dynamo_namespace_filter_in_router_histogram_query(self, router_client):
        """Router histogram query must filter by dynamo_namespace so each pool planner
        only reads its own LocalRouter's metrics, not the cluster-wide aggregate.
        dynamo_component_router_* metrics use MetricsHierarchy which injects dynamo_namespace
        with underscores. DYN_NAMESPACE dashes are normalized to underscores for the PromQL filter.
        """
        router_client.get_avg_inter_token_latency("60s", "mymodel")
        call_args = str(router_client.prom.custom_query.call_args)
        assert "dynamo_namespace" in call_args, (
            "dynamo_namespace filter missing from router histogram query — "
            "without it, all pool planners read the same cluster-wide aggregate"
        )
        # MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized
        assert "test_fe_namespace" in call_args

    def test_dynamo_namespace_filter_in_router_request_count_query(self, router_client):
        """Router request count query must filter by dynamo_namespace.
        dynamo_component_router_* get dynamo_namespace from MetricsHierarchy (underscores).
        """
        router_client.get_avg_request_count("60s", "mymodel")
        call_args = str(router_client.prom.custom_query.call_args)
        assert "dynamo_namespace" in call_args, (
            "dynamo_namespace filter missing from router request count query — "
            "without it, all pool planners read the same cluster-wide aggregate"
        )
        # MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized
        assert "test_fe_namespace" in call_args

    def test_router_histogram_returns_zero_on_empty_result(self, router_client):
        """_get_router_average_histogram returns 0 when Prometheus has no data."""
        router_client.prom.custom_query.return_value = []
        result = router_client.get_avg_inter_token_latency("60s", "mymodel")
        assert result == 0

    def test_router_request_count_returns_zero_on_empty_result(self, router_client):
        """get_avg_request_count (router) returns 0 when Prometheus has no data."""
        router_client.prom.custom_query.return_value = []
        result = router_client.get_avg_request_count("60s", "mymodel")
        assert result == 0

    def test_router_histogram_returns_zero_on_nan(self, router_client):
        """_get_router_average_histogram returns 0 when value is NaN."""
        router_client.prom.custom_query.return_value = [{"value": [0, "NaN"]}]
        result = router_client.get_avg_inter_token_latency("60s", "mymodel")
        assert result == 0

    def test_warn_if_router_not_scraped_logs_warning_when_absent(
        self, router_client, caplog
    ):
        """warn_if_router_not_scraped logs a warning when absent() returns a result."""
        router_client.prom.custom_query.return_value = [{"value": [0, "1"]}]
        with caplog.at_level(logging.WARNING):
            router_client.warn_if_router_not_scraped()
        assert any(
            "No 'dynamo_component_router_requests_total'" in r.message
            for r in caplog.records
        )

    def test_warn_if_router_not_scraped_silent_when_present(
        self, router_client, caplog
    ):
        """warn_if_router_not_scraped is silent when the metric exists (absent() returns empty)."""
        router_client.prom.custom_query.return_value = []
        with caplog.at_level(logging.WARNING):
            router_client.warn_if_router_not_scraped()
        assert not any(
            "dynamo_component_router_requests_total" in r.message
            for r in caplog.records
        )