Commit 9f8e3b37 authored by GoatWu's avatar GoatWu
Browse files

update post multi-task server and client

parent 99d12b98
...@@ -82,7 +82,7 @@ def start_server(config: ServerConfig) -> Optional[tuple[subprocess.Popen, str]] ...@@ -82,7 +82,7 @@ def start_server(config: ServerConfig) -> Optional[tuple[subprocess.Popen, str]]
# Wait for server to start, up to 600 seconds # Wait for server to start, up to 600 seconds
node_ip = get_node_ip() node_ip = get_node_ip()
service_url = f"http://{node_ip}:{config.port}/v1/local/video/generate/service_status" service_url = f"http://{node_ip}:{config.port}/v1/service/status"
# Check once per second, up to 600 times # Check once per second, up to 600 times
for _ in range(600): for _ in range(600):
...@@ -149,14 +149,14 @@ def main(): ...@@ -149,14 +149,14 @@ def main():
logger.error(f"Error occurred while starting server: {e}") logger.error(f"Error occurred while starting server: {e}")
# Print all server URLs # Print all server URLs
print("\nAll server URLs:") logger.info("\nAll server URLs:")
for url in urls: for url in urls:
print(url) logger.info(url)
# Print node information # Print node information
node_ip = get_node_ip() node_ip = get_node_ip()
print(f"\nCurrent node IP: {node_ip}") logger.info(f"\nCurrent node IP: {node_ip}")
print(f"Number of servers started: {len(urls)}") logger.info(f"Number of servers started: {len(urls)}")
try: try:
# Wait for all processes # Wait for all processes
......
...@@ -3,132 +3,121 @@ from loguru import logger ...@@ -3,132 +3,121 @@ from loguru import logger
import random import random
import string import string
import time import time
import threading
from datetime import datetime from datetime import datetime
from tqdm import tqdm
# same as lightx2v/utils/generate_task_id.py def send_and_monitor_task(url, message, task_index, complete_bar, complete_lock):
# from lightx2v.utils.generate_task_id import generate_task_id """Send task to server and monitor until completion"""
def generate_task_id():
"""
Generate a random task ID in the format XXXX-XXXX-XXXX-XXXX-XXXX.
Features:
1. Does not modify the global random state.
2. Each X is an uppercase letter or digit (0-9).
3. Combines time factors to ensure high randomness.
For example: N1PQ-PRM5-N1BN-Z3S1-BGBJ
"""
# Save the current random state (does not affect external randomness)
original_state = random.getstate()
try: try:
# Define character set (uppercase letters + digits) # Step 1: Send task and get task_id
characters = string.ascii_uppercase + string.digits response = requests.post(f"{url}/v1/tasks/", json=message)
response_data = response.json()
# Create an independent random instance task_id = response_data.get("task_id")
local_random = random.Random(time.perf_counter_ns())
if not task_id:
# Generate 5 groups of 4-character random strings logger.error(f"No task_id received from {url}")
groups = [] return False
for _ in range(5):
# Mix new time factor for each group # Step 2: Monitor task status until completion
time_mix = int(datetime.now().timestamp()) while True:
local_random.seed(time_mix + local_random.getstate()[1][0] + time.perf_counter_ns()) try:
status_response = requests.get(f"{url}/v1/tasks/{task_id}/status")
groups.append("".join(local_random.choices(characters, k=4))) status_data = status_response.json()
task_status = status_data.get("status")
return "-".join(groups)
if task_status == "completed":
finally: # Update completion bar safely
# Restore the original random state if complete_bar and complete_lock:
random.setstate(original_state) with complete_lock:
complete_bar.update(1)
return True
def post_all_tasks(urls, messages): elif task_status == "failed":
msg_num = len(messages) logger.error(f"Task {task_index + 1} (task_id: {task_id}) failed")
msg_index = 0 if complete_bar and complete_lock:
with complete_lock:
complete_bar.update(1) # Still update progress even if failed
return False
else:
# Task still running, wait and check again
time.sleep(0.5)
except Exception as e:
logger.error(f"Failed to check status for task_id {task_id}: {e}")
time.sleep(0.5)
except Exception as e:
logger.error(f"Failed to send task to {url}: {e}")
return False
def get_available_urls(urls):
"""Check which URLs are available and return the list"""
available_urls = [] available_urls = []
for url in urls: for url in urls:
try: try:
_ = requests.get(f"{url}/v1/service/status").json() _ = requests.get(f"{url}/v1/service/status").json()
available_urls.append(url)
except Exception as e: except Exception as e:
continue continue
available_urls.append(url)
if not available_urls: if not available_urls:
logger.error("No available urls.") logger.error("No available urls.")
return return None
logger.info(f"available_urls: {available_urls}") logger.info(f"available_urls: {available_urls}")
return available_urls
def find_idle_server(available_urls):
"""Find an idle server from available URLs"""
while True: while True:
for url in available_urls: for url in available_urls:
response = requests.get(f"{url}/v1/service/status").json() try:
if response["service_status"] == "idle": response = requests.get(f"{url}/v1/service/status").json()
logger.info(f"{url} service is idle, start task...") if response["service_status"] == "idle":
response = requests.post(f"{url}/v1/tasks/", json=messages[msg_index]) return url
logger.info(f"response: {response.json()}") except Exception as e:
msg_index += 1 continue
if msg_index == msg_num: time.sleep(3)
logger.info("All tasks have been sent.")
return
time.sleep(5) def process_tasks_async(messages, available_urls, show_progress=True):
"""Process a list of tasks asynchronously across multiple servers"""
if not available_urls:
if __name__ == "__main__": logger.error("No available servers to process tasks.")
urls = ["http://localhost:8000", "http://localhost:8001"] return False
messages = [ active_threads = []
{
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001" logger.info(f"Sending {len(messages)} tasks to available servers...")
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False.
"prompt": "A cat walks on the grass, realistic style.", # Create completion progress bar
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走", if show_progress:
"image_path": "", complete_bar = tqdm(total=len(messages), desc="Completing tasks")
"save_video_path": "./output_lightx2v_wan_t2v_t01.mp4", # It is best to set it to an absolute path. complete_lock = threading.Lock() # Thread-safe updates to completion bar
},
{ for idx, message in enumerate(messages):
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001" # Find an idle server
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False. server_url = find_idle_server(available_urls)
"prompt": "A person is riding a bike. Realistic, Natural lighting, Casual.",
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走", # Create and start thread for sending and monitoring task
"image_path": "", thread = threading.Thread(target=send_and_monitor_task, args=(server_url, message, idx, complete_bar if show_progress else None, complete_lock if show_progress else None))
"save_video_path": "./output_lightx2v_wan_t2v_t02.mp4", # It is best to set it to an absolute path. thread.daemon = False
}, thread.start()
{ active_threads.append(thread)
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001"
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False. # Small delay to let thread start
"prompt": "A car turns a corner. Realistic, Natural lighting, Casual.", time.sleep(0.5)
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走",
"image_path": "", # Wait for all threads to complete
"save_video_path": "./output_lightx2v_wan_t2v_t03.mp4", # It is best to set it to an absolute path. for thread in active_threads:
}, thread.join()
{
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001" # Close completion bar
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False. if show_progress:
"prompt": "An astronaut is flying in space, Van Gogh style. Dark, Mysterious.", complete_bar.close()
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走",
"image_path": "", logger.info("All tasks processing completed!")
"save_video_path": "./output_lightx2v_wan_t2v_t04.mp4", # It is best to set it to an absolute path. return True
},
{
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001"
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False.
"prompt": "A beautiful coastal beach in spring, waves gently lapping on the sand, the camera movement is Zoom In. Realistic, Natural lighting, Peaceful.",
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走",
"image_path": "",
"save_video_path": "./output_lightx2v_wan_t2v_t05.mp4", # It is best to set it to an absolute path.
},
{
"task_id": generate_task_id(), # task_id also can be string you like, such as "test_task_001"
"task_id_must_unique": True, # If True, the task_id must be unique, otherwise, it will raise an error. Default is False.
"prompt": "Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage.",
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走",
"image_path": "",
"save_video_path": "./output_lightx2v_wan_t2v_t06.mp4", # It is best to set it to an absolute path.
},
]
logger.info(f"urls: {urls}")
logger.info(f"message: {messages}")
post_all_tasks(urls, messages)
from loguru import logger
from post_multi_servers import get_available_urls, process_tasks_async
if __name__ == "__main__":
urls = [f"http://localhost:{port}" for port in range(8000, 8008)]
img_prompts = {
"assets/inputs/imgs/img_0.jpg": "Summer beach vacation style, a white cat wearing sunglasses sits on a surfboard. The fluffy-furred feline gazes directly at the camera with a relaxed expression. Blurred beach scenery forms the background featuring crystal-clear waters, distant green hills, and a blue sky dotted with white clouds. The cat assumes a naturally relaxed posture, as if savoring the sea breeze and warm sunlight. A close-up shot highlights the feline's intricate details and the refreshing atmosphere of the seaside.",
"assets/inputs/imgs/img_2.jpg": "A close-up cinematic view of a person cooking in a warm,sunlit kitchen, using a wooden spatula to stir-fry a colorful mix of freshvegetables—carrots, broccoli, and bell peppers—in a black frying pan on amodern induction stove. The scene captures the glistening texture of thevegetables, steam gently rising, and subtle reflections on the stove surface.In the background, soft-focus jars, fruits, and a window with natural daylightcreate a cozy atmosphere. The hand motions are smooth and rhythmic, with a realisticsense of motion blur and lighting.",
}
negative_prompt = "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走"
messages = []
for i, (image_path, prompt) in enumerate(img_prompts.items()):
messages.append({"prompt": prompt, "negative_prompt": negative_prompt, "image_path": image_path, "save_video_path": f"./output_lightx2v_wan_i2v_{i + 1}.mp4"})
logger.info(f"urls: {urls}")
# Get available servers
available_urls = get_available_urls(urls)
if not available_urls:
exit(1)
# Process tasks asynchronously
success = process_tasks_async(messages, available_urls, show_progress=True)
if success:
logger.info("All tasks completed successfully!")
else:
logger.error("Some tasks failed.")
exit(1)
from loguru import logger
from post_multi_servers import get_available_urls, process_tasks_async
if __name__ == "__main__":
urls = ["http://localhost:8000", "http://localhost:8001"]
prompts = [
"A cat walks on the grass, realistic style.",
"A person is riding a bike. Realistic, Natural lighting, Casual.",
"A car turns a corner. Realistic, Natural lighting, Casual.",
"An astronaut is flying in space, Van Gogh style. Dark, Mysterious.",
"A beautiful coastal beach in spring, waves gently lapping on the sand, the camera movement is Zoom In. Realistic, Natural lighting, Peaceful.",
"Two anthropomorphic cats in comfy boxing gear and bright gloves fight intensely on a spotlighted stage.",
]
negative_prompt = "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走"
messages = []
for i, prompt in enumerate(prompts):
messages.append({"prompt": prompt, "negative_prompt": negative_prompt, "image_path": "", "save_video_path": f"./output_lightx2v_wan_t2v_{i + 1}.mp4"})
logger.info(f"urls: {urls}")
# Get available servers
available_urls = get_available_urls(urls)
if not available_urls:
exit(1)
# Process tasks asynchronously
success = process_tasks_async(messages, available_urls, show_progress=True)
if success:
logger.info("All tasks completed successfully!")
else:
logger.error("Some tasks failed.")
exit(1)
from tqdm import tqdm
import argparse import argparse
import glob import glob
import os import os
import requests from loguru import logger
import time from post_multi_servers import get_available_urls, process_tasks_async
def post_i2v(image_path, output_path): def create_i2v_messages(img_files, output_path):
url = "http://localhost:8000" """Create messages for image-to-video tasks"""
messages = []
negative_prompt = "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走"
file_name = os.path.basename(image_path) for img_path in img_files:
prompt = os.path.splitext(file_name)[0] file_name = os.path.basename(img_path)
save_video_path = os.path.join(output_path, f"{prompt}.mp4") prompt = os.path.splitext(file_name)[0]
save_video_path = os.path.join(output_path, f"{prompt}.mp4")
message = { message = {
"prompt": prompt, "prompt": prompt,
"negative_prompt": "镜头晃动,色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走", "negative_prompt": negative_prompt,
"image_path": image_path, "image_path": img_path,
"save_video_path": save_video_path, "save_video_path": save_video_path,
} }
messages.append(message)
while True: return messages
response = requests.get(f"{url}/v1/service/status").json()
if response["service_status"] == "idle":
response = requests.post(f"{url}/v1/tasks/", json=message)
return
time.sleep(3)
if __name__ == "__main__": if __name__ == "__main__":
...@@ -34,11 +32,35 @@ if __name__ == "__main__": ...@@ -34,11 +32,35 @@ if __name__ == "__main__":
parser.add_argument("--output_path", type=str, default="./vbench_i2v", help="output video path.") parser.add_argument("--output_path", type=str, default="./vbench_i2v", help="output video path.")
args = parser.parse_args() args = parser.parse_args()
# Create server URLs
urls = [f"http://localhost:{port}" for port in range(8000, 8008)]
# Get available servers
available_urls = get_available_urls(urls)
if not available_urls:
exit(1)
# Find image files
if os.path.exists(args.data_path): if os.path.exists(args.data_path):
img_files = glob.glob(os.path.join(args.data_path, "*.jpg")) img_files = glob.glob(os.path.join(args.data_path, "*.jpg"))
print(f"Found {len(img_files)} image files.") logger.info(f"Found {len(img_files)} image files.")
if not img_files:
logger.error("No image files found.")
exit(1)
# Create messages for all images
messages = create_i2v_messages(img_files, args.output_path)
logger.info(f"Created {len(messages)} tasks.")
# Process tasks asynchronously
success = process_tasks_async(messages, available_urls, show_progress=True)
with tqdm(total=len(img_files)) as progress_bar: if success:
for idx, img_path in enumerate(img_files): logger.info("All image-to-video tasks completed successfully!")
post_i2v(img_path, args.output_path) else:
progress_bar.update() logger.error("Some tasks failed.")
exit(1)
else:
logger.error(f"Data path does not exist: {args.data_path}")
exit(1)
...@@ -16,13 +16,13 @@ if [ -z "${num_gpus}" ]; then ...@@ -16,13 +16,13 @@ if [ -z "${num_gpus}" ]; then
fi fi
# Check required parameters # Check required parameters
if [ -z "$lightx2v_path" ]; then if [ -z "${lightx2v_path}" ]; then
echo "Error: lightx2v_path not set" echo "Error: lightx2v_path is not set. Please set this variable first."
exit 1 exit 1
fi fi
if [ -z "$model_path" ]; then if [ -z "${model_path}" ]; then
echo "Error: model_path not set" echo "Error: model_path is not set. Please set this variable first."
exit 1 exit 1
fi fi
...@@ -37,7 +37,7 @@ export DTYPE=BF16 ...@@ -37,7 +37,7 @@ export DTYPE=BF16
python -m lightx2v.api_multi_servers \ python -m lightx2v.api_multi_servers \
--num_gpus $num_gpus \ --num_gpus $num_gpus \
--start_port 8000 \ --start_port 8000 \
--model_cls wan2.1 \ --model_cls wan2.1_distill \
--task t2v \ --task i2v \
--model_path $model_path \ --model_path $model_path \
--config_json ${lightx2v_path}/configs/wan/wan_t2v.json --config_json ${lightx2v_path}/configs/distill/wan_i2v_distill_4step_cfg.json
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment