publisher.py 4.5 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Alec's avatar
Alec committed
2
3
# SPDX-License-Identifier: Apache-2.0

4
5
import asyncio
import logging
6
from typing import Optional
Alec's avatar
Alec committed
7

8
from prometheus_client import CollectorRegistry
Alec's avatar
Alec committed
9
10
11
12
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats

13
from dynamo.common.utils.prometheus import LLMBackendMetrics
14
from dynamo.llm import WorkerMetricsPublisher
15
from dynamo.runtime import Endpoint
Alec's avatar
Alec committed
16

17
18
19
20
# Create a dedicated registry for dynamo_component metrics
# This ensures these metrics are isolated and can be exposed via their own callback
DYNAMO_COMPONENT_REGISTRY = CollectorRegistry()

Alec's avatar
Alec committed
21
22
23
24

class DynamoStatLoggerPublisher(StatLoggerBase):
    """Stat logger publisher. Wrapper for the WorkerMetricsPublisher to match the StatLoggerBase interface."""

25
26
    def __init__(
        self,
27
28
29
        endpoint: Endpoint,
        dp_rank: int = 0,
        component_gauges: Optional[LLMBackendMetrics] = None,
30
    ) -> None:
Alec's avatar
Alec committed
31
        self.inner = WorkerMetricsPublisher()
32
        self._endpoint = endpoint
Alec's avatar
Alec committed
33
        self.dp_rank = dp_rank
34
        self.component_gauges = component_gauges or LLMBackendMetrics()
Alec's avatar
Alec committed
35
        self.num_gpu_block = 1
36
37
38
39
40
41
        # Schedule async endpoint creation
        self._endpoint_task = asyncio.create_task(self._create_endpoint())

    async def _create_endpoint(self) -> None:
        """Create the NATS endpoint asynchronously."""
        try:
42
            await self.inner.create_endpoint(self._endpoint)
43
44
45
46
            logging.debug("vLLM metrics publisher endpoint created")
        except Exception:
            logging.exception("Failed to create vLLM metrics publisher endpoint")
            raise
Alec's avatar
Alec committed
47

48
    # TODO: Remove this and pass as metadata through shared storage
jh-nv's avatar
jh-nv committed
49
    def set_num_gpu_block(self, num_blocks: int) -> None:
Alec's avatar
Alec committed
50
51
52
        self.num_gpu_block = num_blocks

    def record(
53
54
55
56
        self,
        scheduler_stats: SchedulerStats,
        iteration_stats: Optional[IterationStats],
        engine_idx: int = 0,
jh-nv's avatar
jh-nv committed
57
58
59
        *args: object,
        **kwargs: object,
    ) -> None:
60
61
        active_decode_blocks = int(self.num_gpu_block * scheduler_stats.kv_cache_usage)
        self.inner.publish(self.dp_rank, active_decode_blocks)
Alec's avatar
Alec committed
62

63
64
65
66
67
68
69
70
71
72
73
        dp_rank_str = str(self.dp_rank)
        self.component_gauges.set_total_blocks(dp_rank_str, self.num_gpu_block)

        # Set GPU cache usage percentage directly from scheduler_stats
        # Note: vLLM's scheduler_stats.kv_cache_usage returns very small values
        # (e.g., 0.0000834 for ~0.08% usage), which Prometheus outputs in scientific
        # notation (8.34e-05). This is the correct value and will be properly parsed.
        self.component_gauges.set_gpu_cache_usage(
            dp_rank_str, scheduler_stats.kv_cache_usage
        )

jh-nv's avatar
jh-nv committed
74
    def init_publish(self) -> None:
75
        self.inner.publish(self.dp_rank, 0)
76
77
78
        dp_rank_str = str(self.dp_rank)
        self.component_gauges.set_total_blocks(dp_rank_str, 0)
        self.component_gauges.set_gpu_cache_usage(dp_rank_str, 0.0)
Alec's avatar
Alec committed
79
80
81
82
83
84
85
86

    def log_engine_initialized(self) -> None:
        pass


class StatLoggerFactory:
    """Factory for creating stat logger publishers. Required by vLLM."""

87
88
    def __init__(
        self,
89
        endpoint: Endpoint,
90
        component_gauges: Optional[LLMBackendMetrics] = None,
91
    ) -> None:
92
        self.endpoint = endpoint
93
        self.component_gauges = component_gauges
Alec's avatar
Alec committed
94
95
96
        self.created_logger: Optional[DynamoStatLoggerPublisher] = None

    def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
97
98
99
100
101
        # component_gauges must be set by setup_vllm_engine() before vLLM
        # calls create_stat_logger() during engine initialization.
        assert (
            self.component_gauges is not None
        ), "component_gauges must be set before creating stat loggers"
102
        logger = DynamoStatLoggerPublisher(
103
104
            endpoint=self.endpoint,
            dp_rank=dp_rank,
105
            component_gauges=self.component_gauges,
106
        )
Alec's avatar
Alec committed
107
108
109
110
111
112
113
        self.created_logger = logger

        return logger

    def __call__(self, vllm_config: VllmConfig, dp_rank: int) -> StatLoggerBase:
        return self.create_stat_logger(dp_rank=dp_rank)

114
    # TODO Remove once we publish metadata to shared storage
jh-nv's avatar
jh-nv committed
115
    def set_num_gpu_blocks_all(self, num_blocks: int) -> None:
Alec's avatar
Alec committed
116
117
118
        if self.created_logger:
            self.created_logger.set_num_gpu_block(num_blocks)

jh-nv's avatar
jh-nv committed
119
    def init_publish(self) -> None:
Alec's avatar
Alec committed
120
121
        if self.created_logger:
            self.created_logger.init_publish()