import json import os.path as osp import time from diffusers import DiffusionPipeline import migraphx_diffusers from migraphx_diffusers import AutoTimer import torch def parse_args(): date_str = time.strftime("%Y%m%d-%H%M%S", time.localtime()) from argparse import ArgumentParser parser = ArgumentParser(description="SDXL inference with migraphx backend") #=========================== mdoel load and compile ======================== parser.add_argument( "-m", "--model-dir", type=str, required=True, help="Path to local model directory.", ) parser.add_argument( "--force-compile", action="store_true", default=False, help="Ignore existing .mxr files and override them", ) parser.add_argument( "--img-size", type=int, default=None, help="output image size", ) parser.add_argument( "--num-images-per-prompt", type=int, default=1, help="The number of images to generate per prompt." ) # -------------------------------------------------------------------------- # =============================== generation =============================== parser.add_argument( "-t", "--num-inference-steps", type=int, default=50, help="Number of iteration steps", ) parser.add_argument( "--out-csv-file", type=str, default=f"./perf-{date_str}.csv", help="Prefix of path for saving results", ) # -------------------------------------------------------------------------- # =============================== time count =============================== parser.add_argument( "--num-warmup-loops", type=int, default=1, help="warmup loops", ) parser.add_argument( "--num-count-loops", type=int, default=100, help="time count loops", ) # -------------------------------------------------------------------------- args = parser.parse_args() return args def get_name_and_migraphx_config(model_dir): model_index_json = osp.join(model_dir, "model_index.json") with open(model_index_json, "r") as f: pipe_cfg = json.load(f) if pipe_cfg["_class_name"] == "StableDiffusionXLPipeline": return 'sdxl', migraphx_diffusers.DEFAULT_ARGS['sdxl'] elif pipe_cfg["_class_name"] == "StableDiffusionPipeline": return 'sd2.1', migraphx_diffusers.DEFAULT_ARGS['sd2.1'] else: raise NotImplementedError( f"{pipe_cfg['_class_name']} has not been adapted yet") def main(): args = parse_args() pipe_name, migraphx_config = get_name_and_migraphx_config(args.model_dir) assert pipe_name in ['sdxl', 'sd2.1'], "Only support SDXL or SD2.1!" if args.img_size is not None: migraphx_config['common_args']['img_size'] = args.img_size migraphx_config['common_args'].update(dict( batch=args.num_images_per_prompt, force_compile=args.force_compile, )) pipe = DiffusionPipeline.from_pretrained( args.model_dir, torch_dtype=torch.float16, migraphx_config=migraphx_config ) pipe.to("cuda") t = AutoTimer() t.add_targets([ (pipe, "end2end"), (pipe.text_encoder, "text_encoder"), (pipe.unet, "unet"), (pipe.vae.decode, "vae_decoder") ]) if hasattr(pipe, "text_encoder_2"): t.add_target(pipe.text_encoder_2, key="text_encoder_2") for i in range(args.num_warmup_loops + args.num_count_loops): if i == args.num_warmup_loops: t.start_work() pipe(prompt="the ocean in dream", negative_prompt=None, num_inference_steps=args.num_inference_steps) table = t.summary(batchsize=migraphx_config['common_args']['batch']) t.clear() with open(args.out_csv_file, 'w') as f: f.write(table.get_csv_string()) if __name__ == "__main__": main()