Commit 109f0842 authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #2847 failed with stages
in 0 seconds
# Copyright (c) 2024, Tri Dao.
# Implement dropout + residual + layer_norm / rms_norm.
# Based on the Triton LayerNorm tutorial: https://triton-lang.org/main/getting-started/tutorials/05-layer-norm.html
# For the backward pass, we keep weight_grad and bias_grad in registers and accumulate.
# This is faster for dimensions up to 8k, but after that it's much slower due to register spilling.
# The models we train have hidden dim up to 8k anyway (e.g. Llama 70B), so this is fine.
import math
import torch
import torch.nn.functional as F
import triton
import triton.language as tl
from typing import Callable
from flash_attn.ops.rms_norm import RMSNorm
def custom_amp_decorator(dec: Callable, cuda_amp_deprecated: bool):
def decorator(*args, **kwargs):
if cuda_amp_deprecated:
kwargs["device_type"] = "cuda"
return dec(*args, **kwargs)
return decorator
if hasattr(torch.amp, "custom_fwd"): # type: ignore[attr-defined]
deprecated = True
from torch.amp import custom_fwd, custom_bwd # type: ignore[attr-defined]
else:
deprecated = False
from torch.cuda.amp import custom_fwd, custom_bwd
custom_fwd = custom_amp_decorator(custom_fwd, deprecated)
custom_bwd = custom_amp_decorator(custom_bwd, deprecated)
def triton_autotune_configs():
# Return configs with a valid warp count for the current device
configs=[]
# Maximum threads per block is architecture-dependent in theory, but in reality all are 1024
max_threads_per_block=1024
# Default to warp size 32 if not defined by device
warp_size=getattr(torch.cuda.get_device_properties(torch.cuda.current_device()), "warp_size", 32)
# Autotune for warp counts which are powers of 2 and do not exceed thread per block limit
warp_count=1
while warp_count*warp_size <= max_threads_per_block:
configs.append(triton.Config({}, num_warps=warp_count))
warp_count*=2
return configs
def layer_norm_ref(
x,
weight,
bias,
residual=None,
x1=None,
weight1=None,
bias1=None,
eps=1e-6,
dropout_p=0.0,
rowscale=None,
prenorm=False,
zero_centered_weight=False,
dropout_mask=None,
dropout_mask1=None,
upcast=False,
):
dtype = x.dtype
if upcast:
x = x.float()
weight = weight.float()
bias = bias.float() if bias is not None else None
residual = residual.float() if residual is not None else residual
x1 = x1.float() if x1 is not None else None
weight1 = weight1.float() if weight1 is not None else None
bias1 = bias1.float() if bias1 is not None else None
if zero_centered_weight:
weight = weight + 1.0
if weight1 is not None:
weight1 = weight1 + 1.0
if x1 is not None:
assert rowscale is None, "rowscale is not supported with parallel LayerNorm"
if rowscale is not None:
x = x * rowscale[..., None]
if dropout_p > 0.0:
if dropout_mask is not None:
x = x.masked_fill(~dropout_mask, 0.0) / (1.0 - dropout_p)
else:
x = F.dropout(x, p=dropout_p)
if x1 is not None:
if dropout_mask1 is not None:
x1 = x1.masked_fill(~dropout_mask1, 0.0) / (1.0 - dropout_p)
else:
x1 = F.dropout(x1, p=dropout_p)
if x1 is not None:
x = x + x1
if residual is not None:
x = (x + residual).to(x.dtype)
out = F.layer_norm(x.to(weight.dtype), x.shape[-1:], weight=weight, bias=bias, eps=eps).to(
dtype
)
if weight1 is None:
return out if not prenorm else (out, x)
else:
out1 = F.layer_norm(
x.to(weight1.dtype), x.shape[-1:], weight=weight1, bias=bias1, eps=eps
).to(dtype)
return (out, out1) if not prenorm else (out, out1, x)
def rms_norm_ref(
x,
weight,
bias,
residual=None,
x1=None,
weight1=None,
bias1=None,
eps=1e-6,
dropout_p=0.0,
rowscale=None,
prenorm=False,
zero_centered_weight=False,
dropout_mask=None,
dropout_mask1=None,
upcast=False,
):
dtype = x.dtype
if upcast:
x = x.float()
weight = weight.float()
bias = bias.float() if bias is not None else None
residual = residual.float() if residual is not None else residual
x1 = x1.float() if x1 is not None else None
weight1 = weight1.float() if weight1 is not None else None
bias1 = bias1.float() if bias1 is not None else None
if zero_centered_weight:
weight = weight + 1.0
if weight1 is not None:
weight1 = weight1 + 1.0
if x1 is not None:
assert rowscale is None, "rowscale is not supported with parallel LayerNorm"
if rowscale is not None:
x = x * rowscale[..., None]
if dropout_p > 0.0:
if dropout_mask is not None:
x = x.masked_fill(~dropout_mask, 0.0) / (1.0 - dropout_p)
else:
x = F.dropout(x, p=dropout_p)
if x1 is not None:
if dropout_mask1 is not None:
x1 = x1.masked_fill(~dropout_mask1, 0.0) / (1.0 - dropout_p)
else:
x1 = F.dropout(x1, p=dropout_p)
if x1 is not None:
x = x + x1
if residual is not None:
x = (x + residual).to(x.dtype)
rstd = 1 / torch.sqrt((x.square()).mean(dim=-1, keepdim=True) + eps)
out = ((x * rstd * weight) + bias if bias is not None else (x * rstd * weight)).to(dtype)
if weight1 is None:
return out if not prenorm else (out, x)
else:
out1 = ((x * rstd * weight1) + bias1 if bias1 is not None else (x * rstd * weight1)).to(
dtype
)
return (out, out1) if not prenorm else (out, out1, x)
@triton.autotune(
configs=triton_autotune_configs(),
key=["N", "HAS_RESIDUAL", "STORE_RESIDUAL_OUT", "IS_RMS_NORM", "HAS_BIAS"],
)
# @triton.heuristics({"HAS_BIAS": lambda args: args["B"] is not None})
# @triton.heuristics({"HAS_RESIDUAL": lambda args: args["RESIDUAL"] is not None})
@triton.heuristics({"HAS_X1": lambda args: args["X1"] is not None})
@triton.heuristics({"HAS_W1": lambda args: args["W1"] is not None})
@triton.heuristics({"HAS_B1": lambda args: args["B1"] is not None})
@triton.jit
def _layer_norm_fwd_1pass_kernel(
X, # pointer to the input
Y, # pointer to the output
W, # pointer to the weights
B, # pointer to the biases
RESIDUAL, # pointer to the residual
X1,
W1,
B1,
Y1,
RESIDUAL_OUT, # pointer to the residual
ROWSCALE,
SEEDS, # Dropout seeds for each row
DROPOUT_MASK,
Mean, # pointer to the mean
Rstd, # pointer to the 1/std
stride_x_row, # how much to increase the pointer when moving by 1 row
stride_y_row,
stride_res_row,
stride_res_out_row,
stride_x1_row,
stride_y1_row,
M, # number of rows in X
N, # number of columns in X
eps, # epsilon to avoid division by zero
dropout_p, # Dropout probability
zero_centered_weight, # If true, add 1.0 to the weight
IS_RMS_NORM: tl.constexpr,
BLOCK_N: tl.constexpr,
HAS_RESIDUAL: tl.constexpr,
STORE_RESIDUAL_OUT: tl.constexpr,
HAS_BIAS: tl.constexpr,
HAS_DROPOUT: tl.constexpr,
STORE_DROPOUT_MASK: tl.constexpr,
HAS_ROWSCALE: tl.constexpr,
HAS_X1: tl.constexpr,
HAS_W1: tl.constexpr,
HAS_B1: tl.constexpr,
):
# Map the program id to the row of X and Y it should compute.
row = tl.program_id(0)
X += row * stride_x_row
Y += row * stride_y_row
if HAS_RESIDUAL:
RESIDUAL += row * stride_res_row
if STORE_RESIDUAL_OUT:
RESIDUAL_OUT += row * stride_res_out_row
if HAS_X1:
X1 += row * stride_x1_row
if HAS_W1:
Y1 += row * stride_y1_row
# Compute mean and variance
cols = tl.arange(0, BLOCK_N)
x = tl.load(X + cols, mask=cols < N, other=0.0).to(tl.float32)
if HAS_ROWSCALE:
rowscale = tl.load(ROWSCALE + row).to(tl.float32)
x *= rowscale
if HAS_DROPOUT:
# Compute dropout mask
# 7 rounds is good enough, and reduces register pressure
keep_mask = tl.rand(tl.load(SEEDS + row).to(tl.uint32), cols, n_rounds=7) > dropout_p
x = tl.where(keep_mask, x / (1.0 - dropout_p), 0.0)
if STORE_DROPOUT_MASK:
tl.store(DROPOUT_MASK + row * N + cols, keep_mask, mask=cols < N)
if HAS_X1:
x1 = tl.load(X1 + cols, mask=cols < N, other=0.0).to(tl.float32)
if HAS_ROWSCALE:
rowscale = tl.load(ROWSCALE + M + row).to(tl.float32)
x1 *= rowscale
if HAS_DROPOUT:
# Compute dropout mask
# 7 rounds is good enough, and reduces register pressure
keep_mask = (
tl.rand(tl.load(SEEDS + M + row).to(tl.uint32), cols, n_rounds=7) > dropout_p
)
x1 = tl.where(keep_mask, x1 / (1.0 - dropout_p), 0.0)
if STORE_DROPOUT_MASK:
tl.store(DROPOUT_MASK + (M + row) * N + cols, keep_mask, mask=cols < N)
x += x1
if HAS_RESIDUAL:
residual = tl.load(RESIDUAL + cols, mask=cols < N, other=0.0).to(tl.float32)
x += residual
if STORE_RESIDUAL_OUT:
tl.store(RESIDUAL_OUT + cols, x, mask=cols < N)
if not IS_RMS_NORM:
mean = tl.sum(x, axis=0) / N
tl.store(Mean + row, mean)
xbar = tl.where(cols < N, x - mean, 0.0)
var = tl.sum(xbar * xbar, axis=0) / N
else:
xbar = tl.where(cols < N, x, 0.0)
var = tl.sum(xbar * xbar, axis=0) / N
rstd = 1 / tl.sqrt(var + eps)
tl.store(Rstd + row, rstd)
# Normalize and apply linear transformation
mask = cols < N
w = tl.load(W + cols, mask=mask).to(tl.float32)
if zero_centered_weight:
w += 1.0
if HAS_BIAS:
b = tl.load(B + cols, mask=mask).to(tl.float32)
x_hat = (x - mean) * rstd if not IS_RMS_NORM else x * rstd
y = x_hat * w + b if HAS_BIAS else x_hat * w
# Write output
tl.store(Y + cols, y, mask=mask)
if HAS_W1:
w1 = tl.load(W1 + cols, mask=mask).to(tl.float32)
if zero_centered_weight:
w1 += 1.0
if HAS_B1:
b1 = tl.load(B1 + cols, mask=mask).to(tl.float32)
y1 = x_hat * w1 + b1 if HAS_B1 else x_hat * w1
tl.store(Y1 + cols, y1, mask=mask)
def _layer_norm_fwd(
x,
weight,
bias,
eps,
residual=None,
x1=None,
weight1=None,
bias1=None,
dropout_p=0.0,
rowscale=None,
out_dtype=None,
residual_dtype=None,
zero_centered_weight=False,
is_rms_norm=False,
return_dropout_mask=False,
out=None,
residual_out=None
):
if residual is not None:
residual_dtype = residual.dtype
M, N = x.shape
assert x.stride(-1) == 1
if residual is not None:
assert residual.stride(-1) == 1
assert residual.shape == (M, N)
assert weight.shape == (N,)
assert weight.stride(-1) == 1
if bias is not None:
assert bias.stride(-1) == 1
assert bias.shape == (N,)
if x1 is not None:
assert x1.shape == x.shape
assert rowscale is None
assert x1.stride(-1) == 1
if weight1 is not None:
assert weight1.shape == (N,)
assert weight1.stride(-1) == 1
if bias1 is not None:
assert bias1.shape == (N,)
assert bias1.stride(-1) == 1
if rowscale is not None:
assert rowscale.is_contiguous()
assert rowscale.shape == (M,)
# allocate output
if out is None:
out = torch.empty_like(x, dtype=x.dtype if out_dtype is None else out_dtype)
else:
assert out.shape == x.shape
assert out.stride(-1) == 1
if weight1 is not None:
y1 = torch.empty_like(out)
assert y1.stride(-1) == 1
else:
y1 = None
if (
residual is not None
or (residual_dtype is not None and residual_dtype != x.dtype)
or dropout_p > 0.0
or rowscale is not None
or x1 is not None
):
if residual_out is None:
residual_out = torch.empty(
M, N, device=x.device, dtype=residual_dtype if residual_dtype is not None else x.dtype
)
else:
assert residual_out.shape == x.shape
assert residual_out.stride(-1) == 1
else:
residual_out = None
mean = torch.empty((M,), dtype=torch.float32, device=x.device) if not is_rms_norm else None
rstd = torch.empty((M,), dtype=torch.float32, device=x.device)
if dropout_p > 0.0:
seeds = torch.randint(
2**32, (M if x1 is None else 2 * M,), device=x.device, dtype=torch.int64
)
else:
seeds = None
if return_dropout_mask and dropout_p > 0.0:
dropout_mask = torch.empty(M if x1 is None else 2 * M, N, device=x.device, dtype=torch.bool)
else:
dropout_mask = None
# Less than 64KB per feature: enqueue fused kernel
MAX_FUSED_SIZE = 65536 // x.element_size()
BLOCK_N = min(MAX_FUSED_SIZE, triton.next_power_of_2(N))
if N > BLOCK_N:
raise RuntimeError("This layer norm doesn't support feature dim >= 64KB.")
with torch.cuda.device(x.device.index):
_layer_norm_fwd_1pass_kernel[(M,)](
x,
out,
weight,
bias,
residual,
x1,
weight1,
bias1,
y1,
residual_out,
rowscale,
seeds,
dropout_mask,
mean,
rstd,
x.stride(0),
out.stride(0),
residual.stride(0) if residual is not None else 0,
residual_out.stride(0) if residual_out is not None else 0,
x1.stride(0) if x1 is not None else 0,
y1.stride(0) if y1 is not None else 0,
M,
N,
eps,
dropout_p,
zero_centered_weight,
is_rms_norm,
BLOCK_N,
residual is not None,
residual_out is not None,
bias is not None,
dropout_p > 0.0,
dropout_mask is not None,
rowscale is not None,
)
# residual_out is None if residual is None and residual_dtype == input_dtype and dropout_p == 0.0
if dropout_mask is not None and x1 is not None:
dropout_mask, dropout_mask1 = dropout_mask.tensor_split(2, dim=0)
else:
dropout_mask1 = None
return (
out,
y1,
mean,
rstd,
residual_out if residual_out is not None else x,
seeds,
dropout_mask,
dropout_mask1,
)
@triton.autotune(
configs=triton_autotune_configs(),
key=["N", "HAS_DRESIDUAL", "STORE_DRESIDUAL", "IS_RMS_NORM", "HAS_BIAS", "HAS_DROPOUT"],
)
# @triton.heuristics({"HAS_BIAS": lambda args: args["B"] is not None})
# @triton.heuristics({"HAS_DRESIDUAL": lambda args: args["DRESIDUAL"] is not None})
# @triton.heuristics({"STORE_DRESIDUAL": lambda args: args["DRESIDUAL_IN"] is not None})
@triton.heuristics({"HAS_ROWSCALE": lambda args: args["ROWSCALE"] is not None})
@triton.heuristics({"HAS_DY1": lambda args: args["DY1"] is not None})
@triton.heuristics({"HAS_DX1": lambda args: args["DX1"] is not None})
@triton.heuristics({"HAS_B1": lambda args: args["DB1"] is not None})
@triton.heuristics({"RECOMPUTE_OUTPUT": lambda args: args["Y"] is not None})
@triton.jit
def _layer_norm_bwd_kernel(
X, # pointer to the input
W, # pointer to the weights
B, # pointer to the biases
Y, # pointer to the output to be recomputed
DY, # pointer to the output gradient
DX, # pointer to the input gradient
DW, # pointer to the partial sum of weights gradient
DB, # pointer to the partial sum of biases gradient
DRESIDUAL,
W1,
DY1,
DX1,
DW1,
DB1,
DRESIDUAL_IN,
ROWSCALE,
SEEDS,
Mean, # pointer to the mean
Rstd, # pointer to the 1/std
stride_x_row, # how much to increase the pointer when moving by 1 row
stride_y_row,
stride_dy_row,
stride_dx_row,
stride_dres_row,
stride_dy1_row,
stride_dx1_row,
stride_dres_in_row,
M, # number of rows in X
N, # number of columns in X
eps, # epsilon to avoid division by zero
dropout_p,
zero_centered_weight,
rows_per_program,
IS_RMS_NORM: tl.constexpr,
BLOCK_N: tl.constexpr,
HAS_DRESIDUAL: tl.constexpr,
STORE_DRESIDUAL: tl.constexpr,
HAS_BIAS: tl.constexpr,
HAS_DROPOUT: tl.constexpr,
HAS_ROWSCALE: tl.constexpr,
HAS_DY1: tl.constexpr,
HAS_DX1: tl.constexpr,
HAS_B1: tl.constexpr,
RECOMPUTE_OUTPUT: tl.constexpr,
):
# Map the program id to the elements of X, DX, and DY it should compute.
row_block_id = tl.program_id(0)
row_start = row_block_id * rows_per_program
# Do not early exit if row_start >= M, because we need to write DW and DB
cols = tl.arange(0, BLOCK_N)
mask = cols < N
X += row_start * stride_x_row
if HAS_DRESIDUAL:
DRESIDUAL += row_start * stride_dres_row
if STORE_DRESIDUAL:
DRESIDUAL_IN += row_start * stride_dres_in_row
DY += row_start * stride_dy_row
DX += row_start * stride_dx_row
if HAS_DY1:
DY1 += row_start * stride_dy1_row
if HAS_DX1:
DX1 += row_start * stride_dx1_row
if RECOMPUTE_OUTPUT:
Y += row_start * stride_y_row
w = tl.load(W + cols, mask=mask).to(tl.float32)
if zero_centered_weight:
w += 1.0
if RECOMPUTE_OUTPUT and HAS_BIAS:
b = tl.load(B + cols, mask=mask, other=0.0).to(tl.float32)
if HAS_DY1:
w1 = tl.load(W1 + cols, mask=mask).to(tl.float32)
if zero_centered_weight:
w1 += 1.0
dw = tl.zeros((BLOCK_N,), dtype=tl.float32)
if HAS_BIAS:
db = tl.zeros((BLOCK_N,), dtype=tl.float32)
if HAS_DY1:
dw1 = tl.zeros((BLOCK_N,), dtype=tl.float32)
if HAS_B1:
db1 = tl.zeros((BLOCK_N,), dtype=tl.float32)
row_end = min((row_block_id + 1) * rows_per_program, M)
for row in range(row_start, row_end):
# Load data to SRAM
x = tl.load(X + cols, mask=mask, other=0).to(tl.float32)
dy = tl.load(DY + cols, mask=mask, other=0).to(tl.float32)
if HAS_DY1:
dy1 = tl.load(DY1 + cols, mask=mask, other=0).to(tl.float32)
if not IS_RMS_NORM:
mean = tl.load(Mean + row)
rstd = tl.load(Rstd + row)
# Compute dx
xhat = (x - mean) * rstd if not IS_RMS_NORM else x * rstd
xhat = tl.where(mask, xhat, 0.0)
if RECOMPUTE_OUTPUT:
y = xhat * w + b if HAS_BIAS else xhat * w
tl.store(Y + cols, y, mask=mask)
wdy = w * dy
dw += dy * xhat
if HAS_BIAS:
db += dy
if HAS_DY1:
wdy += w1 * dy1
dw1 += dy1 * xhat
if HAS_B1:
db1 += dy1
if not IS_RMS_NORM:
c1 = tl.sum(xhat * wdy, axis=0) / N
c2 = tl.sum(wdy, axis=0) / N
dx = (wdy - (xhat * c1 + c2)) * rstd
else:
c1 = tl.sum(xhat * wdy, axis=0) / N
dx = (wdy - xhat * c1) * rstd
if HAS_DRESIDUAL:
dres = tl.load(DRESIDUAL + cols, mask=mask, other=0).to(tl.float32)
dx += dres
# Write dx
if STORE_DRESIDUAL:
tl.store(DRESIDUAL_IN + cols, dx, mask=mask)
if HAS_DX1:
if HAS_DROPOUT:
keep_mask = (
tl.rand(tl.load(SEEDS + M + row).to(tl.uint32), cols, n_rounds=7) > dropout_p
)
dx1 = tl.where(keep_mask, dx / (1.0 - dropout_p), 0.0)
else:
dx1 = dx
tl.store(DX1 + cols, dx1, mask=mask)
if HAS_DROPOUT:
keep_mask = tl.rand(tl.load(SEEDS + row).to(tl.uint32), cols, n_rounds=7) > dropout_p
dx = tl.where(keep_mask, dx / (1.0 - dropout_p), 0.0)
if HAS_ROWSCALE:
rowscale = tl.load(ROWSCALE + row).to(tl.float32)
dx *= rowscale
tl.store(DX + cols, dx, mask=mask)
X += stride_x_row
if HAS_DRESIDUAL:
DRESIDUAL += stride_dres_row
if STORE_DRESIDUAL:
DRESIDUAL_IN += stride_dres_in_row
if RECOMPUTE_OUTPUT:
Y += stride_y_row
DY += stride_dy_row
DX += stride_dx_row
if HAS_DY1:
DY1 += stride_dy1_row
if HAS_DX1:
DX1 += stride_dx1_row
tl.store(DW + row_block_id * N + cols, dw, mask=mask)
if HAS_BIAS:
tl.store(DB + row_block_id * N + cols, db, mask=mask)
if HAS_DY1:
tl.store(DW1 + row_block_id * N + cols, dw1, mask=mask)
if HAS_B1:
tl.store(DB1 + row_block_id * N + cols, db1, mask=mask)
def _layer_norm_bwd(
dy,
x,
weight,
bias,
eps,
mean,
rstd,
dresidual=None,
dy1=None,
weight1=None,
bias1=None,
seeds=None,
dropout_p=0.0,
rowscale=None,
has_residual=False,
has_x1=False,
zero_centered_weight=False,
is_rms_norm=False,
x_dtype=None,
recompute_output=False,
):
M, N = x.shape
assert x.stride(-1) == 1
assert dy.stride(-1) == 1
assert dy.shape == (M, N)
if dresidual is not None:
assert dresidual.stride(-1) == 1
assert dresidual.shape == (M, N)
assert weight.shape == (N,)
assert weight.stride(-1) == 1
if bias is not None:
assert bias.stride(-1) == 1
assert bias.shape == (N,)
if dy1 is not None:
assert weight1 is not None
assert dy1.shape == dy.shape
assert dy1.stride(-1) == 1
if weight1 is not None:
assert weight1.shape == (N,)
assert weight1.stride(-1) == 1
if bias1 is not None:
assert bias1.shape == (N,)
assert bias1.stride(-1) == 1
if seeds is not None:
assert seeds.is_contiguous()
assert seeds.shape == (M if not has_x1 else M * 2,)
if rowscale is not None:
assert rowscale.is_contiguous()
assert rowscale.shape == (M,)
# allocate output
dx = (
torch.empty_like(x)
if x_dtype is None
else torch.empty(M, N, dtype=x_dtype, device=x.device)
)
dresidual_in = (
torch.empty_like(x)
if has_residual
and (dx.dtype != x.dtype or dropout_p > 0.0 or rowscale is not None or has_x1)
else None
)
dx1 = torch.empty_like(dx) if (has_x1 and dropout_p > 0.0) else None
y = torch.empty(M, N, dtype=dy.dtype, device=dy.device) if recompute_output else None
if recompute_output:
assert weight1 is None, "recompute_output is not supported with parallel LayerNorm"
# Less than 64KB per feature: enqueue fused kernel
MAX_FUSED_SIZE = 65536 // x.element_size()
BLOCK_N = min(MAX_FUSED_SIZE, triton.next_power_of_2(N))
if N > BLOCK_N:
raise RuntimeError("This layer norm doesn't support feature dim >= 64KB.")
# Increasing the multiple (e.g. 8) will allow more thread blocks to be launched and hide the
# latency of the gmem reads/writes, but will increase the time of summing up dw / db.
sm_count = torch.cuda.get_device_properties(x.device).multi_processor_count * 8
_dw = torch.empty((sm_count, N), dtype=torch.float32, device=weight.device)
_db = (
torch.empty((sm_count, N), dtype=torch.float32, device=bias.device)
if bias is not None
else None
)
_dw1 = torch.empty_like(_dw) if weight1 is not None else None
_db1 = torch.empty_like(_db) if bias1 is not None else None
rows_per_program = math.ceil(M / sm_count)
grid = (sm_count,)
with torch.cuda.device(x.device.index):
_layer_norm_bwd_kernel[grid](
x,
weight,
bias,
y,
dy,
dx,
_dw,
_db,
dresidual,
weight1,
dy1,
dx1,
_dw1,
_db1,
dresidual_in,
rowscale,
seeds,
mean,
rstd,
x.stride(0),
0 if not recompute_output else y.stride(0),
dy.stride(0),
dx.stride(0),
dresidual.stride(0) if dresidual is not None else 0,
dy1.stride(0) if dy1 is not None else 0,
dx1.stride(0) if dx1 is not None else 0,
dresidual_in.stride(0) if dresidual_in is not None else 0,
M,
N,
eps,
dropout_p,
zero_centered_weight,
rows_per_program,
is_rms_norm,
BLOCK_N,
dresidual is not None,
dresidual_in is not None,
bias is not None,
dropout_p > 0.0,
)
dw = _dw.sum(0).to(weight.dtype)
db = _db.sum(0).to(bias.dtype) if bias is not None else None
dw1 = _dw1.sum(0).to(weight1.dtype) if weight1 is not None else None
db1 = _db1.sum(0).to(bias1.dtype) if bias1 is not None else None
# Don't need to compute dresidual_in separately in this case
if has_residual and dx.dtype == x.dtype and dropout_p == 0.0 and rowscale is None:
dresidual_in = dx
if has_x1 and dropout_p == 0.0:
dx1 = dx
return (
(dx, dw, db, dresidual_in, dx1, dw1, db1)
if not recompute_output
else (dx, dw, db, dresidual_in, dx1, dw1, db1, y)
)
class LayerNormFn(torch.autograd.Function):
@staticmethod
def forward(
ctx,
x,
weight,
bias,
residual=None,
x1=None,
weight1=None,
bias1=None,
eps=1e-6,
dropout_p=0.0,
rowscale=None,
prenorm=False,
residual_in_fp32=False,
zero_centered_weight=False,
is_rms_norm=False,
return_dropout_mask=False,
out=None,
residual_out=None
):
x_shape_og = x.shape
# Check for zero sequence length
if x.numel() == 0:
ctx.zero_seq_length = True
# Only save minimal required tensors for backward
# ctx.save_for_backward(weight, bias, weight1, bias1)
ctx.x_shape_og = x_shape_og
ctx.weight_shape = weight.shape
ctx.weight_dtype = weight.dtype
ctx.weight_device = weight.device
ctx.has_bias = bias is not None
ctx.bias_shape = bias.shape if bias is not None else None
ctx.bias_dtype = bias.dtype if bias is not None else None
ctx.bias_device = bias.device if bias is not None else None
ctx.has_weight1 = weight1 is not None
ctx.weight1_shape = weight1.shape if weight1 is not None else None
ctx.weight1_dtype = weight1.dtype if weight1 is not None else None
ctx.weight1_device = weight1.device if weight1 is not None else None
ctx.has_bias1 = bias1 is not None
ctx.bias1_shape = bias1.shape if bias1 is not None else None
ctx.bias1_dtype = bias1.dtype if bias1 is not None else None
ctx.bias1_device = bias1.device if bias1 is not None else None
ctx.has_residual = residual is not None
ctx.has_x1 = x1 is not None
ctx.dropout_p = dropout_p
# Handle output tensors with correct dtype
y = x # Preserve input tensor properties
y1 = torch.empty_like(x) if x1 is not None else None
# Only create residual_out if prenorm is True
residual_out = torch.empty(x.shape,
dtype=torch.float32 if residual_in_fp32 else x.dtype,
device=x.device) if prenorm else None
# Handle dropout masks
dropout_mask = None
dropout_mask1 = None
if return_dropout_mask:
dropout_mask = torch.empty_like(x, dtype=torch.uint8)
if x1 is not None:
dropout_mask1 = torch.empty_like(x, dtype=torch.uint8)
# Return based on configuration
if not return_dropout_mask:
if weight1 is None:
return y if not prenorm else (y, residual_out)
else:
return (y, y1) if not prenorm else (y, y1, residual_out)
else:
if weight1 is None:
return ((y, dropout_mask, dropout_mask1) if not prenorm
else (y, residual_out, dropout_mask, dropout_mask1))
else:
return ((y, y1, dropout_mask, dropout_mask1) if not prenorm
else (y, y1, residual_out, dropout_mask, dropout_mask1))
ctx.zero_seq_length = False
# reshape input data into 2D tensor
x = x.reshape(-1, x.shape[-1])
if x.stride(-1) != 1:
x = x.contiguous()
if residual is not None:
assert residual.shape == x_shape_og
residual = residual.reshape(-1, residual.shape[-1])
if residual.stride(-1) != 1:
residual = residual.contiguous()
if x1 is not None:
assert x1.shape == x_shape_og
assert rowscale is None, "rowscale is not supported with parallel LayerNorm"
x1 = x1.reshape(-1, x1.shape[-1])
if x1.stride(-1) != 1:
x1 = x1.contiguous()
weight = weight.contiguous()
if bias is not None:
bias = bias.contiguous()
if weight1 is not None:
weight1 = weight1.contiguous()
if bias1 is not None:
bias1 = bias1.contiguous()
if rowscale is not None:
rowscale = rowscale.reshape(-1).contiguous()
residual_dtype = (
residual.dtype
if residual is not None
else (torch.float32 if residual_in_fp32 else None)
)
if out is not None:
out = out.reshape(-1, out.shape[-1])
if residual_out is not None:
residual_out = residual_out.reshape(-1, residual_out.shape[-1])
y, y1, mean, rstd, residual_out, seeds, dropout_mask, dropout_mask1 = _layer_norm_fwd(
x,
weight,
bias,
eps,
residual,
x1,
weight1,
bias1,
dropout_p=dropout_p,
rowscale=rowscale,
residual_dtype=residual_dtype,
zero_centered_weight=zero_centered_weight,
is_rms_norm=is_rms_norm,
return_dropout_mask=return_dropout_mask,
out=out,
residual_out=residual_out
)
ctx.save_for_backward(
residual_out, weight, bias, weight1, bias1, rowscale, seeds, mean, rstd
)
ctx.x_shape_og = x_shape_og
ctx.eps = eps
ctx.dropout_p = dropout_p
ctx.is_rms_norm = is_rms_norm
ctx.has_residual = residual is not None
ctx.has_x1 = x1 is not None
ctx.prenorm = prenorm
ctx.x_dtype = x.dtype
ctx.zero_centered_weight = zero_centered_weight
y = y.reshape(x_shape_og)
y1 = y1.reshape(x_shape_og) if y1 is not None else None
residual_out = residual_out.reshape(x_shape_og) if residual_out is not None else None
dropout_mask = dropout_mask.reshape(x_shape_og) if dropout_mask is not None else None
dropout_mask1 = dropout_mask1.reshape(x_shape_og) if dropout_mask1 is not None else None
if not return_dropout_mask:
if weight1 is None:
return y if not prenorm else (y, residual_out)
else:
return (y, y1) if not prenorm else (y, y1, residual_out)
else:
if weight1 is None:
return (
(y, dropout_mask, dropout_mask1)
if not prenorm
else (y, residual_out, dropout_mask, dropout_mask1)
)
else:
return (
(y, y1, dropout_mask, dropout_mask1)
if not prenorm
else (y, y1, residual_out, dropout_mask, dropout_mask1)
)
@staticmethod
def backward(ctx, dy, *args):
if ctx.zero_seq_length:
return (
torch.zeros(ctx.x_shape_og, dtype=dy.dtype, device=dy.device),
torch.zeros(ctx.weight_shape, dtype=ctx.weight_dtype, device=ctx.weight_device),
torch.zeros(ctx.bias_shape, dtype=ctx.bias_dtype, device=ctx.bias_device) if ctx.has_bias else None,
torch.zeros(ctx.x_shape_og, dtype=dy.dtype, device=dy.device) if ctx.has_residual else None,
torch.zeros(ctx.x_shape_og, dtype=dy.dtype, device=dy.device) if ctx.has_x1 and ctx.dropout_p > 0.0 else None,
torch.zeros(ctx.weight1_shape, dtype=ctx.weight1_dtype, device=ctx.weight1_device) if ctx.has_weight1 else None,
torch.zeros(ctx.bias1_shape, dtype=ctx.bias1_dtype, device=ctx.bias1_device) if ctx.has_bias1 else None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
x, weight, bias, weight1, bias1, rowscale, seeds, mean, rstd = ctx.saved_tensors
dy = dy.reshape(-1, dy.shape[-1])
if dy.stride(-1) != 1:
dy = dy.contiguous()
assert dy.shape == x.shape
if weight1 is not None:
dy1, args = args[0], args[1:]
dy1 = dy1.reshape(-1, dy1.shape[-1])
if dy1.stride(-1) != 1:
dy1 = dy1.contiguous()
assert dy1.shape == x.shape
else:
dy1 = None
if ctx.prenorm:
dresidual = args[0]
dresidual = dresidual.reshape(-1, dresidual.shape[-1])
if dresidual.stride(-1) != 1:
dresidual = dresidual.contiguous()
assert dresidual.shape == x.shape
else:
dresidual = None
dx, dw, db, dresidual_in, dx1, dw1, db1 = _layer_norm_bwd(
dy,
x,
weight,
bias,
ctx.eps,
mean,
rstd,
dresidual,
dy1,
weight1,
bias1,
seeds,
ctx.dropout_p,
rowscale,
ctx.has_residual,
ctx.has_x1,
ctx.zero_centered_weight,
ctx.is_rms_norm,
x_dtype=ctx.x_dtype,
)
return (
dx.reshape(ctx.x_shape_og),
dw,
db,
dresidual_in.reshape(ctx.x_shape_og) if ctx.has_residual else None,
dx1.reshape(ctx.x_shape_og) if dx1 is not None else None,
dw1,
db1,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
def layer_norm_fn(
x,
weight,
bias,
residual=None,
x1=None,
weight1=None,
bias1=None,
eps=1e-6,
dropout_p=0.0,
rowscale=None,
prenorm=False,
residual_in_fp32=False,
zero_centered_weight=False,
is_rms_norm=False,
return_dropout_mask=False,
out=None,
residual_out=None
):
return LayerNormFn.apply(
x,
weight,
bias,
residual,
x1,
weight1,
bias1,
eps,
dropout_p,
rowscale,
prenorm,
residual_in_fp32,
zero_centered_weight,
is_rms_norm,
return_dropout_mask,
out,
residual_out
)
def rms_norm_fn(
x,
weight,
bias,
residual=None,
x1=None,
weight1=None,
bias1=None,
eps=1e-6,
dropout_p=0.0,
rowscale=None,
prenorm=False,
residual_in_fp32=False,
zero_centered_weight=False,
return_dropout_mask=False,
out=None,
residual_out=None
):
return LayerNormFn.apply(
x,
weight,
bias,
residual,
x1,
weight1,
bias1,
eps,
dropout_p,
rowscale,
prenorm,
residual_in_fp32,
zero_centered_weight,
True,
return_dropout_mask,
out,
residual_out
)
'''
class RMSNorm(torch.nn.Module):
def __init__(self, hidden_size, eps=1e-5, dropout_p=0.0, zero_centered_weight=False,
device=None, dtype=None):
factory_kwargs = {"device": device, "dtype": dtype}
super().__init__()
self.eps = eps
if dropout_p > 0.0:
self.drop = torch.nn.Dropout(dropout_p)
else:
self.drop = None
self.zero_centered_weight = zero_centered_weight
self.weight = torch.nn.Parameter(torch.empty(hidden_size, **factory_kwargs))
self.register_parameter("bias", None)
self.reset_parameters()
def reset_parameters(self):
if not self.zero_centered_weight:
torch.nn.init.ones_(self.weight)
else:
torch.nn.init.zeros_(self.weight)
def forward(self, x, residual=None, prenorm=False, residual_in_fp32=False):
return rms_norm_fn(
x,
self.weight,
self.bias,
residual=residual,
eps=self.eps,
dropout_p=self.drop.p if self.drop is not None and self.training else 0.0,
prenorm=prenorm,
residual_in_fp32=residual_in_fp32,
zero_centered_weight=self.zero_centered_weight,
)
'''
class LayerNormLinearFn(torch.autograd.Function):
@staticmethod
@custom_fwd
def forward(
ctx,
x,
norm_weight,
norm_bias,
linear_weight,
linear_bias,
residual=None,
eps=1e-6,
prenorm=False,
residual_in_fp32=False,
is_rms_norm=False,
):
x_shape_og = x.shape
# reshape input data into 2D tensor
x = x.reshape(-1, x.shape[-1])
if x.stride(-1) != 1:
x = x.contiguous()
if residual is not None:
assert residual.shape == x_shape_og
residual = residual.reshape(-1, residual.shape[-1])
if residual.stride(-1) != 1:
residual = residual.contiguous()
norm_weight = norm_weight.contiguous()
if norm_bias is not None:
norm_bias = norm_bias.contiguous()
residual_dtype = (
residual.dtype
if residual is not None
else (torch.float32 if residual_in_fp32 else None)
)
y, _, mean, rstd, residual_out, *rest = _layer_norm_fwd(
x,
norm_weight,
norm_bias,
eps,
residual,
out_dtype=None if not torch.is_autocast_enabled() else torch.get_autocast_dtype("cuda"),
residual_dtype=residual_dtype,
is_rms_norm=is_rms_norm,
)
y = y.reshape(x_shape_og)
dtype = torch.get_autocast_dtype("cuda") if torch.is_autocast_enabled() else y.dtype
linear_weight = linear_weight.to(dtype)
linear_bias = linear_bias.to(dtype) if linear_bias is not None else None
out = F.linear(y.to(linear_weight.dtype), linear_weight, linear_bias)
# We don't store y, will be recomputed in the backward pass to save memory
ctx.save_for_backward(residual_out, norm_weight, norm_bias, linear_weight, mean, rstd)
ctx.x_shape_og = x_shape_og
ctx.eps = eps
ctx.is_rms_norm = is_rms_norm
ctx.has_residual = residual is not None
ctx.prenorm = prenorm
ctx.x_dtype = x.dtype
ctx.linear_bias_is_none = linear_bias is None
return out if not prenorm else (out, residual_out.reshape(x_shape_og))
@staticmethod
@custom_bwd
def backward(ctx, dout, *args):
x, norm_weight, norm_bias, linear_weight, mean, rstd = ctx.saved_tensors
dout = dout.reshape(-1, dout.shape[-1])
dy = F.linear(dout, linear_weight.t())
dlinear_bias = None if ctx.linear_bias_is_none else dout.sum(0)
if dy.stride(-1) != 1:
dy = dy.contiguous()
assert dy.shape == x.shape
if ctx.prenorm:
dresidual = args[0]
dresidual = dresidual.reshape(-1, dresidual.shape[-1])
if dresidual.stride(-1) != 1:
dresidual = dresidual.contiguous()
assert dresidual.shape == x.shape
else:
dresidual = None
dx, dnorm_weight, dnorm_bias, dresidual_in, _, _, _, y = _layer_norm_bwd(
dy,
x,
norm_weight,
norm_bias,
ctx.eps,
mean,
rstd,
dresidual=dresidual,
has_residual=ctx.has_residual,
is_rms_norm=ctx.is_rms_norm,
x_dtype=ctx.x_dtype,
recompute_output=True,
)
dlinear_weight = torch.einsum("bo,bi->oi", dout, y)
return (
dx.reshape(ctx.x_shape_og),
dnorm_weight,
dnorm_bias,
dlinear_weight,
dlinear_bias,
dresidual_in.reshape(ctx.x_shape_og) if ctx.has_residual else None,
None,
None,
None,
None,
)
def layer_norm_linear_fn(
x,
norm_weight,
norm_bias,
linear_weight,
linear_bias,
residual=None,
eps=1e-6,
prenorm=False,
residual_in_fp32=False,
is_rms_norm=False,
):
return LayerNormLinearFn.apply(
x,
norm_weight,
norm_bias,
linear_weight,
linear_bias,
residual,
eps,
prenorm,
residual_in_fp32,
is_rms_norm,
)
""" Cosine Scheduler
Cosine LR schedule with warmup, cycle/restarts, noise, k-decay.
Hacked together by / Copyright 2021 Ross Wightman
"""
import logging
import math
import torch
from typing import List
from .scheduler import Scheduler
_logger = logging.getLogger(__name__)
class CosineLRScheduler(Scheduler):
"""
Cosine decay with restarts.
This is described in the paper https://arxiv.org/abs/1608.03983.
Inspiration from
https://github.com/allenai/allennlp/blob/master/allennlp/training/learning_rate_schedulers/cosine.py
k-decay option based on `k-decay: A New Method For Learning Rate Schedule` - https://arxiv.org/abs/2004.05909
"""
def __init__(
self,
optimizer: torch.optim.Optimizer,
t_initial: int,
lr_min: float = 0.,
cycle_mul: float = 1.,
cycle_decay: float = 1.,
cycle_limit: int = 1,
warmup_t=0,
warmup_lr_init=0,
warmup_prefix=False,
t_in_epochs=True,
noise_range_t=None,
noise_pct=0.67,
noise_std=1.0,
noise_seed=42,
k_decay=1.0,
initialize=True,
) -> None:
super().__init__(
optimizer,
param_group_field="lr",
t_in_epochs=t_in_epochs,
noise_range_t=noise_range_t,
noise_pct=noise_pct,
noise_std=noise_std,
noise_seed=noise_seed,
initialize=initialize,
)
assert t_initial > 0
assert lr_min >= 0
if t_initial == 1 and cycle_mul == 1 and cycle_decay == 1:
_logger.warning(
"Cosine annealing scheduler will have no effect on the learning "
"rate since t_initial = t_mul = eta_mul = 1.")
self.t_initial = t_initial
self.lr_min = lr_min
self.cycle_mul = cycle_mul
self.cycle_decay = cycle_decay
self.cycle_limit = cycle_limit
self.warmup_t = warmup_t
self.warmup_lr_init = warmup_lr_init
self.warmup_prefix = warmup_prefix
self.k_decay = k_decay
if self.warmup_t:
self.warmup_steps = [(v - warmup_lr_init) / self.warmup_t for v in self.base_values]
super().update_groups(self.warmup_lr_init)
else:
self.warmup_steps = [1 for _ in self.base_values]
self._step_count = 0 # no use
def _get_lr(self, t: int) -> List[float]:
if t < self.warmup_t:
lrs = [self.warmup_lr_init + t * s for s in self.warmup_steps]
else:
if self.warmup_prefix:
t = t - self.warmup_t
if self.cycle_mul != 1:
i = math.floor(math.log(1 - t / self.t_initial * (1 - self.cycle_mul), self.cycle_mul))
t_i = self.cycle_mul ** i * self.t_initial
t_curr = t - (1 - self.cycle_mul ** i) / (1 - self.cycle_mul) * self.t_initial
else:
i = t // self.t_initial
t_i = self.t_initial
t_curr = t - (self.t_initial * i)
gamma = self.cycle_decay ** i
lr_max_values = [v * gamma for v in self.base_values]
k = self.k_decay
if i < self.cycle_limit:
lrs = [
self.lr_min + 0.5 * (lr_max - self.lr_min) * (1 + math.cos(math.pi * t_curr ** k / t_i ** k))
for lr_max in lr_max_values
]
else:
lrs = [self.lr_min for _ in self.base_values]
return lrs
def get_cycle_length(self, cycles=0):
cycles = max(1, cycles or self.cycle_limit)
if self.cycle_mul == 1.0:
return self.t_initial * cycles
else:
return int(math.floor(-self.t_initial * (self.cycle_mul ** cycles - 1) / (1 - self.cycle_mul)))
import abc
from abc import ABC
from typing import Any, Dict, List, Optional
import torch
class Scheduler(ABC):
""" Parameter Scheduler Base Class
A scheduler base class that can be used to schedule any optimizer parameter groups.
Unlike the builtin PyTorch schedulers, this is intended to be consistently called
* At the END of each epoch, before incrementing the epoch count, to calculate next epoch's value
* At the END of each optimizer update, after incrementing the update count, to calculate next update's value
The schedulers built on this should try to remain as stateless as possible (for simplicity).
This family of schedulers is attempting to avoid the confusion of the meaning of 'last_epoch'
and -1 values for special behaviour. All epoch and update counts must be tracked in the training
code and explicitly passed in to the schedulers on the corresponding step or step_update call.
Based on ideas from:
* https://github.com/pytorch/fairseq/tree/master/fairseq/optim/lr_scheduler
* https://github.com/allenai/allennlp/tree/master/allennlp/training/learning_rate_schedulers
"""
def __init__(
self,
optimizer: torch.optim.Optimizer,
param_group_field: str,
t_in_epochs: bool = True,
noise_range_t=None,
noise_type='normal',
noise_pct=0.67,
noise_std=1.0,
noise_seed=None,
initialize: bool = True,
) -> None:
self.optimizer = optimizer
self.param_group_field = param_group_field
self._initial_param_group_field = f"initial_{param_group_field}"
if initialize:
for i, group in enumerate(self.optimizer.param_groups):
if param_group_field not in group:
raise KeyError(f"{param_group_field} missing from param_groups[{i}]")
group.setdefault(self._initial_param_group_field, group[param_group_field])
else:
for i, group in enumerate(self.optimizer.param_groups):
if self._initial_param_group_field not in group:
raise KeyError(f"{self._initial_param_group_field} missing from param_groups[{i}]")
self.base_values = [group[self._initial_param_group_field] for group in self.optimizer.param_groups]
self.metric = None # any point to having this for all?
self.t_in_epochs = t_in_epochs
self.noise_range_t = noise_range_t
self.noise_pct = noise_pct
self.noise_type = noise_type
self.noise_std = noise_std
self.noise_seed = noise_seed if noise_seed is not None else 42
self.update_groups(self.base_values)
def state_dict(self) -> Dict[str, Any]:
return {key: value for key, value in self.__dict__.items() if key != 'optimizer'}
def load_state_dict(self, state_dict: Dict[str, Any]) -> None:
self.__dict__.update(state_dict)
def get_last_lr(self):
""" Return last computed learning rate by current scheduler.
"""
return self._last_lr
@abc.abstractmethod
def _get_lr(self, t: int) -> List[float]:
pass
def _get_values(self, t: int, on_epoch: bool = True) -> Optional[List[float]]:
return self._get_lr(t)
def step(self, epoch: int, metric: float = None) -> None:
self.metric = metric
values = self._get_values(epoch, on_epoch=True)
if values is not None:
values = self._add_noise(values, epoch)
self.update_groups(values)
# def step_update(self, num_updates: int, metric: float = None):
# self.metric = metric
# values = self._get_values(num_updates, on_epoch=False)
# if values is not None:
# values = self._add_noise(values, num_updates)
# self.update_groups(values)
def update_groups(self, values):
if not isinstance(values, (list, tuple)):
values = [values] * len(self.optimizer.param_groups)
for param_group, value in zip(self.optimizer.param_groups, values):
if 'lr_scale' in param_group:
param_group[self.param_group_field] = value * param_group['lr_scale']
else:
param_group[self.param_group_field] = value
self._last_lr = [group[self.param_group_field] for group in self.optimizer.param_groups]
def _add_noise(self, lrs, t):
if self._is_apply_noise(t):
noise = self._calculate_noise(t)
lrs = [v + v * noise for v in lrs]
return lrs
def _is_apply_noise(self, t) -> bool:
"""Return True if scheduler in noise range."""
apply_noise = False
if self.noise_range_t is not None:
if isinstance(self.noise_range_t, (list, tuple)):
apply_noise = self.noise_range_t[0] <= t < self.noise_range_t[1]
else:
apply_noise = t >= self.noise_range_t
return apply_noise
def _calculate_noise(self, t) -> float:
g = torch.Generator()
g.manual_seed(self.noise_seed + t)
if self.noise_type == 'normal':
while True:
# resample if noise out of percent limit, brute force but shouldn't spin much
noise = torch.randn(1, generator=g).item()
if abs(noise) < self.noise_pct:
return noise
else:
noise = 2 * (torch.rand(1, generator=g).item() - 0.5) * self.noise_pct
return noise
""" Step Scheduler
Basic step LR schedule with warmup, noise.
Hacked together by / Copyright 2020 Ross Wightman
"""
import math
import torch
from typing import List
from .scheduler import Scheduler
class StepLRScheduler(Scheduler):
"""
"""
def __init__(
self,
optimizer: torch.optim.Optimizer,
decay_t: float,
decay_rate: float = 1.,
warmup_t=0,
warmup_lr_init=0,
warmup_prefix=True,
t_in_epochs=True,
noise_range_t=None,
noise_pct=0.67,
noise_std=1.0,
noise_seed=42,
initialize=True,
) -> None:
super().__init__(
optimizer,
param_group_field="lr",
t_in_epochs=t_in_epochs,
noise_range_t=noise_range_t,
noise_pct=noise_pct,
noise_std=noise_std,
noise_seed=noise_seed,
initialize=initialize,
)
self.decay_t = decay_t
self.decay_rate = decay_rate
self.warmup_t = warmup_t
self.warmup_lr_init = warmup_lr_init
self.warmup_prefix = warmup_prefix
if self.warmup_t:
self.warmup_steps = [(v - warmup_lr_init) / self.warmup_t for v in self.base_values]
super().update_groups(self.warmup_lr_init)
else:
self.warmup_steps = [1 for _ in self.base_values]
def _get_lr(self, t: int) -> List[float]:
if t < self.warmup_t:
lrs = [self.warmup_lr_init + t * s for s in self.warmup_steps]
else:
if self.warmup_prefix:
t = t - self.warmup_t
lrs = [v * (self.decay_rate ** (t // self.decay_t)) for v in self.base_values]
return lrs
\ No newline at end of file
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import warnings
from typing import List, Optional, Tuple, Union
import numpy as np
import PIL.Image
import torch
from diffusers.image_processor import PipelineImageInput, VaeImageProcessor, is_valid_image_imagelist
from diffusers.configuration_utils import register_to_config
class OmniGen2ImageProcessor(VaeImageProcessor):
"""
Image processor for PixArt image resize and crop.
Args:
do_resize (`bool`, *optional*, defaults to `True`):
Whether to downscale the image's (height, width) dimensions to multiples of `vae_scale_factor`. Can accept
`height` and `width` arguments from [`image_processor.VaeImageProcessor.preprocess`] method.
vae_scale_factor (`int`, *optional*, defaults to `8`):
VAE scale factor. If `do_resize` is `True`, the image is automatically resized to multiples of this factor.
resample (`str`, *optional*, defaults to `lanczos`):
Resampling filter to use when resizing the image.
do_normalize (`bool`, *optional*, defaults to `True`):
Whether to normalize the image to [-1,1].
do_binarize (`bool`, *optional*, defaults to `False`):
Whether to binarize the image to 0/1.
do_convert_rgb (`bool`, *optional*, defaults to be `False`):
Whether to convert the images to RGB format.
do_convert_grayscale (`bool`, *optional*, defaults to be `False`):
Whether to convert the images to grayscale format.
"""
@register_to_config
def __init__(
self,
do_resize: bool = True,
vae_scale_factor: int = 16,
resample: str = "lanczos",
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
do_normalize: bool = True,
do_binarize: bool = False,
do_convert_grayscale: bool = False,
):
super().__init__(
do_resize=do_resize,
vae_scale_factor=vae_scale_factor,
resample=resample,
do_normalize=do_normalize,
do_binarize=do_binarize,
do_convert_grayscale=do_convert_grayscale,
)
self.max_pixels = max_pixels
self.max_side_length = max_side_length
def get_new_height_width(
self,
image: Union[PIL.Image.Image, np.ndarray, torch.Tensor],
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
) -> Tuple[int, int]:
r"""
Returns the height and width of the image, downscaled to the next integer multiple of `vae_scale_factor`.
Args:
image (`Union[PIL.Image.Image, np.ndarray, torch.Tensor]`):
The image input, which can be a PIL image, NumPy array, or PyTorch tensor. If it is a NumPy array, it
should have shape `[batch, height, width]` or `[batch, height, width, channels]`. If it is a PyTorch
tensor, it should have shape `[batch, channels, height, width]`.
height (`Optional[int]`, *optional*, defaults to `None`):
The height of the preprocessed image. If `None`, the height of the `image` input will be used.
width (`Optional[int]`, *optional*, defaults to `None`):
The width of the preprocessed image. If `None`, the width of the `image` input will be used.
Returns:
`Tuple[int, int]`:
A tuple containing the height and width, both resized to the nearest integer multiple of
`vae_scale_factor`.
"""
if height is None:
if isinstance(image, PIL.Image.Image):
height = image.height
elif isinstance(image, torch.Tensor):
height = image.shape[2]
else:
height = image.shape[1]
if width is None:
if isinstance(image, PIL.Image.Image):
width = image.width
elif isinstance(image, torch.Tensor):
width = image.shape[3]
else:
width = image.shape[2]
if max_side_length is None:
max_side_length = self.max_side_length
if max_pixels is None:
max_pixels = self.max_pixels
ratio = 1.0
if max_side_length is not None:
if height > width:
max_side_length_ratio = max_side_length / height
else:
max_side_length_ratio = max_side_length / width
cur_pixels = height * width
max_pixels_ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(max_pixels_ratio, max_side_length_ratio, 1.0) # do not upscale input image
new_height, new_width = int(height * ratio) // self.config.vae_scale_factor * self.config.vae_scale_factor, int(width * ratio) // self.config.vae_scale_factor * self.config.vae_scale_factor
return new_height, new_width
def preprocess(
self,
image: PipelineImageInput,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: Optional[int] = None,
max_side_length: Optional[int] = None,
resize_mode: str = "default", # "default", "fill", "crop"
crops_coords: Optional[Tuple[int, int, int, int]] = None,
) -> torch.Tensor:
"""
Preprocess the image input.
Args:
image (`PipelineImageInput`):
The image input, accepted formats are PIL images, NumPy arrays, PyTorch tensors; Also accept list of
supported formats.
height (`int`, *optional*):
The height in preprocessed image. If `None`, will use the `get_default_height_width()` to get default
height.
width (`int`, *optional*):
The width in preprocessed. If `None`, will use get_default_height_width()` to get the default width.
resize_mode (`str`, *optional*, defaults to `default`):
The resize mode, can be one of `default` or `fill`. If `default`, will resize the image to fit within
the specified width and height, and it may not maintaining the original aspect ratio. If `fill`, will
resize the image to fit within the specified width and height, maintaining the aspect ratio, and then
center the image within the dimensions, filling empty with data from image. If `crop`, will resize the
image to fit within the specified width and height, maintaining the aspect ratio, and then center the
image within the dimensions, cropping the excess. Note that resize_mode `fill` and `crop` are only
supported for PIL image input.
crops_coords (`List[Tuple[int, int, int, int]]`, *optional*, defaults to `None`):
The crop coordinates for each image in the batch. If `None`, will not crop the image.
Returns:
`torch.Tensor`:
The preprocessed image.
"""
supported_formats = (PIL.Image.Image, np.ndarray, torch.Tensor)
# Expand the missing dimension for 3-dimensional pytorch tensor or numpy array that represents grayscale image
if self.config.do_convert_grayscale and isinstance(image, (torch.Tensor, np.ndarray)) and image.ndim == 3:
if isinstance(image, torch.Tensor):
# if image is a pytorch tensor could have 2 possible shapes:
# 1. batch x height x width: we should insert the channel dimension at position 1
# 2. channel x height x width: we should insert batch dimension at position 0,
# however, since both channel and batch dimension has same size 1, it is same to insert at position 1
# for simplicity, we insert a dimension of size 1 at position 1 for both cases
image = image.unsqueeze(1)
else:
# if it is a numpy array, it could have 2 possible shapes:
# 1. batch x height x width: insert channel dimension on last position
# 2. height x width x channel: insert batch dimension on first position
if image.shape[-1] == 1:
image = np.expand_dims(image, axis=0)
else:
image = np.expand_dims(image, axis=-1)
if isinstance(image, list) and isinstance(image[0], np.ndarray) and image[0].ndim == 4:
warnings.warn(
"Passing `image` as a list of 4d np.ndarray is deprecated."
"Please concatenate the list along the batch dimension and pass it as a single 4d np.ndarray",
FutureWarning,
)
image = np.concatenate(image, axis=0)
if isinstance(image, list) and isinstance(image[0], torch.Tensor) and image[0].ndim == 4:
warnings.warn(
"Passing `image` as a list of 4d torch.Tensor is deprecated."
"Please concatenate the list along the batch dimension and pass it as a single 4d torch.Tensor",
FutureWarning,
)
image = torch.cat(image, axis=0)
if not is_valid_image_imagelist(image):
raise ValueError(
f"Input is in incorrect format. Currently, we only support {', '.join(str(x) for x in supported_formats)}"
)
if not isinstance(image, list):
image = [image]
if isinstance(image[0], PIL.Image.Image):
if crops_coords is not None:
image = [i.crop(crops_coords) for i in image]
if self.config.do_resize:
height, width = self.get_new_height_width(image[0], height, width, max_pixels, max_side_length)
image = [self.resize(i, height, width, resize_mode=resize_mode) for i in image]
if self.config.do_convert_rgb:
image = [self.convert_to_rgb(i) for i in image]
elif self.config.do_convert_grayscale:
image = [self.convert_to_grayscale(i) for i in image]
image = self.pil_to_numpy(image) # to np
image = self.numpy_to_pt(image) # to pt
elif isinstance(image[0], np.ndarray):
image = np.concatenate(image, axis=0) if image[0].ndim == 4 else np.stack(image, axis=0)
image = self.numpy_to_pt(image)
height, width = self.get_new_height_width(image, height, width, max_pixels, max_side_length)
if self.config.do_resize:
image = self.resize(image, height, width)
elif isinstance(image[0], torch.Tensor):
image = torch.cat(image, axis=0) if image[0].ndim == 4 else torch.stack(image, axis=0)
if self.config.do_convert_grayscale and image.ndim == 3:
image = image.unsqueeze(1)
channel = image.shape[1]
# don't need any preprocess if the image is latents
if channel == self.config.vae_latent_channels:
return image
height, width = self.get_new_height_width(image, height, width, max_pixels, max_side_length)
if self.config.do_resize:
image = self.resize(image, height, width)
# expected range [0,1], normalize to [-1,1]
do_normalize = self.config.do_normalize
if do_normalize and image.min() < 0:
warnings.warn(
"Passing `image` as torch tensor with value range in [-1,1] is deprecated. The expected value range for image tensor is [0,1] "
f"when passing as pytorch tensor or numpy Array. You passed `image` with value range [{image.min()},{image.max()}]",
FutureWarning,
)
do_normalize = False
if do_normalize:
image = self.normalize(image)
if self.config.do_binarize:
image = self.binarize(image)
return image
\ No newline at end of file
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Callable, Dict, List, Optional, Union
import torch
from huggingface_hub.utils import validate_hf_hub_args
from diffusers.utils import (
USE_PEFT_BACKEND,
is_peft_available,
is_peft_version,
is_torch_version,
is_transformers_available,
is_transformers_version,
logging,
)
from diffusers.loaders.lora_base import ( # noqa
LoraBaseMixin,
_fetch_state_dict,
)
from diffusers.loaders.lora_conversion_utils import (
_convert_non_diffusers_lumina2_lora_to_diffusers,
)
_LOW_CPU_MEM_USAGE_DEFAULT_LORA = False
if is_torch_version(">=", "1.9.0"):
if (
is_peft_available()
and is_peft_version(">=", "0.13.1")
and is_transformers_available()
and is_transformers_version(">", "4.45.2")
):
_LOW_CPU_MEM_USAGE_DEFAULT_LORA = True
logger = logging.get_logger(__name__)
TRANSFORMER_NAME = "transformer"
class OmniGen2LoraLoaderMixin(LoraBaseMixin):
r"""
Load LoRA layers into [`OmniGen2Transformer2DModel`]. Specific to [`OmniGen2Pipeline`].
"""
_lora_loadable_modules = ["transformer"]
transformer_name = TRANSFORMER_NAME
@classmethod
@validate_hf_hub_args
def lora_state_dict(
cls,
pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]],
**kwargs,
):
r"""
Return state dict for lora weights and the network alphas.
<Tip warning={true}>
We support loading A1111 formatted LoRA checkpoints in a limited capacity.
This function is experimental and might change in the future.
</Tip>
Parameters:
pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`):
Can be either:
- A string, the *model id* (for example `google/ddpm-celebahq-256`) of a pretrained model hosted on
the Hub.
- A path to a *directory* (for example `./my_model_directory`) containing the model weights saved
with [`ModelMixin.save_pretrained`].
- A [torch state
dict](https://pytorch.org/tutorials/beginner/saving_loading_models.html#what-is-a-state-dict).
cache_dir (`Union[str, os.PathLike]`, *optional*):
Path to a directory where a downloaded pretrained model configuration is cached if the standard cache
is not used.
force_download (`bool`, *optional*, defaults to `False`):
Whether or not to force the (re-)download of the model weights and configuration files, overriding the
cached versions if they exist.
proxies (`Dict[str, str]`, *optional*):
A dictionary of proxy servers to use by protocol or endpoint, for example, `{'http': 'foo.bar:3128',
'http://hostname': 'foo.bar:4012'}`. The proxies are used on each request.
local_files_only (`bool`, *optional*, defaults to `False`):
Whether to only load local model weights and configuration files or not. If set to `True`, the model
won't be downloaded from the Hub.
token (`str` or *bool*, *optional*):
The token to use as HTTP bearer authorization for remote files. If `True`, the token generated from
`diffusers-cli login` (stored in `~/.huggingface`) is used.
revision (`str`, *optional*, defaults to `"main"`):
The specific model version to use. It can be a branch name, a tag name, a commit id, or any identifier
allowed by Git.
subfolder (`str`, *optional*, defaults to `""`):
The subfolder location of a model file within a larger model repository on the Hub or locally.
"""
# Load the main state dict first which has the LoRA layers for either of
# transformer and text encoder or both.
cache_dir = kwargs.pop("cache_dir", None)
force_download = kwargs.pop("force_download", False)
proxies = kwargs.pop("proxies", None)
local_files_only = kwargs.pop("local_files_only", None)
token = kwargs.pop("token", None)
revision = kwargs.pop("revision", None)
subfolder = kwargs.pop("subfolder", None)
weight_name = kwargs.pop("weight_name", None)
use_safetensors = kwargs.pop("use_safetensors", None)
allow_pickle = False
if use_safetensors is None:
use_safetensors = True
allow_pickle = True
user_agent = {
"file_type": "attn_procs_weights",
"framework": "pytorch",
}
state_dict = _fetch_state_dict(
pretrained_model_name_or_path_or_dict=pretrained_model_name_or_path_or_dict,
weight_name=weight_name,
use_safetensors=use_safetensors,
local_files_only=local_files_only,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
token=token,
revision=revision,
subfolder=subfolder,
user_agent=user_agent,
allow_pickle=allow_pickle,
)
is_dora_scale_present = any("dora_scale" in k for k in state_dict)
if is_dora_scale_present:
warn_msg = "It seems like you are using a DoRA checkpoint that is not compatible in Diffusers at the moment. So, we are going to filter out the keys associated to 'dora_scale` from the state dict. If you think this is a mistake please open an issue https://github.com/huggingface/diffusers/issues/new."
logger.warning(warn_msg)
state_dict = {k: v for k, v in state_dict.items() if "dora_scale" not in k}
# conversion.
non_diffusers = any(k.startswith("diffusion_model.") for k in state_dict)
if non_diffusers:
state_dict = _convert_non_diffusers_lumina2_lora_to_diffusers(state_dict)
return state_dict
# Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.load_lora_weights
def load_lora_weights(
self, pretrained_model_name_or_path_or_dict: Union[str, Dict[str, torch.Tensor]], adapter_name=None, **kwargs
):
"""
Load LoRA weights specified in `pretrained_model_name_or_path_or_dict` into `self.transformer` and
`self.text_encoder`. All kwargs are forwarded to `self.lora_state_dict`. See
[`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`] for more details on how the state dict is loaded.
See [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_into_transformer`] for more details on how the state
dict is loaded into `self.transformer`.
Parameters:
pretrained_model_name_or_path_or_dict (`str` or `os.PathLike` or `dict`):
See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`].
adapter_name (`str`, *optional*):
Adapter name to be used for referencing the loaded adapter model. If not specified, it will use
`default_{i}` where i is the total number of adapters being loaded.
low_cpu_mem_usage (`bool`, *optional*):
Speed up model loading by only loading the pretrained LoRA weights and not initializing the random
weights.
kwargs (`dict`, *optional*):
See [`~loaders.StableDiffusionLoraLoaderMixin.lora_state_dict`].
"""
if not USE_PEFT_BACKEND:
raise ValueError("PEFT backend is required for this method.")
low_cpu_mem_usage = kwargs.pop("low_cpu_mem_usage", _LOW_CPU_MEM_USAGE_DEFAULT_LORA)
if low_cpu_mem_usage and is_peft_version("<", "0.13.0"):
raise ValueError(
"`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`."
)
# if a dict is passed, copy it instead of modifying it inplace
if isinstance(pretrained_model_name_or_path_or_dict, dict):
pretrained_model_name_or_path_or_dict = pretrained_model_name_or_path_or_dict.copy()
# First, ensure that the checkpoint is a compatible one and can be successfully loaded.
state_dict = self.lora_state_dict(pretrained_model_name_or_path_or_dict, **kwargs)
is_correct_format = all("lora" in key for key in state_dict.keys())
if not is_correct_format:
raise ValueError("Invalid LoRA checkpoint.")
self.load_lora_into_transformer(
state_dict,
transformer=getattr(self, self.transformer_name) if not hasattr(self, "transformer") else self.transformer,
adapter_name=adapter_name,
_pipeline=self,
low_cpu_mem_usage=low_cpu_mem_usage,
)
@classmethod
# Copied from diffusers.loaders.lora_pipeline.SD3LoraLoaderMixin.load_lora_into_transformer with SD3Transformer2DModel->Lumina2Transformer2DModel
def load_lora_into_transformer(
cls, state_dict, transformer, adapter_name=None, _pipeline=None, low_cpu_mem_usage=False, hotswap: bool = False
):
"""
This will load the LoRA layers specified in `state_dict` into `transformer`.
Parameters:
state_dict (`dict`):
A standard state dict containing the lora layer parameters. The keys can either be indexed directly
into the unet or prefixed with an additional `unet` which can be used to distinguish between text
encoder lora layers.
transformer (`Lumina2Transformer2DModel`):
The Transformer model to load the LoRA layers into.
adapter_name (`str`, *optional*):
Adapter name to be used for referencing the loaded adapter model. If not specified, it will use
`default_{i}` where i is the total number of adapters being loaded.
low_cpu_mem_usage (`bool`, *optional*):
Speed up model loading by only loading the pretrained LoRA weights and not initializing the random
weights.
hotswap : (`bool`, *optional*)
Defaults to `False`. Whether to substitute an existing (LoRA) adapter with the newly loaded adapter
in-place. This means that, instead of loading an additional adapter, this will take the existing
adapter weights and replace them with the weights of the new adapter. This can be faster and more
memory efficient. However, the main advantage of hotswapping is that when the model is compiled with
torch.compile, loading the new adapter does not require recompilation of the model. When using
hotswapping, the passed `adapter_name` should be the name of an already loaded adapter.
If the new adapter and the old adapter have different ranks and/or LoRA alphas (i.e. scaling), you need
to call an additional method before loading the adapter:
```py
pipeline = ... # load diffusers pipeline
max_rank = ... # the highest rank among all LoRAs that you want to load
# call *before* compiling and loading the LoRA adapter
pipeline.enable_lora_hotswap(target_rank=max_rank)
pipeline.load_lora_weights(file_name)
# optionally compile the model now
```
Note that hotswapping adapters of the text encoder is not yet supported. There are some further
limitations to this technique, which are documented here:
https://huggingface.co/docs/peft/main/en/package_reference/hotswap
"""
if low_cpu_mem_usage and is_peft_version("<", "0.13.0"):
raise ValueError(
"`low_cpu_mem_usage=True` is not compatible with this `peft` version. Please update it with `pip install -U peft`."
)
# Load the layers corresponding to transformer.
logger.info(f"Loading {cls.transformer_name}.")
transformer.load_lora_adapter(
state_dict,
network_alphas=None,
adapter_name=adapter_name,
_pipeline=_pipeline,
low_cpu_mem_usage=low_cpu_mem_usage,
hotswap=hotswap,
)
@classmethod
# Copied from diffusers.loaders.lora_pipeline.CogVideoXLoraLoaderMixin.save_lora_weights
def save_lora_weights(
cls,
save_directory: Union[str, os.PathLike],
transformer_lora_layers: Dict[str, Union[torch.nn.Module, torch.Tensor]] = None,
is_main_process: bool = True,
weight_name: str = None,
save_function: Callable = None,
safe_serialization: bool = True,
):
r"""
Save the LoRA parameters corresponding to the UNet and text encoder.
Arguments:
save_directory (`str` or `os.PathLike`):
Directory to save LoRA parameters to. Will be created if it doesn't exist.
transformer_lora_layers (`Dict[str, torch.nn.Module]` or `Dict[str, torch.Tensor]`):
State dict of the LoRA layers corresponding to the `transformer`.
is_main_process (`bool`, *optional*, defaults to `True`):
Whether the process calling this is the main process or not. Useful during distributed training and you
need to call this function on all processes. In this case, set `is_main_process=True` only on the main
process to avoid race conditions.
save_function (`Callable`):
The function to use to save the state dictionary. Useful during distributed training when you need to
replace `torch.save` with another method. Can be configured with the environment variable
`DIFFUSERS_SAVE_MODE`.
safe_serialization (`bool`, *optional*, defaults to `True`):
Whether to save the model using `safetensors` or the traditional PyTorch way with `pickle`.
"""
state_dict = {}
if not transformer_lora_layers:
raise ValueError("You must pass `transformer_lora_layers`.")
if transformer_lora_layers:
state_dict.update(cls.pack_weights(transformer_lora_layers, cls.transformer_name))
# Save the model
cls.write_lora_layers(
state_dict=state_dict,
save_directory=save_directory,
is_main_process=is_main_process,
weight_name=weight_name,
save_function=save_function,
safe_serialization=safe_serialization,
)
# Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.fuse_lora
def fuse_lora(
self,
components: List[str] = ["transformer"],
lora_scale: float = 1.0,
safe_fusing: bool = False,
adapter_names: Optional[List[str]] = None,
**kwargs,
):
r"""
Fuses the LoRA parameters into the original parameters of the corresponding blocks.
<Tip warning={true}>
This is an experimental API.
</Tip>
Args:
components: (`List[str]`): List of LoRA-injectable components to fuse the LoRAs into.
lora_scale (`float`, defaults to 1.0):
Controls how much to influence the outputs with the LoRA parameters.
safe_fusing (`bool`, defaults to `False`):
Whether to check fused weights for NaN values before fusing and if values are NaN not fusing them.
adapter_names (`List[str]`, *optional*):
Adapter names to be used for fusing. If nothing is passed, all active adapters will be fused.
Example:
```py
from diffusers import DiffusionPipeline
import torch
pipeline = DiffusionPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16
).to("cuda")
pipeline.load_lora_weights("nerijs/pixel-art-xl", weight_name="pixel-art-xl.safetensors", adapter_name="pixel")
pipeline.fuse_lora(lora_scale=0.7)
```
"""
super().fuse_lora(
components=components,
lora_scale=lora_scale,
safe_fusing=safe_fusing,
adapter_names=adapter_names,
**kwargs,
)
# Copied from diffusers.loaders.lora_pipeline.SanaLoraLoaderMixin.unfuse_lora
def unfuse_lora(self, components: List[str] = ["transformer"], **kwargs):
r"""
Reverses the effect of
[`pipe.fuse_lora()`](https://huggingface.co/docs/diffusers/main/en/api/loaders#diffusers.loaders.LoraBaseMixin.fuse_lora).
<Tip warning={true}>
This is an experimental API.
</Tip>
Args:
components (`List[str]`): List of LoRA-injectable components to unfuse LoRA from.
unfuse_transformer (`bool`, defaults to `True`): Whether to unfuse the UNet LoRA parameters.
"""
super().unfuse_lora(components=components, **kwargs)
\ No newline at end of file
"""
OmniGen2 Diffusion Pipeline
Copyright 2025 BAAI, The OmniGen2 Team and The HuggingFace Team. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import inspect
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import math
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
from transformers import Qwen2_5_VLForConditionalGeneration
from diffusers.models.autoencoders import AutoencoderKL
from ...models.transformers import OmniGen2Transformer2DModel
from ...models.transformers.repo import OmniGen2RotaryPosEmbed
from diffusers.schedulers import FlowMatchEulerDiscreteScheduler
from diffusers.utils import (
is_torch_xla_available,
logging,
)
from diffusers.utils.torch_utils import randn_tensor
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from dataclasses import dataclass
import PIL.Image
from diffusers.utils import BaseOutput
from omnigen2.pipelines.image_processor import OmniGen2ImageProcessor
from omnigen2.utils.teacache_util import TeaCacheParams
from ..lora_pipeline import OmniGen2LoraLoaderMixin
if is_torch_xla_available():
import torch_xla.core.xla_model as xm
XLA_AVAILABLE = True
else:
XLA_AVAILABLE = False
from ...cache_functions import cache_init
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@dataclass
class FMPipelineOutput(BaseOutput):
"""
Output class for OmniGen2 pipeline.
Args:
images (Union[List[PIL.Image.Image], np.ndarray]):
List of denoised PIL images of length `batch_size` or numpy array of shape
`(batch_size, height, width, num_channels)`. Contains the generated images.
"""
images: Union[List[PIL.Image.Image], np.ndarray]
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None:
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
class OmniGen2Pipeline(DiffusionPipeline, OmniGen2LoraLoaderMixin):
"""
Pipeline for text-to-image generation using OmniGen2.
This pipeline implements a text-to-image generation model that uses:
- Qwen2.5-VL for text encoding
- A custom transformer architecture for image generation
- VAE for image encoding/decoding
- FlowMatchEulerDiscreteScheduler for noise scheduling
Args:
transformer (OmniGen2Transformer2DModel): The transformer model for image generation.
vae (AutoencoderKL): The VAE model for image encoding/decoding.
scheduler (FlowMatchEulerDiscreteScheduler): The scheduler for noise scheduling.
text_encoder (Qwen2_5_VLModel): The text encoder model.
tokenizer (Union[Qwen2Tokenizer, Qwen2TokenizerFast]): The tokenizer for text processing.
"""
model_cpu_offload_seq = "mllm->transformer->vae"
def __init__(
self,
transformer: OmniGen2Transformer2DModel,
vae: AutoencoderKL,
scheduler: FlowMatchEulerDiscreteScheduler,
mllm: Qwen2_5_VLForConditionalGeneration,
processor,
) -> None:
"""
Initialize the OmniGen2 pipeline.
Args:
transformer: The transformer model for image generation.
vae: The VAE model for image encoding/decoding.
scheduler: The scheduler for noise scheduling.
text_encoder: The text encoder model.
tokenizer: The tokenizer for text processing.
"""
super().__init__()
self.register_modules(
transformer=transformer,
vae=vae,
scheduler=scheduler,
mllm=mllm,
processor=processor
)
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
self.image_processor = OmniGen2ImageProcessor(vae_scale_factor=self.vae_scale_factor * 2, do_resize=True)
self.default_sample_size = 128
def prepare_latents(
self,
batch_size: int,
num_channels_latents: int,
height: int,
width: int,
dtype: torch.dtype,
device: torch.device,
generator: Optional[torch.Generator],
latents: Optional[torch.FloatTensor] = None,
) -> torch.FloatTensor:
"""
Prepare the initial latents for the diffusion process.
Args:
batch_size: The number of images to generate.
num_channels_latents: The number of channels in the latent space.
height: The height of the generated image.
width: The width of the generated image.
dtype: The data type of the latents.
device: The device to place the latents on.
generator: The random number generator to use.
latents: Optional pre-computed latents to use instead of random initialization.
Returns:
torch.FloatTensor: The prepared latents tensor.
"""
height = int(height) // self.vae_scale_factor
width = int(width) // self.vae_scale_factor
shape = (batch_size, num_channels_latents, height, width)
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
latents = latents.to(device)
return latents
def encode_vae(self, img: torch.FloatTensor) -> torch.FloatTensor:
"""
Encode an image into the VAE latent space.
Args:
img: The input image tensor to encode.
Returns:
torch.FloatTensor: The encoded latent representation.
"""
z0 = self.vae.encode(img.to(dtype=self.vae.dtype)).latent_dist.sample()
if self.vae.config.shift_factor is not None:
z0 = z0 - self.vae.config.shift_factor
if self.vae.config.scaling_factor is not None:
z0 = z0 * self.vae.config.scaling_factor
z0 = z0.to(dtype=self.vae.dtype)
return z0
def prepare_image(
self,
images: Union[List[PIL.Image.Image], PIL.Image.Image],
batch_size: int,
num_images_per_prompt: int,
max_pixels: int,
max_side_length: int,
device: torch.device,
dtype: torch.dtype,
) -> List[Optional[torch.FloatTensor]]:
"""
Prepare input images for processing by encoding them into the VAE latent space.
Args:
images: Single image or list of images to process.
batch_size: The number of images to generate per prompt.
num_images_per_prompt: The number of images to generate for each prompt.
device: The device to place the encoded latents on.
dtype: The data type of the encoded latents.
Returns:
List[Optional[torch.FloatTensor]]: List of encoded latent representations for each image.
"""
if batch_size == 1:
images = [images]
latents = []
for i, img in enumerate(images):
if img is not None and len(img) > 0:
ref_latents = []
for j, img_j in enumerate(img):
img_j = self.image_processor.preprocess(img_j, max_pixels=max_pixels, max_side_length=max_side_length)
ref_latents.append(self.encode_vae(img_j.to(device=device)).squeeze(0))
else:
ref_latents = None
for _ in range(num_images_per_prompt):
latents.append(ref_latents)
return latents
def _get_qwen2_prompt_embeds(
self,
prompt: Union[str, List[str]],
device: Optional[torch.device] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Get prompt embeddings from the Qwen2 text encoder.
Args:
prompt: The prompt or list of prompts to encode.
device: The device to place the embeddings on. If None, uses the pipeline's device.
max_sequence_length: Maximum sequence length for tokenization.
Returns:
Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
- The prompt embeddings tensor
- The attention mask tensor
Raises:
Warning: If the input text is truncated due to sequence length limitations.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
# text_inputs = self.processor.tokenizer(
# prompt,
# padding="max_length",
# max_length=max_sequence_length,
# truncation=True,
# return_tensors="pt",
# )
text_inputs = self.processor.tokenizer(
prompt,
padding="longest",
max_length=max_sequence_length,
truncation=True,
return_tensors="pt",
)
text_input_ids = text_inputs.input_ids.to(device)
untruncated_ids = self.processor.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids.to(device)
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
removed_text = self.processor.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because Gemma can only handle sequences up to"
f" {max_sequence_length} tokens: {removed_text}"
)
prompt_attention_mask = text_inputs.attention_mask.to(device)
prompt_embeds = self.mllm(
text_input_ids,
attention_mask=prompt_attention_mask,
output_hidden_states=True,
).hidden_states[-1]
if self.mllm is not None:
dtype = self.mllm.dtype
elif self.transformer is not None:
dtype = self.transformer.dtype
else:
dtype = None
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
return prompt_embeds, prompt_attention_mask
def _apply_chat_template(self, prompt: str):
prompt = [
{
"role": "system",
"content": "You are a helpful assistant that generates high-quality images based on user instructions.",
},
{"role": "user", "content": prompt},
]
prompt = self.processor.tokenizer.apply_chat_template(prompt, tokenize=False, add_generation_prompt=False)
return prompt
def encode_prompt(
self,
prompt: Union[str, List[str]],
do_classifier_free_guidance: bool = True,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: int = 1,
device: Optional[torch.device] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
prompt_attention_mask: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
max_sequence_length: int = 256,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
negative_prompt (`str` or `List[str]`, *optional*):
The prompt not to guide the image generation. If not defined, one has to pass `negative_prompt_embeds`
instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). For
Lumina-T2I, this should be "".
do_classifier_free_guidance (`bool`, *optional*, defaults to `True`):
whether to use classifier free guidance or not
num_images_per_prompt (`int`, *optional*, defaults to 1):
number of images that should be generated per prompt
device: (`torch.device`, *optional*):
torch device to place the resulting embeddings on
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. For Lumina-T2I, it's should be the embeddings of the "" string.
max_sequence_length (`int`, defaults to `256`):
Maximum sequence length to use for the prompt.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [self._apply_chat_template(_prompt) for _prompt in prompt]
if prompt is not None:
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds, prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=prompt,
device=device,
max_sequence_length=max_sequence_length
)
batch_size, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
prompt_attention_mask = prompt_attention_mask.repeat(num_images_per_prompt, 1)
prompt_attention_mask = prompt_attention_mask.view(batch_size * num_images_per_prompt, -1)
# Get negative embeddings for classifier free guidance
if do_classifier_free_guidance and negative_prompt_embeds is None:
negative_prompt = negative_prompt if negative_prompt is not None else ""
# Normalize str to list
negative_prompt = batch_size * [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt
negative_prompt = [self._apply_chat_template(_negative_prompt) for _negative_prompt in negative_prompt]
if prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif isinstance(negative_prompt, str):
negative_prompt = [negative_prompt]
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
negative_prompt_embeds, negative_prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=negative_prompt,
device=device,
max_sequence_length=max_sequence_length,
)
batch_size, seq_len, _ = negative_prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)
negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
negative_prompt_attention_mask = negative_prompt_attention_mask.repeat(num_images_per_prompt, 1)
negative_prompt_attention_mask = negative_prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
return prompt_embeds, prompt_attention_mask, negative_prompt_embeds, negative_prompt_attention_mask
@property
def num_timesteps(self):
return self._num_timesteps
@property
def text_guidance_scale(self):
return self._text_guidance_scale
@property
def image_guidance_scale(self):
return self._image_guidance_scale
@property
def cfg_range(self):
return self._cfg_range
@torch.no_grad()
def __call__(
self,
prompt: Optional[Union[str, List[str]]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
prompt_attention_mask: Optional[torch.LongTensor] = None,
negative_prompt_attention_mask: Optional[torch.LongTensor] = None,
max_sequence_length: Optional[int] = None,
callback_on_step_end_tensor_inputs: Optional[List[str]] = None,
input_images: Optional[List[PIL.Image.Image]] = None,
num_images_per_prompt: int = 1,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: int = 1024 * 1024,
max_input_image_side_length: int = 1024,
align_res: bool = True,
num_inference_steps: int = 28,
text_guidance_scale: float = 4.0,
image_guidance_scale: float = 1.0,
cfg_range: Tuple[float, float] = (0.0, 1.0),
attention_kwargs: Optional[Dict[str, Any]] = None,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
verbose: bool = False,
step_func=None,
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
self._text_guidance_scale = text_guidance_scale
self._image_guidance_scale = image_guidance_scale
self._cfg_range = cfg_range
self._attention_kwargs = attention_kwargs
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
device = self._execution_device
# 3. Encode input prompt
(
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
) = self.encode_prompt(
prompt,
self.text_guidance_scale > 1.0,
negative_prompt=negative_prompt,
num_images_per_prompt=num_images_per_prompt,
device=device,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
max_sequence_length=max_sequence_length,
)
dtype = self.vae.dtype
# 3. Prepare control image
ref_latents = self.prepare_image(
images=input_images,
batch_size=batch_size,
num_images_per_prompt=num_images_per_prompt,
max_pixels=max_pixels,
max_side_length=max_input_image_side_length,
device=device,
dtype=dtype,
)
if input_images is None:
input_images = []
if len(input_images) == 1 and align_res:
width, height = ref_latents[0][0].shape[-1] * self.vae_scale_factor, ref_latents[0][0].shape[-2] * self.vae_scale_factor
ori_width, ori_height = width, height
else:
ori_width, ori_height = width, height
cur_pixels = height * width
ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(ratio, 1.0)
height, width = int(height * ratio) // 16 * 16, int(width * ratio) // 16 * 16
if len(input_images) == 0:
self._image_guidance_scale = 1
# 4. Prepare latents.
latent_channels = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_images_per_prompt,
latent_channels,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
)
freqs_cis = OmniGen2RotaryPosEmbed.get_freqs_cis(
self.transformer.config.axes_dim_rope,
self.transformer.config.axes_lens,
theta=10000,
)
image = self.processing(
latents=latents,
ref_latents=ref_latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
num_inference_steps=num_inference_steps,
timesteps=timesteps,
device=device,
dtype=dtype,
verbose=verbose,
step_func=step_func,
)
image = F.interpolate(image, size=(ori_height, ori_width), mode='bilinear')
image = self.image_processor.postprocess(image, output_type=output_type)
# Offload all models
self.maybe_free_model_hooks()
if not return_dict:
return image
else:
return FMPipelineOutput(images=image)
def processing(
self,
latents,
ref_latents,
prompt_embeds,
freqs_cis,
negative_prompt_embeds,
prompt_attention_mask,
negative_prompt_attention_mask,
num_inference_steps,
timesteps,
device,
dtype,
verbose,
step_func=None
):
batch_size = latents.shape[0]
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
num_tokens=latents.shape[-2] * latents.shape[-1]
)
num_warmup_steps = max(len(timesteps) - num_inference_steps * self.scheduler.order, 0)
self._num_timesteps = len(timesteps)
enable_taylorseer = getattr(self, "enable_taylorseer", False)
if enable_taylorseer:
model_pred_cache_dic, model_pred_current = cache_init(self, num_inference_steps)
model_pred_ref_cache_dic, model_pred_ref_current = cache_init(self, num_inference_steps)
model_pred_uncond_cache_dic, model_pred_uncond_current = cache_init(self, num_inference_steps)
self.transformer.enable_taylorseer = True
elif self.transformer.enable_teacache:
# Use different TeaCacheParams for different conditions
teacache_params = TeaCacheParams()
teacache_params_uncond = TeaCacheParams()
teacache_params_ref = TeaCacheParams()
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
if enable_taylorseer:
self.transformer.cache_dic = model_pred_cache_dic
self.transformer.current = model_pred_current
elif self.transformer.enable_teacache:
teacache_params.is_first_or_last_step = i == 0 or i == len(timesteps) - 1
self.transformer.teacache_params = teacache_params
model_pred = self.predict(
t=t,
latents=latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
text_guidance_scale = self.text_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0
image_guidance_scale = self.image_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0
if text_guidance_scale > 1.0 and image_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_ref_cache_dic
self.transformer.current = model_pred_ref_current
elif self.transformer.enable_teacache:
teacache_params_ref.is_first_or_last_step = i == 0 or i == len(timesteps) - 1
self.transformer.teacache_params = teacache_params_ref
model_pred_ref = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = i == 0 or i == len(timesteps) - 1
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = model_pred_uncond + image_guidance_scale * (model_pred_ref - model_pred_uncond) + \
text_guidance_scale * (model_pred - model_pred_ref)
elif text_guidance_scale > 1.0:
if enable_taylorseer:
self.transformer.cache_dic = model_pred_uncond_cache_dic
self.transformer.current = model_pred_uncond_current
elif self.transformer.enable_teacache:
teacache_params_uncond.is_first_or_last_step = i == 0 or i == len(timesteps) - 1
self.transformer.teacache_params = teacache_params_uncond
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = model_pred_uncond + text_guidance_scale * (model_pred - model_pred_uncond)
latents = self.scheduler.step(model_pred, t, latents, return_dict=False)[0]
latents = latents.to(dtype=dtype)
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
progress_bar.update()
if step_func is not None:
step_func(i, self._num_timesteps)
if enable_taylorseer:
del model_pred_cache_dic, model_pred_ref_cache_dic, model_pred_uncond_cache_dic
del model_pred_current, model_pred_ref_current, model_pred_uncond_current
latents = latents.to(dtype=dtype)
if self.vae.config.scaling_factor is not None:
latents = latents / self.vae.config.scaling_factor
if self.vae.config.shift_factor is not None:
latents = latents + self.vae.config.shift_factor
image = self.vae.decode(latents, return_dict=False)[0]
return image
def predict(
self,
t,
latents,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
ref_image_hidden_states,
):
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timestep = t.expand(latents.shape[0]).to(latents.dtype)
batch_size, num_channels_latents, height, width = latents.shape
optional_kwargs = {}
if 'ref_image_hidden_states' in set(inspect.signature(self.transformer.forward).parameters.keys()):
optional_kwargs['ref_image_hidden_states'] = ref_image_hidden_states
model_pred = self.transformer(
latents,
timestep,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
**optional_kwargs
)
return model_pred
\ No newline at end of file
"""
OmniGen2 Diffusion Pipeline
Copyright 2025 BAAI, The OmniGen2 Team and The HuggingFace Team. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import inspect
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import math
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
from transformers import Qwen2_5_VLForConditionalGeneration
from diffusers.models.autoencoders import AutoencoderKL
from ...models.transformers import OmniGen2Transformer2DModel
from ...models.transformers.repo import OmniGen2RotaryPosEmbed
from diffusers.schedulers import FlowMatchEulerDiscreteScheduler
from diffusers.utils import (
is_torch_xla_available,
logging,
)
from diffusers.utils.torch_utils import randn_tensor
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from dataclasses import dataclass
import PIL.Image
from diffusers.utils import BaseOutput
from omnigen2.pipelines.image_processor import OmniGen2ImageProcessor
if is_torch_xla_available():
import torch_xla.core.xla_model as xm
XLA_AVAILABLE = True
else:
XLA_AVAILABLE = False
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@dataclass
class OmniGen2PipelineOutput(BaseOutput):
"""
Output class for OmniGen2 pipeline.
Args:
images (Union[List[PIL.Image.Image], np.ndarray]):
List of denoised PIL images of length `batch_size` or numpy array of shape
`(batch_size, height, width, num_channels)`. Contains the generated images.
"""
text: str
images: Union[List[PIL.Image.Image], np.ndarray]
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
**kwargs,
):
"""
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`):
The scheduler to get timesteps from.
num_inference_steps (`int`):
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
if timesteps is not None:
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
class OmniGen2ChatPipeline(DiffusionPipeline):
"""
Pipeline for text-to-image generation using OmniGen2.
This pipeline implements a text-to-image generation model that uses:
- Qwen2.5-VL for text encoding
- A custom transformer architecture for image generation
- VAE for image encoding/decoding
- FlowMatchEulerDiscreteScheduler for noise scheduling
Args:
transformer (OmniGen2Transformer2DModel): The transformer model for image generation.
vae (AutoencoderKL): The VAE model for image encoding/decoding.
scheduler (FlowMatchEulerDiscreteScheduler): The scheduler for noise scheduling.
text_encoder (Qwen2_5_VLModel): The text encoder model.
tokenizer (Union[Qwen2Tokenizer, Qwen2TokenizerFast]): The tokenizer for text processing.
"""
model_cpu_offload_seq = "mllm->transformer->vae"
def __init__(
self,
transformer: OmniGen2Transformer2DModel,
vae: AutoencoderKL,
scheduler: FlowMatchEulerDiscreteScheduler,
mllm: Qwen2_5_VLForConditionalGeneration,
processor,
) -> None:
"""
Initialize the OmniGen2 pipeline.
Args:
transformer: The transformer model for image generation.
vae: The VAE model for image encoding/decoding.
scheduler: The scheduler for noise scheduling.
text_encoder: The text encoder model.
tokenizer: The tokenizer for text processing.
"""
super().__init__()
self.register_modules(
transformer=transformer,
vae=vae,
scheduler=scheduler,
mllm=mllm,
processor=processor
)
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
self.image_processor = OmniGen2ImageProcessor(vae_scale_factor=self.vae_scale_factor * 2, do_resize=True)
self.default_sample_size = 128
def prepare_latents(
self,
batch_size: int,
num_channels_latents: int,
height: int,
width: int,
dtype: torch.dtype,
device: torch.device,
generator: Optional[torch.Generator],
latents: Optional[torch.FloatTensor] = None,
) -> torch.FloatTensor:
"""
Prepare the initial latents for the diffusion process.
Args:
batch_size: The number of images to generate.
num_channels_latents: The number of channels in the latent space.
height: The height of the generated image.
width: The width of the generated image.
dtype: The data type of the latents.
device: The device to place the latents on.
generator: The random number generator to use.
latents: Optional pre-computed latents to use instead of random initialization.
Returns:
torch.FloatTensor: The prepared latents tensor.
"""
height = int(height) // self.vae_scale_factor
width = int(width) // self.vae_scale_factor
shape = (batch_size, num_channels_latents, height, width)
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
latents = latents.to(device)
return latents
def encode_vae(self, img: torch.FloatTensor) -> torch.FloatTensor:
"""
Encode an image into the VAE latent space.
Args:
img: The input image tensor to encode.
Returns:
torch.FloatTensor: The encoded latent representation.
"""
z0 = self.vae.encode(img.to(dtype=self.vae.dtype)).latent_dist.sample()
if self.vae.config.shift_factor is not None:
z0 = z0 - self.vae.config.shift_factor
if self.vae.config.scaling_factor is not None:
z0 = z0 * self.vae.config.scaling_factor
z0 = z0.to(dtype=self.vae.dtype)
return z0
def prepare_image(
self,
images: Union[List[PIL.Image.Image], PIL.Image.Image],
batch_size: int,
num_images_per_prompt: int,
max_pixels: int,
max_side_length: int,
device: torch.device,
dtype: torch.dtype,
) -> List[Optional[torch.FloatTensor]]:
"""
Prepare input images for processing by encoding them into the VAE latent space.
Args:
images: Single image or list of images to process.
batch_size: The number of images to generate per prompt.
num_images_per_prompt: The number of images to generate for each prompt.
device: The device to place the encoded latents on.
dtype: The data type of the encoded latents.
Returns:
List[Optional[torch.FloatTensor]]: List of encoded latent representations for each image.
"""
if batch_size == 1:
images = [images]
latents = []
for i, img in enumerate(images):
if img is not None and len(img) > 0:
ref_latents = []
for j, img_j in enumerate(img):
img_j = self.image_processor.preprocess(img_j, max_pixels=max_pixels, max_side_length=max_side_length)
ref_latents.append(self.encode_vae(img_j.to(device=device)).squeeze(0))
else:
ref_latents = None
for _ in range(num_images_per_prompt):
latents.append(ref_latents)
return latents
def _apply_chat_template(self, prompt: str, images: List = None):
if images is not None:
prompt = "".join(
[
f"<img{i}>: <|vision_start|><|image_pad|><|vision_end|>"
for i in range(1, len(images) + 1)
]
) + prompt
prompt = f"<|im_start|>system\nYou are a helpful assistant that generates high-quality images based on user instructions.<|im_end|>\n<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n"
return prompt
def _get_qwen2_prompt_embeds(
self,
prompt: Union[str, List[str]],
input_images = None,
device: Optional[torch.device] = None,
use_only_text_hidden_states: bool = True,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Get prompt embeddings from the Qwen2 text encoder.
Args:
prompt: The prompt or list of prompts to encode.
device: The device to place the embeddings on. If None, uses the pipeline's device.
Returns:
Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
- The prompt embeddings tensor
- The attention mask tensor
Raises:
Warning: If the input text is truncated due to sequence length limitations.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
inputs = self.processor(
text=prompt,
images=input_images,
videos=None,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(device)
prompt_embeds = self.mllm(
**inputs,
output_hidden_states=True,
).hidden_states[-1]
text_input_ids = inputs.input_ids
text_mask = inputs.attention_mask
if use_only_text_hidden_states:
mask = text_input_ids != self.mllm.config.image_token_id
mask = mask & text_mask
mask = mask.bool()
text_l = mask.sum(dim=-1)
max_l = text_l.max()
text_batch_size = prompt_embeds.size(0)
new_prompt_embeds = torch.zeros((text_batch_size, max_l, prompt_embeds.size(-1)), device=prompt_embeds.device, dtype=prompt_embeds.dtype)
new_text_mask = torch.zeros((text_batch_size, max_l), dtype=text_mask.dtype, device=text_mask.device)
for i in range(text_batch_size):
new_prompt_embeds[i, :text_l[i]] = prompt_embeds[i, mask[i]]
new_text_mask[i, :text_l[i]] = 1
prompt_embeds = new_prompt_embeds
text_mask = new_text_mask
prompt_embeds = prompt_embeds.to(dtype=self.mllm.dtype, device=device)
return prompt_embeds, text_mask
def encode_prompt(
self,
prompt: Union[str, List[str]],
input_images: Optional[Union[str, List[str]]] = None,
do_classifier_free_guidance: bool = True,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: int = 1,
device: Optional[torch.device] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
prompt_attention_mask: Optional[torch.Tensor] = None,
negative_prompt_attention_mask: Optional[torch.Tensor] = None,
max_sequence_length: int = 256,
use_text_encoder_penultimate_layer_feats: bool = False
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
r"""
Encodes the prompt into text encoder hidden states.
Args:
prompt (`str` or `List[str]`, *optional*):
prompt to be encoded
negative_prompt (`str` or `List[str]`, *optional*):
The prompt not to guide the image generation. If not defined, one has to pass `negative_prompt_embeds`
instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`). For
Lumina-T2I, this should be "".
do_classifier_free_guidance (`bool`, *optional*, defaults to `True`):
whether to use classifier free guidance or not
num_images_per_prompt (`int`, *optional*, defaults to 1):
number of images that should be generated per prompt
device: (`torch.device`, *optional*):
torch device to place the resulting embeddings on
prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`torch.Tensor`, *optional*):
Pre-generated negative text embeddings. For Lumina-T2I, it's should be the embeddings of the "" string.
max_sequence_length (`int`, defaults to `256`):
Maximum sequence length to use for the prompt.
"""
device = device or self._execution_device
prompt = [prompt] if isinstance(prompt, str) else prompt
if prompt is not None:
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
if prompt_embeds is None:
prompt_embeds, prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=prompt,
input_images=input_images,
device=device,
)
batch_size, seq_len, _ = prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
prompt_attention_mask = prompt_attention_mask.repeat(num_images_per_prompt, 1)
prompt_attention_mask = prompt_attention_mask.view(batch_size * num_images_per_prompt, -1)
# Get negative embeddings for classifier free guidance
negative_prompt_embeds, negative_prompt_attention_mask = None, None
if do_classifier_free_guidance and negative_prompt_embeds is None:
negative_prompt = negative_prompt if negative_prompt is not None else ""
# Normalize str to list
negative_prompt = batch_size * [negative_prompt] if isinstance(negative_prompt, str) else negative_prompt
negative_prompt = [self._apply_chat_template(_negative_prompt) for _negative_prompt in negative_prompt]
if prompt is not None and type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
elif isinstance(negative_prompt, str):
negative_prompt = [negative_prompt]
elif batch_size != len(negative_prompt):
raise ValueError(
f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
" the batch size of `prompt`."
)
negative_prompt_embeds, negative_prompt_attention_mask = self._get_qwen2_prompt_embeds(
prompt=negative_prompt,
device=device,
)
batch_size, seq_len, _ = negative_prompt_embeds.shape
# duplicate text embeddings and attention mask for each generation per prompt, using mps friendly method
negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)
negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
negative_prompt_attention_mask = negative_prompt_attention_mask.repeat(num_images_per_prompt, 1)
negative_prompt_attention_mask = negative_prompt_attention_mask.view(
batch_size * num_images_per_prompt, -1
)
return prompt_embeds, prompt_attention_mask, negative_prompt_embeds, negative_prompt_attention_mask
@property
def num_timesteps(self):
return self._num_timesteps
@property
def text_guidance_scale(self):
return self._text_guidance_scale
@property
def image_guidance_scale(self):
return self._image_guidance_scale
@property
def cfg_range(self):
return self._cfg_range
def prepare_inputs_for_text_generation(self, prompts, input_images, device):
if isinstance(prompts, str):
prompts = [prompts]
ori_padding_side = self.processor.tokenizer.padding_side
self.processor.tokenizer.padding_side = "left"
inputs = self.processor(
text=prompts,
images=input_images,
videos=None,
padding=True,
return_tensors="pt",
).to(device)
self.processor.tokenizer.padding_side = ori_padding_side
return inputs
def generate_text(self, prompt, input_images):
inputs = self.prepare_inputs_for_text_generation(
prompt, input_images, self.mllm.device
)
generated_ids = self.mllm.generate(
**inputs,
tokenizer=self.processor.tokenizer,
max_new_tokens=256,
stop_strings=["<|im_end|>", "<|img|>", "<|endoftext|>"],
) # stop_words=[151643, 151645, 151665]
generated_ids_trimmed = [
out_ids[len(in_ids) :]
for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_texts = self.processor.batch_decode(
generated_ids_trimmed,
# skip_special_tokens=True,
skip_special_tokens=False,
clean_up_tokenization_spaces=False,
)
return output_texts
def generate_image(
self,
prompt: Optional[Union[str, List[str]]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
prompt_attention_mask: Optional[torch.LongTensor] = None,
negative_prompt_attention_mask: Optional[torch.LongTensor] = None,
use_text_encoder_penultimate_layer_feats: bool = False,
max_sequence_length: Optional[int] = None,
callback_on_step_end_tensor_inputs: Optional[List[str]] = None,
input_images: Optional[List[PIL.Image.Image]] = None,
num_images_per_prompt: int = 1,
height: Optional[int] = None,
width: Optional[int] = None,
max_pixels: int = 1024 * 1024,
max_input_image_side_length: int = 1024,
align_res: bool = True,
num_inference_steps: int = 28,
text_guidance_scale: float = 4.0,
image_guidance_scale: float = 1.0,
cfg_range: Tuple[float, float] = (0.0, 1.0),
attention_kwargs: Optional[Dict[str, Any]] = None,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
verbose: bool = False,
step_func=None,
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
self._text_guidance_scale = text_guidance_scale
self._image_guidance_scale = image_guidance_scale
self._cfg_range = cfg_range
self._attention_kwargs = attention_kwargs
# 2. Define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
else:
batch_size = prompt_embeds.shape[0]
device = self._execution_device
# 3. Encode input promptb
(
prompt_embeds,
prompt_attention_mask,
negative_prompt_embeds,
negative_prompt_attention_mask,
) = self.encode_prompt(
prompt,
input_images,
self.text_guidance_scale > 1.0,
negative_prompt=negative_prompt,
num_images_per_prompt=num_images_per_prompt,
device=device,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
max_sequence_length=max_sequence_length,
use_text_encoder_penultimate_layer_feats=use_text_encoder_penultimate_layer_feats
)
dtype = self.vae.dtype
# 3. Prepare control image
ref_latents = self.prepare_image(
images=input_images,
batch_size=batch_size,
num_images_per_prompt=num_images_per_prompt,
max_pixels=max_pixels,
max_side_length=max_input_image_side_length,
device=device,
dtype=dtype,
)
if input_images is None:
input_images = []
if len(input_images) == 1 and align_res:
width, height = ref_latents[0][0].shape[-1] * self.vae_scale_factor, ref_latents[0][0].shape[-2] * self.vae_scale_factor
ori_width, ori_height = width, height
else:
ori_width, ori_height = width, height
cur_pixels = height * width
ratio = (max_pixels / cur_pixels) ** 0.5
ratio = min(ratio, 1.0)
height, width = int(height * ratio) // 16 * 16, int(width * ratio) // 16 * 16
if len(input_images) == 0:
self._image_guidance_scale = 1
# 4. Prepare latents.
latent_channels = self.transformer.config.in_channels
latents = self.prepare_latents(
batch_size * num_images_per_prompt,
latent_channels,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
)
freqs_cis = OmniGen2RotaryPosEmbed.get_freqs_cis(
self.transformer.config.axes_dim_rope,
self.transformer.config.axes_lens,
theta=10000,
)
image = self.processing(
latents=latents,
ref_latents=ref_latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
negative_prompt_embeds=negative_prompt_embeds,
prompt_attention_mask=prompt_attention_mask,
negative_prompt_attention_mask=negative_prompt_attention_mask,
num_inference_steps=num_inference_steps,
timesteps=timesteps,
device=device,
dtype=dtype,
verbose=verbose,
step_func=step_func,
)
image = F.interpolate(image, size=(ori_height, ori_width), mode='bilinear')
image = self.image_processor.postprocess(image, output_type=output_type)
# Offload all models
self.maybe_free_model_hooks()
return image
@torch.no_grad()
def __call__(
self,
prompt: Optional[Union[str, List[str]]] = None,
negative_prompt: Optional[Union[str, List[str]]] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
prompt_attention_mask: Optional[torch.LongTensor] = None,
negative_prompt_attention_mask: Optional[torch.LongTensor] = None,
use_text_encoder_penultimate_layer_feats: bool = False,
max_sequence_length: Optional[int] = None,
callback_on_step_end_tensor_inputs: Optional[List[str]] = None,
input_images: Optional[List[PIL.Image.Image]] = None,
num_images_per_prompt: int = 1,
height: Optional[int] = 1024,
width: Optional[int] = 1024,
max_pixels: Optional[int] = 1024 * 1024,
max_input_image_side_length: int = 1024,
align_res: bool = True,
num_inference_steps: int = 28,
text_guidance_scale: float = 4.0,
image_guidance_scale: float = 1.0,
cfg_range: Tuple[float, float] = (0.0, 1.0),
attention_kwargs: Optional[Dict[str, Any]] = None,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
verbose: bool = False,
step_func=None,
):
assert isinstance(prompt, str), "prompt must be a string since chat mode only support one prompt per turn"
# input_images = self.preprocess_images(input_images, max_input_image_size)
prompt = self._apply_chat_template(prompt, input_images)
generated_text = self.generate_text(prompt, input_images)[0]
images = None
if generated_text.startswith("<|img|>"):
#TODO: reuse the hidden state when generate text instead of re-generating
prompt = prompt + generated_text.split("<|img|>")[0]
images = self.generate_image(
prompt=prompt,
negative_prompt=negative_prompt,
use_text_encoder_penultimate_layer_feats=use_text_encoder_penultimate_layer_feats,
max_sequence_length=max_sequence_length,
input_images=input_images,
num_images_per_prompt=num_images_per_prompt,
height=height,
width=width,
max_pixels=max_pixels,
max_input_image_side_length=max_input_image_side_length,
align_res=align_res,
num_inference_steps=num_inference_steps,
text_guidance_scale=text_guidance_scale,
image_guidance_scale=image_guidance_scale,
cfg_range=cfg_range,
timesteps=timesteps,
generator=generator,
latents=latents,
return_dict=False,
verbose=verbose,
step_func=step_func,
)
generated_text = generated_text.replace("<|im_end|>", "")
if not return_dict:
return generated_text, images
else:
return OmniGen2PipelineOutput(text=generated_text, images=images)
def processing(
self,
latents,
ref_latents,
prompt_embeds,
freqs_cis,
negative_prompt_embeds,
prompt_attention_mask,
negative_prompt_attention_mask,
num_inference_steps,
timesteps,
device,
dtype,
verbose,
step_func=None
):
batch_size = latents.shape[0]
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
num_tokens=latents.shape[-2] * latents.shape[-1]
)
num_warmup_steps = max(len(timesteps) - num_inference_steps * self.scheduler.order, 0)
self._num_timesteps = len(timesteps)
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
model_pred = self.predict(
t=t,
latents=latents,
prompt_embeds=prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
text_guidance_scale = self.text_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0
image_guidance_scale = self.image_guidance_scale if self.cfg_range[0] <= i / len(timesteps) <= self.cfg_range[1] else 1.0
if text_guidance_scale > 1.0 and image_guidance_scale > 1.0:
model_pred_ref = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=ref_latents,
)
if image_guidance_scale != 1:
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
else:
model_pred_uncond = torch.zeros_like(model_pred)
model_pred = model_pred_uncond + image_guidance_scale * (model_pred_ref - model_pred_uncond) + \
text_guidance_scale * (model_pred - model_pred_ref)
elif text_guidance_scale > 1.0:
model_pred_uncond = self.predict(
t=t,
latents=latents,
prompt_embeds=negative_prompt_embeds,
freqs_cis=freqs_cis,
prompt_attention_mask=negative_prompt_attention_mask,
ref_image_hidden_states=None,
)
model_pred = model_pred_uncond + text_guidance_scale * (model_pred - model_pred_uncond)
latents = self.scheduler.step(model_pred, t, latents, return_dict=False)[0]
latents = latents.to(dtype=dtype)
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
progress_bar.update()
if step_func is not None:
step_func(i, self._num_timesteps)
latents = latents.to(dtype=dtype)
if self.vae.config.scaling_factor is not None:
latents = latents / self.vae.config.scaling_factor
if self.vae.config.shift_factor is not None:
latents = latents + self.vae.config.shift_factor
image = self.vae.decode(latents, return_dict=False)[0]
return image
def predict(
self,
t,
latents,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
ref_image_hidden_states,
):
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timestep = t.expand(latents.shape[0]).to(latents.dtype)
batch_size, num_channels_latents, height, width = latents.shape
optional_kwargs = {}
if 'ref_image_hidden_states' in set(inspect.signature(self.transformer.forward).parameters.keys()):
optional_kwargs['ref_image_hidden_states'] = ref_image_hidden_states
model_pred = self.transformer(
latents,
timestep,
prompt_embeds,
freqs_cis,
prompt_attention_mask,
**optional_kwargs
)
return model_pred
import torch
def get_pipeline_embeds(pipeline, prompt, negative_prompt, device):
""" Get pipeline embeds for prompts bigger than the maxlength of the pipe
:param pipeline:
:param prompt:
:param negative_prompt:
:param device:
:return:
"""
max_length = pipeline.tokenizer.model_max_length
# simple way to determine length of tokens
# count_prompt = len(prompt.split(" "))
# count_negative_prompt = len(negative_prompt.split(" "))
# create the tensor based on which prompt is longer
# if count_prompt >= count_negative_prompt:
input_ids = pipeline.tokenizer(prompt, return_tensors="pt", truncation=False, padding='longest').input_ids.to(device)
# input_ids = pipeline.tokenizer(prompt, padding="max_length",
# max_length=pipeline.tokenizer.model_max_length,
# truncation=True,
# return_tensors="pt",).input_ids.to(device)
shape_max_length = input_ids.shape[-1]
if negative_prompt is not None:
negative_ids = pipeline.tokenizer(negative_prompt, truncation=True, padding="max_length",
max_length=shape_max_length, return_tensors="pt").input_ids.to(device)
# else:
# negative_ids = pipeline.tokenizer(negative_prompt, return_tensors="pt", truncation=False).input_ids.to(device)
# shape_max_length = negative_ids.shape[-1]
# input_ids = pipeline.tokenizer(prompt, return_tensors="pt", truncation=False, padding="max_length",
# max_length=shape_max_length).input_ids.to(device)
concat_embeds = []
neg_embeds = []
for i in range(0, shape_max_length, max_length):
if hasattr(pipeline.text_encoder.config, "use_attention_mask") and pipeline.text_encoder.config.use_attention_mask:
attention_mask = input_ids[:, i: i + max_length].attention_mask.to(device)
else:
attention_mask = None
concat_embeds.append(pipeline.text_encoder(input_ids[:, i: i + max_length],
attention_mask=attention_mask)[0])
if negative_prompt is not None:
if hasattr(pipeline.text_encoder.config, "use_attention_mask") and pipeline.text_encoder.config.use_attention_mask:
attention_mask = negative_ids[:, i: i + max_length].attention_mask.to(device)
else:
attention_mask = None
neg_embeds.append(pipeline.text_encoder(negative_ids[:, i: i + max_length],
attention_mask=attention_mask)[0])
concat_embeds = torch.cat(concat_embeds, dim=1)
if negative_prompt is not None:
neg_embeds = torch.cat(neg_embeds, dim=1)
else:
neg_embeds = None
return concat_embeds, neg_embeds
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment