import os import torch from PIL import Image from diffusers import (EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, DPMSolverMultistepScheduler, PNDMScheduler, DDIMScheduler) from omegaconf import OmegaConf from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection from safetensors.torch import load_file as load_safetensors from huggingface_hub import snapshot_download from ruyi.data.bucket_sampler import ASPECT_RATIO_512, get_closest_ratio from ruyi.models.autoencoder_magvit import AutoencoderKLMagvit from ruyi.models.transformer3d import HunyuanTransformer3DModel from ruyi.pipeline.pipeline_ruyi_inpaint import RuyiInpaintPipeline from ruyi.utils.lora_utils import merge_lora, unmerge_lora from ruyi.utils.utils import get_image_to_video_latent, save_videos_grid # Input and output start_image_path = "assets/girl_01.jpg" end_image_path = "assets/girl_02.jpg" # Can be None for start-image-to-video output_video_path = "outputs/example_01.mp4" # Video settings video_length = 120 # The max video length is 120 frames (24 frames per second) base_resolution = 640 # # The pixels in the generated video are approximately 512 x 512. Values in the range of [384, 896] typically produce good video quality. video_size = None # Override base_resolution. Format: [height, width], e.g., [384, 672] # Control settings aspect_ratio = "16:9" # Choose in ["16:9", "9:16"], note that this is only the hint motion = "auto" # Motion control, choose in ["1", "2", "3", "4", "auto"] camera_direction = "auto" # Camera control, choose in ["static", "left", "right", "up", "down", "auto"] # Sampler settings steps = 25 cfg = 7.0 scheduler_name = "DDIM" # Choose in ["Euler", "Euler A", "DPM++", "PNDM","DDIM"] # GPU memory settings low_gpu_memory_mode = False # Low gpu memory mode gpu_offload_steps = 5 # Choose in [0, 10, 7, 5, 1], the latter number requires less GPU memory but longer time # Random seed seed = 42 # The Answer to the Ultimate Question of Life, The Universe, and Everything # Model settings config_path = "config/default.yaml" model_name = "Ruyi-Mini-7B" model_type = "Inpaint" model_path = f"models/{model_name}" # (Down)load mode in this path auto_download = False # Automatically download the model if the pipeline creation fails auto_update = False # If auto_download is enabled, check for updates and update the model if necessary # LoRA settings lora_path = None lora_weight = 1.0 # Other settings weight_dtype = torch.bfloat16 device = torch.device("cuda") def get_control_embeddings(pipeline, aspect_ratio, motion, camera_direction): # Default keys p_default_key = "p.default" n_default_key = "n.default" # Load embeddings if motion == "auto": motion = "0" p_key = f"p.{aspect_ratio.replace(':', 'x')}movie{motion}{camera_direction}" embeddings = pipeline.embeddings # Get embeddings positive_embeds = embeddings.get(f"{p_key}.emb1", embeddings[f"{p_default_key}.emb1"]) positive_attention_mask = embeddings.get(f"{p_key}.mask1", embeddings[f"{p_default_key}.mask1"]) positive_embeds_2 = embeddings.get(f"{p_key}.emb2", embeddings[f"{p_default_key}.emb2"]) positive_attention_mask_2 = embeddings.get(f"{p_key}.mask2", embeddings[f"{p_default_key}.mask2"]) negative_embeds = embeddings[f"{n_default_key}.emb1"] negative_attention_mask = embeddings[f"{n_default_key}.mask1"] negative_embeds_2 = embeddings[f"{n_default_key}.emb2"] negative_attention_mask_2 = embeddings[f"{n_default_key}.mask2"] return { "positive_embeds": positive_embeds, "positive_attention_mask": positive_attention_mask, "positive_embeds_2": positive_embeds_2, "positive_attention_mask_2": positive_attention_mask_2, "negative_embeds": negative_embeds, "negative_attention_mask": negative_attention_mask, "negative_embeds_2": negative_embeds_2, "negative_attention_mask_2": negative_attention_mask_2, } def try_setup_pipeline(model_path, weight_dtype, config): try: # Get Vae vae = AutoencoderKLMagvit.from_pretrained( model_path, subfolder="vae" ).to(weight_dtype) print("Vae loaded ...") # Get Transformer transformer_additional_kwargs = OmegaConf.to_container(config['transformer_additional_kwargs']) transformer = HunyuanTransformer3DModel.from_pretrained_2d( model_path, subfolder="transformer", transformer_additional_kwargs=transformer_additional_kwargs ).to(weight_dtype) print("Transformer loaded ...") # Load Clip clip_image_encoder = CLIPVisionModelWithProjection.from_pretrained( model_path, subfolder="image_encoder" ).to(weight_dtype) clip_image_processor = CLIPImageProcessor.from_pretrained( model_path, subfolder="image_encoder" ) # Load sampler and create pipeline Choosen_Scheduler = DDIMScheduler scheduler = Choosen_Scheduler.from_pretrained( model_path, subfolder="scheduler" ) pipeline = RuyiInpaintPipeline.from_pretrained( model_path, vae=vae, transformer=transformer, scheduler=scheduler, torch_dtype=weight_dtype, clip_image_encoder=clip_image_encoder, clip_image_processor=clip_image_processor, ) # Load embeddings embeddings = load_safetensors(os.path.join(model_path, "embeddings.safetensors")) pipeline.embeddings = embeddings print("Pipeline loaded ...") return pipeline except Exception as e: print("[Ruyi] Setup pipeline failed:", e) return None # # Load config # config = OmegaConf.load(config_path) # # # Load images # start_img = [Image.open(start_image_path).convert("RGB")] # end_img = [Image.open(end_image_path).convert("RGB")] if end_image_path is not None else None # # # Check for update # repo_id = f"IamCreateAI/{model_name}" # if auto_download and auto_update: # print(f"Checking for {model_name} updates ...") # # # Download the model # snapshot_download(repo_id=repo_id, local_dir=model_path) # # # Init model # pipeline = try_setup_pipeline(model_path, weight_dtype, config) # if pipeline is None and auto_download: # print(f"Downloading {model_name} ...") # # # Download the model # snapshot_download(repo_id=repo_id, local_dir=model_path) # # pipeline = try_setup_pipeline(model_path, weight_dtype, config) # # if pipeline is None: # message = (f"[Load Model Failed] " # f"Please download Ruyi model from huggingface repo '{repo_id}', " # f"And put it into '{model_path}'.") # if not auto_download: # message += "\nOr just set auto_download to 'True'." # raise FileNotFoundError(message) # # # Setup GPU memory mode # if low_gpu_memory_mode: # pipeline.enable_sequential_cpu_offload() # else: # pipeline.enable_model_cpu_offload() # # # Prepare LoRA config # loras = { # 'models': [lora_path] if lora_path is not None else [], # 'weights': [lora_weight] if lora_path is not None else [], # } # # # Count most suitable height and width # if video_size is None: # aspect_ratio_sample_size = {key : [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()} # original_width, original_height = start_img[0].size if type(start_img) is list else Image.open(start_img).size # closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size) # height, width = [int(x / 16) * 16 for x in closest_size] # else: # height, width = video_size # # # Set hidden states offload steps # pipeline.transformer.hidden_cache_size = gpu_offload_steps # # # Load Sampler # if scheduler_name == "DPM++": # noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder='scheduler') # elif scheduler_name == "Euler": # noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') # elif scheduler_name == "Euler A": # noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') # elif scheduler_name == "PNDM": # noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder='scheduler') # elif scheduler_name == "DDIM": # noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder='scheduler') # pipeline.scheduler = noise_scheduler # # # Set random seed # generator= torch.Generator(device).manual_seed(seed) # # # Load control embeddings # embeddings = get_control_embeddings(pipeline, aspect_ratio, motion, camera_direction) # # with torch.no_grad(): # video_length = int(video_length // pipeline.vae.mini_batch_encoder * pipeline.vae.mini_batch_encoder) if video_length != 1 else 1 # input_video, input_video_mask, clip_image = get_image_to_video_latent(start_img, end_img, video_length=video_length, sample_size=(height, width)) # # for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): # pipeline = merge_lora(pipeline, _lora_path, _lora_weight) # # sample = pipeline( # prompt_embeds = embeddings["positive_embeds"], # prompt_attention_mask = embeddings["positive_attention_mask"], # prompt_embeds_2 = embeddings["positive_embeds_2"], # prompt_attention_mask_2 = embeddings["positive_attention_mask_2"], # # negative_prompt_embeds = embeddings["negative_embeds"], # negative_prompt_attention_mask = embeddings["negative_attention_mask"], # negative_prompt_embeds_2 = embeddings["negative_embeds_2"], # negative_prompt_attention_mask_2 = embeddings["negative_attention_mask_2"], # # video_length = video_length, # height = height, # width = width, # generator = generator, # guidance_scale = cfg, # num_inference_steps = steps, # # video = input_video, # mask_video = input_video_mask, # clip_image = clip_image, # ).videos # # for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): # pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) # # # Save the video # output_folder = os.path.dirname(output_video_path) # if output_folder != '': # os.makedirs(output_folder, exist_ok=True) # save_videos_grid(sample, output_video_path, fps=24)