import argparse import torch import torch.distributed as dist from torch.distributed.device_mesh import init_device_mesh import json from lightx2v.utils.envs import * from lightx2v.utils.utils import seed_all from lightx2v.utils.profiler import ProfilingContext from lightx2v.utils.set_config import set_config from lightx2v.utils.registry_factory import RUNNER_REGISTER from lightx2v.models.runners.hunyuan.hunyuan_runner import HunyuanRunner from lightx2v.models.runners.wan.wan_runner import WanRunner, Wan22MoeRunner from lightx2v.models.runners.wan.wan_distill_runner import WanDistillRunner from lightx2v.models.runners.wan.wan_causvid_runner import WanCausVidRunner from lightx2v.models.runners.wan.wan_audio_runner import WanAudioRunner, Wan22MoeAudioRunner from lightx2v.models.runners.wan.wan_skyreels_v2_df_runner import WanSkyreelsV2DFRunner from lightx2v.models.runners.graph_runner import GraphRunner from lightx2v.models.runners.cogvideox.cogvidex_runner import CogvideoxRunner from lightx2v.common.ops import * from loguru import logger def init_runner(config): seed_all(config.seed) if config.parallel: if not dist.is_initialized(): dist.init_process_group(backend="nccl") cfg_p_size = config.parallel.get("cfg_p_size", 1) seq_p_size = config.parallel.get("seq_p_size", 1) assert cfg_p_size * seq_p_size == dist.get_world_size(), f"cfg_p_size * seq_p_size must be equal to world_size" config["device_mesh"] = init_device_mesh("cuda", (cfg_p_size, seq_p_size), mesh_dim_names=("cfg_p", "seq_p")) if CHECK_ENABLE_GRAPH_MODE(): default_runner = RUNNER_REGISTER[config.model_cls](config) runner = GraphRunner(default_runner) runner.runner.init_modules() else: runner = RUNNER_REGISTER[config.model_cls](config) runner.init_modules() return runner def main(): parser = argparse.ArgumentParser() parser.add_argument( "--model_cls", type=str, required=True, choices=["wan2.1", "hunyuan", "wan2.1_distill", "wan2.1_causvid", "wan2.1_skyreels_v2_df", "cogvideox", "wan2.1_audio", "wan2.2_moe", "wan2.2_moe_audio", "wan2.2"], default="wan2.1", ) parser.add_argument("--task", type=str, choices=["t2v", "i2v"], default="t2v") parser.add_argument("--model_path", type=str, required=True) parser.add_argument("--config_json", type=str, required=True) parser.add_argument("--use_prompt_enhancer", action="store_true") parser.add_argument("--prompt", type=str, default="", help="The input prompt for text-to-video generation") parser.add_argument("--negative_prompt", type=str, default="") parser.add_argument("--image_path", type=str, default="", help="The path to input image file for image-to-video (i2v) task") parser.add_argument("--audio_path", type=str, default="", help="The path to input audio file for audio-to-video (a2v) task") parser.add_argument("--save_video_path", type=str, default="./output_lightx2v.mp4", help="The path to save video path/file") args = parser.parse_args() logger.info(f"args: {args}") with ProfilingContext("Total Cost"): config = set_config(args) logger.info(f"config:\n{json.dumps(config, ensure_ascii=False, indent=4)}") runner = init_runner(config) runner.run_pipeline() # Clean up distributed process group if dist.is_initialized(): dist.destroy_process_group() logger.info("Distributed process group cleaned up") if __name__ == "__main__": main()