"components/backends/sglang/vscode:/vscode.git/clone" did not exist on "e31c8790bba25aa75e6e2d1e0e2923043891fbfb"
server_with_loop.py 4.49 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Example demonstrating metrics updates via background loop instead of callback.

This shows an alternative approach where:
1. Metrics are created and registered with an endpoint
2. A background thread continuously updates metrics in a loop
3. No callback is used - metrics are updated directly by the thread
4. The metrics are automatically served via the /metrics endpoint
13
14

Usage:
15
    DYN_SYSTEM_PORT=8081 ./server_with_loop.py
16
17
18

    # In another terminal, query the metrics:
    curl http://localhost:8081/metrics
19
20
21
22
23
24
25
26
"""

import asyncio
import threading
import time

import uvloop

27
28
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68


def metrics_updater_thread(
    request_total_slots: IntGauge,
    gpu_cache_usage_perc: Gauge,
    worker_active_requests: IntGaugeVec,
    update_count: IntCounter,
):
    """Background thread that continuously updates metrics."""
    print("[python] Metrics updater thread started")

    while True:
        update_count.inc()
        count = update_count.get()

        # Update simple metrics
        request_total_slots.set(1024 + count)
        gpu_cache_usage_perc.set(0.01 + (count * 0.01))

        # Update vector metrics with varying values
        worker_active_requests.set(
            5 + (count % 10), {"worker_id": "worker_1", "model": "llama-3"}
        )
        worker_active_requests.set(
            3 + (count % 5), {"worker_id": "worker_2", "model": "llama-3"}
        )

        print(f"[python] Updated metrics in loop (iteration #{count})")

        # Update every 2 seconds
        time.sleep(2)


@dynamo_worker()
async def worker(runtime: DistributedRuntime) -> None:
    await init(runtime)


async def init(runtime: DistributedRuntime):
    # Create component and endpoint
69
    component: Component = runtime.namespace("ns557").component("cp557")
70
71
    await component.create_service()

72
    endpoint: Endpoint = component.endpoint("ep557")
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

    # Create metrics using the endpoint's metrics property
    print("[python] Creating metrics...")

    request_total_slots: IntGauge = endpoint.metrics.create_intgauge(
        "request_total_slots", "Total request slots available"
    )
    gpu_cache_usage_perc: Gauge = endpoint.metrics.create_gauge(
        "gpu_cache_usage_percent", "GPU cache usage percentage"
    )

    worker_active_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
        "worker_active_requests",
        "Active requests per worker",
        ["worker_id", "model"],
    )

    update_count: IntCounter = endpoint.metrics.create_intcounter(
        "update_count",
        "Number of times metrics were updated",
        [("update_method", "background_thread")],
    )

96
97
98
99
    print(f"[python] Created IntGauge: {request_total_slots.name()}")
    print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
    print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
    print(f"[python] Created IntCounter: {update_count.name()}")
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    print("[python] Metrics automatically registered with endpoint!")

    # Set initial values
    print("[python] Setting initial metric values...")
    request_total_slots.set(1024)
    gpu_cache_usage_perc.set(0.00)
    worker_active_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
    worker_active_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})

    # Start background thread to update metrics
    print("[python] Starting background thread to update metrics...")
    updater = threading.Thread(
        target=metrics_updater_thread,
        args=(
            request_total_slots,
            gpu_cache_usage_perc,
            worker_active_requests,
            update_count,
        ),
        daemon=True,
    )
    updater.start()

    print("[python] ✅ Metrics are now registered and served via /metrics endpoint")
    print("[python]    Metrics are being updated every 2 seconds by background thread")
    print(
        "[python]    Check the system status server port to see them in Prometheus format"
    )

    # Note: This example does not call serve_endpoint() to keep it simple.
    # In a real service, you would call: await endpoint.serve_endpoint(handler, ...)
    # Keep running so metrics endpoint stays up
    _ = await asyncio.Event().wait()


if __name__ == "__main__":
    uvloop.install()
    asyncio.run(worker())