collector.py 9.08 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2023-2024 SGLang Team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
14
15
"""Utilities for Prometheus Metrics Collection."""

16
import time
17
18
19
20
21
22
23
24
25
26
27
28
from dataclasses import dataclass
from typing import Dict, Union


@dataclass
class SchedulerStats:
    num_running_reqs: int = 0
    num_used_tokens: int = 0
    token_usage: float = 0.0
    gen_throughput: float = 0.0
    num_queue_reqs: int = 0
    cache_hit_rate: float = 0.0
29
    spec_accept_length: float = 0.0
30
31
32
33
34
35
36
37
38


class SchedulerMetricsCollector:

    def __init__(self, labels: Dict[str, str]) -> None:
        # We need to import prometheus_client after setting the env variable `PROMETHEUS_MULTIPROC_DIR`
        from prometheus_client import Gauge

        self.labels = labels
39
        self.last_log_time = time.time()
40
41
42

        self.num_running_reqs = Gauge(
            name="sglang:num_running_reqs",
43
            documentation="The number of running requests.",
44
            labelnames=labels.keys(),
45
            multiprocess_mode="mostrecent",
46
47
48
49
        )

        self.num_used_tokens = Gauge(
            name="sglang:num_used_tokens",
50
            documentation="The number of used tokens.",
51
            labelnames=labels.keys(),
52
            multiprocess_mode="mostrecent",
53
54
55
56
        )

        self.token_usage = Gauge(
            name="sglang:token_usage",
57
            documentation="The token usage.",
58
59
60
61
62
63
            labelnames=labels.keys(),
            multiprocess_mode="mostrecent",
        )

        self.gen_throughput = Gauge(
            name="sglang:gen_throughput",
64
            documentation="The generation throughput (token/s).",
65
            labelnames=labels.keys(),
66
            multiprocess_mode="mostrecent",
67
68
69
70
        )

        self.num_queue_reqs = Gauge(
            name="sglang:num_queue_reqs",
71
            documentation="The number of requests in the waiting queue.",
72
            labelnames=labels.keys(),
73
            multiprocess_mode="mostrecent",
74
75
76
77
        )

        self.cache_hit_rate = Gauge(
            name="sglang:cache_hit_rate",
78
79
80
81
82
83
84
85
            documentation="The prefix cache hit rate.",
            labelnames=labels.keys(),
            multiprocess_mode="mostrecent",
        )

        self.spec_accept_length = Gauge(
            name="sglang:spec_accept_length",
            documentation="The average acceptance length of speculative decoding.",
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
            labelnames=labels.keys(),
            multiprocess_mode="mostrecent",
        )

    def _log_gauge(self, gauge, data: Union[int, float]) -> None:
        # Convenience function for logging to gauge.
        gauge.labels(**self.labels).set(data)

    def log_stats(self, stats: SchedulerStats) -> None:
        self._log_gauge(self.num_running_reqs, stats.num_running_reqs)
        self._log_gauge(self.num_used_tokens, stats.num_used_tokens)
        self._log_gauge(self.token_usage, stats.token_usage)
        self._log_gauge(self.gen_throughput, stats.gen_throughput)
        self._log_gauge(self.num_queue_reqs, stats.num_queue_reqs)
        self._log_gauge(self.cache_hit_rate, stats.cache_hit_rate)
101
        self._log_gauge(self.spec_accept_length, stats.spec_accept_length)
102
        self.last_log_time = time.time()
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123


class TokenizerMetricsCollector:
    def __init__(self, labels: Dict[str, str]) -> None:
        # We need to import prometheus_client after setting the env variable `PROMETHEUS_MULTIPROC_DIR`
        from prometheus_client import Counter, Histogram

        self.labels = labels

        self.prompt_tokens_total = Counter(
            name="sglang:prompt_tokens_total",
            documentation="Number of prefill tokens processed.",
            labelnames=labels.keys(),
        )

        self.generation_tokens_total = Counter(
            name="sglang:generation_tokens_total",
            documentation="Number of generation tokens processed.",
            labelnames=labels.keys(),
        )

124
125
126
127
128
129
        self.cached_tokens_total = Counter(
            name="sglang:cached_tokens_total",
            documentation="Number of cached prompt tokens.",
            labelnames=labels.keys(),
        )

130
131
132
133
134
135
        self.num_requests_total = Counter(
            name="sglang:num_requests_total",
            documentation="Number of requests processed.",
            labelnames=labels.keys(),
        )

136
137
138
139
140
141
        self.histogram_time_to_first_token = Histogram(
            name="sglang:time_to_first_token_seconds",
            documentation="Histogram of time to first token in seconds.",
            labelnames=labels.keys(),
            buckets=[
                0.1,
142
                0.3,
143
                0.5,
144
145
                0.7,
                0.9,
146
147
                1,
                2,
148
149
150
                4,
                6,
                8,
151
152
153
154
155
156
157
                10,
                20,
                40,
                60,
                80,
                120,
                160,
158
159
160
161
162
163
164
165
            ],
        )

        self.histogram_time_per_output_token = Histogram(
            name="sglang:time_per_output_token_seconds",
            documentation="Histogram of time per output token in seconds.",
            labelnames=labels.keys(),
            buckets=[
166
                0.002,
167
                0.005,
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
                0.010,
                0.020,
                0.030,
                0.040,
                0.050,
                0.060,
                0.070,
                0.080,
                0.090,
                0.100,
                0.150,
                0.200,
                0.300,
                0.400,
                0.600,
                0.800,
                1.000,
                2.000,
            ],
        )

        self.histogram_inter_token_latency_seconds = Histogram(
            name="sglang:inter_token_latency_seconds",
            documentation="Histogram of inter-token latency in seconds.",
            labelnames=labels.keys(),
            buckets=[
                0.002,
                0.004,
                0.006,
                0.008,
                0.010,
199
                0.015,
200
                0.020,
201
                0.025,
202
203
204
205
                0.030,
                0.035,
                0.040,
                0.050,
206
                0.075,
207
208
209
210
211
212
213
214
215
                0.100,
                0.150,
                0.200,
                0.300,
                0.400,
                0.500,
                0.750,
                1.000,
                2.000,
216
217
218
219
220
221
222
223
            ],
        )

        self.histogram_e2e_request_latency = Histogram(
            name="sglang:e2e_request_latency_seconds",
            documentation="Histogram of End-to-end request latency in seconds",
            labelnames=labels.keys(),
            buckets=[
224
                0.1,
225
226
227
                0.2,
                0.4,
                0.8,
228
229
230
231
232
233
234
235
                1,
                2,
                5,
                10,
                20,
                40,
                60,
                80,
236
237
238
239
240
241
242
243
244
245
246
                100,
                150,
                200,
                250,
                300,
                350,
                500,
                1000,
            ],
        )

247
248
249
    def _log_histogram(self, histogram, data: Union[int, float]) -> None:
        histogram.labels(**self.labels).observe(data)

250
251
252
253
    def observe_one_finished_request(
        self,
        prompt_tokens: int,
        generation_tokens: int,
254
        cached_tokens: int,
255
256
        e2e_latency: float,
    ):
257
258
        self.prompt_tokens_total.labels(**self.labels).inc(prompt_tokens)
        self.generation_tokens_total.labels(**self.labels).inc(generation_tokens)
259
        self.cached_tokens_total.labels(**self.labels).inc(cached_tokens)
260
        self.num_requests_total.labels(**self.labels).inc(1)
261
262
263
264
265
266
267
268
        self._log_histogram(self.histogram_e2e_request_latency, e2e_latency)
        if generation_tokens >= 1:
            self.histogram_time_per_output_token.labels(**self.labels).observe(
                e2e_latency / generation_tokens
            )

    def observe_time_to_first_token(self, value: float):
        self.histogram_time_to_first_token.labels(**self.labels).observe(value)
269

270
271
    def observe_inter_token_latency(self, internval: float, num_new_tokens: int):
        adjusted_interval = internval / num_new_tokens
272

273
274
275
276
        # A faster version of the Histogram::observe which observes multiple values at the same time.
        # reference: https://github.com/prometheus/client_python/blob/v0.21.1/prometheus_client/metrics.py#L639
        his = self.histogram_inter_token_latency_seconds.labels(**self.labels)
        his._sum.inc(internval)
277

278
279
280
281
        for i, bound in enumerate(his._upper_bounds):
            if adjusted_interval <= bound:
                his._buckets[i].inc(num_new_tokens)
                break