threadtest.py 3.11 KB
Newer Older
xuxzh1's avatar
opt1  
xuxzh1 committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import requests
import time
import concurrent.futures

# 定义请求的URL和payload
for concurrent_requests in (1,2,4):
    for num_predict in (128,128):
        url = "http://localhost:11434/api/generate"
        headers = {
            "Content-Type": "application/json"
        }
        #"hi "*510对应512tokens "hi "*998对应1000tokens
        payload = {
            "model" : "deepseek-r1:671b",
                    "prompt" : "hi",
                    "stream" : False,
                    "raw" : True,
                    "keep_alive" : "1h",
                    "options": {
                        "num_predict": num_predict,
                        "seed" : 42,
                        "stop" : []
                    }
        }

        # 定义发送单个请求的函数
        def send_request():
            start_time = time.time()  # 记录请求开始时间
            response = requests.post(url, headers=headers, json=payload)  # 发送请求
            end_time = time.time()  # 记录请求结束时间

            if response.status_code == 200:
                response_data = response.json()
                completion_tokens = response_data["eval_count"]
                elapsed_time = end_time - start_time
                return completion_tokens, elapsed_time, response_data
            else:
                print(f"请求失败,状态码: {response.status_code}")
                return 0, 0

        # 定义并发请求的数量
        concurrent_requests = concurrent_requests # 可以根据需要调整并发数

        # 记录总开始时间
        total_start_time = time.time()

        # 使用线程池并发发送请求
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_requests) as executor:
            futures = [executor.submit(send_request) for _ in range(concurrent_requests)]
            results = [future.result() for future in concurrent.futures.as_completed(futures)]

        # 记录总结束时间
        total_end_time = time.time()

        for result in results:
            response_data = result[2]
            completion_tokens = result[0]
            elapsed_time = result[1]
            print(f"请求完成: 生成 tokens = {completion_tokens}, 耗时 = {elapsed_time:.2f} 秒, 生成速度:{completion_tokens/elapsed_time:.2f}, 响应内容:{response_data}")

        # 计算总生成 tokens 和总耗时
        total_completion_tokens = sum(result[0] for result in results)
        total_elapsed_time = total_end_time - total_start_time

        # 计算整体生成速度(tokens/秒)
        if total_elapsed_time > 0:
            overall_speed = total_completion_tokens / total_elapsed_time
            print(f"batch_size : {concurrent_requests}" )
            print(f"总生成 tokens: {total_completion_tokens}")
            print(f"总耗时: {total_elapsed_time:.2f} 秒")
            print(f"整体生成速度: {overall_speed:.2f} tokens/秒")
        else:
            print("总耗时过短,无法计算生成速度")  
        print("================num_predict====================")
    print("================concurrent_requests====================")