Commit aad7b6c7 authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #2416 canceled with stages
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
from PIL import Image
from rembg import remove, new_session
class BackgroundRemover():
def __init__(self):
self.session = new_session()
def __call__(self, image: Image.Image):
output = remove(image, session=self.session, bgcolor=[255, 255, 255, 0])
return output
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
from .pipelines import Hunyuan3DDiTPipeline, Hunyuan3DDiTFlowMatchingPipeline
from .postprocessors import FaceReducer, FloaterRemover, DegenerateFaceRemover
from .preprocessors import ImageProcessorV2, IMAGE_PROCESSORS, DEFAULT_IMAGEPROCESSOR
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
from .conditioner import DualImageEncoder, SingleImageEncoder, DinoImageEncoder, CLIPImageEncoder
from .hunyuan3ddit import Hunyuan3DDiT
from .vae import ShapeVAE
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
import torch
import torch.nn as nn
from torchvision import transforms
from transformers import (
CLIPVisionModelWithProjection,
CLIPVisionConfig,
Dinov2Model,
Dinov2Config,
)
class ImageEncoder(nn.Module):
def __init__(
self,
version=None,
config=None,
use_cls_token=True,
image_size=224,
**kwargs,
):
super().__init__()
if config is None:
self.model = self.MODEL_CLASS.from_pretrained(version)
else:
self.model = self.MODEL_CLASS(self.MODEL_CONFIG_CLASS.from_dict(config))
self.model.eval()
self.model.requires_grad_(False)
self.use_cls_token = use_cls_token
self.size = image_size // 14
self.num_patches = (image_size // 14) ** 2
if self.use_cls_token:
self.num_patches += 1
self.transform = transforms.Compose(
[
transforms.Resize(image_size, transforms.InterpolationMode.BILINEAR, antialias=True),
transforms.CenterCrop(image_size),
transforms.Normalize(
mean=self.mean,
std=self.std,
),
]
)
def forward(self, image, mask=None, value_range=(-1, 1)):
if value_range is not None:
low, high = value_range
image = (image - low) / (high - low)
image = image.to(self.model.device, dtype=self.model.dtype)
inputs = self.transform(image)
outputs = self.model(inputs)
last_hidden_state = outputs.last_hidden_state
if not self.use_cls_token:
last_hidden_state = last_hidden_state[:, 1:, :]
return last_hidden_state
def unconditional_embedding(self, batch_size):
device = next(self.model.parameters()).device
dtype = next(self.model.parameters()).dtype
zero = torch.zeros(
batch_size,
self.num_patches,
self.model.config.hidden_size,
device=device,
dtype=dtype,
)
return zero
class CLIPImageEncoder(ImageEncoder):
MODEL_CLASS = CLIPVisionModelWithProjection
MODEL_CONFIG_CLASS = CLIPVisionConfig
mean = [0.48145466, 0.4578275, 0.40821073]
std = [0.26862954, 0.26130258, 0.27577711]
class DinoImageEncoder(ImageEncoder):
MODEL_CLASS = Dinov2Model
MODEL_CONFIG_CLASS = Dinov2Config
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
def build_image_encoder(config):
if config['type'] == 'CLIPImageEncoder':
return CLIPImageEncoder(**config['kwargs'])
elif config['type'] == 'DinoImageEncoder':
return DinoImageEncoder(**config['kwargs'])
else:
raise ValueError(f'Unknown image encoder type: {config["type"]}')
class DualImageEncoder(nn.Module):
def __init__(
self,
main_image_encoder,
additional_image_encoder,
):
super().__init__()
self.main_image_encoder = build_image_encoder(main_image_encoder)
self.additional_image_encoder = build_image_encoder(additional_image_encoder)
def forward(self, image, mask=None):
outputs = {
'main': self.main_image_encoder(image, mask=mask),
'additional': self.additional_image_encoder(image, mask=mask),
}
return outputs
def unconditional_embedding(self, batch_size):
outputs = {
'main': self.main_image_encoder.unconditional_embedding(batch_size),
'additional': self.additional_image_encoder.unconditional_embedding(batch_size),
}
return outputs
class SingleImageEncoder(nn.Module):
def __init__(
self,
main_image_encoder,
):
super().__init__()
self.main_image_encoder = build_image_encoder(main_image_encoder)
def forward(self, image, mask=None):
outputs = {
'main': self.main_image_encoder(image, mask=mask),
}
return outputs
def unconditional_embedding(self, batch_size):
outputs = {
'main': self.main_image_encoder.unconditional_embedding(batch_size),
}
return outputs
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
import math
from dataclasses import dataclass
from typing import List, Tuple, Optional
import torch
from einops import rearrange
from torch import Tensor, nn
def attention(q: Tensor, k: Tensor, v: Tensor, **kwargs) -> Tensor:
x = torch.nn.functional.scaled_dot_product_attention(q, k, v)
x = rearrange(x, "B H L D -> B L (H D)")
return x
def timestep_embedding(t: Tensor, dim, max_period=10000, time_factor: float = 1000.0):
"""
Create sinusoidal timestep embeddings.
:param t: a 1-D Tensor of N indices, one per batch element.
These may be fractional.
:param dim: the dimension of the output.
:param max_period: controls the minimum frequency of the embeddings.
:return: an (N, D) Tensor of positional embeddings.
"""
t = time_factor * t
half = dim // 2
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(
t.device
)
args = t[:, None].float() * freqs[None]
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
if dim % 2:
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
if torch.is_floating_point(t):
embedding = embedding.to(t)
return embedding
class MLPEmbedder(nn.Module):
def __init__(self, in_dim: int, hidden_dim: int):
super().__init__()
self.in_layer = nn.Linear(in_dim, hidden_dim, bias=True)
self.silu = nn.SiLU()
self.out_layer = nn.Linear(hidden_dim, hidden_dim, bias=True)
def forward(self, x: Tensor) -> Tensor:
return self.out_layer(self.silu(self.in_layer(x)))
class RMSNorm(torch.nn.Module):
def __init__(self, dim: int):
super().__init__()
self.scale = nn.Parameter(torch.ones(dim))
def forward(self, x: Tensor):
x_dtype = x.dtype
x = x.float()
rrms = torch.rsqrt(torch.mean(x ** 2, dim=-1, keepdim=True) + 1e-6)
return (x * rrms).to(dtype=x_dtype) * self.scale
class QKNorm(torch.nn.Module):
def __init__(self, dim: int):
super().__init__()
self.query_norm = RMSNorm(dim)
self.key_norm = RMSNorm(dim)
def forward(self, q: Tensor, k: Tensor, v: Tensor) -> Tuple[Tensor, Tensor]:
q = self.query_norm(q)
k = self.key_norm(k)
return q.to(v), k.to(v)
class SelfAttention(nn.Module):
def __init__(
self,
dim: int,
num_heads: int = 8,
qkv_bias: bool = False,
):
super().__init__()
self.num_heads = num_heads
head_dim = dim // num_heads
self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
self.norm = QKNorm(head_dim)
self.proj = nn.Linear(dim, dim)
def forward(self, x: Tensor, pe: Tensor) -> Tensor:
qkv = self.qkv(x)
q, k, v = rearrange(qkv, "B L (K H D) -> K B H L D", K=3, H=self.num_heads)
q, k = self.norm(q, k, v)
x = attention(q, k, v, pe=pe)
x = self.proj(x)
return x
@dataclass
class ModulationOut:
shift: Tensor
scale: Tensor
gate: Tensor
class Modulation(nn.Module):
def __init__(self, dim: int, double: bool):
super().__init__()
self.is_double = double
self.multiplier = 6 if double else 3
self.lin = nn.Linear(dim, self.multiplier * dim, bias=True)
def forward(self, vec: Tensor) -> Tuple[ModulationOut, Optional[ModulationOut]]:
out = self.lin(nn.functional.silu(vec))[:, None, :]
out = out.chunk(self.multiplier, dim=-1)
return (
ModulationOut(*out[:3]),
ModulationOut(*out[3:]) if self.is_double else None,
)
class DoubleStreamBlock(nn.Module):
def __init__(
self,
hidden_size: int,
num_heads: int,
mlp_ratio: float,
qkv_bias: bool = False,
):
super().__init__()
mlp_hidden_dim = int(hidden_size * mlp_ratio)
self.num_heads = num_heads
self.hidden_size = hidden_size
self.img_mod = Modulation(hidden_size, double=True)
self.img_norm1 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.img_attn = SelfAttention(dim=hidden_size, num_heads=num_heads, qkv_bias=qkv_bias)
self.img_norm2 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.img_mlp = nn.Sequential(
nn.Linear(hidden_size, mlp_hidden_dim, bias=True),
nn.GELU(approximate="tanh"),
nn.Linear(mlp_hidden_dim, hidden_size, bias=True),
)
self.txt_mod = Modulation(hidden_size, double=True)
self.txt_norm1 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.txt_attn = SelfAttention(dim=hidden_size, num_heads=num_heads, qkv_bias=qkv_bias)
self.txt_norm2 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.txt_mlp = nn.Sequential(
nn.Linear(hidden_size, mlp_hidden_dim, bias=True),
nn.GELU(approximate="tanh"),
nn.Linear(mlp_hidden_dim, hidden_size, bias=True),
)
def forward(self, img: Tensor, txt: Tensor, vec: Tensor, pe: Tensor) -> Tuple[Tensor, Tensor]:
img_mod1, img_mod2 = self.img_mod(vec)
txt_mod1, txt_mod2 = self.txt_mod(vec)
img_modulated = self.img_norm1(img)
img_modulated = (1 + img_mod1.scale) * img_modulated + img_mod1.shift
img_qkv = self.img_attn.qkv(img_modulated)
img_q, img_k, img_v = rearrange(img_qkv, "B L (K H D) -> K B H L D", K=3, H=self.num_heads)
img_q, img_k = self.img_attn.norm(img_q, img_k, img_v)
txt_modulated = self.txt_norm1(txt)
txt_modulated = (1 + txt_mod1.scale) * txt_modulated + txt_mod1.shift
txt_qkv = self.txt_attn.qkv(txt_modulated)
txt_q, txt_k, txt_v = rearrange(txt_qkv, "B L (K H D) -> K B H L D", K=3, H=self.num_heads)
txt_q, txt_k = self.txt_attn.norm(txt_q, txt_k, txt_v)
q = torch.cat((txt_q, img_q), dim=2)
k = torch.cat((txt_k, img_k), dim=2)
v = torch.cat((txt_v, img_v), dim=2)
attn = attention(q, k, v, pe=pe)
txt_attn, img_attn = attn[:, : txt.shape[1]], attn[:, txt.shape[1]:]
img = img + img_mod1.gate * self.img_attn.proj(img_attn)
img = img + img_mod2.gate * self.img_mlp((1 + img_mod2.scale) * self.img_norm2(img) + img_mod2.shift)
txt = txt + txt_mod1.gate * self.txt_attn.proj(txt_attn)
txt = txt + txt_mod2.gate * self.txt_mlp((1 + txt_mod2.scale) * self.txt_norm2(txt) + txt_mod2.shift)
return img, txt
class SingleStreamBlock(nn.Module):
"""
A DiT block with parallel linear layers as described in
https://arxiv.org/abs/2302.05442 and adapted modulation interface.
"""
def __init__(
self,
hidden_size: int,
num_heads: int,
mlp_ratio: float = 4.0,
qk_scale: Optional[float] = None,
):
super().__init__()
self.hidden_dim = hidden_size
self.num_heads = num_heads
head_dim = hidden_size // num_heads
self.scale = qk_scale or head_dim ** -0.5
self.mlp_hidden_dim = int(hidden_size * mlp_ratio)
# qkv and mlp_in
self.linear1 = nn.Linear(hidden_size, hidden_size * 3 + self.mlp_hidden_dim)
# proj and mlp_out
self.linear2 = nn.Linear(hidden_size + self.mlp_hidden_dim, hidden_size)
self.norm = QKNorm(head_dim)
self.hidden_size = hidden_size
self.pre_norm = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.mlp_act = nn.GELU(approximate="tanh")
self.modulation = Modulation(hidden_size, double=False)
def forward(self, x: Tensor, vec: Tensor, pe: Tensor) -> Tensor:
mod, _ = self.modulation(vec)
x_mod = (1 + mod.scale) * self.pre_norm(x) + mod.shift
qkv, mlp = torch.split(self.linear1(x_mod), [3 * self.hidden_size, self.mlp_hidden_dim], dim=-1)
q, k, v = rearrange(qkv, "B L (K H D) -> K B H L D", K=3, H=self.num_heads)
q, k = self.norm(q, k, v)
# compute attention
attn = attention(q, k, v, pe=pe)
# compute activation in mlp stream, cat again and run second linear layer
output = self.linear2(torch.cat((attn, self.mlp_act(mlp)), 2))
return x + mod.gate * output
class LastLayer(nn.Module):
def __init__(self, hidden_size: int, patch_size: int, out_channels: int):
super().__init__()
self.norm_final = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.linear = nn.Linear(hidden_size, patch_size * patch_size * out_channels, bias=True)
self.adaLN_modulation = nn.Sequential(nn.SiLU(), nn.Linear(hidden_size, 2 * hidden_size, bias=True))
def forward(self, x: Tensor, vec: Tensor) -> Tensor:
shift, scale = self.adaLN_modulation(vec).chunk(2, dim=1)
x = (1 + scale[:, None, :]) * self.norm_final(x) + shift[:, None, :]
x = self.linear(x)
return x
class Hunyuan3DDiT(nn.Module):
def __init__(
self,
in_channels: int = 64,
context_in_dim: int = 1536,
hidden_size: int = 1024,
mlp_ratio: float = 4.0,
num_heads: int = 16,
depth: int = 16,
depth_single_blocks: int = 32,
axes_dim: List[int] = [64],
theta: int = 10_000,
qkv_bias: bool = True,
time_factor: float = 1000,
guidance_embed: bool = False,
ckpt_path: Optional[str] = None,
**kwargs,
):
super().__init__()
self.in_channels = in_channels
self.context_in_dim = context_in_dim
self.hidden_size = hidden_size
self.mlp_ratio = mlp_ratio
self.num_heads = num_heads
self.depth = depth
self.depth_single_blocks = depth_single_blocks
self.axes_dim = axes_dim
self.theta = theta
self.qkv_bias = qkv_bias
self.time_factor = time_factor
self.out_channels = self.in_channels
self.guidance_embed = guidance_embed
if hidden_size % num_heads != 0:
raise ValueError(
f"Hidden size {hidden_size} must be divisible by num_heads {num_heads}"
)
pe_dim = hidden_size // num_heads
if sum(axes_dim) != pe_dim:
raise ValueError(f"Got {axes_dim} but expected positional dim {pe_dim}")
self.hidden_size = hidden_size
self.num_heads = num_heads
self.latent_in = nn.Linear(self.in_channels, self.hidden_size, bias=True)
self.time_in = MLPEmbedder(in_dim=256, hidden_dim=self.hidden_size)
self.cond_in = nn.Linear(context_in_dim, self.hidden_size)
self.guidance_in = (
MLPEmbedder(in_dim=256, hidden_dim=self.hidden_size) if guidance_embed else nn.Identity()
)
self.double_blocks = nn.ModuleList(
[
DoubleStreamBlock(
self.hidden_size,
self.num_heads,
mlp_ratio=mlp_ratio,
qkv_bias=qkv_bias,
)
for _ in range(depth)
]
)
self.single_blocks = nn.ModuleList(
[
SingleStreamBlock(
self.hidden_size,
self.num_heads,
mlp_ratio=mlp_ratio,
)
for _ in range(depth_single_blocks)
]
)
self.final_layer = LastLayer(self.hidden_size, 1, self.out_channels)
if ckpt_path is not None:
print('restored denoiser ckpt', ckpt_path)
ckpt = torch.load(ckpt_path, map_location="cpu")
if 'state_dict' not in ckpt:
# deepspeed ckpt
state_dict = {}
for k in ckpt.keys():
new_k = k.replace('_forward_module.', '')
state_dict[new_k] = ckpt[k]
else:
state_dict = ckpt["state_dict"]
final_state_dict = {}
for k, v in state_dict.items():
if k.startswith('model.'):
final_state_dict[k.replace('model.', '')] = v
else:
final_state_dict[k] = v
missing, unexpected = self.load_state_dict(final_state_dict, strict=False)
print('unexpected keys:', unexpected)
print('missing keys:', missing)
def forward(
self,
x,
t,
contexts,
**kwargs,
) -> Tensor:
cond = contexts['main']
latent = self.latent_in(x)
vec = self.time_in(timestep_embedding(t, 256, self.time_factor).to(dtype=latent.dtype))
if self.guidance_embed:
guidance = kwargs.get('guidance', None)
if guidance is None:
raise ValueError("Didn't get guidance strength for guidance distilled model.")
vec = vec + self.guidance_in(timestep_embedding(guidance, 256, self.time_factor))
cond = self.cond_in(cond)
pe = None
for block in self.double_blocks:
latent, cond = block(img=latent, txt=cond, vec=vec, pe=pe)
latent = torch.cat((cond, latent), 1)
for block in self.single_blocks:
latent = block(latent, vec=vec, pe=pe)
latent = latent[:, cond.shape[1]:, ...]
latent = self.final_layer(latent, vec)
return latent
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
from typing import Tuple, List, Union, Optional
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange, repeat
from skimage import measure
from tqdm import tqdm
class FourierEmbedder(nn.Module):
"""The sin/cosine positional embedding. Given an input tensor `x` of shape [n_batch, ..., c_dim], it converts
each feature dimension of `x[..., i]` into:
[
sin(x[..., i]),
sin(f_1*x[..., i]),
sin(f_2*x[..., i]),
...
sin(f_N * x[..., i]),
cos(x[..., i]),
cos(f_1*x[..., i]),
cos(f_2*x[..., i]),
...
cos(f_N * x[..., i]),
x[..., i] # only present if include_input is True.
], here f_i is the frequency.
Denote the space is [0 / num_freqs, 1 / num_freqs, 2 / num_freqs, 3 / num_freqs, ..., (num_freqs - 1) / num_freqs].
If logspace is True, then the frequency f_i is [2^(0 / num_freqs), ..., 2^(i / num_freqs), ...];
Otherwise, the frequencies are linearly spaced between [1.0, 2^(num_freqs - 1)].
Args:
num_freqs (int): the number of frequencies, default is 6;
logspace (bool): If logspace is True, then the frequency f_i is [..., 2^(i / num_freqs), ...],
otherwise, the frequencies are linearly spaced between [1.0, 2^(num_freqs - 1)];
input_dim (int): the input dimension, default is 3;
include_input (bool): include the input tensor or not, default is True.
Attributes:
frequencies (torch.Tensor): If logspace is True, then the frequency f_i is [..., 2^(i / num_freqs), ...],
otherwise, the frequencies are linearly spaced between [1.0, 2^(num_freqs - 1);
out_dim (int): the embedding size, if include_input is True, it is input_dim * (num_freqs * 2 + 1),
otherwise, it is input_dim * num_freqs * 2.
"""
def __init__(self,
num_freqs: int = 6,
logspace: bool = True,
input_dim: int = 3,
include_input: bool = True,
include_pi: bool = True) -> None:
"""The initialization"""
super().__init__()
if logspace:
frequencies = 2.0 ** torch.arange(
num_freqs,
dtype=torch.float32
)
else:
frequencies = torch.linspace(
1.0,
2.0 ** (num_freqs - 1),
num_freqs,
dtype=torch.float32
)
if include_pi:
frequencies *= torch.pi
self.register_buffer("frequencies", frequencies, persistent=False)
self.include_input = include_input
self.num_freqs = num_freqs
self.out_dim = self.get_dims(input_dim)
def get_dims(self, input_dim):
temp = 1 if self.include_input or self.num_freqs == 0 else 0
out_dim = input_dim * (self.num_freqs * 2 + temp)
return out_dim
def forward(self, x: torch.Tensor) -> torch.Tensor:
""" Forward process.
Args:
x: tensor of shape [..., dim]
Returns:
embedding: an embedding of `x` of shape [..., dim * (num_freqs * 2 + temp)]
where temp is 1 if include_input is True and 0 otherwise.
"""
if self.num_freqs > 0:
embed = (x[..., None].contiguous() * self.frequencies).view(*x.shape[:-1], -1)
if self.include_input:
return torch.cat((x, embed.sin(), embed.cos()), dim=-1)
else:
return torch.cat((embed.sin(), embed.cos()), dim=-1)
else:
return x
class DropPath(nn.Module):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
"""
def __init__(self, drop_prob: float = 0., scale_by_keep: bool = True):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
self.scale_by_keep = scale_by_keep
def forward(self, x):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
'survival rate' as the argument.
"""
if self.drop_prob == 0. or not self.training:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets
random_tensor = x.new_empty(shape).bernoulli_(keep_prob)
if keep_prob > 0.0 and self.scale_by_keep:
random_tensor.div_(keep_prob)
return x * random_tensor
def extra_repr(self):
return f'drop_prob={round(self.drop_prob, 3):0.3f}'
class MLP(nn.Module):
def __init__(
self, *,
width: int,
output_width: int = None,
drop_path_rate: float = 0.0
):
super().__init__()
self.width = width
self.c_fc = nn.Linear(width, width * 4)
self.c_proj = nn.Linear(width * 4, output_width if output_width is not None else width)
self.gelu = nn.GELU()
self.drop_path = DropPath(drop_path_rate) if drop_path_rate > 0. else nn.Identity()
def forward(self, x):
return self.drop_path(self.c_proj(self.gelu(self.c_fc(x))))
class QKVMultiheadCrossAttention(nn.Module):
def __init__(
self,
*,
heads: int,
n_data: Optional[int] = None,
width=None,
qk_norm=False,
norm_layer=nn.LayerNorm
):
super().__init__()
self.heads = heads
self.n_data = n_data
self.q_norm = norm_layer(width // heads, elementwise_affine=True, eps=1e-6) if qk_norm else nn.Identity()
self.k_norm = norm_layer(width // heads, elementwise_affine=True, eps=1e-6) if qk_norm else nn.Identity()
def forward(self, q, kv):
_, n_ctx, _ = q.shape
bs, n_data, width = kv.shape
attn_ch = width // self.heads // 2
q = q.view(bs, n_ctx, self.heads, -1)
kv = kv.view(bs, n_data, self.heads, -1)
k, v = torch.split(kv, attn_ch, dim=-1)
q = self.q_norm(q)
k = self.k_norm(k)
q, k, v = map(lambda t: rearrange(t, 'b n h d -> b h n d', h=self.heads), (q, k, v))
out = F.scaled_dot_product_attention(q, k, v).transpose(1, 2).reshape(bs, n_ctx, -1)
return out
class MultiheadCrossAttention(nn.Module):
def __init__(
self,
*,
width: int,
heads: int,
qkv_bias: bool = True,
n_data: Optional[int] = None,
data_width: Optional[int] = None,
norm_layer=nn.LayerNorm,
qk_norm: bool = False
):
super().__init__()
self.n_data = n_data
self.width = width
self.heads = heads
self.data_width = width if data_width is None else data_width
self.c_q = nn.Linear(width, width, bias=qkv_bias)
self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias)
self.c_proj = nn.Linear(width, width)
self.attention = QKVMultiheadCrossAttention(
heads=heads,
n_data=n_data,
width=width,
norm_layer=norm_layer,
qk_norm=qk_norm
)
def forward(self, x, data):
x = self.c_q(x)
data = self.c_kv(data)
x = self.attention(x, data)
x = self.c_proj(x)
return x
class ResidualCrossAttentionBlock(nn.Module):
def __init__(
self,
*,
n_data: Optional[int] = None,
width: int,
heads: int,
data_width: Optional[int] = None,
qkv_bias: bool = True,
norm_layer=nn.LayerNorm,
qk_norm: bool = False
):
super().__init__()
if data_width is None:
data_width = width
self.attn = MultiheadCrossAttention(
n_data=n_data,
width=width,
heads=heads,
data_width=data_width,
qkv_bias=qkv_bias,
norm_layer=norm_layer,
qk_norm=qk_norm
)
self.ln_1 = norm_layer(width, elementwise_affine=True, eps=1e-6)
self.ln_2 = norm_layer(data_width, elementwise_affine=True, eps=1e-6)
self.ln_3 = norm_layer(width, elementwise_affine=True, eps=1e-6)
self.mlp = MLP(width=width)
def forward(self, x: torch.Tensor, data: torch.Tensor):
x = x + self.attn(self.ln_1(x), self.ln_2(data))
x = x + self.mlp(self.ln_3(x))
return x
class QKVMultiheadAttention(nn.Module):
def __init__(
self,
*,
heads: int,
n_ctx: int,
width=None,
qk_norm=False,
norm_layer=nn.LayerNorm
):
super().__init__()
self.heads = heads
self.n_ctx = n_ctx
self.q_norm = norm_layer(width // heads, elementwise_affine=True, eps=1e-6) if qk_norm else nn.Identity()
self.k_norm = norm_layer(width // heads, elementwise_affine=True, eps=1e-6) if qk_norm else nn.Identity()
def forward(self, qkv):
bs, n_ctx, width = qkv.shape
attn_ch = width // self.heads // 3
qkv = qkv.view(bs, n_ctx, self.heads, -1)
q, k, v = torch.split(qkv, attn_ch, dim=-1)
q = self.q_norm(q)
k = self.k_norm(k)
q, k, v = map(lambda t: rearrange(t, 'b n h d -> b h n d', h=self.heads), (q, k, v))
out = F.scaled_dot_product_attention(q, k, v).transpose(1, 2).reshape(bs, n_ctx, -1)
return out
class MultiheadAttention(nn.Module):
def __init__(
self,
*,
n_ctx: int,
width: int,
heads: int,
qkv_bias: bool,
norm_layer=nn.LayerNorm,
qk_norm: bool = False,
drop_path_rate: float = 0.0
):
super().__init__()
self.n_ctx = n_ctx
self.width = width
self.heads = heads
self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias)
self.c_proj = nn.Linear(width, width)
self.attention = QKVMultiheadAttention(
heads=heads,
n_ctx=n_ctx,
width=width,
norm_layer=norm_layer,
qk_norm=qk_norm
)
self.drop_path = DropPath(drop_path_rate) if drop_path_rate > 0. else nn.Identity()
def forward(self, x):
x = self.c_qkv(x)
x = self.attention(x)
x = self.drop_path(self.c_proj(x))
return x
class ResidualAttentionBlock(nn.Module):
def __init__(
self,
*,
n_ctx: int,
width: int,
heads: int,
qkv_bias: bool = True,
norm_layer=nn.LayerNorm,
qk_norm: bool = False,
drop_path_rate: float = 0.0,
):
super().__init__()
self.attn = MultiheadAttention(
n_ctx=n_ctx,
width=width,
heads=heads,
qkv_bias=qkv_bias,
norm_layer=norm_layer,
qk_norm=qk_norm,
drop_path_rate=drop_path_rate
)
self.ln_1 = norm_layer(width, elementwise_affine=True, eps=1e-6)
self.mlp = MLP(width=width, drop_path_rate=drop_path_rate)
self.ln_2 = norm_layer(width, elementwise_affine=True, eps=1e-6)
def forward(self, x: torch.Tensor):
x = x + self.attn(self.ln_1(x))
x = x + self.mlp(self.ln_2(x))
return x
class Transformer(nn.Module):
def __init__(
self,
*,
n_ctx: int,
width: int,
layers: int,
heads: int,
qkv_bias: bool = True,
norm_layer=nn.LayerNorm,
qk_norm: bool = False,
drop_path_rate: float = 0.0
):
super().__init__()
self.n_ctx = n_ctx
self.width = width
self.layers = layers
self.resblocks = nn.ModuleList(
[
ResidualAttentionBlock(
n_ctx=n_ctx,
width=width,
heads=heads,
qkv_bias=qkv_bias,
norm_layer=norm_layer,
qk_norm=qk_norm,
drop_path_rate=drop_path_rate
)
for _ in range(layers)
]
)
def forward(self, x: torch.Tensor):
for block in self.resblocks:
x = block(x)
return x
class CrossAttentionDecoder(nn.Module):
def __init__(
self,
*,
num_latents: int,
out_channels: int,
fourier_embedder: FourierEmbedder,
width: int,
heads: int,
qkv_bias: bool = True,
qk_norm: bool = False,
label_type: str = "binary"
):
super().__init__()
self.fourier_embedder = fourier_embedder
self.query_proj = nn.Linear(self.fourier_embedder.out_dim, width)
self.cross_attn_decoder = ResidualCrossAttentionBlock(
n_data=num_latents,
width=width,
heads=heads,
qkv_bias=qkv_bias,
qk_norm=qk_norm
)
self.ln_post = nn.LayerNorm(width)
self.output_proj = nn.Linear(width, out_channels)
self.label_type = label_type
def forward(self, queries: torch.FloatTensor, latents: torch.FloatTensor):
queries = self.query_proj(self.fourier_embedder(queries).to(latents.dtype))
x = self.cross_attn_decoder(queries, latents)
x = self.ln_post(x)
occ = self.output_proj(x)
return occ
def generate_dense_grid_points(bbox_min: np.ndarray,
bbox_max: np.ndarray,
octree_depth: int,
indexing: str = "ij",
octree_resolution: int = None,
):
length = bbox_max - bbox_min
num_cells = np.exp2(octree_depth)
if octree_resolution is not None:
num_cells = octree_resolution
x = np.linspace(bbox_min[0], bbox_max[0], int(num_cells) + 1, dtype=np.float32)
y = np.linspace(bbox_min[1], bbox_max[1], int(num_cells) + 1, dtype=np.float32)
z = np.linspace(bbox_min[2], bbox_max[2], int(num_cells) + 1, dtype=np.float32)
[xs, ys, zs] = np.meshgrid(x, y, z, indexing=indexing)
xyz = np.stack((xs, ys, zs), axis=-1)
xyz = xyz.reshape(-1, 3)
grid_size = [int(num_cells) + 1, int(num_cells) + 1, int(num_cells) + 1]
return xyz, grid_size, length
def center_vertices(vertices):
"""Translate the vertices so that bounding box is centered at zero."""
vert_min = vertices.min(dim=0)[0]
vert_max = vertices.max(dim=0)[0]
vert_center = 0.5 * (vert_min + vert_max)
return vertices - vert_center
class Latent2MeshOutput:
def __init__(self, mesh_v=None, mesh_f=None):
self.mesh_v = mesh_v
self.mesh_f = mesh_f
class ShapeVAE(nn.Module):
def __init__(
self,
*,
num_latents: int,
embed_dim: int,
width: int,
heads: int,
num_decoder_layers: int,
num_freqs: int = 8,
include_pi: bool = True,
qkv_bias: bool = True,
qk_norm: bool = False,
label_type: str = "binary",
drop_path_rate: float = 0.0,
scale_factor: float = 1.0,
):
super().__init__()
self.fourier_embedder = FourierEmbedder(num_freqs=num_freqs, include_pi=include_pi)
self.post_kl = nn.Linear(embed_dim, width)
self.transformer = Transformer(
n_ctx=num_latents,
width=width,
layers=num_decoder_layers,
heads=heads,
qkv_bias=qkv_bias,
qk_norm=qk_norm,
drop_path_rate=drop_path_rate
)
self.geo_decoder = CrossAttentionDecoder(
fourier_embedder=self.fourier_embedder,
out_channels=1,
num_latents=num_latents,
width=width,
heads=heads,
qkv_bias=qkv_bias,
qk_norm=qk_norm,
label_type=label_type,
)
self.scale_factor = scale_factor
self.latent_shape = (num_latents, embed_dim)
def forward(self, latents):
latents = self.post_kl(latents)
latents = self.transformer(latents)
return latents
@torch.no_grad()
def latents2mesh(
self,
latents: torch.FloatTensor,
bounds: Union[Tuple[float], List[float], float] = 1.1,
octree_depth: int = 7,
num_chunks: int = 10000,
mc_level: float = -1 / 512,
octree_resolution: int = None,
mc_algo: str = 'dmc',
):
device = latents.device
# 1. generate query points
if isinstance(bounds, float):
bounds = [-bounds, -bounds, -bounds, bounds, bounds, bounds]
bbox_min = np.array(bounds[0:3])
bbox_max = np.array(bounds[3:6])
bbox_size = bbox_max - bbox_min
xyz_samples, grid_size, length = generate_dense_grid_points(
bbox_min=bbox_min,
bbox_max=bbox_max,
octree_depth=octree_depth,
octree_resolution=octree_resolution,
indexing="ij"
)
xyz_samples = torch.FloatTensor(xyz_samples)
# 2. latents to 3d volume
batch_logits = []
batch_size = latents.shape[0]
for start in tqdm(range(0, xyz_samples.shape[0], num_chunks),
desc=f"MC Level {mc_level} Implicit Function:"):
queries = xyz_samples[start: start + num_chunks, :].to(device)
queries = queries.half()
batch_queries = repeat(queries, "p c -> b p c", b=batch_size)
logits = self.geo_decoder(batch_queries.to(latents.dtype), latents)
if mc_level == -1:
mc_level = 0
logits = torch.sigmoid(logits) * 2 - 1
print(f'Training with soft labels, inference with sigmoid and marching cubes level 0.')
batch_logits.append(logits)
grid_logits = torch.cat(batch_logits, dim=1)
grid_logits = grid_logits.view((batch_size, grid_size[0], grid_size[1], grid_size[2])).float()
# 3. extract surface
outputs = []
for i in range(batch_size):
try:
if mc_algo == 'mc':
vertices, faces, normals, _ = measure.marching_cubes(
grid_logits[i].cpu().numpy(),
mc_level,
method="lewiner"
)
vertices = vertices / grid_size * bbox_size + bbox_min
elif mc_algo == 'dmc':
if not hasattr(self, 'dmc'):
try:
from diso import DiffDMC
except:
raise ImportError("Please install diso via `pip install diso`, or set mc_algo to 'mc'")
self.dmc = DiffDMC(dtype=torch.float32).to(device)
octree_resolution = 2 ** octree_depth if octree_resolution is None else octree_resolution
sdf = -grid_logits[i] / octree_resolution
verts, faces = self.dmc(sdf, deform=None, return_quads=False, normalize=True)
verts = center_vertices(verts)
vertices = verts.detach().cpu().numpy()
faces = faces.detach().cpu().numpy()[:, ::-1]
else:
raise ValueError(f"mc_algo {mc_algo} not supported.")
outputs.append(
Latent2MeshOutput(
mesh_v=vertices.astype(np.float32),
mesh_f=np.ascontiguousarray(faces)
)
)
except ValueError:
outputs.append(None)
except RuntimeError:
outputs.append(None)
return outputs
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
import copy
import importlib
import inspect
import logging
import os
from typing import List, Optional, Union
import numpy as np
import torch
import trimesh
import yaml
from PIL import Image
from diffusers.utils.torch_utils import randn_tensor
from tqdm import tqdm
logger = logging.getLogger(__name__)
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
if timesteps is not None:
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
elif sigmas is not None:
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
def export_to_trimesh(mesh_output):
if isinstance(mesh_output, list):
outputs = []
for mesh in mesh_output:
if mesh is None:
outputs.append(None)
else:
mesh.mesh_f = mesh.mesh_f[:, ::-1]
mesh_output = trimesh.Trimesh(mesh.mesh_v, mesh.mesh_f)
outputs.append(mesh_output)
return outputs
else:
mesh_output.mesh_f = mesh_output.mesh_f[:, ::-1]
mesh_output = trimesh.Trimesh(mesh_output.mesh_v, mesh_output.mesh_f)
return mesh_output
def get_obj_from_str(string, reload=False):
module, cls = string.rsplit(".", 1)
if reload:
module_imp = importlib.import_module(module)
importlib.reload(module_imp)
return getattr(importlib.import_module(module, package=None), cls)
def instantiate_from_config(config, **kwargs):
if "target" not in config:
raise KeyError("Expected key `target` to instantiate.")
cls = get_obj_from_str(config["target"])
params = config.get("params", dict())
kwargs.update(params)
instance = cls(**kwargs)
return instance
class Hunyuan3DDiTPipeline:
@classmethod
def from_single_file(
cls,
ckpt_path,
config_path,
device='cuda',
dtype=torch.float16,
use_safetensors=None,
**kwargs,
):
# load config
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# load ckpt
if use_safetensors:
ckpt_path = ckpt_path.replace('.ckpt', '.safetensors')
if not os.path.exists(ckpt_path):
raise FileNotFoundError(f"Model file {ckpt_path} not found")
logger.info(f"Loading model from {ckpt_path}")
if use_safetensors:
# parse safetensors
import safetensors.torch
safetensors_ckpt = safetensors.torch.load_file(ckpt_path, device='cpu')
ckpt = {}
for key, value in safetensors_ckpt.items():
model_name = key.split('.')[0]
new_key = key[len(model_name) + 1:]
if model_name not in ckpt:
ckpt[model_name] = {}
ckpt[model_name][new_key] = value
else:
ckpt = torch.load(ckpt_path, map_location='cpu')
# load model
model = instantiate_from_config(config['model'])
model.load_state_dict(ckpt['model'])
vae = instantiate_from_config(config['vae'])
vae.load_state_dict(ckpt['vae'])
conditioner = instantiate_from_config(config['conditioner'])
if 'conditioner' in ckpt:
conditioner.load_state_dict(ckpt['conditioner'])
image_processor = instantiate_from_config(config['image_processor'])
scheduler = instantiate_from_config(config['scheduler'])
model_kwargs = dict(
vae=vae,
model=model,
scheduler=scheduler,
conditioner=conditioner,
image_processor=image_processor,
device=device,
dtype=dtype,
)
model_kwargs.update(kwargs)
return cls(
**model_kwargs
)
@classmethod
def from_pretrained(
cls,
model_path,
device='cuda',
dtype=torch.float16,
use_safetensors=None,
variant=None,
subfolder='hunyuan3d-dit-v2-0',
**kwargs,
):
original_model_path = model_path
# try local path
base_dir = os.environ.get('HY3DGEN_MODELS', '~/.cache/hy3dgen')
# model_path = os.path.expanduser(os.path.join(base_dir, model_path, subfolder))
model_path = os.path.expanduser(os.path.join(model_path, subfolder))
print('Try to load model from local path:', model_path)
if not os.path.exists(model_path):
print('Model path not exists, try to download from huggingface')
try:
import huggingface_hub
# download from huggingface
path = huggingface_hub.snapshot_download(repo_id=original_model_path)
model_path = os.path.join(path, subfolder)
except ImportError:
logger.warning(
"You need to install HuggingFace Hub to load models from the hub."
)
raise RuntimeError(f"Model path {model_path} not found")
except Exception as e:
raise e
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model path {original_model_path} not found")
extension = 'ckpt' if not use_safetensors else 'safetensors'
variant = '' if variant is None else f'.{variant}'
ckpt_name = f'model{variant}.{extension}'
config_path = os.path.join(model_path, 'config.yaml')
ckpt_path = os.path.join(model_path, ckpt_name)
return cls.from_single_file(
ckpt_path,
config_path,
device=device,
dtype=dtype,
use_safetensors=use_safetensors,
variant=variant,
**kwargs
)
def __init__(
self,
vae,
model,
scheduler,
conditioner,
image_processor,
device='cuda',
dtype=torch.float16,
**kwargs
):
self.vae = vae
self.model = model
self.scheduler = scheduler
self.conditioner = conditioner
self.image_processor = image_processor
self.to(device, dtype)
def to(self, device=None, dtype=None):
if device is not None:
self.device = torch.device(device)
self.vae.to(device)
self.model.to(device)
self.conditioner.to(device)
if dtype is not None:
self.dtype = dtype
self.vae.to(dtype=dtype)
self.model.to(dtype=dtype)
self.conditioner.to(dtype=dtype)
def encode_cond(self, image, mask, do_classifier_free_guidance, dual_guidance):
bsz = image.shape[0]
cond = self.conditioner(image=image, mask=mask)
if do_classifier_free_guidance:
un_cond = self.conditioner.unconditional_embedding(bsz)
if dual_guidance:
un_cond_drop_main = copy.deepcopy(un_cond)
un_cond_drop_main['additional'] = cond['additional']
def cat_recursive(a, b, c):
if isinstance(a, torch.Tensor):
return torch.cat([a, b, c], dim=0).to(self.dtype)
out = {}
for k in a.keys():
out[k] = cat_recursive(a[k], b[k], c[k])
return out
cond = cat_recursive(cond, un_cond_drop_main, un_cond)
else:
un_cond = self.conditioner.unconditional_embedding(bsz)
def cat_recursive(a, b):
if isinstance(a, torch.Tensor):
return torch.cat([a, b], dim=0).to(self.dtype)
out = {}
for k in a.keys():
out[k] = cat_recursive(a[k], b[k])
return out
cond = cat_recursive(cond, un_cond)
return cond
def prepare_extra_step_kwargs(self, generator, eta):
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
# and should be between [0, 1]
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
extra_step_kwargs["eta"] = eta
# check if the scheduler accepts generator
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
extra_step_kwargs["generator"] = generator
return extra_step_kwargs
def prepare_latents(self, batch_size, dtype, device, generator, latents=None):
shape = (batch_size, *self.vae.latent_shape)
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
latents = latents.to(device)
# scale the initial noise by the standard deviation required by the scheduler
latents = latents * getattr(self.scheduler, 'init_noise_sigma', 1.0)
return latents
def prepare_image(self, image):
if isinstance(image, str) and not os.path.exists(image):
raise FileNotFoundError(f"Couldn't find image at path {image}")
if not isinstance(image, list):
image = [image]
image_pts = []
mask_pts = []
for img in image:
image_pt, mask_pt = self.image_processor(img, return_mask=True)
image_pts.append(image_pt)
mask_pts.append(mask_pt)
image_pts = torch.cat(image_pts, dim=0).to(self.device, dtype=self.dtype)
if mask_pts[0] is not None:
mask_pts = torch.cat(mask_pts, dim=0).to(self.device, dtype=self.dtype)
else:
mask_pts = None
return image_pts, mask_pts
def get_guidance_scale_embedding(self, w, embedding_dim=512, dtype=torch.float32):
"""
See https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
Args:
timesteps (`torch.Tensor`):
generate embedding vectors at these timesteps
embedding_dim (`int`, *optional*, defaults to 512):
dimension of the embeddings to generate
dtype:
data type of the generated embeddings
Returns:
`torch.FloatTensor`: Embedding vectors with shape `(len(timesteps), embedding_dim)`
"""
assert len(w.shape) == 1
w = w * 1000.0
half_dim = embedding_dim // 2
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
emb = w.to(dtype)[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
assert emb.shape == (w.shape[0], embedding_dim)
return emb
@torch.no_grad()
def __call__(
self,
image: Union[str, List[str], Image.Image] = None,
num_inference_steps: int = 50,
timesteps: List[int] = None,
sigmas: List[float] = None,
eta: float = 0.0,
guidance_scale: float = 7.5,
dual_guidance_scale: float = 10.5,
dual_guidance: bool = True,
generator=None,
box_v=1.01,
octree_resolution=384,
mc_level=-1 / 512,
num_chunks=8000,
mc_algo='mc',
output_type: Optional[str] = "trimesh",
enable_pbar=True,
**kwargs,
) -> List[List[trimesh.Trimesh]]:
callback = kwargs.pop("callback", None)
callback_steps = kwargs.pop("callback_steps", None)
device = self.device
dtype = self.dtype
do_classifier_free_guidance = guidance_scale >= 0 and \
getattr(self.model, 'guidance_cond_proj_dim', None) is None
dual_guidance = dual_guidance_scale >= 0 and dual_guidance
image, mask = self.prepare_image(image)
cond = self.encode_cond(image=image,
mask=mask,
do_classifier_free_guidance=do_classifier_free_guidance,
dual_guidance=dual_guidance)
batch_size = image.shape[0]
t_dtype = torch.long
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler, num_inference_steps, device, timesteps, sigmas)
latents = self.prepare_latents(batch_size, dtype, device, generator)
extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)
guidance_cond = None
if getattr(self.model, 'guidance_cond_proj_dim', None) is not None:
print('Using lcm guidance scale')
guidance_scale_tensor = torch.tensor(guidance_scale - 1).repeat(batch_size)
guidance_cond = self.get_guidance_scale_embedding(
guidance_scale_tensor, embedding_dim=self.model.guidance_cond_proj_dim
).to(device=device, dtype=latents.dtype)
for i, t in enumerate(tqdm(timesteps, disable=not enable_pbar, desc="Diffusion Sampling:", leave=False)):
# expand the latents if we are doing classifier free guidance
if do_classifier_free_guidance:
latent_model_input = torch.cat([latents] * (3 if dual_guidance else 2))
else:
latent_model_input = latents
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
# predict the noise residual
timestep_tensor = torch.tensor([t], dtype=t_dtype, device=device)
timestep_tensor = timestep_tensor.expand(latent_model_input.shape[0])
noise_pred = self.model(latent_model_input, timestep_tensor, cond, guidance_cond=guidance_cond)
# no drop, drop clip, all drop
if do_classifier_free_guidance:
if dual_guidance:
noise_pred_clip, noise_pred_dino, noise_pred_uncond = noise_pred.chunk(3)
noise_pred = (
noise_pred_uncond
+ guidance_scale * (noise_pred_clip - noise_pred_dino)
+ dual_guidance_scale * (noise_pred_dino - noise_pred_uncond)
)
else:
noise_pred_cond, noise_pred_uncond = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
outputs = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)
latents = outputs.prev_sample
if callback is not None and i % callback_steps == 0:
step_idx = i // getattr(self.scheduler, "order", 1)
callback(step_idx, t, outputs)
return self._export(
latents,
output_type,
box_v, mc_level, num_chunks, octree_resolution, mc_algo,
)
def _export(self, latents, output_type, box_v, mc_level, num_chunks, octree_resolution, mc_algo):
if not output_type == "latent":
latents = 1. / self.vae.scale_factor * latents
latents = self.vae(latents)
outputs = self.vae.latents2mesh(
latents,
bounds=box_v,
mc_level=mc_level,
num_chunks=num_chunks,
octree_resolution=octree_resolution,
mc_algo=mc_algo,
)
else:
outputs = latents
if output_type == 'trimesh':
outputs = export_to_trimesh(outputs)
return outputs
class Hunyuan3DDiTFlowMatchingPipeline(Hunyuan3DDiTPipeline):
@torch.no_grad()
def __call__(
self,
image: Union[str, List[str], Image.Image] = None,
num_inference_steps: int = 50,
timesteps: List[int] = None,
sigmas: List[float] = None,
eta: float = 0.0,
guidance_scale: float = 7.5,
generator=None,
box_v=1.01,
octree_resolution=384,
mc_level=0.0,
mc_algo='mc',
num_chunks=8000,
output_type: Optional[str] = "trimesh",
enable_pbar=True,
**kwargs,
) -> List[List[trimesh.Trimesh]]:
callback = kwargs.pop("callback", None)
callback_steps = kwargs.pop("callback_steps", None)
device = self.device
dtype = self.dtype
do_classifier_free_guidance = guidance_scale >= 0 and not (
hasattr(self.model, 'guidance_embed') and
self.model.guidance_embed is True
)
image, mask = self.prepare_image(image)
cond = self.encode_cond(
image=image,
mask=mask,
do_classifier_free_guidance=do_classifier_free_guidance,
dual_guidance=False,
)
batch_size = image.shape[0]
# 5. Prepare timesteps
# NOTE: this is slightly different from common usage, we start from 0.
sigmas = np.linspace(0, 1, num_inference_steps) if sigmas is None else sigmas
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
sigmas=sigmas,
)
latents = self.prepare_latents(batch_size, dtype, device, generator)
guidance = None
if hasattr(self.model, 'guidance_embed') and \
self.model.guidance_embed is True:
guidance = torch.tensor([guidance_scale] * batch_size, device=device, dtype=dtype)
print(f'Using guidance embed with scale {guidance_scale}')
for i, t in enumerate(tqdm(timesteps, disable=not enable_pbar, desc="Diffusion Sampling:")):
# expand the latents if we are doing classifier free guidance
if do_classifier_free_guidance:
latent_model_input = torch.cat([latents] * 2)
else:
latent_model_input = latents
# NOTE: we assume model get timesteps ranged from 0 to 1
timestep = t.expand(latent_model_input.shape[0]).to(
latents.dtype) / self.scheduler.config.num_train_timesteps
noise_pred = self.model(latent_model_input, timestep, cond, guidance=guidance)
if do_classifier_free_guidance:
noise_pred_cond, noise_pred_uncond = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond)
# compute the previous noisy sample x_t -> x_t-1
outputs = self.scheduler.step(noise_pred, t, latents)
latents = outputs.prev_sample
if callback is not None and i % callback_steps == 0:
step_idx = i // getattr(self.scheduler, "order", 1)
callback(step_idx, t, outputs)
return self._export(
latents,
output_type,
box_v, mc_level, num_chunks, octree_resolution, mc_algo,
)
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
import os
import tempfile
from typing import Union
import pymeshlab
import trimesh
from .models.vae import Latent2MeshOutput
def load_mesh(path):
if path.endswith(".glb"):
mesh = trimesh.load(path)
else:
mesh = pymeshlab.MeshSet()
mesh.load_new_mesh(path)
return mesh
def reduce_face(mesh: pymeshlab.MeshSet, max_facenum: int = 200000):
mesh.apply_filter(
"meshing_decimation_quadric_edge_collapse",
targetfacenum=max_facenum,
qualitythr=1.0,
preserveboundary=True,
boundaryweight=3,
preservenormal=True,
preservetopology=True,
autoclean=True
)
return mesh
def remove_floater(mesh: pymeshlab.MeshSet):
mesh.apply_filter("compute_selection_by_small_disconnected_components_per_face",
nbfaceratio=0.005)
mesh.apply_filter("compute_selection_transfer_face_to_vertex", inclusive=False)
mesh.apply_filter("meshing_remove_selected_vertices_and_faces")
return mesh
def pymeshlab2trimesh(mesh: pymeshlab.MeshSet):
temp_file = tempfile.NamedTemporaryFile(suffix='.ply', delete=True)
temp_file.close()
temp_file_name = temp_file.name
mesh.save_current_mesh(temp_file_name)
mesh = trimesh.load(temp_file_name)
if os.path.exists(temp_file_name):
os.remove(temp_file_name)
# 检查加载的对象类型
if isinstance(mesh, trimesh.Scene):
combined_mesh = trimesh.Trimesh()
# 如果是Scene,遍历所有的geometry并合并
for geom in mesh.geometry.values():
combined_mesh = trimesh.util.concatenate([combined_mesh, geom])
mesh = combined_mesh
return mesh
def trimesh2pymeshlab(mesh: trimesh.Trimesh):
temp_file = tempfile.NamedTemporaryFile(suffix='.ply', delete=True)
temp_file.close()
temp_file_name = temp_file.name
if isinstance(mesh, trimesh.scene.Scene):
for idx, obj in enumerate(mesh.geometry.values()):
if idx == 0:
temp_mesh = obj
else:
temp_mesh = temp_mesh + obj
mesh = temp_mesh
mesh.export(temp_file_name)
mesh = pymeshlab.MeshSet()
mesh.load_new_mesh(temp_file_name)
if os.path.exists(temp_file_name):
os.remove(temp_file_name)
return mesh
def export_mesh(input, output):
if isinstance(input, pymeshlab.MeshSet):
mesh = output
elif isinstance(input, Latent2MeshOutput):
output = Latent2MeshOutput()
output.mesh_v = output.current_mesh().vertex_matrix()
output.mesh_f = output.current_mesh().face_matrix()
mesh = output
else:
mesh = pymeshlab2trimesh(output)
return mesh
def import_mesh(mesh: Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput, str]) -> pymeshlab.MeshSet:
if isinstance(mesh, str):
mesh = load_mesh(mesh)
elif isinstance(mesh, Latent2MeshOutput):
mesh = pymeshlab.MeshSet()
mesh_pymeshlab = pymeshlab.Mesh(vertex_matrix=mesh.mesh_v, face_matrix=mesh.mesh_f)
mesh.add_mesh(mesh_pymeshlab, "converted_mesh")
if isinstance(mesh, (trimesh.Trimesh, trimesh.scene.Scene)):
mesh = trimesh2pymeshlab(mesh)
return mesh
class FaceReducer:
def __call__(
self,
mesh: Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput, str],
max_facenum: int = 150000
) -> Union[pymeshlab.MeshSet, trimesh.Trimesh]:
ms = import_mesh(mesh)
ms = reduce_face(ms, max_facenum=max_facenum)
mesh = export_mesh(mesh, ms)
return mesh
class FloaterRemover:
def __call__(
self,
mesh: Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput, str],
) -> Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput]:
ms = import_mesh(mesh)
ms = remove_floater(ms)
mesh = export_mesh(mesh, ms)
return mesh
class DegenerateFaceRemover:
def __call__(
self,
mesh: Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput, str],
) -> Union[pymeshlab.MeshSet, trimesh.Trimesh, Latent2MeshOutput]:
ms = import_mesh(mesh)
temp_file = tempfile.NamedTemporaryFile(suffix='.ply', delete=True)
temp_file.close()
temp_file_name = temp_file.name
ms.save_current_mesh(temp_file_name)
ms = pymeshlab.MeshSet()
ms.load_new_mesh(temp_file_name)
if os.path.exists(temp_file_name):
os.remove(temp_file_name)
mesh = export_mesh(mesh, ms)
return mesh
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
import cv2
import numpy as np
import torch
from PIL import Image
from einops import repeat, rearrange
def array_to_tensor(np_array):
image_pt = torch.tensor(np_array).float()
image_pt = image_pt / 255 * 2 - 1
image_pt = rearrange(image_pt, "h w c -> c h w")
image_pts = repeat(image_pt, "c h w -> b c h w", b=1)
return image_pts
class ImageProcessorV2:
def __init__(self, size=512, border_ratio=None):
self.size = size
self.border_ratio = border_ratio
@staticmethod
def recenter(image, border_ratio: float = 0.2):
""" recenter an image to leave some empty space at the image border.
Args:
image (ndarray): input image, float/uint8 [H, W, 3/4]
mask (ndarray): alpha mask, bool [H, W]
border_ratio (float, optional): border ratio, image will be resized to (1 - border_ratio). Defaults to 0.2.
Returns:
ndarray: output image, float/uint8 [H, W, 3/4]
"""
if image.shape[-1] == 4:
mask = image[..., 3]
else:
mask = np.ones_like(image[..., 0:1]) * 255
image = np.concatenate([image, mask], axis=-1)
mask = mask[..., 0]
H, W, C = image.shape
size = max(H, W)
result = np.zeros((size, size, C), dtype=np.uint8)
coords = np.nonzero(mask)
x_min, x_max = coords[0].min(), coords[0].max()
y_min, y_max = coords[1].min(), coords[1].max()
h = x_max - x_min
w = y_max - y_min
if h == 0 or w == 0:
raise ValueError('input image is empty')
desired_size = int(size * (1 - border_ratio))
scale = desired_size / max(h, w)
h2 = int(h * scale)
w2 = int(w * scale)
x2_min = (size - h2) // 2
x2_max = x2_min + h2
y2_min = (size - w2) // 2
y2_max = y2_min + w2
result[x2_min:x2_max, y2_min:y2_max] = cv2.resize(image[x_min:x_max, y_min:y_max], (w2, h2),
interpolation=cv2.INTER_AREA)
bg = np.ones((result.shape[0], result.shape[1], 3), dtype=np.uint8) * 255
# bg = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8) * 255
mask = result[..., 3:].astype(np.float32) / 255
result = result[..., :3] * mask + bg * (1 - mask)
mask = mask * 255
result = result.clip(0, 255).astype(np.uint8)
mask = mask.clip(0, 255).astype(np.uint8)
return result, mask
def __call__(self, image, border_ratio=0.15, to_tensor=True, return_mask=False, **kwargs):
if self.border_ratio is not None:
border_ratio = self.border_ratio
print(f"Using border_ratio from init: {border_ratio}")
if isinstance(image, str):
image = cv2.imread(image, cv2.IMREAD_UNCHANGED)
image, mask = self.recenter(image, border_ratio=border_ratio)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif isinstance(image, Image.Image):
image = np.asarray(image)
image, mask = self.recenter(image, border_ratio=border_ratio)
image = cv2.resize(image, (self.size, self.size), interpolation=cv2.INTER_CUBIC)
mask = cv2.resize(mask, (self.size, self.size), interpolation=cv2.INTER_NEAREST)
mask = mask[..., np.newaxis]
if to_tensor:
image = array_to_tensor(image)
mask = array_to_tensor(mask)
if return_mask:
return image, mask
return image
IMAGE_PROCESSORS = {
"v2": ImageProcessorV2,
}
DEFAULT_IMAGEPROCESSOR = 'v2'
# Copyright 2024 Stability AI, Katherine Crowson and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
import numpy as np
import torch
from diffusers.configuration_utils import ConfigMixin, register_to_config
from diffusers.schedulers.scheduling_utils import SchedulerMixin
from diffusers.utils import BaseOutput, logging
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@dataclass
class FlowMatchEulerDiscreteSchedulerOutput(BaseOutput):
"""
Output class for the scheduler's `step` function output.
Args:
prev_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images):
Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
denoising loop.
"""
prev_sample: torch.FloatTensor
class FlowMatchEulerDiscreteScheduler(SchedulerMixin, ConfigMixin):
"""
NOTE: this is very similar to diffusers.FlowMatchEulerDiscreteScheduler. Except our timesteps are reversed
Euler scheduler.
This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
methods the library implements for all schedulers such as loading and saving.
Args:
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
timestep_spacing (`str`, defaults to `"linspace"`):
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
shift (`float`, defaults to 1.0):
The shift value for the timestep schedule.
"""
_compatibles = []
order = 1
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1000,
shift: float = 1.0,
use_dynamic_shifting=False,
):
timesteps = np.linspace(1, num_train_timesteps, num_train_timesteps, dtype=np.float32).copy()
timesteps = torch.from_numpy(timesteps).to(dtype=torch.float32)
sigmas = timesteps / num_train_timesteps
if not use_dynamic_shifting:
# when use_dynamic_shifting is True, we apply the timestep shifting on the fly based on the image resolution
sigmas = shift * sigmas / (1 + (shift - 1) * sigmas)
self.timesteps = sigmas * num_train_timesteps
self._step_index = None
self._begin_index = None
self.sigmas = sigmas.to("cpu") # to avoid too much CPU/GPU communication
self.sigma_min = self.sigmas[-1].item()
self.sigma_max = self.sigmas[0].item()
@property
def step_index(self):
"""
The index counter for current timestep. It will increase 1 after each scheduler step.
"""
return self._step_index
@property
def begin_index(self):
"""
The index for the first timestep. It should be set from pipeline with `set_begin_index` method.
"""
return self._begin_index
# Copied from diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index
def set_begin_index(self, begin_index: int = 0):
"""
Sets the begin index for the scheduler. This function should be run from pipeline before the inference.
Args:
begin_index (`int`):
The begin index for the scheduler.
"""
self._begin_index = begin_index
def scale_noise(
self,
sample: torch.FloatTensor,
timestep: Union[float, torch.FloatTensor],
noise: Optional[torch.FloatTensor] = None,
) -> torch.FloatTensor:
"""
Forward process in flow-matching
Args:
sample (`torch.FloatTensor`):
The input sample.
timestep (`int`, *optional*):
The current timestep in the diffusion chain.
Returns:
`torch.FloatTensor`:
A scaled input sample.
"""
# Make sure sigmas and timesteps have the same device and dtype as original_samples
sigmas = self.sigmas.to(device=sample.device, dtype=sample.dtype)
if sample.device.type == "mps" and torch.is_floating_point(timestep):
# mps does not support float64
schedule_timesteps = self.timesteps.to(sample.device, dtype=torch.float32)
timestep = timestep.to(sample.device, dtype=torch.float32)
else:
schedule_timesteps = self.timesteps.to(sample.device)
timestep = timestep.to(sample.device)
# self.begin_index is None when scheduler is used for training, or pipeline does not implement set_begin_index
if self.begin_index is None:
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timestep]
elif self.step_index is not None:
# add_noise is called after first denoising step (for inpainting)
step_indices = [self.step_index] * timestep.shape[0]
else:
# add noise is called before first denoising step to create initial latent(img2img)
step_indices = [self.begin_index] * timestep.shape[0]
sigma = sigmas[step_indices].flatten()
while len(sigma.shape) < len(sample.shape):
sigma = sigma.unsqueeze(-1)
sample = sigma * noise + (1.0 - sigma) * sample
return sample
def _sigma_to_t(self, sigma):
return sigma * self.config.num_train_timesteps
def time_shift(self, mu: float, sigma: float, t: torch.Tensor):
return math.exp(mu) / (math.exp(mu) + (1 / t - 1) ** sigma)
def set_timesteps(
self,
num_inference_steps: int = None,
device: Union[str, torch.device] = None,
sigmas: Optional[List[float]] = None,
mu: Optional[float] = None,
):
"""
Sets the discrete timesteps used for the diffusion chain (to be run before inference).
Args:
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
"""
if self.config.use_dynamic_shifting and mu is None:
raise ValueError(" you have a pass a value for `mu` when `use_dynamic_shifting` is set to be `True`")
if sigmas is None:
self.num_inference_steps = num_inference_steps
timesteps = np.linspace(
self._sigma_to_t(self.sigma_max), self._sigma_to_t(self.sigma_min), num_inference_steps
)
sigmas = timesteps / self.config.num_train_timesteps
if self.config.use_dynamic_shifting:
sigmas = self.time_shift(mu, 1.0, sigmas)
else:
sigmas = self.config.shift * sigmas / (1 + (self.config.shift - 1) * sigmas)
sigmas = torch.from_numpy(sigmas).to(dtype=torch.float32, device=device)
timesteps = sigmas * self.config.num_train_timesteps
self.timesteps = timesteps.to(device=device)
self.sigmas = torch.cat([sigmas, torch.ones(1, device=sigmas.device)])
self._step_index = None
self._begin_index = None
def index_for_timestep(self, timestep, schedule_timesteps=None):
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
indices = (schedule_timesteps == timestep).nonzero()
# The sigma index that is taken for the **very** first `step`
# is always the second index (or the last index if there is only 1)
# This way we can ensure we don't accidentally skip a sigma in
# case we start in the middle of the denoising schedule (e.g. for image-to-image)
pos = 1 if len(indices) > 1 else 0
return indices[pos].item()
def _init_step_index(self, timestep):
if self.begin_index is None:
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
self._step_index = self.index_for_timestep(timestep)
else:
self._step_index = self._begin_index
def step(
self,
model_output: torch.FloatTensor,
timestep: Union[float, torch.FloatTensor],
sample: torch.FloatTensor,
s_churn: float = 0.0,
s_tmin: float = 0.0,
s_tmax: float = float("inf"),
s_noise: float = 1.0,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
) -> Union[FlowMatchEulerDiscreteSchedulerOutput, Tuple]:
"""
Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
process from the learned model outputs (most often the predicted noise).
Args:
model_output (`torch.FloatTensor`):
The direct output from learned diffusion model.
timestep (`float`):
The current discrete timestep in the diffusion chain.
sample (`torch.FloatTensor`):
A current instance of a sample created by the diffusion process.
s_churn (`float`):
s_tmin (`float`):
s_tmax (`float`):
s_noise (`float`, defaults to 1.0):
Scaling factor for noise added to the sample.
generator (`torch.Generator`, *optional*):
A random number generator.
return_dict (`bool`):
Whether or not to return a [`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] or
tuple.
Returns:
[`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] or `tuple`:
If return_dict is `True`, [`~schedulers.scheduling_euler_discrete.EulerDiscreteSchedulerOutput`] is
returned, otherwise a tuple is returned where the first element is the sample tensor.
"""
if (
isinstance(timestep, int)
or isinstance(timestep, torch.IntTensor)
or isinstance(timestep, torch.LongTensor)
):
raise ValueError(
(
"Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to"
" `EulerDiscreteScheduler.step()` is not supported. Make sure to pass"
" one of the `scheduler.timesteps` as a timestep."
),
)
if self.step_index is None:
self._init_step_index(timestep)
# Upcast to avoid precision issues when computing prev_sample
sample = sample.to(torch.float32)
sigma = self.sigmas[self.step_index]
sigma_next = self.sigmas[self.step_index + 1]
prev_sample = sample + (sigma_next - sigma) * model_output
# Cast sample back to model compatible dtype
prev_sample = prev_sample.to(model_output.dtype)
# upon completion increase step index by one
self._step_index += 1
if not return_dict:
return (prev_sample,)
return FlowMatchEulerDiscreteSchedulerOutput(prev_sample=prev_sample)
def __len__(self):
return self.config.num_train_timesteps
# Open Source Model Licensed under the Apache License Version 2.0
# and Other Licenses of the Third-Party Components therein:
# The below Model in this distribution may have been modified by THL A29 Limited
# ("Tencent Modifications"). All Tencent Modifications are Copyright (C) 2024 THL A29 Limited.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# The below software and/or models in this distribution may have been
# modified by THL A29 Limited ("Tencent Modifications").
# All Tencent Modifications are Copyright (C) THL A29 Limited.
# Hunyuan 3D is licensed under the TENCENT HUNYUAN NON-COMMERCIAL LICENSE AGREEMENT
# except for the third-party components listed below.
# Hunyuan 3D does not impose any additional limitations beyond what is outlined
# in the repsective licenses of these third-party components.
# Users must comply with all terms and conditions of original licenses of these third-party
# components and must ensure that the usage of the third party components adheres to
# all relevant laws and regulations.
# For avoidance of doubts, Hunyuan 3D means the large language models and
# their software and algorithms, including trained model weights, parameters (including
# optimizer states), machine-learning model code, inference-enabling code, training-enabling code,
# fine-tuning enabling code and other elements of the foregoing made publicly available
# by Tencent in accordance with TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT.
from .pipelines import Hunyuan3DPaintPipeline, Hunyuan3DTexGenConfig
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment