# LightX2V Server ## Overview The LightX2V server is a distributed video generation service built with FastAPI that processes image-to-video tasks using a multi-process architecture with GPU support. It implements a sophisticated task queue system with distributed inference capabilities for high-throughput video generation workloads. ## Architecture ### System Architecture ```mermaid flowchart TB Client[Client] -->|Send API Request| Router[FastAPI Router] subgraph API Layer Router --> TaskRoutes[Task APIs] Router --> FileRoutes[File APIs] Router --> ServiceRoutes[Service Status APIs] TaskRoutes --> CreateTask["POST /v1/tasks/ - Create Task"] TaskRoutes --> CreateTaskForm["POST /v1/tasks/form - Form Create"] TaskRoutes --> ListTasks["GET /v1/tasks/ - List Tasks"] TaskRoutes --> GetTaskStatus["GET /v1/tasks/id/status - Get Status"] TaskRoutes --> GetTaskResult["GET /v1/tasks/id/result - Get Result"] TaskRoutes --> StopTask["DELETE /v1/tasks/id - Stop Task"] FileRoutes --> DownloadFile["GET /v1/files/download/path - Download File"] ServiceRoutes --> GetServiceStatus["GET /v1/service/status - Service Status"] ServiceRoutes --> GetServiceMetadata["GET /v1/service/metadata - Metadata"] end subgraph Task Management TaskManager[Task Manager] TaskQueue[Task Queue] TaskStatus[Task Status] TaskResult[Task Result] CreateTask --> TaskManager CreateTaskForm --> TaskManager TaskManager --> TaskQueue TaskManager --> TaskStatus TaskManager --> TaskResult end subgraph File Service FileService[File Service] DownloadImage[Download Image] DownloadAudio[Download Audio] SaveFile[Save File] GetOutputPath[Get Output Path] FileService --> DownloadImage FileService --> DownloadAudio FileService --> SaveFile FileService --> GetOutputPath end subgraph Processing Thread ProcessingThread[Processing Thread] NextTask[Get Next Task] ProcessTask[Process Single Task] ProcessingThread --> NextTask ProcessingThread --> ProcessTask end subgraph Video Generation Service VideoService[Video Service] GenerateVideo[Generate Video] VideoService --> GenerateVideo end subgraph Distributed Inference Service InferenceService[Distributed Inference Service] SubmitTask[Submit Task] Worker[Inference Worker Node] ProcessRequest[Process Request] RunPipeline[Run Inference Pipeline] InferenceService --> SubmitTask SubmitTask --> Worker Worker --> ProcessRequest ProcessRequest --> RunPipeline end %% ====== Connect Modules ====== TaskQueue --> ProcessingThread ProcessTask --> VideoService GenerateVideo --> InferenceService GetTaskResult --> FileService DownloadFile --> FileService VideoService --> FileService InferenceService --> TaskManager TaskManager --> TaskStatus ``` ## Task Processing Flow ```mermaid sequenceDiagram participant C as Client participant API as API Server participant TM as TaskManager participant PT as Processing Thread participant VS as VideoService participant FS as FileService participant DIS as DistributedInferenceService participant TIW0 as TorchrunInferenceWorker
(Rank 0) participant TIW1 as TorchrunInferenceWorker
(Rank 1..N) C->>API: POST /v1/tasks
(Create Task) API->>TM: create_task() TM->>TM: Generate task_id TM->>TM: Add to queue
(status: PENDING) API->>PT: ensure_processing_thread() API-->>C: TaskResponse
(task_id, status: pending) Note over PT: Processing Loop PT->>TM: get_next_pending_task() TM-->>PT: task_id PT->>TM: acquire_processing_lock() PT->>TM: start_task()
(status: PROCESSING) PT->>VS: generate_video_with_stop_event() alt Image is URL VS->>FS: download_image() FS->>FS: HTTP download
with retry FS-->>VS: image_path else Image is Base64 VS->>FS: save_base64_image() FS-->>VS: image_path else Image is local path VS->>VS: use existing path end alt Audio is URL VS->>FS: download_audio() FS->>FS: HTTP download
with retry FS-->>VS: audio_path else Audio is Base64 VS->>FS: save_base64_audio() FS-->>VS: audio_path else Audio is local path VS->>VS: use existing path end VS->>DIS: submit_task_async(task_data) DIS->>TIW0: process_request(task_data) Note over TIW0,TIW1: Torchrun-based Distributed Processing TIW0->>TIW0: Check if processing TIW0->>TIW0: Set processing = True alt Multi-GPU Mode (world_size > 1) TIW0->>TIW1: broadcast_task_data()
(via DistributedManager) Note over TIW1: worker_loop() listens for broadcasts TIW1->>TIW1: Receive task_data end par Parallel Inference across all ranks TIW0->>TIW0: runner.set_inputs(task_data) TIW0->>TIW0: runner.run_pipeline() and Note over TIW1: If world_size > 1 TIW1->>TIW1: runner.set_inputs(task_data) TIW1->>TIW1: runner.run_pipeline() end Note over TIW0,TIW1: Synchronization alt Multi-GPU Mode TIW0->>TIW1: barrier() for sync TIW1->>TIW0: barrier() response end TIW0->>TIW0: Set processing = False TIW0->>DIS: Return result (only rank 0) TIW1->>TIW1: Return None (non-rank 0) DIS-->>VS: TaskResponse VS-->>PT: TaskResponse PT->>TM: complete_task()
(status: COMPLETED) PT->>TM: release_processing_lock() Note over C: Client Polling C->>API: GET /v1/tasks/{task_id}/status API->>TM: get_task_status() TM-->>API: status info API-->>C: Task Status C->>API: GET /v1/tasks/{task_id}/result API->>TM: get_task_status() API->>FS: stream_file_response() FS-->>API: Video Stream API-->>C: Video File ``` ## Task States ```mermaid stateDiagram-v2 [*] --> PENDING: create_task() PENDING --> PROCESSING: start_task() PROCESSING --> COMPLETED: complete_task() PROCESSING --> FAILED: fail_task() PENDING --> CANCELLED: cancel_task() PROCESSING --> CANCELLED: cancel_task() COMPLETED --> [*] FAILED --> [*] CANCELLED --> [*] ``` ## Configuration ### Environment Variables see `lightx2v/server/config.py` ### Command Line Arguments ```bash python -m lightx2v.server.main \ --model_path /path/to/model \ --model_cls wan2.1_distill \ --task i2v \ --host 0.0.0.0 \ --port 8000 \ --config_json /path/to/xxx_config.json ``` ```bash python -m lightx2v.server.main \ --model_path /path/to/model \ --model_cls wan2.1_distill \ --task i2v \ --host 0.0.0.0 \ --port 8000 \ --config_json /path/to/xxx_dist_config.json \ --nproc_per_node 2 ``` ## Key Features ### 1. Distributed Processing - **Multi-process architecture** for GPU parallelization - **Master-worker pattern** with rank 0 as coordinator - **PyTorch distributed** backend (NCCL for GPU, Gloo for CPU) - **Automatic GPU allocation** across processes - **Task broadcasting** with chunked pickle serialization ### 2. Task Queue Management - **Thread-safe** task queue with locks - **Sequential processing** with single processing thread - **Configurable queue limits** with overflow protection - **Task prioritization** (FIFO) - **Automatic cleanup** of old completed tasks - **Cancellation support** for pending and running tasks ### 3. File Management - **Multiple input formats**: URL, base64, file upload - **HTTP downloads** with exponential backoff retry - **Streaming responses** for large video files - **Cache management** with automatic cleanup - **File validation** and format detection ## Performance Considerations 1. **Single Task Processing**: Tasks are processed sequentially to manage GPU memory effectively 2. **Multi-GPU Support**: Distributes inference across available GPUs for parallelization 3. **Connection Pooling**: Reuses HTTP connections to reduce overhead 4. **Streaming Responses**: Large files are streamed to avoid memory issues 5. **Queue Management**: Automatic task cleanup prevents memory leaks 6. **Process Isolation**: Distributed workers run in separate processes for stability ## Monitoring and Debugging ### Logging The server uses `loguru` for structured logging. Logs include: - Request/response details - Task lifecycle events - Worker process status - Error traces with context ### Health Checks - `/v1/service/status` - Overall service health - `/v1/tasks/queue/status` - Queue status and processing state - Process monitoring via system tools (htop, nvidia-smi) ### Common Issues 1. **GPU Out of Memory**: Reduce `nproc_per_node` or adjust model batch size 2. **Task Timeout**: Increase `LIGHTX2V_TASK_TIMEOUT` for longer videos 3. **Queue Full**: Increase `LIGHTX2V_MAX_QUEUE_SIZE` or add rate limiting ## Security Considerations 1. **Input Validation**: All inputs validated with Pydantic schemas ## License See the main project LICENSE file for licensing information.