import torch import os from PIL import Image from predict_i2v import try_setup_pipeline, get_control_embeddings from ruyi.data.bucket_sampler import ASPECT_RATIO_512, get_closest_ratio from ruyi.utils.lora_utils import merge_lora, unmerge_lora from ruyi.utils.utils import get_image_to_video_latent, save_videos_grid from diffusers import (EulerDiscreteScheduler, EulerAncestralDiscreteScheduler, DPMSolverMultistepScheduler, PNDMScheduler, DDIMScheduler) import gradio as gr from omegaconf import OmegaConf # Model settings config_path = "config/default.yaml" model_name = "Ruyi-Mini-7B" model_path = f"models/{model_name}" # (Down)load mode in this path # LoRA settings lora_path = None lora_weight = 1.0 # GPU memory settings low_gpu_memory_mode = True # Low gpu memory mode gpu_offload_steps = 5 # Choose in [0, 10, 7, 5, 1], the latter number requires less GPU memory but longer time # Random seed seed = 42 # The Answer to the Ultimate Question of Life, The Universe, and Everything output_video_path = "outputs/example_01.mp4" # Other settings weight_dtype = torch.bfloat16 device = torch.device("cuda") # Load config config = OmegaConf.load(config_path) # Check for update repo_id = f"IamCreateAI/{model_name}" # Init model pipeline = try_setup_pipeline(model_path, weight_dtype, config) if pipeline is None: message = (f"[Load Model Failed] " f"Please download Ruyi model from huggingface repo '{repo_id}', " f"And put it into '{model_path}'.") raise FileNotFoundError(message) # Setup GPU memory mode if low_gpu_memory_mode: pipeline.enable_sequential_cpu_offload() else: pipeline.enable_model_cpu_offload() # Set hidden states offload steps pipeline.transformer.hidden_cache_size = gpu_offload_steps def run_inference(start_image_path=None, end_image_path=None, video_length=-1, base_resolution=-1, cfg=-1, steps=-1, scheduler_name=None, motion=None, camera_direction=None, aspect_ratio=None, progress=gr.Progress(track_tqdm=True)): global pipeline # Load images start_img = [Image.open(start_image_path).convert("RGB")] end_img = [Image.open(end_image_path).convert("RGB")] if end_image_path is not None else None # Prepare LoRA config loras = { 'models': [lora_path] if lora_path is not None else [], 'weights': [lora_weight] if lora_path is not None else [], } video_size = None # Count most suitable height and width if video_size is None: aspect_ratio_sample_size = {key: [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()} original_width, original_height = start_img[0].size if type(start_img) is list else Image.open(start_img).size closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size) height, width = [int(x / 16) * 16 for x in closest_size] else: height, width = video_size # Load Sampler if scheduler_name == "DPM++": noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "Euler": noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "Euler A": noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "PNDM": noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder='scheduler') elif scheduler_name == "DDIM": noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder='scheduler') pipeline.scheduler = noise_scheduler # Set random seed generator = torch.Generator(device).manual_seed(seed) # Load control embeddings embeddings = get_control_embeddings(pipeline, aspect_ratio, motion, camera_direction) with torch.no_grad(): video_length = int( video_length // pipeline.vae.mini_batch_encoder * pipeline.vae.mini_batch_encoder) if video_length != 1 else 1 input_video, input_video_mask, clip_image = get_image_to_video_latent(start_img, end_img, video_length=video_length, sample_size=(height, width)) for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): pipeline = merge_lora(pipeline, _lora_path, _lora_weight) sample = pipeline( prompt_embeds=embeddings["positive_embeds"], prompt_attention_mask=embeddings["positive_attention_mask"], prompt_embeds_2=embeddings["positive_embeds_2"], prompt_attention_mask_2=embeddings["positive_attention_mask_2"], negative_prompt_embeds=embeddings["negative_embeds"], negative_prompt_attention_mask=embeddings["negative_attention_mask"], negative_prompt_embeds_2=embeddings["negative_embeds_2"], negative_prompt_attention_mask_2=embeddings["negative_attention_mask_2"], video_length=video_length, height=height, width=width, generator=generator, guidance_scale=cfg, num_inference_steps=steps, video=input_video, mask_video=input_video_mask, clip_image=clip_image, ).videos for _lora_path, _lora_weight in zip(loras.get("models", []), loras.get("weights", [])): pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) # Save the video output_folder = os.path.dirname(output_video_path) if output_folder != '': os.makedirs(output_folder, exist_ok=True) save_videos_grid(sample, output_video_path, fps=24) return output_video_path with gr.Blocks() as demo: gr.HTML( """

📸 Ruyi-Mini-7B

""" ) with gr.Row(): with gr.Column(scale=1): with gr.Accordion("Video Settings", open=True): video_length = gr.Slider( minimum=24, maximum=120, step=1, value=120, label="Video Length(frames)", ) base_resolution = gr.Slider( minimum=384, maximum=640, step=1, value=640, label="Base Resolution", ) aspect_ratio = gr.Radio( choices=["16:9", "9:16"], label="Aspect Ratio", value="16:9", interactive=True, ) with gr.Accordion("Control settings", open=True): motion = gr.Radio( choices=["1", "2", "3", "4", "auto"], label="motion", value="auto", interactive=True, ) camera_direction = gr.Radio( choices=["static", "left", "right", "up", "down", "auto"], label="Camera Direction", value="static", interactive=True, ) with gr.Accordion("Advanced Sampling Settings", open=False): steps = gr.Slider( value=25, label="Steps", minimum=1, maximum=50, step=1, ) cfg_scale = gr.Slider( value=7.0, label="Classifier-Free Guidance Scale", minimum=1, maximum=10, step=0.1, ) scheduler_name = gr.Radio( choices=["Euler", "Euler A", "DPM++", "PNDM", "DDIM"], label="Scheduler", value="DDIM", interactive=True, ) with gr.Column(scale=1): with gr.Accordion("Input Image(s)", open=True): num_images_slider = gr.Slider( minimum=1, maximum=2, step=1, value=1, label="Number of Input Image(s)", ) condition_image_1 = gr.Image(label="Input Image 1", type="filepath") condition_image_2 = gr.Image(label="Input Image 2", type="filepath", visible=False) condition_image_3 = gr.Image(label="Input Image 3", type="filepath", visible=False) condition_image_4 = gr.Image(label="Input Image 4", type="filepath", visible=False) with gr.Column(scale=1): with gr.Accordion("Output Video", open=True): output_video = gr.Video(label="Output Video") run_btn = gr.Button("Generate") # Update visibility of condition images based on the slider def update_visible_images(num_images): return [ gr.update(visible=num_images >= 2), gr.update(visible=num_images >= 3), gr.update(visible=num_images >= 4), ] # Trigger visibility update when the slider value changes num_images_slider.change( fn=update_visible_images, inputs=num_images_slider, outputs=[condition_image_2, condition_image_3, condition_image_4], ) run_btn.click( fn=run_inference, inputs=[ condition_image_1, condition_image_2, video_length, base_resolution, cfg_scale, steps, scheduler_name, motion, camera_direction, aspect_ratio ], outputs=output_video, ) demo.launch(share=True, server_name="0.0.0.0")