test_pause_advanced.py 5.81 KB
Newer Older
wangkx1's avatar
wangkx1 committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
"""
高级测试暂停功能的脚本:
1. 创建多个下载任务
2. 暂停当前任务
3. 验证下一个任务自动开始
"""
import sys
import os
import time
import threading
import requests

# 添加当前目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

# 导入模块
try:
    from model_download_manager import (
        create_download_task, 
        pause_download, 
        get_downloads_data, 
        global_state,
        download_worker,
        start_workers
    )
    import threading
except ImportError as e:
    print(f"导入失败: {e}")
    sys.exit(1)

def print_task_status():
    """打印所有下载任务的状态"""
    with global_state.operation_lock:
        print("当前任务状态:")
        for task_id, task in global_state.state["download_tasks"].items():
            print(f"  任务 {task_id}: {task['status']} - {task['model_id']} - {task['message']}")
        print(f"活动下载: {list(global_state.active_downloads.keys())}")
        print(f"下载进程: {list(global_state.download_processes.keys())}")
        print("-" * 50)

def test_pause_with_next_task():
    print("=== 高级测试:暂停并自动开始下一个任务 ===")
    
    # 清除现有任务
    with global_state.operation_lock:
        global_state.state["download_tasks"].clear()
        global_state.active_downloads.clear()
        global_state.download_processes.clear()
        global_state.download_queue.queue.clear()
        
    # 确保工作线程在运行
    if not hasattr(global_state, '_workers_started'):
        start_workers()
        global_state._workers_started = True
    
    # 创建多个下载任务
    test_models = [
        "Qwen/Qwen2.5-1.5B-Instruct",
        "Qwen/Qwen2.5-0.5B-Instruct",
        "Qwen/Qwen2.5-0.5B-Chat"
    ]
    
    print(f"1. 创建 {len(test_models)} 个下载任务")
    task_ids = []
    for i, model in enumerate(test_models):
        task_id = create_download_task(model)
        task_ids.append(task_id)
        print(f"   任务 {i+1}: {task_id} - {model}")
    
    print_task_status()
    
    # 等待第一个任务开始下载
    print("2. 等待第一个任务开始下载...")
    first_task_id = task_ids[0]
    for i in range(15):
        time.sleep(1)
        print_task_status()
        with global_state.operation_lock:
            task = global_state.state["download_tasks"].get(first_task_id)
            if task and task.get("status") == "downloading":
                print(f"   第一个任务 {first_task_id} 已开始下载")
                break
        print(f"   等待 {i+1}/15 秒...")
    else:
        print("   第一个任务未能开始下载,测试失败")
        return False
    
    # 暂停第一个任务
    print(f"3. 暂停第一个任务: {first_task_id}")
    result = pause_download(first_task_id)
    print(f"   暂停结果: {result}")
    
    print_task_status()
    
    # 等待下一个任务开始下载
    print("4. 等待下一个任务自动开始...")
    for i in range(15):
        time.sleep(1)
        print_task_status()
        
        # 检查是否有任务在下载
        with global_state.operation_lock:
            downloading_tasks = [
                task_id for task_id, task in global_state.state["download_tasks"].items() 
                if task.get("status") == "downloading"
            ]
            
            if downloading_tasks and downloading_tasks[0] != first_task_id:
                print(f"   下一个任务 {downloading_tasks[0]} 已自动开始下载")
                break
        print(f"   等待 {i+1}/15 秒...")
    else:
        print("   下一个任务未能自动开始,测试失败")
        return False
    
    # 检查第一个任务是否已暂停
    with global_state.operation_lock:
        first_task = global_state.state["download_tasks"].get(first_task_id)
        if first_task.get("status") != "paused":
            print("   第一个任务未正确暂停,测试失败")
            return False
        
        # 检查是否有其他任务在下载
        downloading_tasks = [
            task_id for task_id, task in global_state.state["download_tasks"].items() 
            if task.get("status") == "downloading"
        ]
        if not downloading_tasks or downloading_tasks[0] == first_task_id:
            print("   没有其他任务在下载,测试失败")
            return False
    
    print("5. 测试成功:暂停功能正常,下一个任务自动开始")
    print_task_status()
    
    # 清理测试任务
    print("6. 清理测试任务...")
    with global_state.operation_lock:
        for task_id in task_ids:
            if task_id in global_state.state["download_tasks"]:
                # 终止可能正在运行的进程
                if task_id in global_state.download_processes:
                    try:
                        process = global_state.download_processes[task_id]
                        process.terminate()
                        process.join(timeout=2)
                        if process.is_alive():
                            process.kill()
                    except Exception as e:
                        print(f"   清理进程 {task_id} 时出错: {e}")
                
                # 删除任务
                del global_state.state["download_tasks"][task_id]
                
                # 从active_downloads中移除
                if task_id in global_state.active_downloads:
                    del global_state.active_downloads[task_id]
                
                # 从download_processes中移除
                if task_id in global_state.download_processes:
                    del global_state.download_processes[task_id]
    
    print("测试完成")
    return True

if __name__ == "__main__":
    success = test_pause_with_next_task()
    sys.exit(0 if success else 1)