post_multi_servers.py 4.05 KB
Newer Older
1
2
import random
import string
3
import threading
PengGao's avatar
PengGao committed
4
import time
5
from datetime import datetime
PengGao's avatar
PengGao committed
6
7
8

import requests
from loguru import logger
9
from tqdm import tqdm
10
11


12
13
def send_and_monitor_task(url, message, task_index, complete_bar, complete_lock):
    """Send task to server and monitor until completion"""
14
    try:
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        # Step 1: Send task and get task_id
        response = requests.post(f"{url}/v1/tasks/", json=message)
        response_data = response.json()
        task_id = response_data.get("task_id")

        if not task_id:
            logger.error(f"No task_id received from {url}")
            return False

        # Step 2: Monitor task status until completion
        while True:
            try:
                status_response = requests.get(f"{url}/v1/tasks/{task_id}/status")
                status_data = status_response.json()
                task_status = status_data.get("status")

                if task_status == "completed":
                    # Update completion bar safely
                    if complete_bar and complete_lock:
                        with complete_lock:
                            complete_bar.update(1)
                    return True
                elif task_status == "failed":
                    logger.error(f"Task {task_index + 1} (task_id: {task_id}) failed")
                    if complete_bar and complete_lock:
                        with complete_lock:
                            complete_bar.update(1)  # Still update progress even if failed
                    return False
                else:
                    # Task still running, wait and check again
                    time.sleep(0.5)

            except Exception as e:
                logger.error(f"Failed to check status for task_id {task_id}: {e}")
                time.sleep(0.5)

    except Exception as e:
        logger.error(f"Failed to send task to {url}: {e}")
        return False


def get_available_urls(urls):
    """Check which URLs are available and return the list"""
Zhuguanyu Wu's avatar
Zhuguanyu Wu committed
58
59
60
61
    available_urls = []
    for url in urls:
        try:
            _ = requests.get(f"{url}/v1/service/status").json()
62
            available_urls.append(url)
Zhuguanyu Wu's avatar
Zhuguanyu Wu committed
63
64
65
66
67
        except Exception as e:
            continue

    if not available_urls:
        logger.error("No available urls.")
68
        return None
Zhuguanyu Wu's avatar
Zhuguanyu Wu committed
69
70

    logger.info(f"available_urls: {available_urls}")
71
72
    return available_urls

Zhuguanyu Wu's avatar
Zhuguanyu Wu committed
73

74
75
def find_idle_server(available_urls):
    """Find an idle server from available URLs"""
76
    while True:
Zhuguanyu Wu's avatar
Zhuguanyu Wu committed
77
        for url in available_urls:
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
            try:
                response = requests.get(f"{url}/v1/service/status").json()
                if response["service_status"] == "idle":
                    return url
            except Exception as e:
                continue
        time.sleep(3)


def process_tasks_async(messages, available_urls, show_progress=True):
    """Process a list of tasks asynchronously across multiple servers"""
    if not available_urls:
        logger.error("No available servers to process tasks.")
        return False

    active_threads = []

    logger.info(f"Sending {len(messages)} tasks to available servers...")

    # Create completion progress bar
    if show_progress:
        complete_bar = tqdm(total=len(messages), desc="Completing tasks")
        complete_lock = threading.Lock()  # Thread-safe updates to completion bar

    for idx, message in enumerate(messages):
        # Find an idle server
        server_url = find_idle_server(available_urls)

        # Create and start thread for sending and monitoring task
        thread = threading.Thread(target=send_and_monitor_task, args=(server_url, message, idx, complete_bar if show_progress else None, complete_lock if show_progress else None))
        thread.daemon = False
        thread.start()
        active_threads.append(thread)

        # Small delay to let thread start
        time.sleep(0.5)

    # Wait for all threads to complete
    for thread in active_threads:
        thread.join()

    # Close completion bar
    if show_progress:
        complete_bar.close()

    logger.info("All tasks processing completed!")
    return True