test_pause.py 2.16 KB
Newer Older
wangkx1's avatar
wangkx1 committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3
"""
测试暂停功能的脚本
"""
import sys
import os
import time
import requests

# 添加当前目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

# 导入模块
try:
    from model_download_manager import create_download_task, pause_download, get_downloads_data, global_state
except ImportError as e:
    print(f"导入失败: {e}")
    sys.exit(1)

def test_pause_functionality():
    print("=== 测试暂停下载功能 ===")
    
    # 测试模型ID
    test_model = "Qwen/Qwen2.5-1.5B-Instruct"
    
    print(f"1. 创建下载任务: {test_model}")
    task_id = create_download_task(test_model)
    print(f"   任务ID: {task_id}")
    
    # 等待任务开始下载
    print("2. 等待任务开始下载...")
    for i in range(10):
        time.sleep(1)
        with global_state.operation_lock:
            task = global_state.state["download_tasks"].get(task_id)
            if task and task.get("status") == "downloading":
                print(f"   任务已开始下载: {task['status']}")
                break
    else:
        print("   任务未能开始下载,可能队列中还有其他任务")
        return False
    
    # 暂停任务
    print(f"3. 暂停任务: {task_id}")
    result = pause_download(task_id)
    print(f"   暂停结果: {result}")
    
    # 检查任务状态
    print("4. 检查任务状态...")
    time.sleep(2)
    with global_state.operation_lock:
        task = global_state.state["download_tasks"].get(task_id)
        status = task.get("status") if task else "未知"
        print(f"   任务状态: {status}")
        
        # 检查是否有下载进程在运行
        process_running = task_id in global_state.download_processes
        print(f"   下载进程是否在运行: {'是' if process_running else '否'}")
    
    if status == "paused" and not process_running:
        print("5. 测试成功: 任务已暂停,下载进程已终止")
        return True
    else:
        print("5. 测试失败: 任务未正确暂停或进程仍在运行")
        return False

if __name__ == "__main__":
    success = test_pause_functionality()
    sys.exit(0 if success else 1)