mock_nim_frontend.py 4.78 KB
Newer Older
1
#!/usr/bin/env python3
2
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# SPDX-License-Identifier: Apache-2.0
"""
Mock NIM Frontend - Polls the mock NIM backend for metrics

This script demonstrates how to poll a custom backend for metrics using
the Dynamo runtime in static mode (no etcd required, uses NATS only).
"""
import asyncio
import json
import signal

import uvloop

from dynamo.runtime import DistributedRuntime


async def poll_custom_backend_metrics(
    runtime, namespace_component_endpoint, interval_secs
):
    """Poll custom backend metrics and print the data"""

    print(
        f"Starting custom backend metrics polling: endpoint={namespace_component_endpoint}, interval={interval_secs}s"
    )

    # Parse endpoint string (namespace.component.endpoint)
    parts = namespace_component_endpoint.split(".")
    if len(parts) != 3:
        print(f"ERROR: Invalid endpoint format: {namespace_component_endpoint}")
        return

    namespace, component_name, endpoint_name = parts
    print(f"Polling {namespace}/{component_name}/{endpoint_name}")

    try:
        # Get the component and endpoint
        ns = runtime.namespace(namespace)
        component = ns.component(component_name)
        endpoint = component.endpoint(endpoint_name)

        # Get client (in static mode, no need to wait for instances)
        client = await endpoint.client()
        print("Client created for static endpoint")

    except Exception as e:
        print(f"ERROR during polling setup: {e}")
        import traceback

        traceback.print_exc()
        return

    # Poll loop
    print(f"Starting polling loop (every {interval_secs}s)...")
    while True:
        try:
            await asyncio.sleep(interval_secs)
            print(f"\n{'='*60}")
            print(f"Polling tick at {asyncio.get_event_loop().time():.2f}")

            # Send request and collect responses
            # In static mode, use client.static() or client.generate()
            response_stream = await client.generate("")
            responses = []
            async for response in response_stream:
                if response.data():
                    responses.append(response.data())

            print(f"Received {len(responses)} responses")
            for idx, data in enumerate(responses):
                print(f"\nResponse #{idx+1}:")
                if isinstance(data, str):
                    try:
                        parsed = json.loads(data)
                        print(json.dumps(parsed, indent=2))
                    except json.JSONDecodeError:
                        print(data)
                else:
                    print(data)
            print(f"{'='*60}\n")

        except asyncio.CancelledError:
            print("Polling cancelled")
            break
        except Exception as e:
            print(f"ERROR polling backend: {e}")
            import traceback

            traceback.print_exc()
            await asyncio.sleep(interval_secs)


async def graceful_shutdown(runtime):
    """Gracefully shutdown the runtime"""
    print("\nShutting down...")
    runtime.shutdown()


async def async_main():
    """Main async function - similar to frontend/main.py"""
    import argparse

    parser = argparse.ArgumentParser(
        description="Mock NIM Frontend - Poll backend for metrics"
    )
    parser.add_argument(
        "--custom-backend-metrics-endpoint",
        type=str,
        default="nim.backend.runtime_stats",
        help="Custom backend metrics endpoint in format 'namespace.component.endpoint' (default: 'nim.backend.runtime_stats')",
    )
    parser.add_argument(
        "--polling-interval",
        type=float,
        default=3.0,
        help="Polling interval in seconds (default: 3.0)",
    )
    args = parser.parse_args()

    # Get the event loop
    loop = asyncio.get_running_loop()

    # Create DistributedRuntime - similar to frontend/main.py line 246
125
    runtime = DistributedRuntime(loop, "file", "nats")  # type: ignore[call-arg]
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

    # Setup signal handlers for graceful shutdown
    def signal_handler():
        asyncio.create_task(graceful_shutdown(runtime))

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, signal_handler)

    print("Mock NIM Frontend starting...")
    print(f"Target endpoint: {args.custom_backend_metrics_endpoint}")
    print(f"Polling interval: {args.polling_interval}s")
    print("Static mode: No etcd required, using NATS only\n")

    try:
        # Start polling
        await poll_custom_backend_metrics(
            runtime, args.custom_backend_metrics_endpoint, args.polling_interval
        )
    except asyncio.exceptions.CancelledError:
        pass


def main():
    """Entry point - similar to frontend/main.py"""
    uvloop.run(async_main())


if __name__ == "__main__":
    main()