server.py 4.75 KB
Newer Older
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
1
import asyncio
Olivier Dehaene's avatar
Olivier Dehaene committed
2
import os
3
import torch
Olivier Dehaene's avatar
Olivier Dehaene committed
4

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
5
from grpc import aio
6
from loguru import logger
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
7
8
9

from grpc_reflection.v1alpha import reflection
from pathlib import Path
10
from typing import List, Optional
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
11

12
13
14
15
16
from text_generation_server.cache import Cache
from text_generation_server.interceptor import ExceptionInterceptor
from text_generation_server.models import Model, get_model
from text_generation_server.pb import generate_pb2_grpc, generate_pb2
from text_generation_server.tracing import UDSOpenTelemetryAioServerInterceptor
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
17
18


Olivier Dehaene's avatar
Olivier Dehaene committed
19
class TextGenerationService(generate_pb2_grpc.TextGenerationServiceServicer):
20
    def __init__(self, model: Model, cache: Cache, server_urls: List[str]):
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
21
22
23
        self.cache = cache
        self.model = model
        self.server_urls = server_urls
24
25
26
27
        # For some reason, inference_mode does not work well with GLOO which we use on CPU
        if model.device.type == "cuda":
            # Force inference mode for the lifetime of TextGenerationService
            self._inference_mode_raii_guard = torch._C._InferenceMode(True)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
28

29
30
31
    async def Info(self, request, context):
        return self.model.info

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
32
33
34
35
    async def ServiceDiscovery(self, request, context):
        return generate_pb2.ServiceDiscoveryResponse(urls=self.server_urls)

    async def ClearCache(self, request, context):
36
37
38
39
40
41
        if request.HasField("id"):
            self.cache.delete(request.id)
        else:
            self.cache.clear()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
Olivier Dehaene's avatar
Olivier Dehaene committed
42
        return generate_pb2.ClearCacheResponse()
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
43

44
    async def Prefill(self, request, context):
45
46
47
        batch = self.model.batch_type.from_pb(
            request.batch, self.model.tokenizer, self.model.device
        )
Olivier Dehaene's avatar
Olivier Dehaene committed
48

49
        generations, next_batch = self.model.generate_token(batch)
Olivier Dehaene's avatar
Olivier Dehaene committed
50
51
        self.cache.set(next_batch)

52
53
        return generate_pb2.PrefillResponse(
            generations=[generation.to_pb() for generation in generations],
Olivier Dehaene's avatar
Olivier Dehaene committed
54
            batch=next_batch.to_pb() if next_batch else None,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
55
56
        )

57
    async def Decode(self, request, context):
Olivier Dehaene's avatar
Olivier Dehaene committed
58
59
60
61
62
63
64
65
        if len(request.batches) == 0:
            raise ValueError("Must provide at least one batch")

        batches = []
        for batch_pb in request.batches:
            batch = self.cache.pop(batch_pb.id)
            if batch is None:
                raise ValueError(f"Batch ID {batch_pb.id} not found in cache.")
66
67
68
69
70
71
            batch = batch.filter(batch_pb.requests)
            if batch is not None:
                batches.append(batch)

        if len(batches) == 0:
            raise ValueError("All batches are empty")
Olivier Dehaene's avatar
Olivier Dehaene committed
72
73

        if len(batches) > 1:
74
            batch = self.model.batch_type.concatenate(batches)
Olivier Dehaene's avatar
Olivier Dehaene committed
75
76
77
        else:
            batch = batches[0]

78
        generations, next_batch = self.model.generate_token(batch)
Olivier Dehaene's avatar
Olivier Dehaene committed
79
80
        self.cache.set(next_batch)

81
82
        return generate_pb2.DecodeResponse(
            generations=[generation.to_pb() for generation in generations],
Olivier Dehaene's avatar
Olivier Dehaene committed
83
84
85
            batch=next_batch.to_pb() if next_batch else None,
        )

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
86

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
87
def serve(
88
    model_id: str,
89
    revision: Optional[str],
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
90
    sharded: bool,
91
    quantize: bool,
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
92
93
    uds_path: Path,
):
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
94
    async def serve_inner(
95
        model_id: str,
96
        revision: Optional[str],
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
97
        sharded: bool = False,
98
        quantize: bool = False,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
99
    ):
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
100
        unix_socket_template = "unix://{}-{}"
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
101
102
        if sharded:
            server_urls = [
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
103
                unix_socket_template.format(uds_path, rank)
104
                for rank in range(int(os.environ["WORLD_SIZE"]))
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
105
            ]
106
            local_url = server_urls[int(os.environ["RANK"])]
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
107
        else:
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
108
            local_url = unix_socket_template.format(uds_path, 0)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
109
110
            server_urls = [local_url]

111
112
113
114
115
        try:
            model = get_model(model_id, revision, sharded, quantize)
        except Exception:
            logger.exception("Error when initializing model")
            raise
116

117
118
119
120
121
122
        server = aio.server(
            interceptors=[
                ExceptionInterceptor(),
                UDSOpenTelemetryAioServerInterceptor(),
            ]
        )
Olivier Dehaene's avatar
Olivier Dehaene committed
123
124
        generate_pb2_grpc.add_TextGenerationServiceServicer_to_server(
            TextGenerationService(model, Cache(), server_urls), server
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
125
126
        )
        SERVICE_NAMES = (
Olivier Dehaene's avatar
Olivier Dehaene committed
127
            generate_pb2.DESCRIPTOR.services_by_name["TextGenerationService"].full_name,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
128
129
130
131
            reflection.SERVICE_NAME,
        )
        reflection.enable_server_reflection(SERVICE_NAMES, server)
        server.add_insecure_port(local_url)
132

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
133
        await server.start()
134

135
        logger.info("Server started at {}".format(local_url))
136

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
137
138
139
        try:
            await server.wait_for_termination()
        except KeyboardInterrupt:
140
            logger.info("Signal received. Shutting down")
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
141
            await server.stop(0)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
142

143
    asyncio.run(serve_inner(model_id, revision, sharded, quantize))