Commit 1da75ff3 authored by mashun1's avatar mashun1
Browse files

hyi2v

parents
Pipeline #2556 failed with stages
in 0 seconds
# Download Pretrained Models
All models are stored in `HunyuanVideo-I2V/ckpts` by default, and the file structure is as follows
```shell
HunyuanVideo-I2V
├──ckpts
│ ├──README.md
│ ├──hunyuan-video-i2v-720p
│ │ ├──transformers
│ │ │ ├──mp_rank_00_model_states.pt
├ │ ├──vae
├ │ ├──lora
│ │ │ ├──embrace_kohaya_weights.safetensors
│ │ │ ├──hair_growth_kohaya_weights.safetensors
│ ├──text_encoder_i2v
│ ├──text_encoder_2
├──...
```
## Download HunyuanVideo-I2V model
To download the HunyuanVideo-I2V model, first install the huggingface-cli. (Detailed instructions are available [here](https://huggingface.co/docs/huggingface_hub/guides/cli).)
```shell
python -m pip install "huggingface_hub[cli]"
```
Then download the model using the following commands:
```shell
# Switch to the directory named 'HunyuanVideo-I2V'
cd HunyuanVideo-I2V
# Use the huggingface-cli tool to download HunyuanVideo-I2V model in HunyuanVideo-I2V/ckpts dir.
# The download time may vary from 10 minutes to 1 hour depending on network conditions.
huggingface-cli download tencent/HunyuanVideo-I2V --local-dir ./ckpts
```
<details>
<summary>💡Tips for using huggingface-cli (network problem)</summary>
##### 1. Using HF-Mirror
If you encounter slow download speeds in China, you can try a mirror to speed up the download process. For example,
```shell
HF_ENDPOINT=https://hf-mirror.com huggingface-cli download tencent/HunyuanVideo-I2V --local-dir ./ckpts
```
##### 2. Resume Download
`huggingface-cli` supports resuming downloads. If the download is interrupted, you can just rerun the download
command to resume the download process.
Note: If an `No such file or directory: 'ckpts/.huggingface/.gitignore.lock'` like error occurs during the download
process, you can ignore the error and rerun the download command.
</details>
---
## Download Text Encoder
HunyuanVideo-I2V uses an MLLM model and a CLIP model as text encoder.
1. MLLM model (text_encoder_i2v folder)
HunyuanVideo-I2V supports different MLLMs (including HunyuanMLLM and open-source MLLM models). At this stage, we have not yet released HunyuanMLLM. We recommend the user in community to use [llava-llama-3-8b](https://huggingface.co/xtuner/llava-llama-3-8b-v1_1-transformers) provided by [Xtuer](https://huggingface.co/xtuner), which can be downloaded by the following command.
Note that unlike [HunyuanVideo](https://github.com/Tencent/HunyuanVideo/tree/main), which only uses the language model parts of `llava-llama-3-8b-v1_1-transformers`, HunyuanVideo-I2V needs its full model to encode both prompts and images. Therefore, you only need to download the model without preprocessing.
```shell
cd HunyuanVideo-I2V/ckpts
huggingface-cli download xtuner/llava-llama-3-8b-v1_1-transformers --local-dir ./text_encoder_i2v
```
2. CLIP model (text_encoder_2 folder)
We use [CLIP](https://huggingface.co/openai/clip-vit-large-patch14) provided by [OpenAI](https://openai.com) as another text encoder, users in the community can download this model by the following command
```
cd HunyuanVideo-I2V/ckpts
huggingface-cli download openai/clip-vit-large-patch14 --local-dir ./text_encoder_2
```
{
"Name": [
"HunyuanVideo-I2V"
],
}
\ No newline at end of file
import argparse
from .constants import *
import re
from .modules.models import HUNYUAN_VIDEO_CONFIG
def parse_args(mode="eval", namespace=None):
parser = argparse.ArgumentParser(description="HunyuanVideo inference/lora training script")
parser = add_network_args(parser)
parser = add_extra_models_args(parser)
parser = add_denoise_schedule_args(parser)
parser = add_i2v_args(parser)
parser = add_lora_args(parser)
parser = add_inference_args(parser)
parser = add_parallel_args(parser)
if mode == "train":
parser = add_training_args(parser)
parser = add_optimizer_args(parser)
parser = add_deepspeed_args(parser)
parser = add_data_args(parser)
parser = add_train_denoise_schedule_args(parser)
args = parser.parse_args(namespace=namespace)
args = sanity_check_args(args)
return args
def add_train_denoise_schedule_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Denoise schedule")
group.add_argument("--flow-path-type", type=str, default="linear", choices=FLOW_PATH_TYPE,
help="Path type for flow matching schedulers.")
group.add_argument("--flow-predict-type", type=str, default="velocity", choices=FLOW_PREDICT_TYPE,
help="Prediction type for flow matching schedulers.")
group.add_argument("--flow-loss-weight", type=str, default=None, choices=FLOW_LOSS_WEIGHT,
help="Loss weight type for flow matching schedulers.")
group.add_argument("--flow-train-eps", type=float, default=None,
help="Small epsilon for avoiding instability during training.")
group.add_argument("--flow-sample-eps", type=float, default=None,
help="Small epsilon for avoiding instability during sampling.")
group.add_argument("--flow-snr-type", type=str, default="lognorm", choices=FLOW_SNR_TYPE,
help="Type of SNR to use for flow matching schedulers.")
return parser
def add_deepspeed_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="DeepSpeed")
group.add_argument("--local_rank", type=int, default=-1, help="Local rank for distributed training.")
group.add_argument("--zero-stage", type=int, default=0, choices=[0, 1, 2, 3],
help="DeepSpeed ZeRO stage. 0: off, 1: offload optimizer, 2: offload parameters, "
"3: offload optimizer and parameters.")
return parser
def add_data_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Data")
group.add_argument("--data-type", type=str, default="image", choices=DATA_TYPE, help="Type of the dataset.")
group.add_argument("--data-jsons-path", type=str, default=None, help="Dataset path for training.")
group.add_argument("--sample-n-frames", type=int, default=65,
help="How many frames to sample from a video. if using 3d vae, the number should be 4n+1")
group.add_argument("--sample-stride", type=int, default=1,
help="How many frames to skip when sampling from a video.")
group.add_argument("--num-workers", type=int, default=4, help="Number of workers for data loading.")
group.add_argument("--prefetch-factor", type=int, default=2, help="Prefetch factor for data loading.")
group.add_argument("--same-data-batch", action="store_true", help="Use same data type for all rank in a batch for training.")
group.add_argument("--uncond-p", type=float, default=0.1,
help="Probability of randomly dropping video description.")
group.add_argument("--sematic-cond-drop-p", type=float, default=0.1,
help="Probability of randomly dropping img condition description.")
return parser
def add_training_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Training")
group.add_argument("--task-flag", type=str, required=True,
help="Task flag for training/inference. It is used to determine the experiment directory.")
group.add_argument("--output-dir", type=str, required=True, help="Directory to save logs and models")
group.add_argument("--sample-dir", type=str, default=None, required=False, help="Directory to save samples")
group.add_argument("--micro-batch-size", type=int, default=1, nargs='*',
help="Batch size per model instance (local batch size).")
group.add_argument("--video-micro-batch-size", type=int, default=None, nargs='*',
help="Batch size per model instance (local batch size).")
group.add_argument("--global-batch-size", type=int, default=None, nargs='*',
help="Global batch size (across all model instances). "
"global-batch-size = micro-batch-size * world-size * gradient-accumulation-steps")
group.add_argument("--gradient-accumulation-steps", type=int, default=1,
help="Number of steps to accumulate gradients over before performing an update.")
group.add_argument("--global-seed", type=int, default=42, help="Global seed for reproducibility.")
group.add_argument("--resume", type=str, default=None,
help="Path to the checkpoint to resume training. It can be an experiment index to resume from "
"the latest checkpoint in the output directory.")
group.add_argument("--init-from", type=str, default=None,
help="Path to the checkpoint to load from init ckpt for training. ")
group.add_argument("--training-parts", type=str, default=None, help="Training a subset of the model parameters.")
group.add_argument("--init-save", action="store_true", help="Save the initial model before training.")
group.set_defaults(final_save=True)
group.add_argument("--final-save", action="store_true", help="Save the final model after training.")
group.add_argument("--no-final-save", dest="final_save", action="store_false", help="Do not save the final model.")
group.add_argument("--epochs", type=int, default=100000, help="Number of epochs to train.")
group.add_argument("--max-training-steps", type=int, default=10_000_000, help="Maximum number of training steps.")
group.add_argument("--ckpt-every", type=int, default=5000, help="Save checkpoint every N steps.")
group.add_argument("--rope-theta-rescale-factor", type=float, default=1.0, nargs='+',
help="Rope interpolation factor.")
group.add_argument("--rope-interpolation-factor", type=float, default=1.0, nargs='+',
help="Rope interpolation factor.")
group.add_argument("--log-every", type=int, default=10, help="Log every N update steps.")
group.add_argument("--tensorboard", action="store_true", help="Enable TensorBoard logging.")
group.add_argument("--profile", action="store_true", help="Enable PyTorch profiler.")
return parser
def add_optimizer_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Optimizer")
# Learning rate
group.add_argument("--lr", type=float, default=1e-4,
help="Basic learning rate, varies depending on learning rate schedule and warmup.")
group.add_argument("--warmup-min-lr", type=float, default=1e-6, help="Minimum learning rate for warmup.")
group.add_argument("--warmup-num-steps", type=int, default=0, help="Number of warmup steps for learning rate.")
# Optimizer
group.add_argument("--adam-beta1", type=float, default=0.9,
help="[AdamW] First coefficient for computing running averages of gradient.")
group.add_argument("--adam-beta2", type=float, default=0.999,
help="[AdamW] Second coefficient for computing running averages of gradient square.")
group.add_argument("--adam-eps", type=float, default=1e-8,
help="[AdamW] Term added to the denominator to improve numerical stability.")
group.add_argument("--weight-decay", type=float, default=0,
help="Weight decay coefficient for L2 regularization.")
return parser
def add_train_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="HunyuanVideo train args")
return parser
def add_network_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="HunyuanVideo network args")
# Main model
group.add_argument(
"--model",
type=str,
choices=list(HUNYUAN_VIDEO_CONFIG.keys()),
default="HYVideo-T/2-cfgdistill",
)
group.add_argument(
"--latent-channels",
type=str,
default=16,
help="Number of latent channels of DiT. If None, it will be determined by `vae`. If provided, "
"it still needs to match the latent channels of the VAE model.",
)
group.add_argument(
"--precision",
type=str,
default="bf16",
choices=PRECISIONS,
help="Precision mode. Options: fp32, fp16, bf16. Applied to the backbone model and optimizer.",
)
# RoPE
group.add_argument(
"--rope-theta", type=int, default=256, help="Theta used in RoPE."
)
group.add_argument("--gradient-checkpoint", action="store_true",
help="Enable gradient checkpointing to reduce memory usage.")
group.add_argument("--gradient-checkpoint-layers", type=int, default=-1,
help="Number of layers to checkpoint. -1 for all layers. `n` for the first n layers.")
return parser
def add_extra_models_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(
title="Extra models args, including vae, text encoders and tokenizers)"
)
# - VAE
group.add_argument(
"--vae",
type=str,
default="884-16c-hy",
choices=list(VAE_PATH),
help="Name of the VAE model.",
)
group.add_argument(
"--vae-precision",
type=str,
default="fp16",
choices=PRECISIONS,
help="Precision mode for the VAE model.",
)
group.add_argument(
"--vae-tiling",
action="store_true",
help="Enable tiling for the VAE model to save GPU memory.",
)
group.set_defaults(vae_tiling=True)
group.add_argument(
"--text-encoder",
type=str,
default="llm-i2v",
choices=list(TEXT_ENCODER_PATH),
help="Name of the text encoder model.",
)
group.add_argument(
"--text-encoder-precision",
type=str,
default="fp16",
choices=PRECISIONS,
help="Precision mode for the text encoder model.",
)
group.add_argument(
"--text-states-dim",
type=int,
default=4096,
help="Dimension of the text encoder hidden states.",
)
group.add_argument(
"--text-len", type=int, default=256, help="Maximum length of the text input."
)
group.add_argument(
"--tokenizer",
type=str,
default="llm-i2v",
choices=list(TOKENIZER_PATH),
help="Name of the tokenizer model.",
)
group.add_argument(
"--prompt-template",
type=str,
default="dit-llm-encode-i2v",
choices=PROMPT_TEMPLATE,
help="Image prompt template for the decoder-only text encoder model.",
)
group.add_argument(
"--prompt-template-video",
type=str,
default="dit-llm-encode-video-i2v",
choices=PROMPT_TEMPLATE,
help="Video prompt template for the decoder-only text encoder model.",
)
group.add_argument(
"--hidden-state-skip-layer",
type=int,
default=2,
help="Skip layer for hidden states.",
)
group.add_argument(
"--apply-final-norm",
action="store_true",
help="Apply final normalization to the used text encoder hidden states.",
)
# - CLIP
group.add_argument(
"--text-encoder-2",
type=str,
default="clipL",
choices=list(TEXT_ENCODER_PATH),
help="Name of the second text encoder model.",
)
group.add_argument(
"--text-encoder-precision-2",
type=str,
default="fp16",
choices=PRECISIONS,
help="Precision mode for the second text encoder model.",
)
group.add_argument(
"--text-states-dim-2",
type=int,
default=768,
help="Dimension of the second text encoder hidden states.",
)
group.add_argument(
"--tokenizer-2",
type=str,
default="clipL",
choices=list(TOKENIZER_PATH),
help="Name of the second tokenizer model.",
)
group.add_argument(
"--text-len-2",
type=int,
default=77,
help="Maximum length of the second text input.",
)
return parser
def add_denoise_schedule_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Denoise schedule args")
group.add_argument(
"--denoise-type",
type=str,
default="flow",
help="Denoise type for noised inputs.",
)
# Flow Matching
group.add_argument(
"--flow-shift",
type=float,
default=17.0,
help="Shift factor for flow matching schedulers.",
)
group.add_argument(
"--flow-reverse",
action="store_true",
help="If reverse, learning/sampling from t=1 -> t=0.",
)
group.add_argument(
"--flow-solver",
type=str,
default="euler",
help="Solver for flow matching.",
)
group.add_argument(
"--use-linear-quadratic-schedule",
action="store_true",
help="Use linear quadratic schedule for flow matching."
"Following MovieGen (https://ai.meta.com/static-resource/movie-gen-research-paper)",
)
group.add_argument(
"--linear-schedule-end",
type=int,
default=25,
help="End step for linear quadratic schedule for flow matching.",
)
return parser
def add_inference_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Inference args")
# ======================== Model loads ========================
group.add_argument(
"--model-base",
type=str,
default="ckpts",
help="Root path of all the models, including t2v models and extra models.",
)
group.add_argument(
"--dit-weight",
type=str,
default="ckpts/hunyuan-video-t2v-720p/transformers/mp_rank_00_model_states.pt",
help="Path to the HunyuanVideo model. If None, search the model in the args.model_root."
"1. If it is a file, load the model directly."
"2. If it is a directory, search the model in the directory. Support two types of models: "
"1) named `pytorch_model_*.pt`"
"2) named `*_model_states.pt`, where * can be `mp_rank_00`.",
)
group.add_argument(
"--i2v-dit-weight",
type=str,
default="ckpts/hunyuan-video-i2v-720p/transformers/mp_rank_00_model_states.pt",
help="Path to the HunyuanVideo model. If None, search the model in the args.model_root."
"1. If it is a file, load the model directly."
"2. If it is a directory, search the model in the directory. Support two types of models: "
"1) named `pytorch_model_*.pt`"
"2) named `*_model_states.pt`, where * can be `mp_rank_00`.",
)
group.add_argument(
"--model-resolution",
type=str,
default="540p",
choices=["540p", "720p"],
help="Root path of all the models, including t2v models and extra models.",
)
group.add_argument(
"--load-key",
type=str,
default="module",
help="Key to load the model states. 'module' for the main model, 'ema' for the EMA model.",
)
group.add_argument(
"--use-cpu-offload",
action="store_true",
help="Use CPU offload for the model load.",
)
# ======================== Inference general setting ========================
group.add_argument(
"--batch-size",
type=int,
default=1,
help="Batch size for inference and evaluation.",
)
group.add_argument(
"--infer-steps",
type=int,
default=50,
help="Number of denoising steps for inference.",
)
group.add_argument(
"--disable-autocast",
action="store_true",
help="Disable autocast for denoising loop and vae decoding in pipeline sampling.",
)
group.add_argument(
"--save-path",
type=str,
default="./results",
help="Path to save the generated samples.",
)
group.add_argument(
"--save-path-suffix",
type=str,
default="",
help="Suffix for the directory of saved samples.",
)
group.add_argument(
"--name-suffix",
type=str,
default="",
help="Suffix for the names of saved samples.",
)
group.add_argument(
"--num-videos",
type=int,
default=1,
help="Number of videos to generate for each prompt.",
)
# ---sample size---
group.add_argument(
"--video-size",
type=int,
nargs="+",
default=(720, 1280),
help="Video size for training. If a single value is provided, it will be used for both height "
"and width. If two values are provided, they will be used for height and width "
"respectively.",
)
group.add_argument(
"--video-length",
type=int,
default=129,
help="How many frames to sample from a video. if using 3d vae, the number should be 4n+1",
)
# --- prompt ---
group.add_argument(
"--prompt",
type=str,
default=None,
help="Prompt for sampling during evaluation.",
)
group.add_argument(
"--seed-type",
type=str,
default="auto",
choices=["file", "random", "fixed", "auto"],
help="Seed type for evaluation. If file, use the seed from the CSV file. If random, generate a "
"random seed. If fixed, use the fixed seed given by `--seed`. If auto, `csv` will use the "
"seed column if available, otherwise use the fixed `seed` value. `prompt` will use the "
"fixed `seed` value.",
)
group.add_argument("--seed", type=int, default=None, help="Seed for evaluation.")
# Classifier-Free Guidance
group.add_argument(
"--neg-prompt", type=str, default=None, help="Negative prompt for sampling."
)
group.add_argument(
"--cfg-scale", type=float, default=1.0, help="Classifier free guidance scale."
)
group.add_argument(
"--embedded-cfg-scale",
type=float,
default=None,
help="Embeded classifier free guidance scale.",
)
group.add_argument(
"--use-fp8",
action="store_true",
help="Enable use fp8 for inference acceleration."
)
group.add_argument(
"--reproduce",
action="store_true",
help="Enable reproducibility by setting random seeds and deterministic algorithms.",
)
return parser
def add_i2v_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="I2V args")
group.add_argument(
"--i2v-mode",
action="store_true",
help="Whether to open i2v mode."
)
group.add_argument(
"--i2v-resolution",
type=str,
default="720p",
choices=["720p", "540p", "360p"],
help="Resolution for i2v inference."
)
group.add_argument(
"--i2v-image-path",
type=str,
default="./assets/demo/i2v/imgs/0.png",
help="Image path for i2v inference."
)
group.add_argument(
"--i2v-condition-type",
type=str,
default="token_replace",
choices=["token_replace", "latent_concat"],
help="Condition type for i2v model."
)
group.add_argument(
"--i2v-stability", action="store_true", help="Whether to use i2v stability mode."
)
return parser
def add_lora_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="lora args")
group.add_argument(
"--use-lora", action="store_true", help="Whether to open lora mode."
)
group.add_argument(
"--lora-path", type=str, default="", help="Weight path for lora model."
)
group.add_argument(
"--lora-scale", type=float, default=1.0, help="Fusion scale for lora model."
)
group.add_argument(
"--lora-rank", type=int, default=64, help="Rank for lora model."
)
return parser
def add_parallel_args(parser: argparse.ArgumentParser):
group = parser.add_argument_group(title="Parallel args")
# ======================== Model loads ========================
group.add_argument(
"--ulysses-degree",
type=int,
default=1,
help="Ulysses degree for xdit parallel args.",
)
group.add_argument(
"--ring-degree",
type=int,
default=1,
help="Ring degree for xdit parallel args.",
)
group.add_argument(
"--xdit-adaptive-size",
action="store_true",
help="Make the generated video has no black padding.")
return parser
def sanity_check_args(args):
# VAE channels
vae_pattern = r"\d{2,3}-\d{1,2}c-\w+"
if not re.match(vae_pattern, args.vae):
raise ValueError(
f"Invalid VAE model: {args.vae}. Must be in the format of '{vae_pattern}'."
)
vae_channels = int(args.vae.split("-")[1][:-1])
if args.latent_channels is None:
args.latent_channels = vae_channels
if vae_channels != args.latent_channels:
raise ValueError(
f"Latent channels ({args.latent_channels}) must match the VAE channels ({vae_channels})."
)
return args
import os
import torch
__all__ = [
"C_SCALE",
"PROMPT_TEMPLATE",
"MODEL_BASE",
"PRECISIONS",
"NORMALIZATION_TYPE",
"ACTIVATION_TYPE",
"VAE_PATH",
"TEXT_ENCODER_PATH",
"TOKENIZER_PATH",
"TEXT_PROJECTION",
"DATA_TYPE",
"NEGATIVE_PROMPT",
"NEGATIVE_PROMPT_I2V",
"FLOW_PATH_TYPE",
"FLOW_PREDICT_TYPE",
"FLOW_LOSS_WEIGHT",
"FLOW_SNR_TYPE",
"FLOW_SOLVER",
]
PRECISION_TO_TYPE = {
'fp32': torch.float32,
'fp16': torch.float16,
'bf16': torch.bfloat16,
}
# =================== Constant Values =====================
# Computation scale factor, 1P = 1_000_000_000_000_000. Tensorboard will display the value in PetaFLOPS to avoid
# overflow error when tensorboard logging values.
C_SCALE = 1_000_000_000_000_000
# When using decoder-only models, we must provide a prompt template to instruct the text encoder
# on how to generate the text.
# --------------------------------------------------------------------
PROMPT_TEMPLATE_ENCODE = (
"<|start_header_id|>system<|end_header_id|>\n\nDescribe the image by detailing the color, shape, size, texture, "
"quantity, text, spatial relationships of the objects and background:<|eot_id|>"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
)
PROMPT_TEMPLATE_ENCODE_VIDEO = (
"<|start_header_id|>system<|end_header_id|>\n\nDescribe the video by detailing the following aspects: "
"1. The main content and theme of the video."
"2. The color, shape, size, texture, quantity, text, and spatial relationships of the objects."
"3. Actions, events, behaviors temporal relationships, physical movement changes of the objects."
"4. background environment, light, style and atmosphere."
"5. camera angles, movements, and transitions used in the video:<|eot_id|>"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
)
PROMPT_TEMPLATE_ENCODE_I2V = (
"<|start_header_id|>system<|end_header_id|>\n\n<image>\nDescribe the image by detailing the color, shape, size, texture, "
"quantity, text, spatial relationships of the objects and background:<|eot_id|>"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
"<|start_header_id|>assistant<|end_header_id|>\n\n"
)
PROMPT_TEMPLATE_ENCODE_VIDEO_I2V = (
"<|start_header_id|>system<|end_header_id|>\n\n<image>\nDescribe the video by detailing the following aspects according to the reference image: "
"1. The main content and theme of the video."
"2. The color, shape, size, texture, quantity, text, and spatial relationships of the objects."
"3. Actions, events, behaviors temporal relationships, physical movement changes of the objects."
"4. background environment, light, style and atmosphere."
"5. camera angles, movements, and transitions used in the video:<|eot_id|>\n\n"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
"<|start_header_id|>assistant<|end_header_id|>\n\n"
)
NEGATIVE_PROMPT = "Aerial view, aerial view, overexposed, low quality, deformation, a poor composition, bad hands, bad teeth, bad eyes, bad limbs, distortion"
NEGATIVE_PROMPT_I2V = "deformation, a poor composition and deformed video, bad teeth, bad eyes, bad limbs"
PROMPT_TEMPLATE = {
"dit-llm-encode": {
"template": PROMPT_TEMPLATE_ENCODE,
"crop_start": 36,
},
"dit-llm-encode-video": {
"template": PROMPT_TEMPLATE_ENCODE_VIDEO,
"crop_start": 95,
},
"dit-llm-encode-i2v": {
"template": PROMPT_TEMPLATE_ENCODE_I2V,
"crop_start": 36,
"image_emb_start": 5,
"image_emb_end": 581,
"image_emb_len": 576,
"double_return_token_id": 271
},
"dit-llm-encode-video-i2v": {
"template": PROMPT_TEMPLATE_ENCODE_VIDEO_I2V,
"crop_start": 103,
"image_emb_start": 5,
"image_emb_end": 581,
"image_emb_len": 576,
"double_return_token_id": 271
},
}
# ======================= Model ======================
PRECISIONS = {"fp32", "fp16", "bf16"}
NORMALIZATION_TYPE = {"layer", "rms"}
ACTIVATION_TYPE = {"relu", "silu", "gelu", "gelu_tanh"}
# =================== Model Path =====================
MODEL_BASE = os.getenv("MODEL_BASE", "./ckpts")
# =================== Data =======================
DATA_TYPE = {"image", "video", "image_video"}
# 3D VAE
VAE_PATH = {"884-16c-hy": f"{MODEL_BASE}/hunyuan-video-i2v-720p/vae"}
# Text Encoder
TEXT_ENCODER_PATH = {
"clipL": f"{MODEL_BASE}/text_encoder_2",
"llm": f"{MODEL_BASE}/text_encoder",
"llm-i2v": f"{MODEL_BASE}/text_encoder_i2v",
}
# Tokenizer
TOKENIZER_PATH = {
"clipL": f"{MODEL_BASE}/text_encoder_2",
"llm": f"{MODEL_BASE}/text_encoder",
"llm-i2v": f"{MODEL_BASE}/text_encoder_i2v",
}
TEXT_PROJECTION = {
"linear", # Default, an nn.Linear() layer
"single_refiner", # Single TokenRefiner. Refer to LI-DiT
}
# Flow Matching path type
FLOW_PATH_TYPE = {
"linear", # Linear trajectory between noise and data
"gvp", # Generalized variance-preserving SDE
"vp", # Variance-preserving SDE
}
# Flow Matching predict type
FLOW_PREDICT_TYPE = {
"velocity", # Predict velocity
"score", # Predict score
"noise", # Predict noise
}
# Flow Matching loss weight
FLOW_LOSS_WEIGHT = {
"velocity", # Weight loss by velocity
"likelihood", # Weight loss by likelihood
}
# Flow Matching SNR type
FLOW_SNR_TYPE = {
"lognorm", # Log-normal SNR
"uniform", # Uniform SNR
}
# Flow Matching solvers
FLOW_SOLVER = {
"euler", # Euler solver
}
\ No newline at end of file
import random
import os
import io
import torch
import numpy as np
import json
import traceback
import time
import pyarrow as pa
from torch.utils.data import Dataset
class VideoDataset(Dataset):
def __init__(self,
data_jsons_path: str,
sample_n_frames: int = 129,
sample_stride: int = 1,
text_encoder=None,
text_encoder_2=None,
uncond_p=0.0,
args=None,
logger=None,
) -> None:
"""_summary_
Args:
data_jsons_path (str): input data json path
sample_n_frames (int, optional): training video length. Defaults to 129.
sample_stride (int, optional): video frame sample stride. Defaults to 1 (No strid).
text_encoder (_type_, optional): text encoder to tokenize. Defaults to None.
text_encoder_2 (_type_, optional): second text encoder to tokenize. Defaults to None.
uncond_p (float, optional): text uncondition prod. Defaults to 0.0.
args (_type_, optional): args. Defaults to None.
logger (_type_, optional): logger. Defaults to None.
"""
self.args = args
self.sample_n_frames = sample_n_frames
self.sample_stride = sample_stride
self.text_encoder = text_encoder
self.text_encoder_2 = text_encoder_2
self.uncond_p = uncond_p
if logger is None:
from loguru import logger
self.logger = logger
json_files = os.listdir(data_jsons_path)
video_id_list = []
latent_shape_list = []
prompt_list = []
npy_save_path_list = []
height_list = []
width_list = []
for json_file in json_files:
with open(f"{data_jsons_path}/{json_file}", 'r', encoding='utf-8-sig') as file:
data = json.load(file)
video_id = data.get('video_id')
latent_shape = data.get('latent_shape')
prompt = data.get('prompt')
npy_save_path = data.get('npy_save_path')
video_id_list.append(video_id)
latent_shape_list.append(latent_shape)
prompt_list.append(prompt)
npy_save_path_list.append(npy_save_path)
height_list.append(latent_shape[3])
width_list.append(latent_shape[4])
schema = pa.schema([
('video_id', pa.string()),
('latent_shape', pa.list_(pa.int64())),
('prompt', pa.string()),
('npy_save_path', pa.string()),
('height', pa.int64()),
('width', pa.int64()),
])
video_id_array = pa.array(video_id_list, type=pa.string())
latent_shape_array = pa.array(latent_shape_list, type=pa.list_(pa.int64()))
prompt_array = pa.array(prompt_list, type=pa.string())
npy_save_path_array = pa.array(npy_save_path_list, type=pa.string())
height_array = pa.array(height_list, type=pa.int64())
width_array = pa.array(width_list, type=pa.int64())
record_batch = pa.RecordBatch.from_arrays([video_id_array, latent_shape_array, prompt_array,
npy_save_path_array, height_array, width_array], schema=schema)
self.table = pa.Table.from_batches([record_batch])
s_time = time.time()
logger.info(f"load {data_jsons_path} \t cost {time.time() - s_time} s \t total length {len(self.table)}")
def __len__(self):
return len(self.table)
def get_data_info(self, index):
latent_shape = self.table['latent_shape'][index].as_py()
assert isinstance(latent_shape, list), "latent_shape must be list"
num_frames = latent_shape[-3]
height = latent_shape[-2]
width = latent_shape[-1]
num_frames = (num_frames - 1) * 4 + 1
return {'height': height,
'width': width,
'num_frames': num_frames}
@staticmethod
def get_text_tokens(text_encoder, description):
text_inputs = text_encoder.text2tokens(description, data_type='video')
text_ids = text_inputs["input_ids"].squeeze(0)
text_mask = text_inputs["attention_mask"].squeeze(0)
return text_ids, text_mask
def get_batch(self, idx):
videoid = self.table['video_id'][idx].as_py()
prompt = self.table['prompt'][idx].as_py()
pixel_values = torch.tensor(0)
if random.random() < self.uncond_p:
prompt = ''
text_ids, text_mask = self.get_text_tokens(self.text_encoder, prompt)
sample_n_frames = self.sample_n_frames
cache_path = self.table['npy_save_path'][idx].as_py()
latents = torch.from_numpy(np.load(cache_path)).squeeze(0)
sample_n_latent = (sample_n_frames - 1) // 4 + 1
start_idx = 0
latents = latents[:, start_idx:start_idx + sample_n_latent, ...]
if latents.shape[1] < sample_n_latent:
raise Exception(
f' videoid: {videoid} has wrong cache data for temporal buckets of shape {latents.shape}, expected length: {sample_n_latent}')
data_info = self.get_data_info(idx)
num_frames, height, width = data_info['num_frames'], data_info['height'], data_info['width']
kwargs = {
"text": prompt,
"index": idx,
"type": 'video',
'bucket': [num_frames, height, width],
"videoid": videoid
}
if self.text_encoder_2 is None:
return (
pixel_values,
latents,
text_ids.clone(),
text_mask.clone(),
{k: torch.as_tensor(v) if not isinstance(v, str) else v for k, v in kwargs.items()},
)
else:
text_ids_2, text_mask_2 = self.get_text_tokens(self.text_encoder_2, prompt)
return (
pixel_values,
latents,
text_ids.clone(),
text_mask.clone(),
text_ids_2.clone(),
text_mask_2.clone(),
{k: torch.as_tensor(v) if not isinstance(v, str) else v for k, v in kwargs.items()},
)
def __getitem__(self, idx):
try_times = 100
for i in range(try_times):
try:
return self.get_batch(idx)
except Exception as e:
self.logger.warning(
f"Error details: {str(e)}-{self.table['video_id'][idx]}-{traceback.format_exc()}\n")
idx = np.random.randint(len(self))
raise RuntimeError('Too many bad data.')
if __name__ == "__main__":
data_jsons_path = "test_path"
dataset = VideoDataset(args=None,
data_jsons_path=data_jsons_path)
print(dataset.__getitem__(0))
from .pipelines import HunyuanVideoPipeline
from .schedulers import FlowMatchDiscreteScheduler
from .flow.transport import *
def create_transport(
*,
path_type,
prediction,
loss_weight=None,
train_eps=None,
sample_eps=None,
snr_type="uniform",
shift=1.0,
video_shift=None,
reverse=False,
):
if prediction == "noise":
model_type = ModelType.NOISE
elif prediction == "score":
model_type = ModelType.SCORE
else:
model_type = ModelType.VELOCITY
if loss_weight == "velocity":
loss_type = WeightType.VELOCITY
elif loss_weight == "likelihood":
loss_type = WeightType.LIKELIHOOD
else:
loss_type = WeightType.NONE
if snr_type == "lognorm":
snr_type = SNRType.LOGNORM
elif snr_type == "uniform":
snr_type = SNRType.UNIFORM
else:
raise ValueError(f"Invalid snr type {snr_type}")
if video_shift is None:
video_shift = shift
path_choice = {
"linear": PathType.LINEAR,
"gvp": PathType.GVP,
"vp": PathType.VP,
}
path_type = path_choice[path_type.lower()]
if path_type in [PathType.VP]:
train_eps = 1e-5 if train_eps is None else train_eps
sample_eps = 1e-3 if train_eps is None else sample_eps
elif path_type in [PathType.GVP, PathType.LINEAR] and model_type != ModelType.VELOCITY:
train_eps = 1e-3 if train_eps is None else train_eps
sample_eps = 1e-3 if train_eps is None else sample_eps
else: # velocity & [GVP, LINEAR] is stable everywhere
train_eps = 0
sample_eps = 0
# create flow state
state = Transport(
model_type=model_type,
path_type=path_type,
loss_type=loss_type,
train_eps=train_eps,
sample_eps=sample_eps,
snr_type=snr_type,
shift=shift,
video_shift=video_shift,
reverse=reverse,
)
return state
def load_denoiser(args):
if args.denoise_type == "flow":
denoiser = create_transport(path_type=args.flow_path_type,
prediction=args.flow_predict_type,
loss_weight=args.flow_loss_weight,
train_eps=args.flow_train_eps,
sample_eps=args.flow_sample_eps,
snr_type=args.flow_snr_type,
shift=args.flow_shift,
video_shift=args.flow_shift,
reverse=args.flow_reverse,
)
else:
raise ValueError(f"Unknown denoise type: {args.denoise_type}")
return denoiser
\ No newline at end of file
from .transport import ModelType, PathType, Sampler, SNRType, Transport, WeightType
def create_transport(
path_type="linear",
prediction="velocity",
loss_weight=None,
train_eps=None,
sample_eps=None,
snr_type="uniform",
):
"""function for creating Transport object
**Note**: model prediction defaults to velocity
Args:
- path_type: type of path to use; default to linear
- learn_score: set model prediction to score
- learn_noise: set model prediction to noise
- velocity_weighted: weight loss by velocity weight
- likelihood_weighted: weight loss by likelihood weight
- train_eps: small epsilon for avoiding instability during training
- sample_eps: small epsilon for avoiding instability during sampling
"""
if prediction == "noise":
model_type = ModelType.NOISE
elif prediction == "score":
model_type = ModelType.SCORE
else:
model_type = ModelType.VELOCITY
if loss_weight == "velocity":
loss_type = WeightType.VELOCITY
elif loss_weight == "likelihood":
loss_type = WeightType.LIKELIHOOD
else:
loss_type = WeightType.NONE
if snr_type == "lognorm":
snr_type = SNRType.LOGNORM
elif snr_type == "uniform":
snr_type = SNRType.UNIFORM
else:
raise ValueError(f"Invalid snr type {snr_type}")
path_choice = {
"linear": PathType.LINEAR,
"gvp": PathType.GVP,
"vp": PathType.VP,
}
path_type = path_choice[path_type.lower()]
if path_type in [PathType.VP]:
train_eps = 1e-5 if train_eps is None else train_eps
sample_eps = 1e-3 if train_eps is None else sample_eps
elif path_type in [PathType.GVP, PathType.LINEAR] and model_type != ModelType.VELOCITY:
train_eps = 1e-3 if train_eps is None else train_eps
sample_eps = 1e-3 if train_eps is None else sample_eps
else: # velocity & [GVP, LINEAR] is stable everywhere
train_eps = 0
sample_eps = 0
# create flow state
state = Transport(
model_type=model_type,
path_type=path_type,
loss_type=loss_type,
train_eps=train_eps,
sample_eps=sample_eps,
snr_type=snr_type,
)
return state
import torch as th
class sde:
"""SDE solver class"""
def __init__(
self,
drift,
diffusion,
*,
t0,
t1,
num_steps,
sampler_type,
):
assert t0 < t1, "SDE sampler has to be in forward time"
self.num_timesteps = num_steps
self.t = th.linspace(t0, t1, num_steps)
self.dt = self.t[1] - self.t[0]
self.drift = drift
self.diffusion = diffusion
self.sampler_type = sampler_type
def __Euler_Maruyama_step(self, x, mean_x, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
t = th.ones(x.size(0)).to(x) * t
dw = w_cur * th.sqrt(self.dt)
drift = self.drift(x, t, model, **model_kwargs)
diffusion = self.diffusion(x, t)
mean_x = x + drift * self.dt
x = mean_x + th.sqrt(2 * diffusion) * dw
return x, mean_x
def __Heun_step(self, x, _, t, model, **model_kwargs):
w_cur = th.randn(x.size()).to(x)
dw = w_cur * th.sqrt(self.dt)
t_cur = th.ones(x.size(0)).to(x) * t
diffusion = self.diffusion(x, t_cur)
xhat = x + th.sqrt(2 * diffusion) * dw
K1 = self.drift(xhat, t_cur, model, **model_kwargs)
xp = xhat + self.dt * K1
K2 = self.drift(xp, t_cur + self.dt, model, **model_kwargs)
return (
xhat + 0.5 * self.dt * (K1 + K2),
xhat,
) # at last time point we do not perform the heun step
def __forward_fn(self):
"""TODO: generalize here by adding all private functions ending with steps to it"""
sampler_dict = {
"Euler": self.__Euler_Maruyama_step,
"Heun": self.__Heun_step,
}
try:
sampler = sampler_dict[self.sampler_type]
except:
raise NotImplementedError("Smapler type not implemented.")
return sampler
def sample(self, init, model, **model_kwargs):
"""forward loop of sde"""
x = init
mean_x = init
samples = []
sampler = self.__forward_fn()
for ti in self.t[:-1]:
with th.no_grad():
x, mean_x = sampler(x, mean_x, ti, model, **model_kwargs)
samples.append(x)
return samples
class ode:
"""ODE solver class"""
def __init__(
self,
drift,
*,
t0,
t1,
sampler_type,
num_steps,
atol,
rtol,
time_shifting_factor=None,
):
assert t0 < t1, "ODE sampler has to be in forward time"
self.drift = drift
self.t = th.linspace(t0, t1, num_steps)
if time_shifting_factor:
self.t = self.t / (self.t + time_shifting_factor - time_shifting_factor * self.t)
self.atol = atol
self.rtol = rtol
self.sampler_type = sampler_type
def sample(self, x, model, **model_kwargs):
from torchdiffeq import odeint
device = x[0].device if isinstance(x, tuple) else x.device
def _fn(t, x):
t = th.ones(x[0].size(0)).to(device) * t if isinstance(x, tuple) else th.ones(x.size(0)).to(device) * t
model_output = self.drift(x, t, model, **model_kwargs)
return model_output
t = self.t.to(device)
atol = [self.atol] * len(x) if isinstance(x, tuple) else [self.atol]
rtol = [self.rtol] * len(x) if isinstance(x, tuple) else [self.rtol]
samples = odeint(_fn, x, t, method=self.sampler_type, atol=atol, rtol=rtol)
return samples
def sample_with_step_fn(self, x, step_fn):
from torchdiffeq import odeint
device = x[0].device if isinstance(x, tuple) else x.device
t = self.t.to(device)
atol = [self.atol] * len(x) if isinstance(x, tuple) else [self.atol]
rtol = [self.rtol] * len(x) if isinstance(x, tuple) else [self.rtol]
samples = odeint(step_fn, x, t, method=self.sampler_type, atol=atol, rtol=rtol)
return samples
import numpy as np
import torch as th
def expand_t_like_x(t, x):
"""Function to reshape time t to broadcastable dimension of x
Args:
t: [batch_dim,], time vector
x: [batch_dim,...], data point
"""
dims = [1] * len(x[0].size())
t = t.view(t.size(0), *dims)
return t
class ICPlan:
"""Linear Coupling Plan"""
def __init__(self, sigma=0.0, reverse=False):
self.sigma = sigma
self.reverse = reverse
def compute_alpha_t(self, t):
"""Compute the data coefficient along the path"""
if self.reverse:
return 1 - t, -1
else:
return t, 1
def compute_sigma_t(self, t):
"""Compute the noise coefficient along the path"""
if self.reverse:
return t, 1
else:
return 1 - t, -1
def compute_d_alpha_alpha_ratio_t(self, t):
"""Compute the ratio between d_alpha and alpha"""
return 1 / t
def compute_drift(self, x, t):
"""We always output sde according to score parametrization;"""
t = expand_t_like_x(t, x)
alpha_ratio = self.compute_d_alpha_alpha_ratio_t(t)
sigma_t, d_sigma_t = self.compute_sigma_t(t)
drift = alpha_ratio * x
diffusion = alpha_ratio * (sigma_t**2) - sigma_t * d_sigma_t
return -drift, diffusion
def compute_diffusion(self, x, t, form="constant", norm=1.0):
"""Compute the diffusion term of the SDE
Args:
x: [batch_dim, ...], data point
t: [batch_dim,], time vector
form: str, form of the diffusion term
norm: float, norm of the diffusion term
"""
t = expand_t_like_x(t, x)
choices = {
"constant": norm,
"SBDM": norm * self.compute_drift(x, t)[1],
"sigma": norm * self.compute_sigma_t(t)[0],
"linear": norm * (1 - t),
"decreasing": 0.25 * (norm * th.cos(np.pi * t) + 1) ** 2,
"inccreasing-decreasing": norm * th.sin(np.pi * t) ** 2,
}
try:
diffusion = choices[form]
except KeyError:
raise NotImplementedError(f"Diffusion form {form} not implemented")
return diffusion
def get_score_from_velocity(self, velocity, x, t):
"""Wrapper function: transfrom velocity prediction model to score
Args:
velocity: [batch_dim, ...] shaped tensor; velocity model output
x: [batch_dim, ...] shaped tensor; x_t data point
t: [batch_dim,] time tensor
"""
t = expand_t_like_x(t, x)
alpha_t, d_alpha_t = self.compute_alpha_t(t)
sigma_t, d_sigma_t = self.compute_sigma_t(t)
mean = x
reverse_alpha_ratio = alpha_t / d_alpha_t
var = sigma_t**2 - reverse_alpha_ratio * d_sigma_t * sigma_t
score = (reverse_alpha_ratio * velocity - mean) / var
return score
def get_noise_from_velocity(self, velocity, x, t):
"""Wrapper function: transfrom velocity prediction model to denoiser
Args:
velocity: [batch_dim, ...] shaped tensor; velocity model output
x: [batch_dim, ...] shaped tensor; x_t data point
t: [batch_dim,] time tensor
"""
t = expand_t_like_x(t, x)
alpha_t, d_alpha_t = self.compute_alpha_t(t)
sigma_t, d_sigma_t = self.compute_sigma_t(t)
mean = x
reverse_alpha_ratio = alpha_t / d_alpha_t
var = reverse_alpha_ratio * d_sigma_t - sigma_t
noise = (reverse_alpha_ratio * velocity - mean) / var
return noise
def get_velocity_from_score(self, score, x, t):
"""Wrapper function: transfrom score prediction model to velocity
Args:
score: [batch_dim, ...] shaped tensor; score model output
x: [batch_dim, ...] shaped tensor; x_t data point
t: [batch_dim,] time tensor
"""
t = expand_t_like_x(t, x)
drift, var = self.compute_drift(x, t)
velocity = var * score - drift
return velocity
def compute_mu_t(self, t, x0, x1):
"""Compute the mean of time-dependent density p_t"""
t = expand_t_like_x(t, x1)
alpha_t, _ = self.compute_alpha_t(t)
sigma_t, _ = self.compute_sigma_t(t)
if isinstance(x1, (list, tuple)):
return [alpha_t[i] * x1[i] + sigma_t[i] * x0[i] for i in range(len(x1))]
else:
return alpha_t * x1 + sigma_t * x0
def compute_xt(self, t, x0, x1):
"""Sample xt from time-dependent density p_t; rng is required"""
xt = self.compute_mu_t(t, x0, x1)
return xt
def compute_ut(self, t, x0, x1, xt):
"""Compute the vector field corresponding to p_t"""
t = expand_t_like_x(t, x1)
_, d_alpha_t = self.compute_alpha_t(t)
_, d_sigma_t = self.compute_sigma_t(t)
if isinstance(x1, (list, tuple)):
return [d_alpha_t * x1[i] + d_sigma_t * x0[i] for i in range(len(x1))]
else:
return d_alpha_t * x1 + d_sigma_t * x0
def plan(self, t, x0, x1):
xt = self.compute_xt(t, x0, x1)
ut = self.compute_ut(t, x0, x1, xt)
return t, xt, ut
class VPCPlan(ICPlan):
"""class for VP path flow matching"""
def __init__(self, sigma_min=0.1, sigma_max=20.0, reverse=False):
self.sigma_min = sigma_min
self.sigma_max = sigma_max
self.log_mean_coeff = (
lambda t: -0.25 * ((1 - t) ** 2) * (self.sigma_max - self.sigma_min) - 0.5 * (1 - t) * self.sigma_min
)
self.d_log_mean_coeff = lambda t: 0.5 * (1 - t) * (self.sigma_max - self.sigma_min) + 0.5 * self.sigma_min
self.reverse = reverse
if self.reverse:
raise NotImplementedError("Reverse VPCPlan is not implemented")
def compute_alpha_t(self, t):
"""Compute coefficient of x1"""
alpha_t = self.log_mean_coeff(t)
alpha_t = th.exp(alpha_t)
d_alpha_t = alpha_t * self.d_log_mean_coeff(t)
return alpha_t, d_alpha_t
def compute_sigma_t(self, t):
"""Compute coefficient of x0"""
p_sigma_t = 2 * self.log_mean_coeff(t)
sigma_t = th.sqrt(1 - th.exp(p_sigma_t))
d_sigma_t = th.exp(p_sigma_t) * (2 * self.d_log_mean_coeff(t)) / (-2 * sigma_t)
return sigma_t, d_sigma_t
def compute_d_alpha_alpha_ratio_t(self, t):
"""Special purposed function for computing numerical stabled d_alpha_t / alpha_t"""
return self.d_log_mean_coeff(t)
def compute_drift(self, x, t):
"""Compute the drift term of the SDE"""
t = expand_t_like_x(t, x)
beta_t = self.sigma_min + (1 - t) * (self.sigma_max - self.sigma_min)
return -0.5 * beta_t * x, beta_t / 2
class GVPCPlan(ICPlan):
def __init__(self, sigma=0.0, reverse=False):
super().__init__(sigma)
if self.reverse:
raise NotImplementedError("Reverse GVPCPlan is not implemented")
def compute_alpha_t(self, t):
"""Compute coefficient of x1"""
alpha_t = th.sin(t * np.pi / 2)
d_alpha_t = np.pi / 2 * th.cos(t * np.pi / 2)
return alpha_t, d_alpha_t
def compute_sigma_t(self, t):
"""Compute coefficient of x0"""
sigma_t = th.cos(t * np.pi / 2)
d_sigma_t = -np.pi / 2 * th.sin(t * np.pi / 2)
return sigma_t, d_sigma_t
def compute_d_alpha_alpha_ratio_t(self, t):
"""Special purposed function for computing numerical stabled d_alpha_t / alpha_t"""
return np.pi / (2 * th.tan(t * np.pi / 2))
import enum
import math
from typing import Callable
import copy
import numpy as np
import torch as th
from . import path
from .integrators import ode, sde
from .utils import mean_flat
from hyvideo.constants import PRECISION_TO_TYPE
__all__ = ["ModelType", "PathType", "WeightType", "Transport", "Sampler", "SNRType"]
class ModelType(enum.Enum):
"""
Which type of output the model predicts.
"""
NOISE = enum.auto() # the model predicts epsilon
SCORE = enum.auto() # the model predicts \nabla \log p(x)
VELOCITY = enum.auto() # the model predicts v(x)
class PathType(enum.Enum):
"""
Which type of path to use.
"""
LINEAR = enum.auto()
GVP = enum.auto()
VP = enum.auto()
class WeightType(enum.Enum):
"""
Which type of weighting to use.
"""
NONE = enum.auto()
VELOCITY = enum.auto()
LIKELIHOOD = enum.auto()
class SNRType(enum.Enum):
UNIFORM = enum.auto()
LOGNORM = enum.auto()
def get_lin_function(
x1: float = 256, y1: float = 0.5, x2: float = 4096, y2: float = 1.15
) -> Callable[[float], float]:
m = (y2 - y1) / (x2 - x1)
b = y1 - m * x1
return lambda x: m * x + b
def time_shift(mu: float, sigma: float, t: th.Tensor):
return math.exp(mu) / (math.exp(mu) + (1 / t - 1) ** sigma)
class Transport:
def __init__(self, *, model_type, path_type, loss_type, train_eps, sample_eps, snr_type,
training_timesteps=1000, reverse_time_schedule=False, shift=1.0, video_shift=None, reverse=False,
):
path_options = {
PathType.LINEAR: path.ICPlan,
PathType.GVP: path.GVPCPlan,
PathType.VP: path.VPCPlan,
}
self.loss_type = loss_type
self.model_type = model_type
self.path_sampler = path_options[path_type](reverse=reverse)
self.train_eps = train_eps
self.sample_eps = sample_eps
self.snr_type = snr_type
# timestep shift: http://arxiv.org/abs/2403.03206
self.shift = shift # flow matching shift factor, =sqrt(m/n)
if video_shift is None: video_shift = shift # if video shift is not given, set it to be the same as flow shift
self.video_shift = video_shift
self.reverse = reverse
self.training_timesteps = training_timesteps
self.reverse_time_schedule = reverse_time_schedule
def prior_logp(self, z):
"""
Standard multivariate normal prior
Assume z is batched
"""
shape = th.tensor(z.size())
N = th.prod(shape[1:])
_fn = lambda x: -N / 2.0 * np.log(2 * np.pi) - th.sum(x**2) / 2.0
return th.vmap(_fn)(z)
def check_interval(
self,
train_eps,
sample_eps,
*,
diffusion_form="SBDM",
sde=False,
reverse=False,
eval=False,
last_step_size=0.0,
):
t0 = 0
t1 = 1
eps = train_eps if not eval else sample_eps
if type(self.path_sampler) in [path.VPCPlan]:
t1 = 1 - eps if (not sde or last_step_size == 0) else 1 - last_step_size
elif (type(self.path_sampler) in [path.ICPlan, path.GVPCPlan]) and (
self.model_type != ModelType.VELOCITY or sde
): # avoid numerical issue by taking a first semi-implicit step
t0 = eps if (diffusion_form == "SBDM" and sde) or self.model_type != ModelType.VELOCITY else 0
t1 = 1 - eps if (not sde or last_step_size == 0) else 1 - last_step_size
if reverse:
t0, t1 = 1 - t0, 1 - t1
return t0, t1
def sample(self, x1, n_tokens=None):
"""Sampling x0 & t based on shape of x1 (if needed)
Args:
x1 - data point; [batch, *dim]
"""
if isinstance(x1, (list, tuple)):
x0 = [th.randn_like(img_start) for img_start in x1]
else:
x0 = th.randn_like(x1)
t0, t1 = self.check_interval(self.train_eps, self.sample_eps)
if self.snr_type == SNRType.UNIFORM:
t = th.rand((len(x1),)) * (t1 - t0) + t0
elif self.snr_type == SNRType.LOGNORM:
u = th.normal(mean=0.0, std=1.0, size=(len(x1),))
t = 1 / (1 + th.exp(-u)) * (t1 - t0) + t0
else:
raise ValueError(f"Unknown snr type: {self.snr_type}")
if self.shift != 1.:
if self.reverse:
# xt = (1 - t) * x1 + t * x0
t = (self.shift * t) / (1 + (self.shift - 1) * t)
else:
# xt = t * x1 + (1 - t) * x0
t = t / (self.shift - (self.shift - 1) * t)
t = t.to(x1[0])
return t, x0, x1
def get_model_t(self, t):
if self.reverse_time_schedule:
return (1 - t) * self.training_timesteps
else:
return t * self.training_timesteps
def training_losses(self, model, x1, model_kwargs=None, timestep=None, n_tokens=None,
i2v_mode=False, cond_latents=None, args=None):
self.shift = self.video_shift
if model_kwargs == None:
model_kwargs = {}
t, x0, x1 = self.sample(x1, n_tokens)
if timestep is not None:
t = th.ones_like(t) * timestep
t, xt, ut = self.path_sampler.plan(t, x0, x1)
input_t = self.get_model_t(t)
if i2v_mode and args.i2v_condition_type == "latent_concat":
if cond_latents is not None:
x1_concat = cond_latents.repeat(1,1,x1.shape[2],1,1)
x1_concat[:, :, 1:, :, :] = 0.0
else:
x1_concat = x1.cpu().clone().to(device=x1.device)
x1_concat[:, :, 1:, :, :] = 0.0
mask_concat = th.ones(x1.shape[0], 1, x1.shape[2], x1.shape[3], x1.shape[4]).to(device=x1.device)
mask_concat[:, :, 1:, ...] = 0.0
xt = th.concat([xt, x1_concat, mask_concat], dim=1)
elif i2v_mode and args.i2v_condition_type == "token_replace":
xt = th.concat([cond_latents, xt[:, :, 1:, :, :]], dim=2)
guidance_expand = (
th.tensor(
[args.embedded_cfg_scale] * x1.shape[0],
dtype=th.float32,
device=x1.device,
).to(PRECISION_TO_TYPE[args.precision])
* 1000.0
if args.embedded_cfg_scale is not None
else None
)
model_kwargs["guidance"] = guidance_expand
model_output = model(xt, input_t, **model_kwargs)['x']
if i2v_mode and args.i2v_condition_type == "token_replace":
assert self.model_type == ModelType.VELOCITY, f"self.model_type: {self.model_type} must be ModelType.VELOCITY"
model_output = model_output[:, :, 1:, :, :]
ut = ut[:, :, 1:, :, :]
if not i2v_mode:
assert model_output.size() == xt.size(), f"Output shape from model does not match input shape: " \
f"{model_output.size()} != {xt.size()}"
terms = {}
if self.model_type == ModelType.VELOCITY:
terms["loss"] = mean_flat(((model_output - ut) ** 2))
else:
_, drift_var = self.path_sampler.compute_drift(xt, t)
sigma_t, _ = self.path_sampler.compute_sigma_t(path.expand_t_like_x(t, xt))
if self.loss_type in [WeightType.VELOCITY]:
weight = (drift_var / sigma_t) ** 2
elif self.loss_type in [WeightType.LIKELIHOOD]:
weight = drift_var / (sigma_t ** 2)
elif self.loss_type in [WeightType.NONE]:
weight = 1
else:
raise NotImplementedError()
if self.model_type == ModelType.NOISE:
terms['loss'] = mean_flat(weight * ((model_output - x0) ** 2))
else:
terms['loss'] = mean_flat(weight * ((model_output * sigma_t + x0) ** 2))
return model_output, terms
def get_drift(self):
"""member function for obtaining the drift of the probability flow ODE"""
def score_ode(x, t, model, **model_kwargs):
drift_mean, drift_var = self.path_sampler.compute_drift(x, t)
model_output = model(x, t, **model_kwargs)
return -drift_mean + drift_var * model_output # by change of variable
def noise_ode(x, t, model, **model_kwargs):
drift_mean, drift_var = self.path_sampler.compute_drift(x, t)
sigma_t, _ = self.path_sampler.compute_sigma_t(path.expand_t_like_x(t, x))
model_output = model(x, t, **model_kwargs)
score = model_output / -sigma_t
return -drift_mean + drift_var * score
def velocity_ode(x, t, model, **model_kwargs):
model_output = model(x, t, **model_kwargs)
return model_output
if self.model_type == ModelType.NOISE:
drift_fn = noise_ode
elif self.model_type == ModelType.SCORE:
drift_fn = score_ode
else:
drift_fn = velocity_ode
def body_fn(x, t, model, **model_kwargs):
model_output = drift_fn(x, t, model, **model_kwargs)
assert model_output.shape == x.shape, "Output shape from ODE solver must match input shape"
return model_output
return body_fn
def get_score(
self,
):
"""member function for obtaining score of
x_t = alpha_t * x + sigma_t * eps"""
if self.model_type == ModelType.NOISE:
score_fn = (
lambda x, t, model, **kwargs: model(x, t, **kwargs)
/ -self.path_sampler.compute_sigma_t(path.expand_t_like_x(t, x))[0]
)
elif self.model_type == ModelType.SCORE:
score_fn = lambda x, t, model, **kwagrs: model(x, t, **kwagrs)
elif self.model_type == ModelType.VELOCITY:
score_fn = lambda x, t, model, **kwargs: self.path_sampler.get_score_from_velocity(
model(x, t, **kwargs), x, t
)
else:
raise NotImplementedError()
return score_fn
class Sampler:
"""Sampler class for the transport model"""
def __init__(
self,
transport,
):
"""Constructor for a general sampler; supporting different sampling methods
Args:
- transport: an tranport object specify model prediction & interpolant type
"""
self.transport = transport
self.drift = self.transport.get_drift()
self.score = self.transport.get_score()
def __get_sde_diffusion_and_drift(
self,
*,
diffusion_form="SBDM",
diffusion_norm=1.0,
):
def diffusion_fn(x, t):
diffusion = self.transport.path_sampler.compute_diffusion(x, t, form=diffusion_form, norm=diffusion_norm)
return diffusion
sde_drift = lambda x, t, model, **kwargs: self.drift(x, t, model, **kwargs) + diffusion_fn(x, t) * self.score(
x, t, model, **kwargs
)
sde_diffusion = diffusion_fn
return sde_drift, sde_diffusion
def __get_last_step(
self,
sde_drift,
*,
last_step,
last_step_size,
):
"""Get the last step function of the SDE solver"""
if last_step is None:
last_step_fn = lambda x, t, model, **model_kwargs: x
elif last_step == "Mean":
last_step_fn = (
lambda x, t, model, **model_kwargs: x + sde_drift(x, t, model, **model_kwargs) * last_step_size
)
elif last_step == "Tweedie":
alpha = self.transport.path_sampler.compute_alpha_t # simple aliasing; the original name was too long
sigma = self.transport.path_sampler.compute_sigma_t
last_step_fn = lambda x, t, model, **model_kwargs: x / alpha(t)[0][0] + (sigma(t)[0][0] ** 2) / alpha(t)[0][
0
] * self.score(x, t, model, **model_kwargs)
elif last_step == "Euler":
last_step_fn = (
lambda x, t, model, **model_kwargs: x + self.drift(x, t, model, **model_kwargs) * last_step_size
)
else:
raise NotImplementedError()
return last_step_fn
def sample_sde(
self,
*,
sampling_method="Euler",
diffusion_form="SBDM",
diffusion_norm=1.0,
last_step="Mean",
last_step_size=0.04,
num_steps=250,
):
"""returns a sampling function with given SDE settings
Args:
- sampling_method: type of sampler used in solving the SDE; default to be Euler-Maruyama
- diffusion_form: function form of diffusion coefficient; default to be matching SBDM
- diffusion_norm: function magnitude of diffusion coefficient; default to 1
- last_step: type of the last step; default to identity
- last_step_size: size of the last step; default to match the stride of 250 steps over [0,1]
- num_steps: total integration step of SDE
"""
if last_step is None:
last_step_size = 0.0
sde_drift, sde_diffusion = self.__get_sde_diffusion_and_drift(
diffusion_form=diffusion_form,
diffusion_norm=diffusion_norm,
)
t0, t1 = self.transport.check_interval(
self.transport.train_eps,
self.transport.sample_eps,
diffusion_form=diffusion_form,
sde=True,
eval=True,
reverse=False,
last_step_size=last_step_size,
)
_sde = sde(
sde_drift,
sde_diffusion,
t0=t0,
t1=t1,
num_steps=num_steps,
sampler_type=sampling_method,
)
last_step_fn = self.__get_last_step(sde_drift, last_step=last_step, last_step_size=last_step_size)
def _sample(init, model, **model_kwargs):
xs = _sde.sample(init, model, **model_kwargs)
ts = th.ones(init.size(0), device=init.device) * t1
x = last_step_fn(xs[-1], ts, model, **model_kwargs)
xs.append(x)
assert len(xs) == num_steps, "Samples does not match the number of steps"
return xs
return _sample
def sample_ode(
self,
*,
sampling_method="dopri5",
num_steps=50,
atol=1e-6,
rtol=1e-3,
reverse=False,
time_shifting_factor=None,
):
"""returns a sampling function with given ODE settings
Args:
- sampling_method: type of sampler used in solving the ODE; default to be Dopri5
- num_steps:
- fixed solver (Euler, Heun): the actual number of integration steps performed
- adaptive solver (Dopri5): the number of datapoints saved during integration; produced by interpolation
- atol: absolute error tolerance for the solver
- rtol: relative error tolerance for the solver
- reverse: whether solving the ODE in reverse (data to noise); default to False
"""
if reverse:
drift = lambda x, t, model, **kwargs: self.drift(x, th.ones_like(t) * (1 - t), model, **kwargs)
else:
drift = self.drift
t0, t1 = self.transport.check_interval(
self.transport.train_eps,
self.transport.sample_eps,
sde=False,
eval=True,
reverse=reverse,
last_step_size=0.0,
)
_ode = ode(
drift=drift,
t0=t0,
t1=t1,
sampler_type=sampling_method,
num_steps=num_steps,
atol=atol,
rtol=rtol,
time_shifting_factor=time_shifting_factor,
)
self.ode = _ode
return _ode.sample
def sample_ode_likelihood(
self,
*,
sampling_method="dopri5",
num_steps=50,
atol=1e-6,
rtol=1e-3,
):
"""returns a sampling function for calculating likelihood with given ODE settings
Args:
- sampling_method: type of sampler used in solving the ODE; default to be Dopri5
- num_steps:
- fixed solver (Euler, Heun): the actual number of integration steps performed
- adaptive solver (Dopri5): the number of datapoints saved during integration; produced by interpolation
- atol: absolute error tolerance for the solver
- rtol: relative error tolerance for the solver
"""
def _likelihood_drift(x, t, model, **model_kwargs):
x, _ = x
eps = th.randint(2, x.size(), dtype=th.float, device=x.device) * 2 - 1
t = th.ones_like(t) * (1 - t)
with th.enable_grad():
x.requires_grad = True
grad = th.autograd.grad(th.sum(self.drift(x, t, model, **model_kwargs) * eps), x)[0]
logp_grad = th.sum(grad * eps, dim=tuple(range(1, len(x.size()))))
drift = self.drift(x, t, model, **model_kwargs)
return (-drift, logp_grad)
t0, t1 = self.transport.check_interval(
self.transport.train_eps,
self.transport.sample_eps,
sde=False,
eval=True,
reverse=False,
last_step_size=0.0,
)
_ode = ode(
drift=_likelihood_drift,
t0=t0,
t1=t1,
sampler_type=sampling_method,
num_steps=num_steps,
atol=atol,
rtol=rtol,
)
def _sample_fn(x, model, **model_kwargs):
init_logp = th.zeros(x.size(0)).to(x)
input = (x, init_logp)
drift, delta_logp = _ode.sample(input, model, **model_kwargs)
drift, delta_logp = drift[-1], delta_logp[-1]
prior_logp = self.transport.prior_logp(drift)
logp = prior_logp - delta_logp
return logp, drift
return _sample_fn
import torch as th
class EasyDict:
def __init__(self, sub_dict):
for k, v in sub_dict.items():
setattr(self, k, v)
def __getitem__(self, key):
return getattr(self, key)
def mean_flat(x):
"""
Take the mean over all non-batch dimensions.
"""
return th.mean(x, dim=list(range(1, len(x.size()))))
def log_state(state):
result = []
sorted_state = dict(sorted(state.items()))
for key, value in sorted_state.items():
# Check if the value is an instance of a class
if "<object" in str(value) or "object at" in str(value):
result.append(f"{key}: [{value.__class__.__name__}]")
else:
result.append(f"{key}: {value}")
return "\n".join(result)
from .pipeline_hunyuan_video import HunyuanVideoPipeline
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#
# Modified from diffusers==0.29.2
#
# ==============================================================================
import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
import torch
import torch.distributed as dist
import numpy as np
from dataclasses import dataclass
from packaging import version
from diffusers.callbacks import MultiPipelineCallbacks, PipelineCallback
from diffusers.configuration_utils import FrozenDict
from diffusers.image_processor import VaeImageProcessor
from diffusers.loaders import LoraLoaderMixin, TextualInversionLoaderMixin
from diffusers.models import AutoencoderKL
from diffusers.models.lora import adjust_lora_scale_text_encoder
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (
USE_PEFT_BACKEND,
deprecate,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
from diffusers.utils.torch_utils import randn_tensor
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.utils import BaseOutput
from ...constants import PRECISION_TO_TYPE
from ...vae.autoencoder_kl_causal_3d import AutoencoderKLCausal3D
from ...text_encoder import TextEncoder
from ...modules import HYVideoDiffusionTransformer
from ...utils.data_utils import black_image
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
EXAMPLE_DOC_STRING = """"""
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4
"""
std_text = noise_pred_text.std(
dim=list(range(1, noise_pred_text.ndim)), keepdim=True
)
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# rescale the results from guidance (fixes overexposure)
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# mix with the original results from guidance by factor guidance_rescale to avoid "plain looking" images
noise_cfg = (
guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
)
return noise_cfg
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None and sigmas is not None:
raise ValueError(
"Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values"
)
if timesteps is not None:
accepts_timesteps = "timesteps" in set(
inspect.signature(scheduler.set_timesteps).parameters.keys()
)
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
elif sigmas is not None:
accept_sigmas = "sigmas" in set(
inspect.signature(scheduler.set_timesteps).parameters.keys()
)
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
@dataclass
class HunyuanVideoPipelineOutput(BaseOutput):
videos: Union[torch.Tensor, np.ndarray]
class HunyuanVideoPipeline(DiffusionPipeline):
r"""
Pipeline for text-to-video generation using HunyuanVideo.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
text_encoder ([`TextEncoder`]):
Frozen text-encoder.
text_encoder_2 ([`TextEncoder`]):
Frozen text-encoder_2.
transformer ([`HYVideoDiffusionTransformer`]):
A `HYVideoDiffusionTransformer` to denoise the encoded video latents.
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents.
"""
model_cpu_offload_seq = "text_encoder->text_encoder_2->transformer->vae"
_optional_components = ["text_encoder_2"]
_exclude_from_cpu_offload = ["transformer"]
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
def __init__(
self,
vae: AutoencoderKL,
text_encoder: TextEncoder,
transformer: HYVideoDiffusionTransformer,
scheduler: KarrasDiffusionSchedulers,
text_encoder_2: Optional[TextEncoder] = None,
progress_bar_config: Dict[str, Any] = None,
args=None,
):
super().__init__()
# ==========================================================================================
if progress_bar_config is None:
progress_bar_config = {}
if not hasattr(self, "_progress_bar_config"):
self._progress_bar_config = {}
self._progress_bar_config.update(progress_bar_config)
self.args = args
# ==========================================================================================
if (
hasattr(scheduler.config, "steps_offset")
and scheduler.config.steps_offset != 1
):
deprecation_message = (
f"The configuration file of this scheduler: {scheduler} is outdated. `steps_offset`"
f" should be set to 1 instead of {scheduler.config.steps_offset}. Please make sure "
"to update the config accordingly as leaving `steps_offset` might led to incorrect results"
" in future versions. If you have downloaded this checkpoint from the Hugging Face Hub,"
" it would be very nice if you could open a Pull request for the `scheduler/scheduler_config.json`"
" file"
)
deprecate(
"steps_offset!=1", "1.0.0", deprecation_message, standard_warn=False
)
new_config = dict(scheduler.config)
new_config["steps_offset"] = 1
scheduler._internal_dict = FrozenDict(new_config)
if (
hasattr(scheduler.config, "clip_sample")
and scheduler.config.clip_sample is True
):
deprecation_message = (
f"The configuration file of this scheduler: {scheduler} has not set the configuration `clip_sample`."
" `clip_sample` should be set to False in the configuration file. Please make sure to update the"
" config accordingly as not setting `clip_sample` in the config might lead to incorrect results in"
" future versions. If you have downloaded this checkpoint from the Hugging Face Hub, it would be very"
" nice if you could open a Pull request for the `scheduler/scheduler_config.json` file"
)
deprecate(
"clip_sample not set", "1.0.0", deprecation_message, standard_warn=False
)
new_config = dict(scheduler.config)
new_config["clip_sample"] = False
scheduler._internal_dict = FrozenDict(new_config)
self.register_modules(
vae=vae,
text_encoder=text_encoder,
transformer=transformer,
scheduler=scheduler,
text_encoder_2=text_encoder_2,
)
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def encode_prompt(
self,
prompt,
device,
num_videos_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
negative_attention_mask: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
clip_skip: Optional[int] = None,
text_encoder: Optional[TextEncoder] = None,
data_type: Optional[str] = "image",
semantic_images=None
):
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
device: (`torch.device`):
torch device
num_videos_per_prompt (`int`):
number of videos that should be generated per prompt
do_classifier_free_guidance (`bool`):
whether to use classifier free guidance or not
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the video generation. If not defined, one has to pass
`negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is
less than `1`).
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
attention_mask (`torch.Tensor`, *optional*):
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input
argument.
negative_attention_mask (`torch.Tensor`, *optional*):
lora_scale (`float`, *optional*):
A LoRA scale that will be applied to all LoRA layers of the text encoder if LoRA layers are loaded.
clip_skip (`int`, *optional*):
Number of layers to be skipped from CLIP while computing the prompt embeddings. A value of 1 means that
the output of the pre-final layer will be used for computing the prompt embeddings.
text_encoder (TextEncoder, *optional*):
data_type (`str`, *optional*):
"""
if text_encoder is None:
text_encoder = self.text_encoder
# set lora scale so that monkey patched LoRA
# function of text encoder can correctly access it
if lora_scale is not None and isinstance(self, LoraLoaderMixin):
self._lora_scale = lora_scale
# dynamically adjust the LoRA scale
if not USE_PEFT_BACKEND:
adjust_lora_scale_text_encoder(text_encoder.model, lora_scale)
else:
scale_lora_layers(text_encoder.model, lora_scale)
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
# textual inversion: process multi-vector tokens if necessary
if isinstance(self, TextualInversionLoaderMixin):
prompt = self.maybe_convert_prompt(prompt, text_encoder.tokenizer)
text_inputs = text_encoder.text2tokens(prompt, data_type=data_type)
if clip_skip is None:
prompt_outputs = text_encoder.encode(
text_inputs, data_type=data_type, semantic_images=semantic_images, device=device
)
prompt_embeds = prompt_outputs.hidden_state
else:
prompt_outputs = text_encoder.encode(
text_inputs,
output_hidden_states=True,
data_type=data_type,
semantic_images=semantic_images,
device=device,
)
# Access the `hidden_states` first, that contains a tuple of
# all the hidden states from the encoder layers. Then index into
# the tuple to access the hidden states from the desired layer.
prompt_embeds = prompt_outputs.hidden_states_list[-(clip_skip + 1)]
# We also need to apply the final LayerNorm here to not mess with the
# representations. The `last_hidden_states` that we typically use for
# obtaining the final prompt representations passes through the LayerNorm
# layer.
prompt_embeds = text_encoder.model.text_model.final_layer_norm(
prompt_embeds
)
attention_mask = prompt_outputs.attention_mask
if attention_mask is not None:
attention_mask = attention_mask.to(device)
bs_embed, seq_len = attention_mask.shape
attention_mask = attention_mask.repeat(1, num_videos_per_prompt)
attention_mask = attention_mask.view(
bs_embed * num_videos_per_prompt, seq_len
)
if text_encoder is not None:
prompt_embeds_dtype = text_encoder.dtype
elif self.transformer is not None:
prompt_embeds_dtype = self.transformer.dtype
else:
prompt_embeds_dtype = prompt_embeds.dtype
prompt_embeds = prompt_embeds.to(dtype=prompt_embeds_dtype, device=device)
if prompt_embeds.ndim == 2:
bs_embed, _ = prompt_embeds.shape
# duplicate text embeddings for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt)
prompt_embeds = prompt_embeds.view(bs_embed * num_videos_per_prompt, -1)
else:
bs_embed, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
prompt_embeds = prompt_embeds.view(
bs_embed * num_videos_per_prompt, seq_len, -1
)
# get unconditional embeddings for classifier free guidance
if do_classifier_free_guidance and negative_prompt_embeds is None:
uncond_tokens: List[str]
if negative_prompt is None:
uncond_tokens = [""] * batch_size
elif prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif isinstance(negative_prompt, str):
uncond_tokens = [negative_prompt]
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
else:
uncond_tokens = negative_prompt
# textual inversion: process multi-vector tokens if necessary
if isinstance(self, TextualInversionLoaderMixin):
uncond_tokens = self.maybe_convert_prompt(
uncond_tokens, text_encoder.tokenizer
)
# max_length = prompt_embeds.shape[1]
uncond_input = text_encoder.text2tokens(uncond_tokens, data_type=data_type)
if semantic_images is not None:
uncond_image = [black_image(img.size[0], img.size[1]) for img in semantic_images]
else:
uncond_image = None
negative_prompt_outputs = text_encoder.encode(
uncond_input, data_type=data_type, semantic_images=uncond_image, device=device
)
negative_prompt_embeds = negative_prompt_outputs.hidden_state
negative_attention_mask = negative_prompt_outputs.attention_mask
if negative_attention_mask is not None:
negative_attention_mask = negative_attention_mask.to(device)
_, seq_len = negative_attention_mask.shape
negative_attention_mask = negative_attention_mask.repeat(
1, num_videos_per_prompt
)
negative_attention_mask = negative_attention_mask.view(
batch_size * num_videos_per_prompt, seq_len
)
if do_classifier_free_guidance:
# duplicate unconditional embeddings for each generation per prompt, using mps friendly method
seq_len = negative_prompt_embeds.shape[1]
negative_prompt_embeds = negative_prompt_embeds.to(
dtype=prompt_embeds_dtype, device=device
)
if negative_prompt_embeds.ndim == 2:
negative_prompt_embeds = negative_prompt_embeds.repeat(
1, num_videos_per_prompt
)
negative_prompt_embeds = negative_prompt_embeds.view(
batch_size * num_videos_per_prompt, -1
)
else:
negative_prompt_embeds = negative_prompt_embeds.repeat(
1, num_videos_per_prompt, 1
)
negative_prompt_embeds = negative_prompt_embeds.view(
batch_size * num_videos_per_prompt, seq_len, -1
)
if text_encoder is not None:
if isinstance(self, LoraLoaderMixin) and USE_PEFT_BACKEND:
# Retrieve the original scale by scaling back the LoRA layers
unscale_lora_layers(text_encoder.model, lora_scale)
return (
prompt_embeds,
negative_prompt_embeds,
attention_mask,
negative_attention_mask,
)
def decode_latents(self, latents, enable_tiling=True):
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
latents = 1 / self.vae.config.scaling_factor * latents
if enable_tiling:
self.vae.enable_tiling()
image = self.vae.decode(latents, return_dict=False)[0]
else:
image = self.vae.decode(latents, return_dict=False)[0]
image = (image / 2 + 0.5).clamp(0, 1)
# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16
if image.ndim == 4:
image = image.cpu().permute(0, 2, 3, 1).float()
else:
image = image.cpu().float()
return image
def prepare_extra_func_kwargs(self, func, kwargs):
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
extra_step_kwargs = {}
for k, v in kwargs.items():
accepts = k in set(inspect.signature(func).parameters.keys())
if accepts:
extra_step_kwargs[k] = v
return extra_step_kwargs
def check_inputs(
self,
prompt,
height,
width,
video_length,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
vae_ver="88-4c-sd",
):
if height % 8 != 0 or width % 8 != 0:
raise ValueError(
f"`height` and `width` have to be divisible by 8 but are {height} and {width}."
)
if video_length is not None:
if "884" in vae_ver:
if video_length != 1 and (video_length - 1) % 4 != 0:
raise ValueError(
f"`video_length` has to be 1 or a multiple of 4 but is {video_length}."
)
elif "888" in vae_ver:
if video_length != 1 and (video_length - 1) % 8 != 0:
raise ValueError(
f"`video_length` has to be 1 or a multiple of 8 but is {video_length}."
)
if callback_steps is not None and (
not isinstance(callback_steps, int) or callback_steps <= 0
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs
for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
elif prompt is not None and (
not isinstance(prompt, str) and not isinstance(prompt, list)
):
raise ValueError(
f"`prompt` has to be of type `str` or `list` but is {type(prompt)}"
)
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
def prepare_latents(
self,
batch_size,
num_channels_latents,
height,
width,
video_length,
dtype,
device,
generator,
latents=None,
img_latents=None,
i2v_mode=False,
i2v_condition_type=None,
i2v_stability=True,
):
if i2v_mode and i2v_condition_type == "latent_concat":
num_channels_latents = (num_channels_latents - 1) // 2
shape = (
batch_size,
num_channels_latents,
video_length,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
if i2v_mode and i2v_stability:
if img_latents.shape[2] == 1:
img_latents = img_latents.repeat(1, 1, video_length, 1, 1)
x0 = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
x1 = img_latents
t = torch.tensor([0.999]).to(device=device)
latents = x0 * t + x1 * (1 - t)
latents = latents.to(dtype=dtype)
if latents is None:
latents = randn_tensor(
shape, generator=generator, device=device, dtype=dtype
)
else:
latents = latents.to(device)
# Check existence to make it compatible with FlowMatchEulerDiscreteScheduler
if hasattr(self.scheduler, "init_noise_sigma"):
# scale the initial noise by the standard deviation required by the scheduler
latents = latents * self.scheduler.init_noise_sigma
return latents
# Copied from diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img.LatentConsistencyModelPipeline.get_guidance_scale_embedding
def get_guidance_scale_embedding(
self,
w: torch.Tensor,
embedding_dim: int = 512,
dtype: torch.dtype = torch.float32,
) -> torch.Tensor:
"""
See https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
Args:
w (`torch.Tensor`):
Generate embedding vectors with a specified guidance scale to subsequently enrich timestep embeddings.
embedding_dim (`int`, *optional*, defaults to 512):
Dimension of the embeddings to generate.
dtype (`torch.dtype`, *optional*, defaults to `torch.float32`):
Data type of the generated embeddings.
Returns:
`torch.Tensor`: Embedding vectors with shape `(len(w), embedding_dim)`.
"""
assert len(w.shape) == 1
w = w * 1000.0
half_dim = embedding_dim // 2
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
emb = w.to(dtype)[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
assert emb.shape == (w.shape[0], embedding_dim)
return emb
@property
def guidance_scale(self):
return self._guidance_scale
@property
def guidance_rescale(self):
return self._guidance_rescale
@property
def clip_skip(self):
return self._clip_skip
# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
# corresponds to doing no classifier free guidance.
@property
def do_classifier_free_guidance(self):
# return self._guidance_scale > 1 and self.transformer.config.time_cond_proj_dim is None
return self._guidance_scale > 1
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
@property
def num_timesteps(self):
return self._num_timesteps
@property
def interrupt(self):
return self._interrupt
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
self,
prompt: Union[str, List[str]],
height: int,
width: int,
video_length: int,
data_type: str = "video",
num_inference_steps: int = 50,
timesteps: List[int] = None,
sigmas: List[float] = None,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_videos_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
negative_attention_mask: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
guidance_rescale: float = 0.0,
clip_skip: Optional[int] = None,
callback_on_step_end: Optional[
Union[
Callable[[int, int, Dict], None],
PipelineCallback,
MultiPipelineCallbacks,
]
] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
freqs_cis: Tuple[torch.Tensor, torch.Tensor] = None,
vae_ver: str = "88-4c-sd",
enable_tiling: bool = False,
n_tokens: Optional[int] = None,
embedded_guidance_scale: Optional[float] = None,
i2v_mode: bool = False,
i2v_condition_type: str = None,
i2v_stability: bool = True,
img_latents: Optional[torch.Tensor] = None,
semantic_images=None,
**kwargs,
):
r"""
The call function to the pipeline for generation.
Args:
prompt (`str` or `List[str]`):
The prompt or prompts to guide image generation. If not defined, you need to pass `prompt_embeds`.
height (`int`):
The height in pixels of the generated image.
width (`int`):
The width in pixels of the generated image.
video_length (`int`):
The number of frames in the generated video.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
timesteps (`List[int]`, *optional*):
Custom timesteps to use for the denoising process with schedulers which support a `timesteps` argument
in their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is
passed will be used. Must be in descending order.
sigmas (`List[float]`, *optional*):
Custom sigmas to use for the denoising process with schedulers which support a `sigmas` argument in
their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is passed
will be used.
guidance_scale (`float`, *optional*, defaults to 7.5):
A higher guidance scale value encourages the model to generate images closely linked to the text
`prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`.
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts to guide what to not include in image generation. If not defined, you need to
pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`).
num_videos_per_prompt (`int`, *optional*, defaults to 1):
The number of images to generate per prompt.
eta (`float`, *optional*, defaults to 0.0):
Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies
to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers.
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
generation deterministic.
latents (`torch.Tensor`, *optional*):
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
tensor is generated by sampling using the supplied random `generator`.
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not
provided, text embeddings are generated from the `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If
not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument.
output_type (`str`, *optional*, defaults to `"pil"`):
The output format of the generated image. Choose between `PIL.Image` or `np.array`.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`HunyuanVideoPipelineOutput`] instead of a
plain tuple.
cross_attention_kwargs (`dict`, *optional*):
A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in
[`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
guidance_rescale (`float`, *optional*, defaults to 0.0):
Guidance rescale factor from [Common Diffusion Noise Schedules and Sample Steps are
Flawed](https://arxiv.org/pdf/2305.08891.pdf). Guidance rescale factor should fix overexposure when
using zero terminal SNR.
clip_skip (`int`, *optional*):
Number of layers to be skipped from CLIP while computing the prompt embeddings. A value of 1 means that
the output of the pre-final layer will be used for computing the prompt embeddings.
callback_on_step_end (`Callable`, `PipelineCallback`, `MultiPipelineCallbacks`, *optional*):
A function or a subclass of `PipelineCallback` or `MultiPipelineCallbacks` that is called at the end of
each denoising step during the inference. with the following arguments: `callback_on_step_end(self:
DiffusionPipeline, step: int, timestep: int, callback_kwargs: Dict)`. `callback_kwargs` will include a
list of all tensors as specified by `callback_on_step_end_tensor_inputs`.
callback_on_step_end_tensor_inputs (`List`, *optional*):
The list of tensor inputs for the `callback_on_step_end` function. The tensors specified in the list
will be passed as `callback_kwargs` argument. You will only be able to include variables listed in the
`._callback_tensor_inputs` attribute of your pipeline class.
Examples:
Returns:
[`~HunyuanVideoPipelineOutput`] or `tuple`:
If `return_dict` is `True`, [`HunyuanVideoPipelineOutput`] is returned,
otherwise a `tuple` is returned where the first element is a list with the generated images and the
second element is a list of `bool`s indicating whether the corresponding generated image contains
"not-safe-for-work" (nsfw) content.
"""
callback = kwargs.pop("callback", None)
callback_steps = kwargs.pop("callback_steps", None)
if callback is not None:
deprecate(
"callback",
"1.0.0",
"Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`",
)
if callback_steps is not None:
deprecate(
"callback_steps",
"1.0.0",
"Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`",
)
if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)):
callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs
# 0. Default height and width to unet
# height = height or self.transformer.config.sample_size * self.vae_scale_factor
# width = width or self.transformer.config.sample_size * self.vae_scale_factor
# to deal with lora scaling and other possible forward hooks
# 1. Check inputs. Raise error if not correct
self.check_inputs(
prompt,
height,
width,
video_length,
callback_steps,
negative_prompt,
prompt_embeds,
negative_prompt_embeds,
callback_on_step_end_tensor_inputs,
vae_ver=vae_ver,
)
self._guidance_scale = guidance_scale
self._guidance_rescale = guidance_rescale
self._clip_skip = clip_skip
self._cross_attention_kwargs = cross_attention_kwargs
self._interrupt = False
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
device = torch.device(f"cuda:{dist.get_rank()}") if dist.is_initialized() else self._execution_device
# 3. Encode input prompt
lora_scale = (
self.cross_attention_kwargs.get("scale", None)
if self.cross_attention_kwargs is not None
else None
)
(
prompt_embeds,
negative_prompt_embeds,
prompt_mask,
negative_prompt_mask,
) = self.encode_prompt(
prompt,
device,
num_videos_per_prompt,
self.do_classifier_free_guidance,
negative_prompt,
prompt_embeds=prompt_embeds,
attention_mask=attention_mask,
negative_prompt_embeds=negative_prompt_embeds,
negative_attention_mask=negative_attention_mask,
lora_scale=lora_scale,
clip_skip=self.clip_skip,
data_type=data_type,
semantic_images=semantic_images
)
if self.text_encoder_2 is not None:
(
prompt_embeds_2,
negative_prompt_embeds_2,
prompt_mask_2,
negative_prompt_mask_2,
) = self.encode_prompt(
prompt,
device,
num_videos_per_prompt,
self.do_classifier_free_guidance,
negative_prompt,
prompt_embeds=None,
attention_mask=None,
negative_prompt_embeds=None,
negative_attention_mask=None,
lora_scale=lora_scale,
clip_skip=self.clip_skip,
text_encoder=self.text_encoder_2,
data_type=data_type,
)
else:
prompt_embeds_2 = None
negative_prompt_embeds_2 = None
prompt_mask_2 = None
negative_prompt_mask_2 = None
# For classifier free guidance, we need to do two forward passes.
# Here we concatenate the unconditional and text embeddings into a single batch
# to avoid doing two forward passes
if self.do_classifier_free_guidance:
prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])
if prompt_mask is not None:
prompt_mask = torch.cat([negative_prompt_mask, prompt_mask])
if prompt_embeds_2 is not None:
prompt_embeds_2 = torch.cat([negative_prompt_embeds_2, prompt_embeds_2])
if prompt_mask_2 is not None:
prompt_mask_2 = torch.cat([negative_prompt_mask_2, prompt_mask_2])
# 4. Prepare timesteps
extra_set_timesteps_kwargs = self.prepare_extra_func_kwargs(
self.scheduler.set_timesteps, {"n_tokens": n_tokens}
)
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
sigmas,
**extra_set_timesteps_kwargs,
)
if "884" in vae_ver:
video_length = (video_length - 1) // 4 + 1
elif "888" in vae_ver:
video_length = (video_length - 1) // 8 + 1
else:
video_length = video_length
# 5. Prepare latent variables
num_channels_latents = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_videos_per_prompt,
num_channels_latents,
height,
width,
video_length,
prompt_embeds.dtype,
device,
generator,
latents,
img_latents=img_latents,
i2v_mode=i2v_mode,
i2v_condition_type=i2v_condition_type,
i2v_stability=i2v_stability
)
if i2v_mode and i2v_condition_type == "latent_concat":
if img_latents.shape[2] == 1:
img_latents_concat = img_latents.repeat(1, 1, video_length, 1, 1)
else:
img_latents_concat = img_latents
img_latents_concat[:, :, 1:, ...] = 0
i2v_mask = torch.zeros(video_length)
i2v_mask[0] = 1
mask_concat = torch.ones(img_latents_concat.shape[0], 1, img_latents_concat.shape[2], img_latents_concat.shape[3],
img_latents_concat.shape[4]).to(device=img_latents.device)
mask_concat[:, :, 1:, ...] = 0
# 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
extra_step_kwargs = self.prepare_extra_func_kwargs(
self.scheduler.step,
{"generator": generator, "eta": eta},
)
target_dtype = PRECISION_TO_TYPE[self.args.precision]
autocast_enabled = (
target_dtype != torch.float32
) and not self.args.disable_autocast
vae_dtype = PRECISION_TO_TYPE[self.args.vae_precision]
vae_autocast_enabled = (
vae_dtype != torch.float32
) and not self.args.disable_autocast
# 7. Denoising loop
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
self._num_timesteps = len(timesteps)
# if is_progress_bar:
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if self.interrupt:
continue
if i2v_mode and i2v_condition_type == "token_replace":
latents = torch.concat([img_latents, latents[:, :, 1:, :, :]], dim=2)
# expand the latents if we are doing classifier free guidance
if i2v_mode and i2v_condition_type == "latent_concat":
latent_model_input = torch.concat([latents, img_latents_concat, mask_concat], dim=1)
else:
latent_model_input = latents
latent_model_input = (
torch.cat([latent_model_input] * 2)
if self.do_classifier_free_guidance
else latent_model_input
)
latent_model_input = self.scheduler.scale_model_input(
latent_model_input, t
)
t_expand = t.repeat(latent_model_input.shape[0])
guidance_expand = (
torch.tensor(
[embedded_guidance_scale] * latent_model_input.shape[0],
dtype=torch.float32,
device=device,
).to(target_dtype)
* 1000.0
if embedded_guidance_scale is not None
else None
)
# predict the noise residual
with torch.autocast(
device_type="cuda", dtype=target_dtype, enabled=autocast_enabled
):
noise_pred = self.transformer( # For an input image (129, 192, 336) (1, 256, 256)
latent_model_input, # [2, 16, 33, 24, 42]
t_expand, # [2]
text_states=prompt_embeds, # [2, 256, 4096]
text_mask=prompt_mask, # [2, 256]
text_states_2=prompt_embeds_2, # [2, 768]
freqs_cos=freqs_cis[0], # [seqlen, head_dim]
freqs_sin=freqs_cis[1], # [seqlen, head_dim]
guidance=guidance_expand,
return_dict=True,
)[
"x"
]
# perform guidance
if self.do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + self.guidance_scale * (
noise_pred_text - noise_pred_uncond
)
if self.do_classifier_free_guidance and self.guidance_rescale > 0.0:
# Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf
noise_pred = rescale_noise_cfg(
noise_pred,
noise_pred_text,
guidance_rescale=self.guidance_rescale,
)
# compute the previous noisy sample x_t -> x_t-1
if i2v_mode and i2v_condition_type == "token_replace":
latents = self.scheduler.step(
noise_pred[:, :, 1:, :, :], t, latents[:, :, 1:, :, :], **extra_step_kwargs, return_dict=False
)[0]
latents = torch.concat(
[img_latents, latents], dim=2
)
else:
latents = self.scheduler.step(
noise_pred, t, latents, **extra_step_kwargs, return_dict=False
)[0]
if callback_on_step_end is not None:
callback_kwargs = {}
for k in callback_on_step_end_tensor_inputs:
callback_kwargs[k] = locals()[k]
callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)
latents = callback_outputs.pop("latents", latents)
prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
negative_prompt_embeds = callback_outputs.pop(
"negative_prompt_embeds", negative_prompt_embeds
)
# call the callback, if provided
if i == len(timesteps) - 1 or (
(i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0
):
if progress_bar is not None:
progress_bar.update()
if callback is not None and i % callback_steps == 0:
step_idx = i // getattr(self.scheduler, "order", 1)
callback(step_idx, t, latents)
if not output_type == "latent":
expand_temporal_dim = False
if len(latents.shape) == 4:
if isinstance(self.vae, AutoencoderKLCausal3D):
latents = latents.unsqueeze(2)
expand_temporal_dim = True
elif len(latents.shape) == 5:
pass
else:
raise ValueError(
f"Only support latents with shape (b, c, h, w) or (b, c, f, h, w), but got {latents.shape}."
)
if (
hasattr(self.vae.config, "shift_factor")
and self.vae.config.shift_factor
):
latents = (
latents / self.vae.config.scaling_factor
+ self.vae.config.shift_factor
)
else:
latents = latents / self.vae.config.scaling_factor
with torch.autocast(
device_type="cuda", dtype=vae_dtype, enabled=vae_autocast_enabled
):
if enable_tiling:
self.vae.enable_tiling()
image = self.vae.decode(
latents, return_dict=False, generator=generator
)[0]
else:
image = self.vae.decode(
latents, return_dict=False, generator=generator
)[0]
if expand_temporal_dim or image.shape[2] == 1:
image = image.squeeze(2)
else:
image = latents
image = (image / 2 + 0.5).clamp(0, 1)
# we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16
image = image.cpu().float()
if i2v_mode and i2v_condition_type == "latent_concat":
image = image[:, :, 4:, :, :]
# Offload all models
self.maybe_free_model_hooks()
if not return_dict:
return image
return HunyuanVideoPipelineOutput(videos=image)
from .scheduling_flow_match_discrete import FlowMatchDiscreteScheduler
# Copyright 2024 Stability AI, Katherine Crowson and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#
# Modified from diffusers==0.29.2
#
# ==============================================================================
from dataclasses import dataclass
from typing import Optional, Tuple, Union
import numpy as np
import torch
from diffusers.configuration_utils import ConfigMixin, register_to_config
from diffusers.utils import BaseOutput, logging
from diffusers.schedulers.scheduling_utils import SchedulerMixin
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@dataclass
class FlowMatchDiscreteSchedulerOutput(BaseOutput):
"""
Output class for the scheduler's `step` function output.
Args:
prev_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images):
Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
denoising loop.
"""
prev_sample: torch.FloatTensor
class FlowMatchDiscreteScheduler(SchedulerMixin, ConfigMixin):
"""
Euler scheduler.
This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
methods the library implements for all schedulers such as loading and saving.
Args:
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
timestep_spacing (`str`, defaults to `"linspace"`):
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
shift (`float`, defaults to 1.0):
The shift value for the timestep schedule.
reverse (`bool`, defaults to `True`):
Whether to reverse the timestep schedule.
"""
_compatibles = []
order = 1
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1000,
shift: float = 1.0,
reverse: bool = True,
solver: str = "euler",
n_tokens: Optional[int] = None,
):
sigmas = torch.linspace(1, 0, num_train_timesteps + 1)
if not reverse:
sigmas = sigmas.flip(0)
self.sigmas = sigmas
# the value fed to model
self.timesteps = (sigmas[:-1] * num_train_timesteps).to(dtype=torch.float32)
self._step_index = None
self._begin_index = None
self.supported_solver = ["euler"]
if solver not in self.supported_solver:
raise ValueError(
f"Solver {solver} not supported. Supported solvers: {self.supported_solver}"
)
@property
def step_index(self):
"""
The index counter for current timestep. It will increase 1 after each scheduler step.
"""
return self._step_index
@property
def begin_index(self):
"""
The index for the first timestep. It should be set from pipeline with `set_begin_index` method.
"""
return self._begin_index
# Copied from diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index
def set_begin_index(self, begin_index: int = 0):
"""
Sets the begin index for the scheduler. This function should be run from pipeline before the inference.
Args:
begin_index (`int`):
The begin index for the scheduler.
"""
self._begin_index = begin_index
def _sigma_to_t(self, sigma):
return sigma * self.config.num_train_timesteps
def set_timesteps(
self,
num_inference_steps: int,
device: Union[str, torch.device] = None,
n_tokens: int = None,
):
"""
Sets the discrete timesteps used for the diffusion chain (to be run before inference).
Args:
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
n_tokens (`int`, *optional*):
Number of tokens in the input sequence.
"""
self.num_inference_steps = num_inference_steps
sigmas = torch.linspace(1, 0, num_inference_steps + 1)
sigmas = self.sd3_time_shift(sigmas)
if not self.config.reverse:
sigmas = 1 - sigmas
self.sigmas = sigmas
self.timesteps = (sigmas[:-1] * self.config.num_train_timesteps).to(
dtype=torch.float32, device=device
)
# Reset step index
self._step_index = None
def index_for_timestep(self, timestep, schedule_timesteps=None):
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
indices = (schedule_timesteps == timestep).nonzero()
# The sigma index that is taken for the **very** first `step`
# is always the second index (or the last index if there is only 1)
# This way we can ensure we don't accidentally skip a sigma in
# case we start in the middle of the denoising schedule (e.g. for image-to-image)
pos = 1 if len(indices) > 1 else 0
return indices[pos].item()
def _init_step_index(self, timestep):
if self.begin_index is None:
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
self._step_index = self.index_for_timestep(timestep)
else:
self._step_index = self._begin_index
def scale_model_input(
self, sample: torch.Tensor, timestep: Optional[int] = None
) -> torch.Tensor:
return sample
def sd3_time_shift(self, t: torch.Tensor):
return (self.config.shift * t) / (1 + (self.config.shift - 1) * t)
def step(
self,
model_output: torch.FloatTensor,
timestep: Union[float, torch.FloatTensor],
sample: torch.FloatTensor,
return_dict: bool = True,
) -> Union[FlowMatchDiscreteSchedulerOutput, Tuple]:
"""
Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
process from the learned model outputs (most often the predicted noise).
Args:
model_output (`torch.FloatTensor`):
The direct output from learned diffusion model.
timestep (`float`):
The current discrete timestep in the diffusion chain.
sample (`torch.FloatTensor`):
A current instance of a sample created by the diffusion process.
generator (`torch.Generator`, *optional*):
A random number generator.
n_tokens (`int`, *optional*):
Number of tokens in the input sequence.
return_dict (`bool`):
Whether or not to return a [`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] or
tuple.
Returns:
[`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] or `tuple`:
If return_dict is `True`, [`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] is
returned, otherwise a tuple is returned where the first element is the sample tensor.
"""
if (
isinstance(timestep, int)
or isinstance(timestep, torch.IntTensor)
or isinstance(timestep, torch.LongTensor)
):
raise ValueError(
(
"Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to"
" `EulerDiscreteScheduler.step()` is not supported. Make sure to pass"
" one of the `scheduler.timesteps` as a timestep."
),
)
if self.step_index is None:
self._init_step_index(timestep)
# Upcast to avoid precision issues when computing prev_sample
sample = sample.to(torch.float32)
dt = self.sigmas[self.step_index + 1] - self.sigmas[self.step_index]
if self.config.solver == "euler":
prev_sample = sample + model_output.to(torch.float32) * dt
else:
raise ValueError(
f"Solver {self.config.solver} not supported. Supported solvers: {self.supported_solver}"
)
# upon completion increase step index by one
self._step_index += 1
if not return_dict:
return (prev_sample,)
return FlowMatchDiscreteSchedulerOutput(prev_sample=prev_sample)
def __len__(self):
return self.config.num_train_timesteps
import argparse
from pathlib import Path
def get_tensorboard_config(output_dir: str, job_name: str):
tensorboard_config = {
"enabled": True,
"output_path": output_dir,
"job_name": job_name
}
return tensorboard_config
def get_deepspeed_config(args: argparse.Namespace,
micro_batch_size: int,
global_batch_size: int,
output_dir: str = None,
job_name: str = None,
):
config = {
"train_batch_size": global_batch_size,
"train_micro_batch_size_per_gpu": micro_batch_size,
"gradient_accumulation_steps": args.gradient_accumulation_steps,
"steps_per_print": args.log_every,
"optimizer": {
"type": "AdamW",
"params": {
"lr": args.lr,
"betas": [
args.adam_beta1,
args.adam_beta2
],
"eps": args.adam_eps,
"weight_decay": args.weight_decay
}
},
"gradient_clipping": 1.0,
"prescale_gradients": True,
"fp16": {
"enabled": args.precision == 'fp16',
"fp16_master_weights_and_grads": False,
"loss_scale": 0,
"loss_scale_window": 500,
"hysteresis": 2,
"min_loss_scale": 1,
"initial_scale_power": 15
},
"bf16": {
"enabled": args.precision == 'bf16'
},
"wall_clock_breakdown": False,
"zero_optimization": {
"stage": args.zero_stage,
"reduce_scatter": False,
"reduce_bucket_size": 1e9,
},
}
if args.tensorboard:
config["tensorboard"] = get_tensorboard_config(output_dir, job_name)
return config
[中文阅读](./README_zh.md)
# HunyuanVideo Latent Feature Extraction Tool
This project provides an efficient tool for extracting latent features from videos, preparing them for subsequent video generation and processing tasks.
## Features
- Support for various video formats and resolutions
- Multi-GPU parallel processing for improved efficiency
- Support for multiple aspect ratios
- High-performance VAE model for feature extraction
- Automatic skipping of already processed videos, supporting resume functionality
## Usage
### 1. Configuration File
## Input dataset Format
The input video metadata file (meta_file.list) should be a list of JSON file paths, with each JSON file containing the following fields:
The format of meta_file.list (e.g., ./assets/demo/i2v_lora/train_dataset/meta_file.list) is as follows
```
/path/to/0.json
/path/to/1.json
/path/to/2.json
...
```
The format of /path/to/0.json (e.g., ./assets/demo/i2v_lora/train_dataset/meta_data.json) is as follows
```json
{
"video_path": "/path/to/video.mp4",
"raw_caption": {
"long caption": "Detailed description text of the video"
}
}
```
Configure parameters in `hyvideo/hyvae_extract/vae.yaml`:
```yaml
vae_path: "./ckpts/hunyuan-video-i2v-720p/vae" # VAE model path
video_url_files: "/path/to/meta_file.list" # Video metadata file list
output_base_dir: "/path/to/output/directory" # Output directory
sample_n_frames: 129 # Number of frames to sample
target_size: # Target size
- bucket_size
- bucket_size
enable_multi_aspect_ratio: True # Enable multiple aspect ratios
use_stride: True # Use stride sampling
```
#### Bucket Size Reference
The `target_size` parameter defines the resolution bucket size. Here are the recommended values for different quality levels:
| Quality | Bucket Size | Typical Resolution |
|---------|-------------|-------------------|
| 720p | 960 | 1280×720 or similar |
| 540p | 720 | 960×540 or similar |
| 360p | 480 | 640×360 or similar |
When `enable_multi_aspect_ratio` is set to `True`, the system will use these bucket sizes as a base to generate multiple aspect ratio buckets. For optimal performance, choose a bucket size that balances quality and memory usage based on your hardware capabilities.
### 2. Run Extraction
```bash
# Set environment variables
export HOST_GPU_NUM=8 # Set the number of GPUs to use
# Run extraction script
cd HunyuanVideo-I2V
bash hyvideo/hyvae_extract/start.sh
```
### 3. Single GPU Run
```bash
cd HunyuanVideo-I2V
export PYTHONPATH=${PYTHONPATH}:`pwd`
export HOST_GPU_NUM=1
CUDA_VISIBLE_DEVICES=0 python3 -u hyvideo/hyvae_extract/run.py --local_rank 0 --config 'hyvideo/hyvae_extract/vae.yaml'
```
## Output Files
The program generates the following files in the specified output directory:
1. `{video_id}.npy` - Latent feature array of the video
2. `json_path/{video_id}.json` - JSON file containing video metadata, including:
- video_id: Video ID
- latent_shape: Shape of the latent features
- video_path: Original video path
- prompt: Video description/prompt
- npy_save_path: Path where the latent features are saved
```
output_base_dir/
├── {video_id_1}.npy # Latent feature array for video 1
├── {video_id_2}.npy # Latent feature array for video 2
├── {video_id_3}.npy # Latent feature array for video 3
│ ...
├── {video_id_n}.npy # Latent feature array for video n
└── json_path/ # Directory containing metadata JSON files
│ ├── {video_id_1}.json # Metadata for video 1
│ ├── {video_id_2}.json # Metadata for video 2
│ ├── {video_id_3}.json # Metadata for video 3
│ │ ...
│ └── {video_id_n}.json # Metadata for video n
```
## Advanced Configuration
### Multiple Aspect Ratio Processing
When `enable_multi_aspect_ratio` is set to `True`, the system selects the target size closest to the original aspect ratio of the video, rather than forcing it to be cropped to a fixed size. This is useful for maintaining the integrity of the video content.
### Stride Sampling
When `use_stride` is set to `True`, the system automatically adjusts the sampling stride based on the video's frame rate:
- When frame rate >= 50fps, stride is 2
- When frame rate < 50fps, stride is 1
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment