publisher.py 3.92 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
17
import asyncio
import logging
18
from typing import List, Optional, Tuple
19
20
21
22
23

from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats

24
from dynamo.llm import WorkerMetricsPublisher
25
from dynamo.runtime import Endpoint
26
27
28
29
30
31
32
33
34
35
36


class NullStatLogger(StatLoggerBase):
    def __init__(self):
        pass

    def record(
        self,
        scheduler_stats: Optional[SchedulerStats],
        iteration_stats: Optional[IterationStats],
        engine_idx: int = 0,
37
38
        *args,
        **kwargs,
39
40
41
42
43
44
45
46
47
48
    ):
        pass

    def log_engine_initialized(self):
        pass


class DynamoStatLoggerPublisher(StatLoggerBase):
    """Stat logger publisher. Wrapper for the WorkerMetricsPublisher to match the StatLoggerBase interface."""

49
50
    def __init__(
        self,
51
        endpoint: Endpoint,
52
53
        dp_rank: int,
    ) -> None:
54
        self.inner = WorkerMetricsPublisher()
55
        self._endpoint = endpoint
56
57
        self.dp_rank = dp_rank
        self.num_gpu_block = 1
58
59
60
61
62
63
        # Schedule async endpoint creation
        self._endpoint_task = asyncio.create_task(self._create_endpoint())

    async def _create_endpoint(self) -> None:
        """Create the NATS endpoint asynchronously."""
        try:
64
            await self.inner.create_endpoint(self._endpoint)
65
66
67
68
            logging.debug("Multimodal metrics publisher endpoint created")
        except Exception:
            logging.exception("Failed to create multimodal metrics publisher endpoint")
            raise
69
70
71
72
73
74
75
76
77
78

    # TODO: Remove this and pass as metadata through etcd
    def set_num_gpu_block(self, num_blocks):
        self.num_gpu_block = num_blocks

    def record(
        self,
        scheduler_stats: SchedulerStats,
        iteration_stats: Optional[IterationStats],
        engine_idx: int = 0,
79
80
        *args,
        **kwargs,
81
    ):
82
83
        active_decode_blocks = int(self.num_gpu_block * scheduler_stats.kv_cache_usage)
        self.inner.publish(self.dp_rank, active_decode_blocks)
84
85

    def init_publish(self):
86
        self.inner.publish(self.dp_rank, 0)
87
88
89
90
91
92
93
94

    def log_engine_initialized(self) -> None:
        pass


class StatLoggerFactory:
    """Factory for creating stat logger publishers. Required by vLLM."""

95
96
    def __init__(
        self,
97
        endpoint: Endpoint,
98
99
100
        dp_rank: int = 0,
        metrics_labels: Optional[List[Tuple[str, str]]] = None,
    ) -> None:
101
        self.endpoint = endpoint
102
103
        self.created_logger: Optional[DynamoStatLoggerPublisher] = None
        self.dp_rank = dp_rank
104
        self.metrics_labels = metrics_labels or []
105
106
107
108

    def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
        if self.dp_rank != dp_rank:
            return NullStatLogger()
109
        logger = DynamoStatLoggerPublisher(self.endpoint, dp_rank)
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        self.created_logger = logger

        return logger

    def __call__(self, vllm_config: VllmConfig, dp_rank: int) -> StatLoggerBase:
        return self.create_stat_logger(dp_rank=dp_rank)

    # TODO Remove once we publish metadata to etcd
    def set_num_gpu_blocks_all(self, num_blocks):
        if self.created_logger:
            self.created_logger.set_num_gpu_block(num_blocks)

    def init_publish(self):
        if self.created_logger:
            self.created_logger.init_publish()