import torch import os from PIL import Image from predict_i2v import try_setup_pipeline, get_control_embeddings from ruyi.data.bucket_sampler import ASPECT_RATIO_512, get_closest_ratio from ruyi.utils.lora_utils import merge_lora, unmerge_lora from ruyi.utils.utils import get_image_to_video_latent, save_videos_grid from diffusers import (EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, DPMSolverMultistepScheduler, PNDMScheduler, DDIMScheduler) import gradio as gr from omegaconf import OmegaConf # Model settings config_path = "config/default.yaml" model_name = "Ruyi-Mini-7B" model_path = f"models/{model_name}" # (Down)load mode in this path # LoRA settings lora_path = None lora_weight = 1.0 # GPU memory settings low_gpu_memory_mode = True # Low gpu memory mode gpu_offload_steps = 5 # Choose in [0, 10, 7, 5, 1], the latter number requires less GPU memory but longer time # Random seed seed = 42 # The Answer to the Ultimate Question of Life, The Universe, and Everything output_video_path = "outputs/example_01.mp4" # Other settings weight_dtype = torch.bfloat16 device = torch.device("cuda") # Load config config = OmegaConf.load(config_path) # Check for update repo_id = f"IamCreateAI/{model_name}" # Init model pipeline = try_setup_pipeline(model_path, weight_dtype, config) if pipeline is None: message = (f"[Load Model Failed] " f"Please download Ruyi model from huggingface repo '{repo_id}', " f"And put it into '{model_path}'.") raise FileNotFoundError(message) # Setup GPU memory mode if low_gpu_memory_mode: pipeline.enable_sequential_cpu_offload() else: pipeline.enable_model_cpu_offload() # Set hidden states offload steps pipeline.transformer.hidden_cache_size = gpu_offload_steps def run_inference(start_image_path=None, end_image_path=None, video_length=-1, base_resolution=-1, cfg=-1, steps=-1, scheduler_name=None, motion=None, camera_direction=None, aspect_ratio=None, progress=gr.Progress(track_tqdm=True)): global pipeline # Load images start_img = [Image.open(start_image_path).convert("RGB")] end_img = [Image.open(end_image_path).convert("RGB")] if end_image_path is not None else None # Prepare LoRA config loras = { 'models': [lora_path] if lora_path is not None else [], 'weights': [lora_weight] if lora_path is not None else [], } video_size = None # Count most suitable height and width if video_size is None: aspect_ratio_sample_size = {key: [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()} original_width, original_height = start_img[0].size if type(start_img) is list else Image.open(start_img).size closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size) height, width = [int(x / 16) * 16 for x in closest_size] else: height, width = video_size # Load Sampler if scheduler_name == "DPM++": noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "Euler": noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "Euler A": noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "PNDM": noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "DDIM": noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder='scheduler') pipeline.scheduler = noise_scheduler # Set random seed generator = torch.Generator(device).manual_seed(seed) # Load control embeddings embeddings = get_control_embeddings(pipeline, aspect_ratio, motion, camera_direction) with torch.no_grad(): video_length = int( video_length // pipeline.vae.mini_batch_encoder * pipeline.vae.mini_batch_encoder) if video_length != 1 else 1 input_video, input_video_mask, clip_image = get_image_to_video_latent(start_img, end_img, video_length=video_length, sample_size=(height, width)) for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): pipeline = merge_lora(pipeline, _lora_path, _lora_weight) sample = pipeline( prompt_embeds=embeddings["positive_embeds"], prompt_attention_mask=embeddings["positive_attention_mask"], prompt_embeds_2=embeddings["positive_embeds_2"], prompt_attention_mask_2=embeddings["positive_attention_mask_2"], negative_prompt_embeds=embeddings["negative_embeds"], negative_prompt_attention_mask=embeddings["negative_attention_mask"], negative_prompt_embeds_2=embeddings["negative_embeds_2"], negative_prompt_attention_mask_2=embeddings["negative_attention_mask_2"], video_length=video_length, height=height, width=width, generator=generator, guidance_scale=cfg, num_inference_steps=steps, video=input_video, mask_video=input_video_mask, clip_image=clip_image, ).videos for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) # Save the video output_folder = os.path.dirname(output_video_path) if output_folder != '': os.makedirs(output_folder, exist_ok=True) save_videos_grid(sample, output_video_path, fps=24) return output_video_path with gr.Blocks() as demo: gr.HTML( """