README.md 13.5 KB
Newer Older
gaclove's avatar
gaclove committed
1
2
3
4
# LightX2V Server

## Overview

PengGao's avatar
PengGao committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
The LightX2V server is a distributed video/image generation service built with FastAPI that processes image-to-video and text-to-image tasks using a multi-process architecture with GPU support. It implements a sophisticated task queue system with distributed inference capabilities for high-throughput generation workloads.

## Directory Structure

```
server/
├── __init__.py
├── __main__.py              # Entry point
├── main.py                  # Server startup
├── config.py                # Configuration
├── task_manager.py          # Task management
├── schema.py                # Data models (VideoTaskRequest, ImageTaskRequest)
├── api/
│   ├── __init__.py
│   ├── router.py            # Main router aggregation
│   ├── deps.py              # Dependency injection container
│   ├── server.py            # ApiServer class
│   ├── files.py             # /v1/files/*
│   ├── service_routes.py    # /v1/service/*
│   └── tasks/
│       ├── __init__.py
│       ├── common.py        # Common task operations
│       ├── video.py         # POST /v1/tasks/video
│       └── image.py         # POST /v1/tasks/image
├── services/
│   ├── __init__.py
│   ├── file_service.py      # File service (unified download)
│   ├── distributed_utils.py # Distributed manager
│   ├── inference/
│   │   ├── __init__.py
│   │   ├── worker.py        # TorchrunInferenceWorker
│   │   └── service.py       # DistributedInferenceService
│   └── generation/
│       ├── __init__.py
│       ├── base.py          # Base generation service
│       ├── video.py         # VideoGenerationService
│       └── image.py         # ImageGenerationService
├── media/
│   ├── __init__.py
│   ├── base.py              # MediaHandler base class
│   ├── image.py             # ImageHandler
│   └── audio.py             # AudioHandler
└── metrics/                 # Prometheus metrics
```
gaclove's avatar
gaclove committed
49
50
51
52
53
54

## Architecture

### System Architecture

```mermaid
PengGao's avatar
PengGao committed
55
56
flowchart TB
    Client[Client] -->|Send API Request| Router[FastAPI Router]
57

PengGao's avatar
PengGao committed
58
59
60
61
    subgraph API Layer
        Router --> TaskRoutes[Task APIs]
        Router --> FileRoutes[File APIs]
        Router --> ServiceRoutes[Service Status APIs]
62

PengGao's avatar
PengGao committed
63
64
65
66
        TaskRoutes --> CreateVideoTask["POST /v1/tasks/video - Create Video Task"]
        TaskRoutes --> CreateImageTask["POST /v1/tasks/image - Create Image Task"]
        TaskRoutes --> CreateVideoTaskForm["POST /v1/tasks/video/form - Form Create Video"]
        TaskRoutes --> CreateImageTaskForm["POST /v1/tasks/image/form - Form Create Image"]
PengGao's avatar
PengGao committed
67
        TaskRoutes --> ListTasks["GET /v1/tasks/ - List Tasks"]
PengGao's avatar
PengGao committed
68
69
70
        TaskRoutes --> GetTaskStatus["GET /v1/tasks/{id}/status - Get Status"]
        TaskRoutes --> GetTaskResult["GET /v1/tasks/{id}/result - Get Result"]
        TaskRoutes --> StopTask["DELETE /v1/tasks/{id} - Stop Task"]
71

PengGao's avatar
PengGao committed
72
        FileRoutes --> DownloadFile["GET /v1/files/download/{path} - Download File"]
73

PengGao's avatar
PengGao committed
74
75
        ServiceRoutes --> GetServiceStatus["GET /v1/service/status - Service Status"]
        ServiceRoutes --> GetServiceMetadata["GET /v1/service/metadata - Metadata"]
gaclove's avatar
gaclove committed
76
    end
77

PengGao's avatar
PengGao committed
78
79
80
81
82
83
    subgraph Task Management
        TaskManager[Task Manager]
        TaskQueue[Task Queue]
        TaskStatus[Task Status]
        TaskResult[Task Result]

PengGao's avatar
PengGao committed
84
85
        CreateVideoTask --> TaskManager
        CreateImageTask --> TaskManager
PengGao's avatar
PengGao committed
86
87
88
        TaskManager --> TaskQueue
        TaskManager --> TaskStatus
        TaskManager --> TaskResult
gaclove's avatar
gaclove committed
89
    end
90

PengGao's avatar
PengGao committed
91
92
    subgraph File Service
        FileService[File Service]
PengGao's avatar
PengGao committed
93
        DownloadMedia[Download Media]
PengGao's avatar
PengGao committed
94
95
96
        SaveFile[Save File]
        GetOutputPath[Get Output Path]

PengGao's avatar
PengGao committed
97
        FileService --> DownloadMedia
PengGao's avatar
PengGao committed
98
99
100
        FileService --> SaveFile
        FileService --> GetOutputPath
    end
101

PengGao's avatar
PengGao committed
102
103
104
105
106
107
108
109
110
    subgraph Media Handlers
        MediaHandler[MediaHandler Base]
        ImageHandler[ImageHandler]
        AudioHandler[AudioHandler]

        MediaHandler --> ImageHandler
        MediaHandler --> AudioHandler
    end

PengGao's avatar
PengGao committed
111
112
113
114
    subgraph Processing Thread
        ProcessingThread[Processing Thread]
        NextTask[Get Next Task]
        ProcessTask[Process Single Task]
115

PengGao's avatar
PengGao committed
116
117
118
        ProcessingThread --> NextTask
        ProcessingThread --> ProcessTask
    end
119

PengGao's avatar
PengGao committed
120
121
122
123
    subgraph Generation Services
        VideoService[VideoGenerationService]
        ImageService[ImageGenerationService]
        BaseService[BaseGenerationService]
124

PengGao's avatar
PengGao committed
125
126
        BaseService --> VideoService
        BaseService --> ImageService
PengGao's avatar
PengGao committed
127
    end
128

PengGao's avatar
PengGao committed
129
    subgraph Distributed Inference Service
PengGao's avatar
PengGao committed
130
        InferenceService[DistributedInferenceService]
PengGao's avatar
PengGao committed
131
        SubmitTask[Submit Task]
PengGao's avatar
PengGao committed
132
        Worker[TorchrunInferenceWorker]
PengGao's avatar
PengGao committed
133
134
135
136
137
138
139
140
        ProcessRequest[Process Request]
        RunPipeline[Run Inference Pipeline]

        InferenceService --> SubmitTask
        SubmitTask --> Worker
        Worker --> ProcessRequest
        ProcessRequest --> RunPipeline
    end
141

PengGao's avatar
PengGao committed
142
143
    TaskQueue --> ProcessingThread
    ProcessTask --> VideoService
PengGao's avatar
PengGao committed
144
145
146
    ProcessTask --> ImageService
    VideoService --> InferenceService
    ImageService --> InferenceService
PengGao's avatar
PengGao committed
147
148
149
    GetTaskResult --> FileService
    DownloadFile --> FileService
    VideoService --> FileService
PengGao's avatar
PengGao committed
150
151
    ImageService --> FileService
    FileService --> MediaHandler
gaclove's avatar
gaclove committed
152
153
154
155
156
157
158
159
160
161
```

## Task Processing Flow

```mermaid
sequenceDiagram
    participant C as Client
    participant API as API Server
    participant TM as TaskManager
    participant PT as Processing Thread
PengGao's avatar
PengGao committed
162
    participant GS as GenerationService<br/>(Video/Image)
gaclove's avatar
gaclove committed
163
    participant FS as FileService
PengGao's avatar
PengGao committed
164
165
166
    participant DIS as DistributedInferenceService
    participant TIW0 as TorchrunInferenceWorker<br/>(Rank 0)
    participant TIW1 as TorchrunInferenceWorker<br/>(Rank 1..N)
167

PengGao's avatar
PengGao committed
168
    C->>API: POST /v1/tasks/video<br/>or /v1/tasks/image
gaclove's avatar
gaclove committed
169
170
171
172
173
    API->>TM: create_task()
    TM->>TM: Generate task_id
    TM->>TM: Add to queue<br/>(status: PENDING)
    API->>PT: ensure_processing_thread()
    API-->>C: TaskResponse<br/>(task_id, status: pending)
174

gaclove's avatar
gaclove committed
175
176
177
    Note over PT: Processing Loop
    PT->>TM: get_next_pending_task()
    TM-->>PT: task_id
178

gaclove's avatar
gaclove committed
179
180
    PT->>TM: acquire_processing_lock()
    PT->>TM: start_task()<br/>(status: PROCESSING)
181

PengGao's avatar
PengGao committed
182
183
    PT->>PT: Select service by task type
    PT->>GS: generate_with_stop_event()
184

gaclove's avatar
gaclove committed
185
    alt Image is URL
PengGao's avatar
PengGao committed
186
        GS->>FS: download_media(url, "image")
gaclove's avatar
gaclove committed
187
        FS->>FS: HTTP download<br/>with retry
PengGao's avatar
PengGao committed
188
        FS-->>GS: image_path
gaclove's avatar
gaclove committed
189
    else Image is Base64
PengGao's avatar
PengGao committed
190
191
        GS->>GS: save_base64_image()
        GS-->>GS: image_path
PengGao's avatar
PengGao committed
192
    else Image is local path
PengGao's avatar
PengGao committed
193
        GS->>GS: use existing path
PengGao's avatar
PengGao committed
194
195
    end

PengGao's avatar
PengGao committed
196
197
    alt Audio is URL (Video only)
        GS->>FS: download_media(url, "audio")
PengGao's avatar
PengGao committed
198
        FS->>FS: HTTP download<br/>with retry
PengGao's avatar
PengGao committed
199
        FS-->>GS: audio_path
PengGao's avatar
PengGao committed
200
    else Audio is Base64
PengGao's avatar
PengGao committed
201
202
        GS->>GS: save_base64_audio()
        GS-->>GS: audio_path
gaclove's avatar
gaclove committed
203
    end
204

PengGao's avatar
PengGao committed
205
    GS->>DIS: submit_task_async(task_data)
PengGao's avatar
PengGao committed
206
    DIS->>TIW0: process_request(task_data)
207

PengGao's avatar
PengGao committed
208
209
210
    Note over TIW0,TIW1: Torchrun-based Distributed Processing
    TIW0->>TIW0: Check if processing
    TIW0->>TIW0: Set processing = True
211

PengGao's avatar
PengGao committed
212
213
214
215
216
217
218
219
220
    alt Multi-GPU Mode (world_size > 1)
        TIW0->>TIW1: broadcast_task_data()<br/>(via DistributedManager)
        Note over TIW1: worker_loop() listens for broadcasts
        TIW1->>TIW1: Receive task_data
    end

    par Parallel Inference across all ranks
        TIW0->>TIW0: runner.set_inputs(task_data)
        TIW0->>TIW0: runner.run_pipeline()
gaclove's avatar
gaclove committed
221
    and
PengGao's avatar
PengGao committed
222
223
224
225
226
227
228
229
230
        Note over TIW1: If world_size > 1
        TIW1->>TIW1: runner.set_inputs(task_data)
        TIW1->>TIW1: runner.run_pipeline()
    end

    Note over TIW0,TIW1: Synchronization
    alt Multi-GPU Mode
        TIW0->>TIW1: barrier() for sync
        TIW1->>TIW0: barrier() response
gaclove's avatar
gaclove committed
231
    end
232

PengGao's avatar
PengGao committed
233
234
235
    TIW0->>TIW0: Set processing = False
    TIW0->>DIS: Return result (only rank 0)
    TIW1->>TIW1: Return None (non-rank 0)
236

PengGao's avatar
PengGao committed
237
238
    DIS-->>GS: TaskResponse
    GS-->>PT: TaskResponse
239

gaclove's avatar
gaclove committed
240
241
    PT->>TM: complete_task()<br/>(status: COMPLETED)
    PT->>TM: release_processing_lock()
242

gaclove's avatar
gaclove committed
243
244
245
246
247
    Note over C: Client Polling
    C->>API: GET /v1/tasks/{task_id}/status
    API->>TM: get_task_status()
    TM-->>API: status info
    API-->>C: Task Status
248

gaclove's avatar
gaclove committed
249
250
251
    C->>API: GET /v1/tasks/{task_id}/result
    API->>TM: get_task_status()
    API->>FS: stream_file_response()
PengGao's avatar
PengGao committed
252
253
    FS-->>API: Video/Image Stream
    API-->>C: Output File
gaclove's avatar
gaclove committed
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
```

## Task States

```mermaid
stateDiagram-v2
    [*] --> PENDING: create_task()
    PENDING --> PROCESSING: start_task()
    PROCESSING --> COMPLETED: complete_task()
    PROCESSING --> FAILED: fail_task()
    PENDING --> CANCELLED: cancel_task()
    PROCESSING --> CANCELLED: cancel_task()
    COMPLETED --> [*]
    FAILED --> [*]
    CANCELLED --> [*]
```

PengGao's avatar
PengGao committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
## API Endpoints

### Task APIs

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/tasks/video` | POST | Create video generation task |
| `/v1/tasks/video/form` | POST | Create video task with form data |
| `/v1/tasks/image` | POST | Create image generation task |
| `/v1/tasks/image/form` | POST | Create image task with form data |
| `/v1/tasks` | GET | List all tasks |
| `/v1/tasks/queue/status` | GET | Get queue status |
| `/v1/tasks/{task_id}/status` | GET | Get task status |
| `/v1/tasks/{task_id}/result` | GET | Get task result (stream) |
| `/v1/tasks/{task_id}` | DELETE | Cancel task |
| `/v1/tasks/all/running` | DELETE | Cancel all running tasks |

### File APIs

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/files/download/{path}` | GET | Download output file |

### Service APIs

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/v1/service/status` | GET | Get service status |
| `/v1/service/metadata` | GET | Get service metadata |

## Request Models

### VideoTaskRequest

```python
class VideoTaskRequest(BaseTaskRequest):
    num_fragments: int = 1
    target_video_length: int = 81
    audio_path: str = ""
    video_duration: int = 5
    talk_objects: Optional[list[TalkObject]] = None
```

### ImageTaskRequest

```python
class ImageTaskRequest(BaseTaskRequest):
    aspect_ratio: str = "16:9"
```

### BaseTaskRequest (Common Fields)

```python
class BaseTaskRequest(BaseModel):
    task_id: str  # auto-generated
    prompt: str = ""
    use_prompt_enhancer: bool = False
    negative_prompt: str = ""
    image_path: str = ""  # URL, base64, or local path
    save_result_path: str = ""
    infer_steps: int = 5
    seed: int  # auto-generated
```

gaclove's avatar
gaclove committed
335
336
337
338
## Configuration

### Environment Variables

gaclove's avatar
gaclove committed
339
see `lightx2v/server/config.py`
gaclove's avatar
gaclove committed
340
341
342
343

### Command Line Arguments

```bash
PengGao's avatar
PengGao committed
344
345
# Single GPU
python -m lightx2v.server \
gaclove's avatar
gaclove committed
346
347
348
349
350
351
352
353
354
    --model_path /path/to/model \
    --model_cls wan2.1_distill \
    --task i2v \
    --host 0.0.0.0 \
    --port 8000 \
    --config_json /path/to/xxx_config.json
```

```bash
PengGao's avatar
PengGao committed
355
356
# Multi-GPU with torchrun
torchrun --nproc_per_node=2 -m lightx2v.server \
gaclove's avatar
gaclove committed
357
358
359
360
361
    --model_path /path/to/model \
    --model_cls wan2.1_distill \
    --task i2v \
    --host 0.0.0.0 \
    --port 8000 \
PengGao's avatar
PengGao committed
362
    --config_json /path/to/xxx_dist_config.json
gaclove's avatar
gaclove committed
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
```

## Key Features

### 1. Distributed Processing

- **Multi-process architecture** for GPU parallelization
- **Master-worker pattern** with rank 0 as coordinator
- **PyTorch distributed** backend (NCCL for GPU, Gloo for CPU)
- **Automatic GPU allocation** across processes
- **Task broadcasting** with chunked pickle serialization

### 2. Task Queue Management

- **Thread-safe** task queue with locks
- **Sequential processing** with single processing thread
- **Configurable queue limits** with overflow protection
- **Task prioritization** (FIFO)
- **Automatic cleanup** of old completed tasks
- **Cancellation support** for pending and running tasks

### 3. File Management

- **Multiple input formats**: URL, base64, file upload
- **HTTP downloads** with exponential backoff retry
- **Streaming responses** for large video files
- **Cache management** with automatic cleanup
- **File validation** and format detection
PengGao's avatar
PengGao committed
391
392
393
394
395
396
397
398
- **Unified media handling** via MediaHandler pattern

### 4. Separate Video/Image Endpoints

- **Dedicated endpoints** for video and image generation
- **Type-specific request models** (VideoTaskRequest, ImageTaskRequest)
- **Automatic service routing** based on task type
- **Backward compatible** with legacy `/v1/tasks` endpoint
gaclove's avatar
gaclove committed
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438

## Performance Considerations

1. **Single Task Processing**: Tasks are processed sequentially to manage GPU memory effectively
2. **Multi-GPU Support**: Distributes inference across available GPUs for parallelization
3. **Connection Pooling**: Reuses HTTP connections to reduce overhead
4. **Streaming Responses**: Large files are streamed to avoid memory issues
5. **Queue Management**: Automatic task cleanup prevents memory leaks
6. **Process Isolation**: Distributed workers run in separate processes for stability

## Monitoring and Debugging

### Logging

The server uses `loguru` for structured logging. Logs include:

- Request/response details
- Task lifecycle events
- Worker process status
- Error traces with context

### Health Checks

- `/v1/service/status` - Overall service health
- `/v1/tasks/queue/status` - Queue status and processing state
- Process monitoring via system tools (htop, nvidia-smi)

### Common Issues

1. **GPU Out of Memory**: Reduce `nproc_per_node` or adjust model batch size
2. **Task Timeout**: Increase `LIGHTX2V_TASK_TIMEOUT` for longer videos
3. **Queue Full**: Increase `LIGHTX2V_MAX_QUEUE_SIZE` or add rate limiting

## Security Considerations

1. **Input Validation**: All inputs validated with Pydantic schemas

## License

See the main project LICENSE file for licensing information.