backend_request_func.py 23.4 KB
Newer Older
1
2
3
4
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import io
laibao's avatar
laibao committed
5
6
7
8
9
10
import json
import os
import sys
import time
import traceback
from dataclasses import dataclass, field
11
from typing import Optional, Union
laibao's avatar
laibao committed
12
13

import aiohttp
laibao's avatar
laibao committed
14
import huggingface_hub.constants
laibao's avatar
laibao committed
15
from tqdm.asyncio import tqdm
16
17
18
19
from transformers import AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast

# NOTE(simon): do not import vLLM here so the benchmark script
# can run without vLLM installed.
laibao's avatar
laibao committed
20
21
22
23
24
25
26
27
28
29
30

AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)


@dataclass
class RequestFuncInput:
    prompt: str
    api_url: str
    prompt_len: int
    output_len: int
    model: str
31
    model_name: Optional[str] = None
laibao's avatar
laibao committed
32
    logprobs: Optional[int] = None
33
    extra_body: Optional[dict] = None
laibao's avatar
laibao committed
34
    multi_modal_content: Optional[dict] = None
35
36
    ignore_eos: bool = False
    language: Optional[str] = None
laibao's avatar
laibao committed
37
38
39
40
41
42
43


@dataclass
class RequestFuncOutput:
    generated_text: str = ""
    success: bool = False
    latency: float = 0.0
44
    output_tokens: int = 0
laibao's avatar
laibao committed
45
    ttft: float = 0.0  # Time to first token
46
47
    itl: list[float] = field(default_factory=list)  # list of inter-token latencies
    tpot: float = 0.0  # avg next-token latencies
laibao's avatar
laibao committed
48
49
50
51
52
53
54
55
56
57
58
    prompt_len: int = 0
    error: str = ""


async def async_request_tgi(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    api_url = request_func_input.api_url
    assert api_url.endswith("generate_stream")

59
60
61
    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
laibao's avatar
laibao committed
62
63
64
65
66
        params = {
            "max_new_tokens": request_func_input.output_len,
            "do_sample": True,
            "temperature": 0.01,  # TGI does not accept 0.0 temperature.
            "top_p": 0.99,  # TGI does not accept 1.0 top_p.
67
68
            "truncate": request_func_input.prompt_len,
            "ignore_eos_token": request_func_input.ignore_eos,
laibao's avatar
laibao committed
69
70
71
72
73
74
75
        }
        payload = {
            "inputs": request_func_input.prompt,
            "parameters": params,
        }
        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len
76
77
78
79
        if request_func_input.ignore_eos:
            output.output_tokens = request_func_input.output_len
        else:
            output.output_tokens = None
laibao's avatar
laibao committed
80
81
82
83
84
85
86
87
88
89
90

        ttft = 0.0
        st = time.perf_counter()
        most_recent_timestamp = st
        try:
            async with session.post(url=api_url, json=payload) as response:
                if response.status == 200:
                    async for chunk_bytes in response.content:
                        chunk_bytes = chunk_bytes.strip()
                        if not chunk_bytes:
                            continue
laibao's avatar
laibao committed
91
                        chunk_bytes = chunk_bytes.decode("utf-8")
laibao's avatar
laibao committed
92

93
                        # NOTE: Sometimes TGI returns a ping response without
laibao's avatar
laibao committed
94
95
96
                        # any data, we should skip it.
                        if chunk_bytes.startswith(":"):
                            continue
97
                        chunk = chunk_bytes.removeprefix("data:")
laibao's avatar
laibao committed
98
99
100
101
102
103
104
105
106
107

                        data = json.loads(chunk)
                        timestamp = time.perf_counter()
                        # First token
                        if ttft == 0.0:
                            ttft = time.perf_counter() - st
                            output.ttft = ttft

                        # Decoding phase
                        else:
108
                            output.itl.append(timestamp - most_recent_timestamp)
laibao's avatar
laibao committed
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

                        most_recent_timestamp = timestamp

                    output.latency = most_recent_timestamp - st
                    output.success = True
                    output.generated_text = data["generated_text"]
                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

        if pbar:
            pbar.update(1)
        return output


async def async_request_trt_llm(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    api_url = request_func_input.api_url
    assert api_url.endswith("generate_stream")

135
136
137
    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
laibao's avatar
laibao committed
138
139
140
141
142
143
144
145
        payload = {
            "accumulate_tokens": True,
            "text_input": request_func_input.prompt,
            "temperature": 0.0,
            "top_p": 1.0,
            "max_tokens": request_func_input.output_len,
            "stream": True,
        }
146
147
        if request_func_input.ignore_eos:
            payload["min_length"] = request_func_input.output_len
laibao's avatar
laibao committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len

        ttft = 0.0
        st = time.perf_counter()
        most_recent_timestamp = st
        try:
            async with session.post(url=api_url, json=payload) as response:
                if response.status == 200:
                    async for chunk_bytes in response.content:
                        chunk_bytes = chunk_bytes.strip()
                        if not chunk_bytes:
                            continue

162
                        chunk = chunk_bytes.decode("utf-8").removeprefix("data:")
laibao's avatar
laibao committed
163
164
165
166
167
168

                        data = json.loads(chunk)
                        output.generated_text += data["text_output"]
                        timestamp = time.perf_counter()
                        # First token
                        if ttft == 0.0:
169
                            ttft = timestamp - st
laibao's avatar
laibao committed
170
171
172
173
                            output.ttft = ttft

                        # Decoding phase
                        else:
174
                            output.itl.append(timestamp - most_recent_timestamp)
laibao's avatar
laibao committed
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

                        most_recent_timestamp = timestamp

                    output.latency = most_recent_timestamp - st
                    output.success = True

                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

        if pbar:
            pbar.update(1)
        return output


async def async_request_deepspeed_mii(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
198
199
200
201
    api_url = request_func_input.api_url
    assert api_url.endswith(("completions", "profile")), (
        "OpenAI Completions API URL must end with 'completions' or 'profile'."
    )
laibao's avatar
laibao committed
202

203
204
205
    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
laibao's avatar
laibao committed
206
        payload = {
207
            "model": request_func_input.model,
laibao's avatar
laibao committed
208
209
210
211
212
            "prompt": request_func_input.prompt,
            "max_tokens": request_func_input.output_len,
            "temperature": 0.01,  # deepspeed-mii does not accept 0.0 temp.
            "top_p": 1.0,
        }
213
214
        headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}

laibao's avatar
laibao committed
215
216
217
218
219
220
221
222
223
224
        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len

        # NOTE: DeepSpeed-MII doesn't support streaming as of Jan 28 2024,
        # will use 0 as placeholder.
        # See https://github.com/microsoft/DeepSpeed-MII/pull/311
        output.ttft = 0

        st = time.perf_counter()
        try:
225
226
227
            async with session.post(
                url=api_url, json=payload, headers=headers
            ) as response:
laibao's avatar
laibao committed
228
229
230
                if response.status == 200:
                    parsed_resp = await response.json()
                    output.latency = time.perf_counter() - st
231
232
233
234
235
236
237
238
239
240
                    if "choices" in parsed_resp:
                        output.generated_text = parsed_resp["choices"][0]["text"]
                    elif "text" in parsed_resp:
                        output.generated_text = parsed_resp["text"][0]
                    else:
                        output.error = (
                            "Unexpected response format: "
                            "neither 'choices' nor 'text' found"
                        )
                        output.success = False
laibao's avatar
laibao committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
                    output.success = True
                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

        if pbar:
            pbar.update(1)
        return output


async def async_request_openai_completions(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    api_url = request_func_input.api_url
260
261
262
    assert api_url.endswith(("completions", "profile")), (
        "OpenAI Completions API URL must end with 'completions' or 'profile'."
    )
laibao's avatar
laibao committed
263

264
265
266
    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
laibao's avatar
laibao committed
267
        payload = {
268
269
270
            "model": request_func_input.model_name
            if request_func_input.model_name
            else request_func_input.model,
laibao's avatar
laibao committed
271
272
            "prompt": request_func_input.prompt,
            "temperature": 0.0,
273
            "repetition_penalty": 1.0,
laibao's avatar
laibao committed
274
            "max_tokens": request_func_input.output_len,
laibao's avatar
laibao committed
275
            "logprobs": request_func_input.logprobs,
laibao's avatar
laibao committed
276
            "stream": True,
277
278
279
            "stream_options": {
                "include_usage": True,
            },
laibao's avatar
laibao committed
280
        }
281
282
283
284
285
        if request_func_input.ignore_eos:
            payload["ignore_eos"] = request_func_input.ignore_eos
        if request_func_input.extra_body:
            payload.update(request_func_input.extra_body)
        headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
laibao's avatar
laibao committed
286
287
288
289
290
291
292
293

        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len

        generated_text = ""
        st = time.perf_counter()
        most_recent_timestamp = st
        try:
294
295
296
            async with session.post(
                url=api_url, json=payload, headers=headers
            ) as response:
laibao's avatar
laibao committed
297
                if response.status == 200:
298
                    first_chunk_received = False
laibao's avatar
laibao committed
299
300
301
302
303
                    async for chunk_bytes in response.content:
                        chunk_bytes = chunk_bytes.strip()
                        if not chunk_bytes:
                            continue

304
305
                        chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
                        if chunk != "[DONE]":
laibao's avatar
laibao committed
306
307
                            data = json.loads(chunk)

laibao's avatar
laibao committed
308
309
310
                            # NOTE: Some completion API might have a last
                            # usage summary response without a token so we
                            # want to check a token was generated
311
312
313
314
                            if choices := data.get("choices"):
                                # Note that text could be empty here
                                # e.g. for special tokens
                                text = choices[0].get("text")
laibao's avatar
laibao committed
315
316
                                timestamp = time.perf_counter()
                                # First token
317
318
                                if not first_chunk_received:
                                    first_chunk_received = True
laibao's avatar
laibao committed
319
320
321
322
                                    ttft = time.perf_counter() - st
                                    output.ttft = ttft

                                # Decoding phase
laibao's avatar
laibao committed
323
                                else:
324
                                    output.itl.append(timestamp - most_recent_timestamp)
laibao's avatar
laibao committed
325
326

                                most_recent_timestamp = timestamp
327
328
329
330
331
332
333
334
335
336
337
                                generated_text += text or ""
                            if usage := data.get("usage"):
                                output.output_tokens = usage.get("completion_tokens")
                    if first_chunk_received:
                        output.success = True
                    else:
                        output.success = False
                        output.error = (
                            "Never received a valid chunk to calculate TTFT."
                            "This response will be marked as failed!"
                        )
laibao's avatar
laibao committed
338
                    output.generated_text = generated_text
339
                    output.latency = most_recent_timestamp - st
laibao's avatar
laibao committed
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

    if pbar:
        pbar.update(1)
    return output


async def async_request_openai_chat_completions(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    api_url = request_func_input.api_url
358
359
360
    assert api_url.endswith(("chat/completions", "profile")), (
        "OpenAI Chat Completions API URL must end with 'chat/completions'."
    )
laibao's avatar
laibao committed
361

362
363
364
    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
laibao's avatar
laibao committed
365
366
367
        content = [{"type": "text", "text": request_func_input.prompt}]
        if request_func_input.multi_modal_content:
            content.append(request_func_input.multi_modal_content)
laibao's avatar
laibao committed
368
        payload = {
369
370
371
            "model": request_func_input.model_name
            if request_func_input.model_name
            else request_func_input.model,
laibao's avatar
laibao committed
372
            "messages": [
373
                {"role": "user", "content": content},
laibao's avatar
laibao committed
374
375
            ],
            "temperature": 0.0,
376
            "max_completion_tokens": request_func_input.output_len,
laibao's avatar
laibao committed
377
            "stream": True,
378
379
380
            "stream_options": {
                "include_usage": True,
            },
laibao's avatar
laibao committed
381
        }
382
383
384
385
        if request_func_input.ignore_eos:
            payload["ignore_eos"] = request_func_input.ignore_eos
        if request_func_input.extra_body:
            payload.update(request_func_input.extra_body)
laibao's avatar
laibao committed
386
387
388
389
390
391
392
393
394
395
396
397
398
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
        }

        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len

        generated_text = ""
        ttft = 0.0
        st = time.perf_counter()
        most_recent_timestamp = st
        try:
399
400
401
            async with session.post(
                url=api_url, json=payload, headers=headers
            ) as response:
laibao's avatar
laibao committed
402
403
404
405
406
                if response.status == 200:
                    async for chunk_bytes in response.content:
                        chunk_bytes = chunk_bytes.strip()
                        if not chunk_bytes:
                            continue
407
408
409
410
411
                        chunk_bytes = chunk_bytes.decode("utf-8")
                        # NOTE: SSE comments (often used as pings) start with a colon.
                        # These are not JSON data payload and should be skipped.
                        if chunk_bytes.startswith(":"):
                            continue
laibao's avatar
laibao committed
412

413
414
415
                        chunk = chunk_bytes.removeprefix("data: ")

                        if chunk != "[DONE]":
laibao's avatar
laibao committed
416
417
418
                            timestamp = time.perf_counter()
                            data = json.loads(chunk)

419
420
                            if choices := data.get("choices"):
                                content = choices[0]["delta"].get("content")
laibao's avatar
laibao committed
421
422
                                # First token
                                if ttft == 0.0:
423
                                    ttft = timestamp - st
laibao's avatar
laibao committed
424
425
426
427
                                    output.ttft = ttft

                                # Decoding phase
                                else:
428
                                    output.itl.append(timestamp - most_recent_timestamp)
laibao's avatar
laibao committed
429

430
431
432
                                generated_text += content or ""
                            elif usage := data.get("usage"):
                                output.output_tokens = usage.get("completion_tokens")
laibao's avatar
laibao committed
433
434
435
436
437

                            most_recent_timestamp = timestamp

                    output.generated_text = generated_text
                    output.success = True
438
                    output.latency = most_recent_timestamp - st
laibao's avatar
laibao committed
439
440
441
442
443
444
445
446
447
448
449
450
451
                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

    if pbar:
        pbar.update(1)
    return output


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
async def async_request_openai_audio(
    request_func_input: RequestFuncInput,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    # Lazy import without PlaceholderModule to avoid vllm dep.
    import soundfile

    api_url = request_func_input.api_url
    assert api_url.endswith(("transcriptions", "translations")), (
        "OpenAI Chat Completions API URL must end with 'transcriptions' "
    )
    "or `translations`."

    async with aiohttp.ClientSession(
        trust_env=True, timeout=AIOHTTP_TIMEOUT
    ) as session:
        content = [{"type": "text", "text": request_func_input.prompt}]
        payload = {
            "model": request_func_input.model_name
            if request_func_input.model_name
            else request_func_input.model,
            "temperature": 0.0,
            "max_completion_tokens": request_func_input.output_len,
            "stream": True,
            "language": "en",
            # Flattened due to multipart/form-data
            "stream_include_usage": True,
            "stream_continuous_usage_stats": True,
        }
        if request_func_input.extra_body:
            payload.update(request_func_input.extra_body)
        headers = {
            "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
        }

        # Send audio file
        def to_bytes(y, sr):
            buffer = io.BytesIO()
            soundfile.write(buffer, y, sr, format="WAV")
            buffer.seek(0)
            return buffer

        with to_bytes(*request_func_input.multi_modal_content["audio"]) as f:
            form = aiohttp.FormData()
            form.add_field("file", f, content_type="audio/wav")
            for key, value in payload.items():
                form.add_field(key, str(value))

            output = RequestFuncOutput()
            output.prompt_len = request_func_input.prompt_len

            generated_text = ""
            ttft = 0.0
            st = time.perf_counter()
            most_recent_timestamp = st
            try:
                async with session.post(
                    url=api_url, data=form, headers=headers
                ) as response:
                    if response.status == 200:
                        async for chunk_bytes in response.content:
                            chunk_bytes = chunk_bytes.strip()
                            if not chunk_bytes:
                                continue

                            chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
                            if chunk != "[DONE]":
                                timestamp = time.perf_counter()
                                data = json.loads(chunk)

                                if choices := data.get("choices"):
                                    content = choices[0]["delta"].get("content")
                                    # First token
                                    if ttft == 0.0:
                                        ttft = timestamp - st
                                        output.ttft = ttft

                                    # Decoding phase
                                    else:
                                        output.itl.append(
                                            timestamp - most_recent_timestamp
                                        )

                                    generated_text += content or ""
                                elif usage := data.get("usage"):
                                    output.output_tokens = usage.get(
                                        "completion_tokens"
                                    )

                                most_recent_timestamp = timestamp

                        output.generated_text = generated_text
                        output.success = True
                        output.latency = most_recent_timestamp - st
                    else:
                        output.error = response.reason or ""
                        output.success = False
            except Exception:
                output.success = False
                exc_info = sys.exc_info()
                output.error = "".join(traceback.format_exception(*exc_info))

        if pbar:
            pbar.update(1)
        return output
laibao's avatar
laibao committed
557
558


laibao's avatar
laibao committed
559
def get_model(pretrained_model_name_or_path: str) -> str:
560
    if os.getenv("VLLM_USE_MODELSCOPE", "False").lower() == "true":
laibao's avatar
laibao committed
561
562
        from modelscope import snapshot_download

563
        from vllm.model_executor.model_loader.weight_utils import get_lock
laibao's avatar
laibao committed
564

565
566
567
568
569
570
571
572
573
574
        # Use file lock to prevent multiple processes from
        # downloading the same model weights at the same time.
        with get_lock(pretrained_model_name_or_path):
            model_path = snapshot_download(
                model_id=pretrained_model_name_or_path,
                local_files_only=huggingface_hub.constants.HF_HUB_OFFLINE,
                ignore_file_pattern=[".*.pt", ".*.safetensors", ".*.bin"],
            )

            return model_path
laibao's avatar
laibao committed
575
576
577
578
    return pretrained_model_name_or_path


def get_tokenizer(
579
580
581
582
    pretrained_model_name_or_path: str,
    tokenizer_mode: str = "auto",
    trust_remote_code: bool = False,
    **kwargs,
laibao's avatar
laibao committed
583
584
) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]:
    if pretrained_model_name_or_path is not None and not os.path.exists(
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
        pretrained_model_name_or_path
    ):
        pretrained_model_name_or_path = get_model(pretrained_model_name_or_path)
    if tokenizer_mode == "slow":
        if kwargs.get("use_fast", False):
            raise ValueError("Cannot use the fast tokenizer in slow tokenizer mode.")
        kwargs["use_fast"] = False
    if tokenizer_mode == "mistral":
        try:
            from vllm.transformers_utils.tokenizer import MistralTokenizer
        except ImportError as e:
            raise ImportError(
                "MistralTokenizer requires vllm package.\n"
                "Please install it with `pip install vllm` "
                "to use mistral tokenizer mode."
            ) from e
        return MistralTokenizer.from_pretrained(str(pretrained_model_name_or_path))
    else:
        return AutoTokenizer.from_pretrained(
            pretrained_model_name_or_path,
            trust_remote_code=trust_remote_code,
            **kwargs,
        )
laibao's avatar
laibao committed
608
609


laibao's avatar
laibao committed
610
611
612
613
614
615
616
ASYNC_REQUEST_FUNCS = {
    "tgi": async_request_tgi,
    "vllm": async_request_openai_completions,
    "lmdeploy": async_request_openai_completions,
    "deepspeed-mii": async_request_deepspeed_mii,
    "openai": async_request_openai_completions,
    "openai-chat": async_request_openai_chat_completions,
617
    "openai-audio": async_request_openai_audio,
laibao's avatar
laibao committed
618
    "tensorrt-llm": async_request_trt_llm,
laibao's avatar
laibao committed
619
    "scalellm": async_request_openai_completions,
620
621
    "sglang": async_request_openai_completions,
    "llama.cpp": async_request_openai_completions,
laibao's avatar
laibao committed
622
}
623
624
625
626
627
628

OPENAI_COMPATIBLE_BACKENDS = [
    k
    for k, v in ASYNC_REQUEST_FUNCS.items()
    if v in (async_request_openai_completions, async_request_openai_chat_completions)
]