server.py 10.4 KB
Newer Older
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
1
import asyncio
Olivier Dehaene's avatar
Olivier Dehaene committed
2
import os
3
import torch
4
import time
5
import signal
Olivier Dehaene's avatar
Olivier Dehaene committed
6

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
7
from grpc import aio
8
from loguru import logger
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
9
10
11

from grpc_reflection.v1alpha import reflection
from pathlib import Path
12
from typing import List, Optional
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
13

14
15
16
from text_generation_server.cache import Cache
from text_generation_server.interceptor import ExceptionInterceptor
from text_generation_server.models import Model, get_model
17
18
19
20
21
22
23
24
25
26
27
28
29

try:
    from text_generation_server.models.pali_gemma import PaliGemmaBatch
    from text_generation_server.models.vlm_causal_lm import (
        VlmCausalLMBatch,
    )
    from text_generation_server.models.idefics_causal_lm import IdeficsCausalLMBatch

    VLM_BATCH_TYPES = {PaliGemmaBatch, VlmCausalLMBatch, IdeficsCausalLMBatch}
except (ImportError, NotImplementedError):
    # These imports can fail on CPU/Non flash.
    VLM_BATCH_TYPES = set()

30
31
from text_generation_server.pb import generate_pb2_grpc, generate_pb2
from text_generation_server.tracing import UDSOpenTelemetryAioServerInterceptor
drbh's avatar
drbh committed
32
33
34
35
from text_generation_server.models.globals import set_model_id, set_adapter_to_index
from text_generation_server.utils.adapter import (
    AdapterParameters,
)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
36

OlivierDehaene's avatar
OlivierDehaene committed
37

38
39
40
41
42
43
44
45
46
47
48
49
class SignalHandler:
    KEEP_PROCESSING = True

    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)

    def exit_gracefully(self, signum, frame):
        print(f"Exiting gracefully: Signal {signum}")
        self.KEEP_PROCESSING = False


Olivier Dehaene's avatar
Olivier Dehaene committed
50
class TextGenerationService(generate_pb2_grpc.TextGenerationServiceServicer):
51
52
53
54
55
56
57
    def __init__(
        self,
        model: Model,
        cache: Cache,
        quantize: Optional[str],
        server_urls: List[str],
    ):
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
58
59
        self.cache = cache
        self.model = model
60
        self.quantize = quantize
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
61
        self.server_urls = server_urls
62
63
64
65
        # For some reason, inference_mode does not work well with GLOO which we use on CPU
        if model.device.type == "cuda":
            # Force inference mode for the lifetime of TextGenerationService
            self._inference_mode_raii_guard = torch._C._InferenceMode(True)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
66

67
68
69
    async def Info(self, request, context):
        return self.model.info

70
71
72
73
74
    async def Health(self, request, context):
        if self.model.device.type == "cuda":
            torch.zeros((2, 2)).cuda()
        return generate_pb2.HealthResponse()

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
75
76
77
78
    async def ServiceDiscovery(self, request, context):
        return generate_pb2.ServiceDiscoveryResponse(urls=self.server_urls)

    async def ClearCache(self, request, context):
79
80
81
82
        if request.HasField("id"):
            self.cache.delete(request.id)
        else:
            self.cache.clear()
Olivier Dehaene's avatar
Olivier Dehaene committed
83
        return generate_pb2.ClearCacheResponse()
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
84

85
86
87
88
    async def FilterBatch(self, request, context):
        batch = self.cache.pop(request.batch_id)
        if batch is None:
            raise ValueError(f"Batch ID {request.batch_id} not found in cache.")
89
        filtered_batch = batch.filter(request.request_ids)
90
91
92
93
        self.cache.set(filtered_batch)

        return generate_pb2.FilterBatchResponse(batch=filtered_batch.to_pb())

94
    async def Warmup(self, request, context):
95
        if self.quantize in {"exl2", "gptq"}:
96
97
98
99
            try:
                # When using GPTQ, Exllama kernels need some global kernels
                # For which we have the finale shapes only after the model has loaded
                # This will allocate those buffers.
Nicolas Patry's avatar
Nicolas Patry committed
100
                from text_generation_server.layers.gptq import (
101
102
103
104
105
106
107
108
109
                    create_exllama_buffers,
                    set_device,
                )

                set_device(self.model.device)
                create_exllama_buffers(request.max_prefill_tokens)
            except ImportError:
                pass

110
111
112
        if (
            self.model.batch_type in VLM_BATCH_TYPES
        ):  # Hack, i would rather use kwargs in the `from_pb` call
113
            batch = self.model.batch_type.from_pb_processor(
OlivierDehaene's avatar
OlivierDehaene committed
114
115
116
                request.batch,
                self.model.tokenizer,
                self.model.processor,
117
                self.model.model.config,
OlivierDehaene's avatar
OlivierDehaene committed
118
119
                self.model.dtype,
                self.model.device,
120
121
122
123
124
            )
        else:
            batch = self.model.batch_type.from_pb(
                request.batch, self.model.tokenizer, self.model.dtype, self.model.device
            )
125
        max_supported_total_tokens = self.model.warmup(batch)
126

127
128
129
        return generate_pb2.WarmupResponse(
            max_supported_total_tokens=max_supported_total_tokens
        )
130

131
    async def Prefill(self, request, context):
132
        start = time.time_ns()
133
134
135
        if (
            self.model.batch_type in VLM_BATCH_TYPES
        ):  # Hack, i would rather use kwargs in the `from_pb` call
136
            batch = self.model.batch_type.from_pb_processor(
OlivierDehaene's avatar
OlivierDehaene committed
137
138
139
                request.batch,
                self.model.tokenizer,
                self.model.processor,
140
                self.model.model.config,
OlivierDehaene's avatar
OlivierDehaene committed
141
142
                self.model.dtype,
                self.model.device,
143
144
145
146
147
            )
        else:
            batch = self.model.batch_type.from_pb(
                request.batch, self.model.tokenizer, self.model.dtype, self.model.device
            )
Olivier Dehaene's avatar
Olivier Dehaene committed
148

149
        generations, next_batch, timings = self.model.generate_token(batch)
Olivier Dehaene's avatar
Olivier Dehaene committed
150
151
        self.cache.set(next_batch)

152
153
        return generate_pb2.PrefillResponse(
            generations=[generation.to_pb() for generation in generations],
Olivier Dehaene's avatar
Olivier Dehaene committed
154
            batch=next_batch.to_pb() if next_batch else None,
155
156
157
            forward_ns=timings[0],
            decode_ns=timings[1],
            total_ns=time.time_ns() - start,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
158
159
        )

160
    async def Decode(self, request, context):
161
        start = time.time_ns()
Olivier Dehaene's avatar
Olivier Dehaene committed
162
163
164
165
166
167
168
169
        if len(request.batches) == 0:
            raise ValueError("Must provide at least one batch")

        batches = []
        for batch_pb in request.batches:
            batch = self.cache.pop(batch_pb.id)
            if batch is None:
                raise ValueError(f"Batch ID {batch_pb.id} not found in cache.")
170
            batches.append(batch)
171
172
173

        if len(batches) == 0:
            raise ValueError("All batches are empty")
Olivier Dehaene's avatar
Olivier Dehaene committed
174
175

        if len(batches) > 1:
176
            start_concat = time.time_ns()
177
            batch = self.model.batch_type.concatenate(batches)
178
            concat_ns = time.time_ns() - start_concat
Olivier Dehaene's avatar
Olivier Dehaene committed
179
180
        else:
            batch = batches[0]
181
            concat_ns = None
Olivier Dehaene's avatar
Olivier Dehaene committed
182

183
        generations, next_batch, timings = self.model.generate_token(batch)
Olivier Dehaene's avatar
Olivier Dehaene committed
184
185
        self.cache.set(next_batch)

186
187
        return generate_pb2.DecodeResponse(
            generations=[generation.to_pb() for generation in generations],
Olivier Dehaene's avatar
Olivier Dehaene committed
188
            batch=next_batch.to_pb() if next_batch else None,
189
190
191
192
            concat_ns=concat_ns,
            forward_ns=timings[0],
            decode_ns=timings[1],
            total_ns=time.time_ns() - start,
Olivier Dehaene's avatar
Olivier Dehaene committed
193
194
        )

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
195

Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
196
def serve(
197
    model_id: str,
drbh's avatar
drbh committed
198
    lora_adapter_ids: Optional[List[str]],
199
200
201
    revision: Optional[str],
    sharded: bool,
    quantize: Optional[str],
Nicolas Patry's avatar
Nicolas Patry committed
202
    speculate: Optional[int],
203
204
205
    dtype: Optional[str],
    trust_remote_code: bool,
    uds_path: Path,
206
    max_input_tokens: int,
207
208
):
    async def serve_inner(
209
        model_id: str,
drbh's avatar
drbh committed
210
        lora_adapter_ids: Optional[List[str]],
211
212
213
        revision: Optional[str],
        sharded: bool = False,
        quantize: Optional[str] = None,
Nicolas Patry's avatar
Nicolas Patry committed
214
        speculate: Optional[int] = None,
215
216
        dtype: Optional[str] = None,
        trust_remote_code: bool = False,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
217
    ):
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
218
        unix_socket_template = "unix://{}-{}"
drbh's avatar
drbh committed
219
        adapter_to_index = {}
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
220
221
        if sharded:
            server_urls = [
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
222
                unix_socket_template.format(uds_path, rank)
223
                for rank in range(int(os.environ["WORLD_SIZE"]))
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
224
            ]
225
            local_url = server_urls[int(os.environ["RANK"])]
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
226
        else:
Olivier Dehaene's avatar
v0.1.0  
Olivier Dehaene committed
227
            local_url = unix_socket_template.format(uds_path, 0)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
228
229
            server_urls = [local_url]

230
        try:
231
            model = get_model(
OlivierDehaene's avatar
OlivierDehaene committed
232
                model_id,
drbh's avatar
drbh committed
233
                lora_adapter_ids,
OlivierDehaene's avatar
OlivierDehaene committed
234
235
236
237
238
239
                revision,
                sharded,
                quantize,
                speculate,
                dtype,
                trust_remote_code,
240
                max_input_tokens,
241
            )
drbh's avatar
drbh committed
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263

            if len(lora_adapter_ids) > 0:
                for index, adapter_id in enumerate(lora_adapter_ids):
                    # TODO: improve non merged adapter loading and long term
                    # improve adapter loading as a whole
                    adapter_parameters = AdapterParameters(
                        adapter_ids=[adapter_id],
                        weights=None,  #  will be set to 1
                        merge_strategy=0,
                        density=1.0,
                        majority_sign_method=0,
                    )
                    adapter_index = index + 1
                    adapter_to_index[adapter_id] = adapter_index
                    model.load_adapter(
                        adapter_parameters,
                        None,  # adapter_source
                        adapter_index,
                        None,  # api_token
                        False,  # dynamic
                    )

264
265
266
        except Exception:
            logger.exception("Error when initializing model")
            raise
267

drbh's avatar
drbh committed
268
        set_adapter_to_index(adapter_to_index)
269
270
271
272
        server = aio.server(
            interceptors=[
                ExceptionInterceptor(),
                UDSOpenTelemetryAioServerInterceptor(),
273
274
275
276
277
            ],
            options=[
                # Set the maximum possible message length: i32::MAX
                ("grpc.max_receive_message_length", (1 << 31) - 1)
            ],
278
        )
Olivier Dehaene's avatar
Olivier Dehaene committed
279
        generate_pb2_grpc.add_TextGenerationServiceServicer_to_server(
280
            TextGenerationService(model, Cache(), quantize, server_urls), server
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
281
282
        )
        SERVICE_NAMES = (
Olivier Dehaene's avatar
Olivier Dehaene committed
283
            generate_pb2.DESCRIPTOR.services_by_name["TextGenerationService"].full_name,
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
284
285
286
287
            reflection.SERVICE_NAME,
        )
        reflection.enable_server_reflection(SERVICE_NAMES, server)
        server.add_insecure_port(local_url)
288

Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
289
        await server.start()
290

291
        logger.info("Server started at {}".format(local_url))
Nicolas Patry's avatar
Nicolas Patry committed
292
        signal_handler = SignalHandler()
293
294
        while signal_handler.KEEP_PROCESSING:
            await asyncio.sleep(0.5)
Olivier Dehaene's avatar
Init  
Olivier Dehaene committed
295

fxmarty's avatar
fxmarty committed
296
    set_model_id(model_id)
297
    asyncio.run(
OlivierDehaene's avatar
OlivierDehaene committed
298
        serve_inner(
drbh's avatar
drbh committed
299
300
301
302
303
304
305
306
            model_id,
            lora_adapter_ids,
            revision,
            sharded,
            quantize,
            speculate,
            dtype,
            trust_remote_code,
OlivierDehaene's avatar
OlivierDehaene committed
307
        )
308
    )