# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import signal import uvloop from dynamo.runtime import DistributedRuntime, dynamo_worker class RequestHandler: """ Request handler for the generate endpoint """ async def generate(self, request): print(f"Received request: {request}") for char in request: await asyncio.sleep(1) yield char @dynamo_worker(static=False) async def worker(runtime: DistributedRuntime): print( f"Primary lease ID: {runtime.etcd_client().primary_lease_id()}/{runtime.etcd_client().primary_lease_id():#x}" ) # Set up signal handler for graceful shutdown loop = asyncio.get_running_loop() def signal_handler(): # Schedule the shutdown coroutine instead of calling it directly asyncio.create_task(graceful_shutdown(runtime)) for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) print("Signal handlers registered for graceful shutdown") await init(runtime, "dynamo") async def graceful_shutdown(runtime: DistributedRuntime): print("Received shutdown signal, shutting down DistributedRuntime") runtime.shutdown() print("DistributedRuntime shutdown complete") async def init(runtime: DistributedRuntime, ns: str): """ Instantiate a `backend` component and serve the `generate` endpoint A `Component` can serve multiple endpoints """ component = runtime.namespace(ns).component("backend") await component.create_service() endpoint = component.endpoint("generate") print("Started server instance") # the server will gracefully shutdown (i.e., keep opened TCP streams finishes) # after the lease is revoked await endpoint.serve_endpoint(RequestHandler().generate) if __name__ == "__main__": uvloop.install() asyncio.run(worker())