server.py 2.47 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
import signal
18
19

import uvloop
Neelay Shah's avatar
Neelay Shah committed
20

Neelay Shah's avatar
Neelay Shah committed
21
from dynamo.runtime import DistributedRuntime, dynamo_worker
22
23
24
25
26
27
28
29
30
31


class RequestHandler:
    """
    Request handler for the generate endpoint
    """

    async def generate(self, request):
        print(f"Received request: {request}")
        for char in request:
32
            await asyncio.sleep(1)
33
34
35
            yield char


36
@dynamo_worker(static=False)
37
async def worker(runtime: DistributedRuntime):
38
39
40
    print(
        f"Primary lease ID: {runtime.etcd_client().primary_lease_id()}/{runtime.etcd_client().primary_lease_id():#x}"
    )
41
42
43
44
45
46
47
48
49
50
51
52

    # Set up signal handler for graceful shutdown
    loop = asyncio.get_running_loop()

    def signal_handler():
        # Schedule the shutdown coroutine instead of calling it directly
        asyncio.create_task(graceful_shutdown(runtime))

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, signal_handler)

    print("Signal handlers registered for graceful shutdown")
Neelay Shah's avatar
Neelay Shah committed
53
    await init(runtime, "dynamo")
54
55


56
57
58
59
60
61
async def graceful_shutdown(runtime: DistributedRuntime):
    print("Received shutdown signal, shutting down DistributedRuntime")
    runtime.shutdown()
    print("DistributedRuntime shutdown complete")


62
63
64
65
66
67
async def init(runtime: DistributedRuntime, ns: str):
    """
    Instantiate a `backend` component and serve the `generate` endpoint
    A `Component` can serve multiple endpoints
    """
    component = runtime.namespace(ns).component("backend")
68
    await component.create_service()
69
70
71

    endpoint = component.endpoint("generate")
    print("Started server instance")
72
73
74

    # the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
    # after the lease is revoked
75
    await endpoint.serve_endpoint(RequestHandler().generate)
76
77
78
79
80


if __name__ == "__main__":
    uvloop.install()
    asyncio.run(worker())