worker.py 18.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
FastVideo Worker for Dynamo (non-streaming)

Registers a VideoGenerator as a Dynamo backend endpoint compatible with the
/v1/videos frontend endpoint.  The endpoint generates a full video
clip from the request parameters and returns it as a single response containing
the complete MP4 file base64-encoded in data[0].b64_json.

Generation parameters (size, fps, num_frames, etc.) are taken from the
request body's nvext field, so the same worker instance can serve requests
with different resolutions and quality settings without restarting.

One request at a time (asyncio.Lock — VideoGenerator is not re-entrant).

Usage:
19
20
  python worker.py [--model MODEL] [--num-gpus N] [--enable-optimizations]
                   [--attention-backend ATTENTION_BACKEND]
21
22
23
24
25

Options:
  --model          HuggingFace model path
                   (default: FastVideo/LTX2-Distilled-Diffusers)
  --num-gpus       Number of GPUs (default: 1)
26
27
28
29
  --enable-optimizations
                   Enable FP4 quantization (if available) and torch.compile
  --attention-backend
                   Attention backend (default: TORCH_SDPA)
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

Request format (sent to /v1/videos):
  prompt:   text description of the desired video
  model:    HuggingFace model path (must match what the worker registered)
  size:     "WxH" string, e.g. "1920x1088" (default: "1920x1088")
  seconds:  clip duration when nvext.num_frames is not set (default: 5)
  nvext:
    fps:                frames per second (default: 24)
    num_frames:         total frames; overrides fps * seconds when set (default: 121)
    num_inference_steps diffusion steps (default: 5)
    guidance_scale:     CFG scale (default: 1.0)
    seed:               RNG seed (default: 10)
    negative_prompt:    text to avoid (optional)
"""

import argparse
import asyncio
import base64
import logging
import os
import tempfile
import time
import uuid

54
import torch
55
56
57
import uvloop
from fastvideo import VideoGenerator
from fastvideo.configs.pipelines.base import PipelineConfig
58
from fastvideo.platforms.interface import AttentionBackendEnum
59
60
61
62
63
64
65
66
from pydantic import BaseModel, Field

from dynamo.llm import ModelInput, ModelType, register_llm  # type: ignore[attr-defined]
from dynamo.runtime import DistributedRuntime, dynamo_endpoint

logger = logging.getLogger(__name__)

DEFAULT_MODEL = "FastVideo/LTX2-Distilled-Diffusers"
67
68
69
70
71
72
73
74
DEFAULT_ATTENTION_BACKEND = "TORCH_SDPA"
# FastVideo exposes NO_ATTENTION in the enum, but it is not a selectable
# inference backend for this worker's FASTVIDEO_ATTENTION_BACKEND override.
ATTENTION_BACKEND_CHOICES = tuple(
    backend_name
    for backend_name in AttentionBackendEnum.__members__
    if backend_name != "NO_ATTENTION"
)
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

# ── Request / Response models ─────────────────────────────────────────────────


def _get_worker_namespace() -> str:
    """
    Resolve Dynamo namespace for endpoint registration.

    Kubernetes operator injects DYN_NAMESPACE (and optionally a rollout suffix).
    Compose/local runs keep using the historical "dynamo" default.
    """
    namespace = os.environ.get("DYN_NAMESPACE", "dynamo")
    suffix = os.environ.get("DYN_NAMESPACE_WORKER_SUFFIX")
    if suffix:
        namespace = f"{namespace}-{suffix}"
    return namespace


class NvExtVideoCreateRequest(BaseModel):
    fps: int = Field(default=24, description="Frames per second")
    num_frames: int | None = Field(
        default=121, description="Total frames; overrides fps * seconds"
    )
    num_inference_steps: int = Field(default=5, description="Diffusion inference steps")
    guidance_scale: float = Field(
        default=1.0, description="Classifier-free guidance scale"
    )
    seed: int | None = Field(default=10, description="RNG seed for reproducibility")
    negative_prompt: str | None = Field(
        default=None, description="Text to avoid in generation"
    )


class VideoCreateRequest(BaseModel):
    prompt: str = Field(description="Text description of the desired video")
    model: str = Field(description="HuggingFace model path")
    size: str = Field(default="1920x1088", description="Frame dimensions as 'WxH'")
    seconds: int = Field(
        default=5, description="Clip duration; used when nvext.num_frames is unset"
    )
    user: str | None = Field(default=None)
    nvext: NvExtVideoCreateRequest = Field(default_factory=NvExtVideoCreateRequest)


class VideoData(BaseModel):
    b64_json: str | None = Field(default=None, description="Base64-encoded MP4 video")
    mime_type: str = Field(default="video/mp4")


class VideoCreateResponse(BaseModel):
    id: str
    object: str = "video"
    created: int
    model: str
    status: str = "complete"
    data: list[VideoData]


# ── Backend ───────────────────────────────────────────────────────────────────


def _coerce_optional_float(value: object) -> float | None:
    """Best-effort conversion for optional numeric metrics from backend results."""
    if value is None:
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


class FastVideoBackend:
    def __init__(self, args: argparse.Namespace) -> None:
        self.model_name: str = args.model
        self.num_gpus: int = args.num_gpus
150
151
        self.enable_optimizations: bool = args.enable_optimizations
        self.attention_backend: str = args.attention_backend
152
153
154
155
156

        # One request at a time — VideoGenerator is not re-entrant
        self._generate_lock = asyncio.Lock()
        self.generator: VideoGenerator | None = None

157
        os.environ["FASTVIDEO_ATTENTION_BACKEND"] = self.attention_backend
158
159
160
161
162
163
164
165
166
        os.environ["FASTVIDEO_STAGE_LOGGING"] = "1"
        os.environ["FASTVIDEO_ENABLE_RMSNORM_FP4_PREQUANT"] = "0"

    async def initialize_model(self) -> None:
        logger.info("Loading VideoGenerator model=%s", self.model_name)
        loop = asyncio.get_running_loop()

        def _load():
            pipeline_config = PipelineConfig.from_pretrained(self.model_name)
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
            optimization_kwargs = {}
            if self.enable_optimizations:
                major, minor = torch.cuda.get_device_capability()
                if major < 10:
                    logger.warning(
                        "FP4 quantization is only supported on NVIDIA Blackwell GPUs (compute capability 10.0+). Detected compute capability: %d.%d. Continuing without FP4 optimizations.",
                        major,
                        minor,
                    )
                else:
                    logger.info(
                        "Using FP4 quantization for VideoGenerator model=%s",
                        self.model_name,
                    )
                    try:
                        from fastvideo.layers.quantization.fp4_config import FP4Config
                    except ImportError as exc:
                        raise RuntimeError(
                            "FastVideo optimizations require "
                            "fastvideo.layers.quantization.fp4_config, but this "
                            "FastVideo build does not provide it. Re-run "
                            "worker.py without --enable-optimizations or install a "
                            "FastVideo version that includes fp4_config."
                        ) from exc
                    pipeline_config.dit_config.quant_config = FP4Config()

                optimization_kwargs = {
                    "ltx2_refine_enabled": True,
                    "ltx2_refine_lora_path": "",  # disable refine lora for distilled model
                    "ltx2_refine_num_inference_steps": 2,
                    "ltx2_refine_guidance_scale": 1.0,
                    "ltx2_refine_add_noise": True,
                    "enable_torch_compile": True,
                    "enable_torch_compile_text_encoder": True,
                    "torch_compile_kwargs": {
                        "backend": "inductor",
                        "fullgraph": True,
                        "mode": "max-autotune-no-cudagraphs",
                    },
                    "dit_cpu_offload": False,
                    "vae_cpu_offload": False,
                    "text_encoder_cpu_offload": False,
                    "ltx2_vae_tiling": False,
                }
211
212
213
214
215

            return VideoGenerator.from_pretrained(
                self.model_name,
                num_gpus=self.num_gpus,
                pipeline_config=pipeline_config,
216
                **optimization_kwargs,
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
            )

        self.generator = await loop.run_in_executor(None, _load)
        logger.info("VideoGenerator ready")

    # ── Helpers ───────────────────────────────────────────────────────────────

    def _generate_mp4(
        self,
        prompt: str,
        video_id: str,
        width: int,
        height: int,
        num_frames: int,
        fps: int,
        num_inference_steps: int,
        guidance_scale: float,
        seed: int | None,
        negative_prompt: str | None,
    ) -> bytes:
        """Generate a video clip and return it as MP4 bytes."""
        assert self.generator is not None

        with tempfile.TemporaryDirectory() as tmpdir:
            output_path = os.path.join(tmpdir, "output.mp4")
            kwargs: dict = dict(
                save_video=True,
                return_frames=False,
                output_path=output_path,
                height=height,
                width=width,
                num_frames=num_frames,
                fps=fps,
                num_inference_steps=num_inference_steps,
                guidance_scale=guidance_scale,
            )
            if seed is not None:
                kwargs["seed"] = seed
            if negative_prompt is not None:
                kwargs["negative_prompt"] = negative_prompt

            result = self.generator.generate_video(prompt=prompt, **kwargs)
            result_dict = result if isinstance(result, dict) else {}
            generation_time = _coerce_optional_float(result_dict.get("generation_time"))
            e2e_latency = _coerce_optional_float(result_dict.get("e2e_latency"))
            logger.info("[%s] MP4 written to %s", video_id, output_path)
            if generation_time is not None:
                logger.info(
                    "[%s] Generation time: %.2f seconds", video_id, generation_time
                )
            else:
                logger.info("[%s] Generation time: unavailable", video_id)

            if e2e_latency is not None:
                logger.info("[%s] E2E latency: %.2f seconds", video_id, e2e_latency)
            else:
                logger.info("[%s] E2E latency: unavailable", video_id)

            time_start = time.perf_counter()
            with open(output_path, "rb") as f:
                data = f.read()
            time_end = time.perf_counter()
            logger.info(
                "[%s] File read time: %.2f seconds", video_id, time_end - time_start
            )

            return data

    # ── Dynamo endpoint ───────────────────────────────────────────────────────

    @dynamo_endpoint(VideoCreateRequest, VideoCreateResponse)
    async def create_video(self, request: VideoCreateRequest):
        """
        Non-streaming endpoint.

        Generates one video clip using the parameters from the request's nvext
        field, then yields a single VideoCreateResponse with data[0].b64_json
        containing the complete MP4 file encoded in base64.
        """
        if self.generator is None:
            raise RuntimeError("Generator is not initialized")

        nvext = request.nvext
        try:
            width_str, height_str = request.size.lower().split("x", 1)
            width, height = int(width_str), int(height_str)
        except (ValueError, TypeError) as exc:
            raise ValueError(
                f"Invalid size format '{request.size}', expected 'WxH'"
            ) from exc

        if width <= 0 or height <= 0:
            raise ValueError(
                f"Invalid size '{request.size}', width and height must be positive"
            )

        num_frames = (
            nvext.num_frames
            if nvext.num_frames is not None
            else nvext.fps * request.seconds
        )
        if num_frames <= 0:
            raise ValueError("num_frames must be positive")

        fps = nvext.fps
        if fps <= 0:
            raise ValueError("fps must be positive")

        video_id = f"video_{uuid.uuid4().hex}"
        created_ts = int(time.time())

        logger.info(
            "[%s] create_video: prompt='%s...' size=%s frames=%d steps=%d",
            video_id,
            request.prompt[:60],
            request.size,
            num_frames,
            nvext.num_inference_steps,
        )
        logger.info(
            "[%s] Waiting for generate lock (locked=%s)",
            video_id,
            self._generate_lock.locked(),
        )
        async with self._generate_lock:
            t = time.perf_counter()
            logger.info(
                "[%s] Generating video (%dx%d, %d frames, %d steps) ...",
                video_id,
                width,
                height,
                num_frames,
                nvext.num_inference_steps,
            )
            try:
                mp4_bytes = await asyncio.to_thread(
                    self._generate_mp4,
                    prompt=request.prompt,
                    video_id=video_id,
                    width=width,
                    height=height,
                    num_frames=num_frames,
                    fps=fps,
                    num_inference_steps=nvext.num_inference_steps,
                    guidance_scale=nvext.guidance_scale,
                    seed=nvext.seed,
                    negative_prompt=nvext.negative_prompt,
                )
            except Exception as exc:
                logger.exception("[%s] Generation failed", video_id)
                raise RuntimeError(
                    f"Video generation failed for request {video_id}"
                ) from exc

            elapsed = time.perf_counter() - t
            logger.info(
                "[%s] Generation done in %.1fs — encoding %.2f MB MP4",
                video_id,
                elapsed,
                len(mp4_bytes) / 1_048_576,
            )

            yield VideoCreateResponse(
                id=video_id,
                created=created_ts,
                model=request.model,
                data=[VideoData(b64_json=base64.b64encode(mp4_bytes).decode())],
            ).model_dump()
        logger.info("[%s] Generation request finished", video_id)


# ── Dynamo wiring ─────────────────────────────────────────────────────────────


async def _register_model(endpoint, model_name: str) -> None:
    try:
        await register_llm(
            ModelInput.Text,  # type: ignore[attr-defined]
            ModelType.Videos,
            endpoint,
            model_name,
            model_name,
        )
        logger.info("Successfully registered model: %s", model_name)
    except Exception as e:
        logger.error("Failed to register model: %s", e, exc_info=True)
        raise RuntimeError("Model registration failed") from e


async def backend_worker(runtime: DistributedRuntime, args: argparse.Namespace) -> None:
    namespace_name = _get_worker_namespace()
    component_name = "backend"
    endpoint_name = "generate"

    endpoint = runtime.endpoint(f"{namespace_name}.{component_name}.{endpoint_name}")
    logger.info(
        "Serving endpoint %s/%s/%s", namespace_name, component_name, endpoint_name
    )

    backend = FastVideoBackend(args)
    await backend.initialize_model()

    await asyncio.gather(
        endpoint.serve_endpoint(backend.create_video),  # type: ignore[arg-type]
        _register_model(endpoint, backend.model_name),
    )


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="FastVideo Worker for Dynamo (non-streaming)"
    )
    parser.add_argument(
        "--model",
        default=DEFAULT_MODEL,
        help=f"HuggingFace model path (default: {DEFAULT_MODEL})",
    )
    parser.add_argument(
        "--num-gpus",
        type=int,
        default=1,
        dest="num_gpus",
        help="Number of GPUs (default: 1)",
    )
    parser.add_argument(
442
        "--enable-optimizations",
443
        action="store_true",
444
445
446
447
448
449
450
451
452
453
454
455
456
        dest="enable_optimizations",
        help="Enable FP4 quantization (if available) and torch.compile",
    )
    parser.add_argument(
        "--attention-backend",
        choices=ATTENTION_BACKEND_CHOICES,
        default=DEFAULT_ATTENTION_BACKEND,
        dest="attention_backend",
        help=(
            "Attention backend to set via FASTVIDEO_ATTENTION_BACKEND "
            f"(choices: {', '.join(ATTENTION_BACKEND_CHOICES)}; "
            f"default: {DEFAULT_ATTENTION_BACKEND})"
        ),
457
458
459
460
461
462
463
464
465
466
467
468
469
470
    )
    return parser.parse_args()


async def main(args: argparse.Namespace) -> None:
    loop = asyncio.get_running_loop()
    # Use Kubernetes discovery in-cluster and file discovery for local compose by default.
    discovery_backend = os.environ.get("DYN_DISCOVERY_BACKEND")
    if not discovery_backend:
        discovery_backend = (
            "kubernetes" if os.environ.get("KUBERNETES_SERVICE_HOST") else "file"
        )
    logger.info("Using discovery backend: %s", discovery_backend)
    logger.info("Resolved worker namespace: %s", _get_worker_namespace())
471
    runtime = DistributedRuntime(loop, discovery_backend, "tcp")
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
    await backend_worker(runtime, args)


if __name__ == "__main__":
    _args = _parse_args()
    logging.basicConfig(
        level=(
            logging.DEBUG
            if os.environ.get("FASTVIDEO_LOG_LEVEL") == "DEBUG"
            else logging.INFO
        ),
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        force=True,
    )
    uvloop.install()
    asyncio.run(main(_args))