gpt_vllm.py 6.7 KB
Newer Older
yangzhong's avatar
yangzhong committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import asyncio
import random
import time
import uuid
import torch
import os
from typing import AsyncGenerator, List, Dict, Any
import numpy as np

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    # # 以下设置可能会轻微影响性能,但确保完全可复现
    # torch.backends.cudnn.deterministic = True
    # torch.backends.cudnn.benchmark = False

set_seed(42)

import sys
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, root_dir)

import patch_vllm  # ⚠️ Monkey Patch, do not delete this line

from indextts.gpt.index_tts_gpt2_vllm_v1 import PLACEHOLDER_TOKEN, PLACEHOLDER_TOKEN_ID

from vllm import SamplingParams, TokensPrompt
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.v1.engine.async_llm import AsyncLLM

model_dir = os.path.join(root_dir, "checkpoints/IndexTTS-2-vLLM")

vllm_dir = os.path.join(model_dir, "gpt")

async def run_single_inference(llm: AsyncLLM, inputs_embeds: torch.Tensor):
    request_id = uuid.uuid4().hex

    sampling_params = SamplingParams(
        temperature=1.0,
        top_p=0.8,
        top_k=30,
        repetition_penalty=10.0,
        max_tokens=random.randint(400, 1200),  # 8s - 24s # 1818
        ignore_eos=True,
        stop_token_ids=[],  # 8193
    )
    
    multi_modal_data = {"audio": {"audio_embeds": [inputs_embeds.squeeze(0).cpu()]}}
    fake_inputs = PLACEHOLDER_TOKEN * 1
    tokens_prompt = TokensPrompt(prompt=fake_inputs, multi_modal_data=multi_modal_data)

    start_time = time.time()
    output_generator = llm.generate(tokens_prompt, sampling_params, request_id=request_id)
    
    prefill_flag = True
    first_token_time = None
    ttft = 0.0
    output_tokens = []

    async for output in output_generator:
        if prefill_flag:
            first_token_time = time.time()
            ttft = first_token_time - start_time
            prefill_flag = False

        final_output = output

    end_time = time.time()
    
    latency = end_time - start_time
    decode_time = end_time - first_token_time
    
    generated_tokens = final_output.outputs[0].token_ids[:-2]
    num_generated_tokens = len(generated_tokens)
    
    decode_speed = 0
    if decode_time > 0:
        decode_speed = num_generated_tokens / decode_time

    return {
        "prefill_time": ttft,
        "decode_time": decode_time,
        "latency": latency,
        "num_generated_tokens": num_generated_tokens,
        "decode_speed_tokens_per_sec": decode_speed,
    }

async def run_user_simulation(llm: AsyncLLM, inputs_embeds: torch.Tensor, num_runs: int = 10) -> List[Dict[str, Any]]:
    user_results = []
    for _ in range(num_runs):
        result = await run_single_inference(llm, inputs_embeds)
        user_results.append(result)
    return user_results

async def benchmark(llm_engine, concurrency_levels, runs_per_user):
    hidden_dim = 1280
    conds_len = 34
    max_text_tokens = 200

    # warm up
    fake_inputs_embeds = torch.randn(
        1, 
        conds_len + max_text_tokens,
        hidden_dim, 
        dtype=torch.float16,
        device="cpu"
    )
    await run_single_inference(llm_engine, fake_inputs_embeds) 

    for concurrency in concurrency_levels:
        # 为每个并发用户准备独立的输入数据
        user_inputs = [torch.randn(
            1, 
            conds_len + max_text_tokens,
            hidden_dim, 
            dtype=torch.float16, 
            device="cpu"
        ) for _ in range(concurrency)]
        
        # 创建并发任务,每个任务代表一个连续请求10次的用户
        tasks = [
            run_user_simulation(llm_engine, user_inputs[c_idx], runs_per_user) 
            for c_idx in range(concurrency)
        ]
        
        benchmark_start_time = time.time()
        results = await asyncio.gather(*tasks)
        benchmark_total_time = time.time() - benchmark_start_time

        # --- 统计和计算结果 ---
        all_ttfts_ms = []
        all_latencies_ms = []
        all_num_generated_tokens = []
        total_requests = 0
        total_generated_tokens = 0

        for user_results in results:
            for res in user_results:
                total_requests += 1
                total_generated_tokens += res["num_generated_tokens"]
                all_ttfts_ms.append(res["prefill_time"] * 1000)
                all_latencies_ms.append(res["latency"] * 1000)
                all_num_generated_tokens.append(res["num_generated_tokens"])

        # Convert to numpy arrays for statistics
        ttfts_np = np.array(all_ttfts_ms)
        latencies_np = np.array(all_latencies_ms)
        tokens_np = np.array(all_num_generated_tokens)
        
        # 总吞吐量 = 在总测试时间内生成的总token数
        total_throughput = total_generated_tokens / benchmark_total_time

        print(f"## Concurrency Level: {concurrency}")
        print(f"\n*   **Total Requests:** {concurrency * runs_per_user}")
        print(f"*   **Total Time:** {benchmark_total_time:.2f} s")
        print(f"*   **Total Throughput:** {total_throughput:.2f} tokens/s\n")

        print("| Metric                 | Min      | Max       | Mean      | P50       | P95       | P99       |")
        print("|------------------------|----------|-----------|-----------|-----------|-----------|-----------|")
        
        if total_requests > 0:
            # TTFT
            print(f"| TTFT (ms)              | {np.min(ttfts_np):<8.2f} | {np.max(ttfts_np):<9.2f} | {np.mean(ttfts_np):<9.2f} | {np.percentile(ttfts_np, 50):<9.2f} | {np.percentile(ttfts_np, 95):<9.2f} | {np.percentile(ttfts_np, 99):<9.2f} |")
    
            # Latency
            print(f"| Latency (ms)           | {np.min(latencies_np):<8.2f} | {np.max(latencies_np):<9.2f} | {np.mean(latencies_np):<9.2f} | {np.percentile(latencies_np, 50):<9.2f} | {np.percentile(latencies_np, 95):<9.2f} | {np.percentile(latencies_np, 99):<9.2f} |")
    
            # Num Generated Tokens
            print(f"| Num Generated Tokens   | {np.min(tokens_np):<8.0f} | {np.max(tokens_np):<9.0f} | {np.mean(tokens_np):<9.2f} | {np.percentile(tokens_np, 50):<9.0f} | {np.percentile(tokens_np, 95):<9.0f} | {np.percentile(tokens_np, 99):<9.0f} |")
        print("\n" + "-"*40 + "\n")


if __name__ == "__main__":
    gpu_memory_utilization = 0.5  # 0.25
    concurrency_levels = [1, 4, 8, 16, 32, 64]  # 并发数
    runs_per_user = 10  # 每个并发的请求数

    engine_args = AsyncEngineArgs(
        model=vllm_dir,
        tensor_parallel_size=1,
        dtype="auto",
        gpu_memory_utilization=gpu_memory_utilization,
        max_num_seqs=50
    )
    llm_engine: AsyncLLM = AsyncLLM.from_engine_args(engine_args)

    asyncio.run(benchmark(llm_engine, concurrency_levels, runs_per_user))