#!/usr/bin/env python3 """ 高级测试暂停功能的脚本: 1. 创建多个下载任务 2. 暂停当前任务 3. 验证下一个任务自动开始 """ import sys import os import time import threading import requests # 添加当前目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 导入模块 try: from model_download_manager import ( create_download_task, pause_download, get_downloads_data, global_state, download_worker, start_workers ) import threading except ImportError as e: print(f"导入失败: {e}") sys.exit(1) def print_task_status(): """打印所有下载任务的状态""" with global_state.operation_lock: print("当前任务状态:") for task_id, task in global_state.state["download_tasks"].items(): print(f" 任务 {task_id}: {task['status']} - {task['model_id']} - {task['message']}") print(f"活动下载: {list(global_state.active_downloads.keys())}") print(f"下载进程: {list(global_state.download_processes.keys())}") print("-" * 50) def test_pause_with_next_task(): print("=== 高级测试:暂停并自动开始下一个任务 ===") # 清除现有任务 with global_state.operation_lock: global_state.state["download_tasks"].clear() global_state.active_downloads.clear() global_state.download_processes.clear() global_state.download_queue.queue.clear() # 确保工作线程在运行 if not hasattr(global_state, '_workers_started'): start_workers() global_state._workers_started = True # 创建多个下载任务 test_models = [ "Qwen/Qwen2.5-1.5B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", "Qwen/Qwen2.5-0.5B-Chat" ] print(f"1. 创建 {len(test_models)} 个下载任务") task_ids = [] for i, model in enumerate(test_models): task_id = create_download_task(model) task_ids.append(task_id) print(f" 任务 {i+1}: {task_id} - {model}") print_task_status() # 等待第一个任务开始下载 print("2. 等待第一个任务开始下载...") first_task_id = task_ids[0] for i in range(15): time.sleep(1) print_task_status() with global_state.operation_lock: task = global_state.state["download_tasks"].get(first_task_id) if task and task.get("status") == "downloading": print(f" 第一个任务 {first_task_id} 已开始下载") break print(f" 等待 {i+1}/15 秒...") else: print(" 第一个任务未能开始下载,测试失败") return False # 暂停第一个任务 print(f"3. 暂停第一个任务: {first_task_id}") result = pause_download(first_task_id) print(f" 暂停结果: {result}") print_task_status() # 等待下一个任务开始下载 print("4. 等待下一个任务自动开始...") for i in range(15): time.sleep(1) print_task_status() # 检查是否有任务在下载 with global_state.operation_lock: downloading_tasks = [ task_id for task_id, task in global_state.state["download_tasks"].items() if task.get("status") == "downloading" ] if downloading_tasks and downloading_tasks[0] != first_task_id: print(f" 下一个任务 {downloading_tasks[0]} 已自动开始下载") break print(f" 等待 {i+1}/15 秒...") else: print(" 下一个任务未能自动开始,测试失败") return False # 检查第一个任务是否已暂停 with global_state.operation_lock: first_task = global_state.state["download_tasks"].get(first_task_id) if first_task.get("status") != "paused": print(" 第一个任务未正确暂停,测试失败") return False # 检查是否有其他任务在下载 downloading_tasks = [ task_id for task_id, task in global_state.state["download_tasks"].items() if task.get("status") == "downloading" ] if not downloading_tasks or downloading_tasks[0] == first_task_id: print(" 没有其他任务在下载,测试失败") return False print("5. 测试成功:暂停功能正常,下一个任务自动开始") print_task_status() # 清理测试任务 print("6. 清理测试任务...") with global_state.operation_lock: for task_id in task_ids: if task_id in global_state.state["download_tasks"]: # 终止可能正在运行的进程 if task_id in global_state.download_processes: try: process = global_state.download_processes[task_id] process.terminate() process.join(timeout=2) if process.is_alive(): process.kill() except Exception as e: print(f" 清理进程 {task_id} 时出错: {e}") # 删除任务 del global_state.state["download_tasks"][task_id] # 从active_downloads中移除 if task_id in global_state.active_downloads: del global_state.active_downloads[task_id] # 从download_processes中移除 if task_id in global_state.download_processes: del global_state.download_processes[task_id] print("测试完成") return True if __name__ == "__main__": success = test_pause_with_next_task() sys.exit(0 if success else 1)