# LightX2V Server
## Overview
The LightX2V server is a distributed video/image generation service built with FastAPI that processes image-to-video and text-to-image tasks using a multi-process architecture with GPU support. It implements a sophisticated task queue system with distributed inference capabilities for high-throughput generation workloads.
## Directory Structure
```
server/
├── __init__.py
├── __main__.py # Entry point
├── main.py # Server startup
├── config.py # Configuration
├── task_manager.py # Task management
├── schema.py # Data models (VideoTaskRequest, ImageTaskRequest)
├── api/
│ ├── __init__.py
│ ├── router.py # Main router aggregation
│ ├── deps.py # Dependency injection container
│ ├── server.py # ApiServer class
│ ├── files.py # /v1/files/*
│ ├── service_routes.py # /v1/service/*
│ └── tasks/
│ ├── __init__.py
│ ├── common.py # Common task operations
│ ├── video.py # POST /v1/tasks/video
│ └── image.py # POST /v1/tasks/image
├── services/
│ ├── __init__.py
│ ├── file_service.py # File service (unified download)
│ ├── distributed_utils.py # Distributed manager
│ ├── inference/
│ │ ├── __init__.py
│ │ ├── worker.py # TorchrunInferenceWorker
│ │ └── service.py # DistributedInferenceService
│ └── generation/
│ ├── __init__.py
│ ├── base.py # Base generation service
│ ├── video.py # VideoGenerationService
│ └── image.py # ImageGenerationService
├── media/
│ ├── __init__.py
│ ├── base.py # MediaHandler base class
│ ├── image.py # ImageHandler
│ └── audio.py # AudioHandler
└── metrics/ # Prometheus metrics
```
## Architecture
### System Architecture
```mermaid
flowchart TB
Client[Client] -->|Send API Request| Router[FastAPI Router]
subgraph API Layer
Router --> TaskRoutes[Task APIs]
Router --> FileRoutes[File APIs]
Router --> ServiceRoutes[Service Status APIs]
TaskRoutes --> CreateVideoTask["POST /v1/tasks/video - Create Video Task"]
TaskRoutes --> CreateImageTask["POST /v1/tasks/image - Create Image Task"]
TaskRoutes --> CreateVideoTaskForm["POST /v1/tasks/video/form - Form Create Video"]
TaskRoutes --> CreateImageTaskForm["POST /v1/tasks/image/form - Form Create Image"]
TaskRoutes --> ListTasks["GET /v1/tasks/ - List Tasks"]
TaskRoutes --> GetTaskStatus["GET /v1/tasks/{id}/status - Get Status"]
TaskRoutes --> GetTaskResult["GET /v1/tasks/{id}/result - Get Result"]
TaskRoutes --> StopTask["DELETE /v1/tasks/{id} - Stop Task"]
FileRoutes --> DownloadFile["GET /v1/files/download/{path} - Download File"]
ServiceRoutes --> GetServiceStatus["GET /v1/service/status - Service Status"]
ServiceRoutes --> GetServiceMetadata["GET /v1/service/metadata - Metadata"]
end
subgraph Task Management
TaskManager[Task Manager]
TaskQueue[Task Queue]
TaskStatus[Task Status]
TaskResult[Task Result]
CreateVideoTask --> TaskManager
CreateImageTask --> TaskManager
TaskManager --> TaskQueue
TaskManager --> TaskStatus
TaskManager --> TaskResult
end
subgraph File Service
FileService[File Service]
DownloadMedia[Download Media]
SaveFile[Save File]
GetOutputPath[Get Output Path]
FileService --> DownloadMedia
FileService --> SaveFile
FileService --> GetOutputPath
end
subgraph Media Handlers
MediaHandler[MediaHandler Base]
ImageHandler[ImageHandler]
AudioHandler[AudioHandler]
MediaHandler --> ImageHandler
MediaHandler --> AudioHandler
end
subgraph Processing Thread
ProcessingThread[Processing Thread]
NextTask[Get Next Task]
ProcessTask[Process Single Task]
ProcessingThread --> NextTask
ProcessingThread --> ProcessTask
end
subgraph Generation Services
VideoService[VideoGenerationService]
ImageService[ImageGenerationService]
BaseService[BaseGenerationService]
BaseService --> VideoService
BaseService --> ImageService
end
subgraph Distributed Inference Service
InferenceService[DistributedInferenceService]
SubmitTask[Submit Task]
Worker[TorchrunInferenceWorker]
ProcessRequest[Process Request]
RunPipeline[Run Inference Pipeline]
InferenceService --> SubmitTask
SubmitTask --> Worker
Worker --> ProcessRequest
ProcessRequest --> RunPipeline
end
TaskQueue --> ProcessingThread
ProcessTask --> VideoService
ProcessTask --> ImageService
VideoService --> InferenceService
ImageService --> InferenceService
GetTaskResult --> FileService
DownloadFile --> FileService
VideoService --> FileService
ImageService --> FileService
FileService --> MediaHandler
```
## Task Processing Flow
```mermaid
sequenceDiagram
participant C as Client
participant API as API Server
participant TM as TaskManager
participant PT as Processing Thread
participant GS as GenerationService
(Video/Image)
participant FS as FileService
participant DIS as DistributedInferenceService
participant TIW0 as TorchrunInferenceWorker
(Rank 0)
participant TIW1 as TorchrunInferenceWorker
(Rank 1..N)
C->>API: POST /v1/tasks/video
or /v1/tasks/image
API->>TM: create_task()
TM->>TM: Generate task_id
TM->>TM: Add to queue
(status: PENDING)
API->>PT: ensure_processing_thread()
API-->>C: TaskResponse
(task_id, status: pending)
Note over PT: Processing Loop
PT->>TM: get_next_pending_task()
TM-->>PT: task_id
PT->>TM: acquire_processing_lock()
PT->>TM: start_task()
(status: PROCESSING)
PT->>PT: Select service by task type
PT->>GS: generate_with_stop_event()
alt Image is URL
GS->>FS: download_media(url, "image")
FS->>FS: HTTP download
with retry
FS-->>GS: image_path
else Image is Base64
GS->>GS: save_base64_image()
GS-->>GS: image_path
else Image is local path
GS->>GS: use existing path
end
alt Audio is URL (Video only)
GS->>FS: download_media(url, "audio")
FS->>FS: HTTP download
with retry
FS-->>GS: audio_path
else Audio is Base64
GS->>GS: save_base64_audio()
GS-->>GS: audio_path
end
GS->>DIS: submit_task_async(task_data)
DIS->>TIW0: process_request(task_data)
Note over TIW0,TIW1: Torchrun-based Distributed Processing
TIW0->>TIW0: Check if processing
TIW0->>TIW0: Set processing = True
alt Multi-GPU Mode (world_size > 1)
TIW0->>TIW1: broadcast_task_data()
(via DistributedManager)
Note over TIW1: worker_loop() listens for broadcasts
TIW1->>TIW1: Receive task_data
end
par Parallel Inference across all ranks
TIW0->>TIW0: runner.set_inputs(task_data)
TIW0->>TIW0: runner.run_pipeline()
and
Note over TIW1: If world_size > 1
TIW1->>TIW1: runner.set_inputs(task_data)
TIW1->>TIW1: runner.run_pipeline()
end
Note over TIW0,TIW1: Synchronization
alt Multi-GPU Mode
TIW0->>TIW1: barrier() for sync
TIW1->>TIW0: barrier() response
end
TIW0->>TIW0: Set processing = False
TIW0->>DIS: Return result (only rank 0)
TIW1->>TIW1: Return None (non-rank 0)
DIS-->>GS: TaskResponse
GS-->>PT: TaskResponse
PT->>TM: complete_task()
(status: COMPLETED)
PT->>TM: release_processing_lock()
Note over C: Client Polling
C->>API: GET /v1/tasks/{task_id}/status
API->>TM: get_task_status()
TM-->>API: status info
API-->>C: Task Status
C->>API: GET /v1/tasks/{task_id}/result
API->>TM: get_task_status()
API->>FS: stream_file_response()
FS-->>API: Video/Image Stream
API-->>C: Output File
```
## Task States
```mermaid
stateDiagram-v2
[*] --> PENDING: create_task()
PENDING --> PROCESSING: start_task()
PROCESSING --> COMPLETED: complete_task()
PROCESSING --> FAILED: fail_task()
PENDING --> CANCELLED: cancel_task()
PROCESSING --> CANCELLED: cancel_task()
COMPLETED --> [*]
FAILED --> [*]
CANCELLED --> [*]
```
## API Endpoints
### Task APIs
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/tasks/video` | POST | Create video generation task |
| `/v1/tasks/video/form` | POST | Create video task with form data |
| `/v1/tasks/image` | POST | Create image generation task |
| `/v1/tasks/image/form` | POST | Create image task with form data |
| `/v1/tasks` | GET | List all tasks |
| `/v1/tasks/queue/status` | GET | Get queue status |
| `/v1/tasks/{task_id}/status` | GET | Get task status |
| `/v1/tasks/{task_id}/result` | GET | Get task result (stream) |
| `/v1/tasks/{task_id}` | DELETE | Cancel task |
| `/v1/tasks/all/running` | DELETE | Cancel all running tasks |
### File APIs
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/files/download/{path}` | GET | Download output file |
### Service APIs
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/service/status` | GET | Get service status |
| `/v1/service/metadata` | GET | Get service metadata |
## Request Models
### VideoTaskRequest
```python
class VideoTaskRequest(BaseTaskRequest):
num_fragments: int = 1
target_video_length: int = 81
audio_path: str = ""
video_duration: int = 5
talk_objects: Optional[list[TalkObject]] = None
```
### ImageTaskRequest
```python
class ImageTaskRequest(BaseTaskRequest):
aspect_ratio: str = "16:9"
```
### BaseTaskRequest (Common Fields)
```python
class BaseTaskRequest(BaseModel):
task_id: str # auto-generated
prompt: str = ""
use_prompt_enhancer: bool = False
negative_prompt: str = ""
image_path: str = "" # URL, base64, or local path
save_result_path: str = ""
infer_steps: int = 5
seed: int # auto-generated
```
## Configuration
### Environment Variables
see `lightx2v/server/config.py`
### Command Line Arguments
```bash
# Single GPU
python -m lightx2v.server \
--model_path /path/to/model \
--model_cls wan2.1_distill \
--task i2v \
--host 0.0.0.0 \
--port 8000 \
--config_json /path/to/xxx_config.json
```
```bash
# Multi-GPU with torchrun
torchrun --nproc_per_node=2 -m lightx2v.server \
--model_path /path/to/model \
--model_cls wan2.1_distill \
--task i2v \
--host 0.0.0.0 \
--port 8000 \
--config_json /path/to/xxx_dist_config.json
```
## Key Features
### 1. Distributed Processing
- **Multi-process architecture** for GPU parallelization
- **Master-worker pattern** with rank 0 as coordinator
- **PyTorch distributed** backend (NCCL for GPU, Gloo for CPU)
- **Automatic GPU allocation** across processes
- **Task broadcasting** with chunked pickle serialization
### 2. Task Queue Management
- **Thread-safe** task queue with locks
- **Sequential processing** with single processing thread
- **Configurable queue limits** with overflow protection
- **Task prioritization** (FIFO)
- **Automatic cleanup** of old completed tasks
- **Cancellation support** for pending and running tasks
### 3. File Management
- **Multiple input formats**: URL, base64, file upload
- **HTTP downloads** with exponential backoff retry
- **Streaming responses** for large video files
- **Cache management** with automatic cleanup
- **File validation** and format detection
- **Unified media handling** via MediaHandler pattern
### 4. Separate Video/Image Endpoints
- **Dedicated endpoints** for video and image generation
- **Type-specific request models** (VideoTaskRequest, ImageTaskRequest)
- **Automatic service routing** based on task type
- **Backward compatible** with legacy `/v1/tasks` endpoint
## Performance Considerations
1. **Single Task Processing**: Tasks are processed sequentially to manage GPU memory effectively
2. **Multi-GPU Support**: Distributes inference across available GPUs for parallelization
3. **Connection Pooling**: Reuses HTTP connections to reduce overhead
4. **Streaming Responses**: Large files are streamed to avoid memory issues
5. **Queue Management**: Automatic task cleanup prevents memory leaks
6. **Process Isolation**: Distributed workers run in separate processes for stability
## Monitoring and Debugging
### Logging
The server uses `loguru` for structured logging. Logs include:
- Request/response details
- Task lifecycle events
- Worker process status
- Error traces with context
### Health Checks
- `/v1/service/status` - Overall service health
- `/v1/tasks/queue/status` - Queue status and processing state
- Process monitoring via system tools (htop, nvidia-smi)
### Common Issues
1. **GPU Out of Memory**: Reduce `nproc_per_node` or adjust model batch size
2. **Task Timeout**: Increase `LIGHTX2V_TASK_TIMEOUT` for longer videos
3. **Queue Full**: Increase `LIGHTX2V_MAX_QUEUE_SIZE` or add rate limiting
## Security Considerations
1. **Input Validation**: All inputs validated with Pydantic schemas
## License
See the main project LICENSE file for licensing information.