# LightX2V Server
## Overview
The LightX2V server is a distributed video generation service built with FastAPI that processes image-to-video tasks using a multi-process architecture with GPU support. It implements a sophisticated task queue system with distributed inference capabilities for high-throughput video generation workloads.
## Architecture
### System Architecture
```mermaid
graph TB
subgraph "Client Layer"
Client[HTTP Client]
end
subgraph "API Layer"
FastAPI[FastAPI Application]
ApiServer[ApiServer]
Router1[Tasks Router
/v1/tasks]
Router2[Files Router
/v1/files]
Router3[Service Router
/v1/service]
end
subgraph "Service Layer"
TaskManager[TaskManager
Thread-safe Task Queue]
FileService[FileService
File I/O & Downloads]
VideoService[VideoGenerationService]
end
subgraph "Processing Layer"
Thread[Processing Thread
Sequential Task Loop]
end
subgraph "Distributed Inference Layer"
DistService[DistributedInferenceService]
SharedData[(Shared Data
mp.Manager.dict)]
TaskEvent[Task Event
mp.Manager.Event]
ResultEvent[Result Event
mp.Manager.Event]
subgraph "Worker Processes"
W0[Worker 0
Master/Rank 0]
W1[Worker 1
Rank 1]
WN[Worker N
Rank N]
end
end
subgraph "Resource Management"
GPUManager[GPUManager
GPU Detection & Allocation]
DistManager[DistributedManager
PyTorch Distributed]
Config[ServerConfig
Configuration]
end
Client -->|HTTP Request| FastAPI
FastAPI --> ApiServer
ApiServer --> Router1
ApiServer --> Router2
ApiServer --> Router3
Router1 -->|Create/Manage Tasks| TaskManager
Router1 -->|Process Tasks| Thread
Router2 -->|File Operations| FileService
Router3 -->|Service Status| TaskManager
Thread -->|Get Pending Tasks| TaskManager
Thread -->|Generate Video| VideoService
VideoService -->|Download Images| FileService
VideoService -->|Submit Task| DistService
DistService -->|Update| SharedData
DistService -->|Signal| TaskEvent
TaskEvent -->|Notify| W0
W0 -->|Broadcast| W1
W0 -->|Broadcast| WN
W0 -->|Update Result| SharedData
W0 -->|Signal| ResultEvent
ResultEvent -->|Notify| DistService
W0 -.->|Uses| GPUManager
W1 -.->|Uses| GPUManager
WN -.->|Uses| GPUManager
W0 -.->|Setup| DistManager
W1 -.->|Setup| DistManager
WN -.->|Setup| DistManager
DistService -.->|Reads| Config
ApiServer -.->|Reads| Config
```
### Components
#### Core Components
| Component | File | Description |
|-----------|------|-------------|
| **ServerManager** | `main.py` | Orchestrates server lifecycle, startup/shutdown sequences |
| **ApiServer** | `api.py` | FastAPI application manager with route registration |
| **TaskManager** | `task_manager.py` | Thread-safe task queue and lifecycle management |
| **FileService** | `service.py` | File I/O, HTTP downloads with retry logic |
| **VideoGenerationService** | `service.py` | Video generation workflow orchestration |
| **DistributedInferenceService** | `service.py` | Multi-process inference management |
| **GPUManager** | `gpu_manager.py` | GPU detection, allocation, and memory management |
| **DistributedManager** | `distributed_utils.py` | PyTorch distributed communication setup |
| **ServerConfig** | `config.py` | Centralized configuration with environment variable support |
## Task Processing Flow
```mermaid
sequenceDiagram
participant C as Client
participant API as API Server
participant TM as TaskManager
participant PT as Processing Thread
participant VS as VideoService
participant FS as FileService
participant DIS as Distributed
Inference Service
participant W0 as Worker 0
(Master)
participant W1 as Worker 1..N
C->>API: POST /v1/tasks
(Create Task)
API->>TM: create_task()
TM->>TM: Generate task_id
TM->>TM: Add to queue
(status: PENDING)
API->>PT: ensure_processing_thread()
API-->>C: TaskResponse
(task_id, status: pending)
Note over PT: Processing Loop
PT->>TM: get_next_pending_task()
TM-->>PT: task_id
PT->>TM: acquire_processing_lock()
PT->>TM: start_task()
(status: PROCESSING)
PT->>VS: generate_video_with_stop_event()
alt Image is URL
VS->>FS: download_image()
FS->>FS: HTTP download
with retry
FS-->>VS: image_path
else Image is Base64
VS->>FS: save_base64_image()
FS-->>VS: image_path
else Image is Upload
VS->>FS: validate_file()
FS-->>VS: image_path
end
VS->>DIS: submit_task(task_data)
DIS->>DIS: shared_data["current_task"] = task_data
DIS->>DIS: task_event.set()
Note over W0,W1: Distributed Processing
W0->>W0: task_event.wait()
W0->>W0: Get task from shared_data
W0->>W1: broadcast_task_data()
par Parallel Inference
W0->>W0: run_pipeline()
and
W1->>W1: run_pipeline()
end
W0->>W0: barrier() for sync
W0->>W0: shared_data["result"] = result
W0->>DIS: result_event.set()
DIS->>DIS: result_event.wait()
DIS->>VS: return result
VS-->>PT: TaskResponse
PT->>TM: complete_task()
(status: COMPLETED)
PT->>TM: release_processing_lock()
Note over C: Client Polling
C->>API: GET /v1/tasks/{task_id}/status
API->>TM: get_task_status()
TM-->>API: status info
API-->>C: Task Status
C->>API: GET /v1/tasks/{task_id}/result
API->>TM: get_task_status()
API->>FS: stream_file_response()
FS-->>API: Video Stream
API-->>C: Video File
```
## Task States
```mermaid
stateDiagram-v2
[*] --> PENDING: create_task()
PENDING --> PROCESSING: start_task()
PROCESSING --> COMPLETED: complete_task()
PROCESSING --> FAILED: fail_task()
PENDING --> CANCELLED: cancel_task()
PROCESSING --> CANCELLED: cancel_task()
COMPLETED --> [*]
FAILED --> [*]
CANCELLED --> [*]
```
## API Endpoints
see `{base_url}/docs`
### Task Management
- POST `/v1/tasks/`
- POST `/v1/tasks/form`
- GET `/v1/tasks/{task_id}/status`
- GET `/v1/tasks/{task_id}/result`
- DELETE `/v1/tasks/{task_id}`
- DELETE `/v1/tasks/all/running`
- GET `/v1/tasks/`
- GET `/v1/tasks/queue/status`
## Configuration
### Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `LIGHTX2V_HOST` | Server host address | `0.0.0.0` |
| `LIGHTX2V_PORT` | Server port | `8000` |
| `LIGHTX2V_MAX_QUEUE_SIZE` | Maximum task queue size | `100` |
| `LIGHTX2V_CACHE_DIR` | File cache directory | `/tmp/lightx2v_cache` |
| `LIGHTX2V_TASK_TIMEOUT` | Task processing timeout (seconds) | `600` |
| `LIGHTX2V_HTTP_TIMEOUT` | HTTP download timeout (seconds) | `30` |
| `LIGHTX2V_HTTP_MAX_RETRIES` | HTTP download max retries | `3` |
| `LIGHTX2V_MAX_UPLOAD_SIZE` | Maximum upload file size (bytes) | `100MB` |
### Command Line Arguments
```bash
python -m lightx2v.server.main \
--model_path /path/to/model \
--model_cls wan2.1_distill \
--task i2v \
--host 0.0.0.0 \
--port 8000 \
--config_json /path/to/xxx_config.json
```
```bash
python -m lightx2v.server.main \
--model_path /path/to/model \
--model_cls wan2.1_distill \
--task i2v \
--host 0.0.0.0 \
--port 8000 \
--config_json /path/to/xxx_dist_config.json \
--nproc_per_node 2
```
## Key Features
### 1. Distributed Processing
- **Multi-process architecture** for GPU parallelization
- **Master-worker pattern** with rank 0 as coordinator
- **PyTorch distributed** backend (NCCL for GPU, Gloo for CPU)
- **Automatic GPU allocation** across processes
- **Task broadcasting** with chunked pickle serialization
### 2. Task Queue Management
- **Thread-safe** task queue with locks
- **Sequential processing** with single processing thread
- **Configurable queue limits** with overflow protection
- **Task prioritization** (FIFO)
- **Automatic cleanup** of old completed tasks
- **Cancellation support** for pending and running tasks
### 3. File Management
- **Multiple input formats**: URL, base64, file upload
- **HTTP downloads** with exponential backoff retry
- **Streaming responses** for large video files
- **Cache management** with automatic cleanup
- **File validation** and format detection
### 4. Resilient Architecture
- **Graceful shutdown** with signal handling
- **Process failure recovery** mechanisms
- **Connection pooling** for HTTP clients
- **Timeout protection** at multiple levels
- **Comprehensive error handling** throughout
### 5. Resource Management
- **GPU memory management** with cache clearing
- **Process lifecycle management**
- **Connection pooling** for efficiency
- **Memory-efficient** streaming for large files
- **Automatic resource cleanup** on shutdown
## Performance Considerations
1. **Single Task Processing**: Tasks are processed sequentially to manage GPU memory effectively
2. **Multi-GPU Support**: Distributes inference across available GPUs for parallelization
3. **Connection Pooling**: Reuses HTTP connections to reduce overhead
4. **Streaming Responses**: Large files are streamed to avoid memory issues
5. **Queue Management**: Automatic task cleanup prevents memory leaks
6. **Process Isolation**: Distributed workers run in separate processes for stability
## Usage Examples
### Client Usage
```python
import httpx
import base64
# Create a task with URL image
response = httpx.post(
"http://localhost:8000/v1/tasks/",
json={
"prompt": "A cat playing piano",
"image_path": "https://example.com/image.jpg",
"use_prompt_enhancer": True,
"seed": 42
}
)
task_id = response.json()["task_id"]
# Create a task with base64 image
with open("image.png", "rb") as f:
image_base64 = base64.b64encode(f.read()).decode()
response = httpx.post(
"http://localhost:8000/v1/tasks/",
json={
"prompt": "A dog dancing",
"image_path": f"data:image/png;base64,{image_base64}"
}
)
# Check task status
status = httpx.get(f"http://localhost:8000/v1/tasks/{task_id}/status")
print(status.json())
# Download result when completed
if status.json()["status"] == "completed":
video = httpx.get(f"http://localhost:8000/v1/tasks/{task_id}/result")
with open("output.mp4", "wb") as f:
f.write(video.content)
```
## Monitoring and Debugging
### Logging
The server uses `loguru` for structured logging. Logs include:
- Request/response details
- Task lifecycle events
- Worker process status
- Error traces with context
### Health Checks
- `/v1/service/status` - Overall service health
- `/v1/tasks/queue/status` - Queue status and processing state
- Process monitoring via system tools (htop, nvidia-smi)
### Common Issues
1. **GPU Out of Memory**: Reduce `nproc_per_node` or adjust model batch size
2. **Task Timeout**: Increase `LIGHTX2V_TASK_TIMEOUT` for longer videos
3. **Queue Full**: Increase `LIGHTX2V_MAX_QUEUE_SIZE` or add rate limiting
4. **Port Conflicts**: Change `LIGHTX2V_PORT` or `MASTER_PORT` range
## Security Considerations
1. **Input Validation**: All inputs validated with Pydantic schemas
2. **File Access**: Restricted to cache directory
3. **Resource Limits**: Configurable queue and file size limits
4. **Process Isolation**: Worker processes run with limited permissions
5. **HTTP Security**: Support for proxy and authentication headers
## License
See the main project LICENSE file for licensing information.