# Copyright 2025 Qwen-Image Team and The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import List, Tuple import torch from ...configuration_utils import FrozenDict from ...guiders import ClassifierFreeGuidance from ...models import QwenImageControlNetModel, QwenImageTransformer2DModel from ...schedulers import FlowMatchEulerDiscreteScheduler from ...utils import logging from ..modular_pipeline import BlockState, LoopSequentialPipelineBlocks, ModularPipelineBlocks, PipelineState from ..modular_pipeline_utils import ComponentSpec, InputParam, OutputParam from .modular_pipeline import QwenImageModularPipeline logger = logging.get_logger(__name__) class QwenImageLoopBeforeDenoiser(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that prepares the latent input for the denoiser. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def inputs(self) -> List[InputParam]: return [ InputParam( "latents", required=True, type_hint=torch.Tensor, description="The initial latents to use for the denoising process. Can be generated in prepare_latent step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): # one timestep block_state.timestep = t.expand(block_state.latents.shape[0]).to(block_state.latents.dtype) block_state.latent_model_input = block_state.latents return components, block_state class QwenImageEditLoopBeforeDenoiser(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that prepares the latent input for the denoiser. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def inputs(self) -> List[InputParam]: return [ InputParam( "latents", required=True, type_hint=torch.Tensor, description="The initial latents to use for the denoising process. Can be generated in prepare_latent step.", ), InputParam( "image_latents", required=True, type_hint=torch.Tensor, description="The initial image latents to use for the denoising process. Can be encoded in vae_encoder step and packed in prepare_image_latents step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): # one timestep block_state.latent_model_input = torch.cat([block_state.latents, block_state.image_latents], dim=1) block_state.timestep = t.expand(block_state.latents.shape[0]).to(block_state.latents.dtype) return components, block_state class QwenImageLoopBeforeDenoiserControlNet(ModularPipelineBlocks): model_name = "qwenimage" @property def expected_components(self) -> List[ComponentSpec]: return [ ComponentSpec( "guider", ClassifierFreeGuidance, config=FrozenDict({"guidance_scale": 4.0}), default_creation_method="from_config", ), ComponentSpec("controlnet", QwenImageControlNetModel), ] @property def description(self) -> str: return ( "step within the denoising loop that runs the controlnet before the denoiser. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def inputs(self) -> List[InputParam]: return [ InputParam( "control_image_latents", required=True, type_hint=torch.Tensor, description="The control image to use for the denoising process. Can be generated in prepare_controlnet_inputs step.", ), InputParam( "controlnet_conditioning_scale", type_hint=float, description="The controlnet conditioning scale value to use for the denoising process. Can be generated in prepare_controlnet_inputs step.", ), InputParam( "controlnet_keep", required=True, type_hint=List[float], description="The controlnet keep values to use for the denoising process. Can be generated in prepare_controlnet_inputs step.", ), InputParam( "num_inference_steps", required=True, type_hint=int, description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step.", ), InputParam( kwargs_type="denoiser_input_fields", description=( "All conditional model inputs for the denoiser. " "It should contain prompt_embeds/negative_prompt_embeds, txt_seq_lens/negative_txt_seq_lens." ), ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: int): # cond_scale for the timestep (controlnet input) if isinstance(block_state.controlnet_keep[i], list): block_state.cond_scale = [ c * s for c, s in zip(block_state.controlnet_conditioning_scale, block_state.controlnet_keep[i]) ] else: controlnet_cond_scale = block_state.controlnet_conditioning_scale if isinstance(controlnet_cond_scale, list): controlnet_cond_scale = controlnet_cond_scale[0] block_state.cond_scale = controlnet_cond_scale * block_state.controlnet_keep[i] # run controlnet for the guidance batch controlnet_block_samples = components.controlnet( hidden_states=block_state.latent_model_input, controlnet_cond=block_state.control_image_latents, conditioning_scale=block_state.cond_scale, timestep=block_state.timestep / 1000, img_shapes=block_state.img_shapes, encoder_hidden_states=block_state.prompt_embeds, encoder_hidden_states_mask=block_state.prompt_embeds_mask, txt_seq_lens=block_state.txt_seq_lens, return_dict=False, ) block_state.additional_cond_kwargs["controlnet_block_samples"] = controlnet_block_samples return components, block_state class QwenImageLoopDenoiser(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that denoise the latent input for the denoiser. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def expected_components(self) -> List[ComponentSpec]: return [ ComponentSpec( "guider", ClassifierFreeGuidance, config=FrozenDict({"guidance_scale": 4.0}), default_creation_method="from_config", ), ComponentSpec("transformer", QwenImageTransformer2DModel), ] @property def inputs(self) -> List[InputParam]: return [ InputParam("attention_kwargs"), InputParam( "latents", required=True, type_hint=torch.Tensor, description="The latents to use for the denoising process. Can be generated in prepare_latents step.", ), InputParam( "num_inference_steps", required=True, type_hint=int, description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step.", ), InputParam( kwargs_type="denoiser_input_fields", description="conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.", ), InputParam( "img_shapes", required=True, type_hint=List[Tuple[int, int]], description="The shape of the image latents for RoPE calculation. Can be generated in prepare_additional_inputs step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): guider_input_fields = { "encoder_hidden_states": ("prompt_embeds", "negative_prompt_embeds"), "encoder_hidden_states_mask": ("prompt_embeds_mask", "negative_prompt_embeds_mask"), "txt_seq_lens": ("txt_seq_lens", "negative_txt_seq_lens"), } components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t) guider_state = components.guider.prepare_inputs(block_state, guider_input_fields) for guider_state_batch in guider_state: components.guider.prepare_models(components.transformer) cond_kwargs = guider_state_batch.as_dict() cond_kwargs = {k: v for k, v in cond_kwargs.items() if k in guider_input_fields} # YiYi TODO: add cache context guider_state_batch.noise_pred = components.transformer( hidden_states=block_state.latent_model_input, timestep=block_state.timestep / 1000, img_shapes=block_state.img_shapes, attention_kwargs=block_state.attention_kwargs, return_dict=False, **cond_kwargs, **block_state.additional_cond_kwargs, )[0] components.guider.cleanup_models(components.transformer) guider_output = components.guider(guider_state) # apply guidance rescale pred_cond_norm = torch.norm(guider_output.pred_cond, dim=-1, keepdim=True) pred_norm = torch.norm(guider_output.pred, dim=-1, keepdim=True) block_state.noise_pred = guider_output.pred * (pred_cond_norm / pred_norm) return components, block_state class QwenImageEditLoopDenoiser(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that denoise the latent input for the denoiser. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def expected_components(self) -> List[ComponentSpec]: return [ ComponentSpec( "guider", ClassifierFreeGuidance, config=FrozenDict({"guidance_scale": 4.0}), default_creation_method="from_config", ), ComponentSpec("transformer", QwenImageTransformer2DModel), ] @property def inputs(self) -> List[InputParam]: return [ InputParam("attention_kwargs"), InputParam( "latents", required=True, type_hint=torch.Tensor, description="The latents to use for the denoising process. Can be generated in prepare_latents step.", ), InputParam( "num_inference_steps", required=True, type_hint=int, description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step.", ), InputParam( kwargs_type="denoiser_input_fields", description="conditional model inputs for the denoiser: e.g. prompt_embeds, negative_prompt_embeds, etc.", ), InputParam( "img_shapes", required=True, type_hint=List[Tuple[int, int]], description="The shape of the image latents for RoPE calculation. Can be generated in prepare_additional_inputs step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): guider_input_fields = { "encoder_hidden_states": ("prompt_embeds", "negative_prompt_embeds"), "encoder_hidden_states_mask": ("prompt_embeds_mask", "negative_prompt_embeds_mask"), "txt_seq_lens": ("txt_seq_lens", "negative_txt_seq_lens"), } components.guider.set_state(step=i, num_inference_steps=block_state.num_inference_steps, timestep=t) guider_state = components.guider.prepare_inputs(block_state, guider_input_fields) for guider_state_batch in guider_state: components.guider.prepare_models(components.transformer) cond_kwargs = guider_state_batch.as_dict() cond_kwargs = {k: v for k, v in cond_kwargs.items() if k in guider_input_fields} # YiYi TODO: add cache context guider_state_batch.noise_pred = components.transformer( hidden_states=block_state.latent_model_input, timestep=block_state.timestep / 1000, img_shapes=block_state.img_shapes, attention_kwargs=block_state.attention_kwargs, return_dict=False, **cond_kwargs, **block_state.additional_cond_kwargs, )[0] components.guider.cleanup_models(components.transformer) guider_output = components.guider(guider_state) pred = guider_output.pred[:, : block_state.latents.size(1)] pred_cond = guider_output.pred_cond[:, : block_state.latents.size(1)] # apply guidance rescale pred_cond_norm = torch.norm(pred_cond, dim=-1, keepdim=True) pred_norm = torch.norm(pred, dim=-1, keepdim=True) block_state.noise_pred = pred * (pred_cond_norm / pred_norm) return components, block_state class QwenImageLoopAfterDenoiser(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that updates the latents. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def expected_components(self) -> List[ComponentSpec]: return [ ComponentSpec("scheduler", FlowMatchEulerDiscreteScheduler), ] @property def intermediate_outputs(self) -> List[OutputParam]: return [ OutputParam("latents", type_hint=torch.Tensor, description="The denoised latents."), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): latents_dtype = block_state.latents.dtype block_state.latents = components.scheduler.step( block_state.noise_pred, t, block_state.latents, return_dict=False, )[0] if block_state.latents.dtype != latents_dtype: if torch.backends.mps.is_available(): # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272 block_state.latents = block_state.latents.to(latents_dtype) return components, block_state class QwenImageLoopAfterDenoiserInpaint(ModularPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "step within the denoising loop that updates the latents using mask and image_latents for inpainting. " "This block should be used to compose the `sub_blocks` attribute of a `LoopSequentialPipelineBlocks` " "object (e.g. `QwenImageDenoiseLoopWrapper`)" ) @property def inputs(self) -> List[InputParam]: return [ InputParam( "mask", required=True, type_hint=torch.Tensor, description="The mask to use for the inpainting process. Can be generated in inpaint prepare latents step.", ), InputParam( "image_latents", required=True, type_hint=torch.Tensor, description="The image latents to use for the inpainting process. Can be generated in inpaint prepare latents step.", ), InputParam( "initial_noise", required=True, type_hint=torch.Tensor, description="The initial noise to use for the inpainting process. Can be generated in inpaint prepare latents step.", ), InputParam( "timesteps", required=True, type_hint=torch.Tensor, description="The timesteps to use for the denoising process. Can be generated in set_timesteps step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, block_state: BlockState, i: int, t: torch.Tensor): block_state.init_latents_proper = block_state.image_latents if i < len(block_state.timesteps) - 1: block_state.noise_timestep = block_state.timesteps[i + 1] block_state.init_latents_proper = components.scheduler.scale_noise( block_state.init_latents_proper, torch.tensor([block_state.noise_timestep]), block_state.initial_noise ) block_state.latents = ( 1 - block_state.mask ) * block_state.init_latents_proper + block_state.mask * block_state.latents return components, block_state class QwenImageDenoiseLoopWrapper(LoopSequentialPipelineBlocks): model_name = "qwenimage" @property def description(self) -> str: return ( "Pipeline block that iteratively denoise the latents over `timesteps`. " "The specific steps with each iteration can be customized with `sub_blocks` attributes" ) @property def loop_expected_components(self) -> List[ComponentSpec]: return [ ComponentSpec("scheduler", FlowMatchEulerDiscreteScheduler), ] @property def loop_inputs(self) -> List[InputParam]: return [ InputParam( "timesteps", required=True, type_hint=torch.Tensor, description="The timesteps to use for the denoising process. Can be generated in set_timesteps step.", ), InputParam( "num_inference_steps", required=True, type_hint=int, description="The number of inference steps to use for the denoising process. Can be generated in set_timesteps step.", ), ] @torch.no_grad() def __call__(self, components: QwenImageModularPipeline, state: PipelineState) -> PipelineState: block_state = self.get_block_state(state) block_state.num_warmup_steps = max( len(block_state.timesteps) - block_state.num_inference_steps * components.scheduler.order, 0 ) block_state.additional_cond_kwargs = {} with self.progress_bar(total=block_state.num_inference_steps) as progress_bar: for i, t in enumerate(block_state.timesteps): components, block_state = self.loop_step(components, block_state, i=i, t=t) if i == len(block_state.timesteps) - 1 or ( (i + 1) > block_state.num_warmup_steps and (i + 1) % components.scheduler.order == 0 ): progress_bar.update() self.set_block_state(state, block_state) return components, state # composing the denoising loops class QwenImageDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageLoopBeforeDenoiser, QwenImageLoopDenoiser, QwenImageLoopAfterDenoiser, ] block_names = ["before_denoiser", "denoiser", "after_denoiser"] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageLoopBeforeDenoiser`\n" " - `QwenImageLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" "This block supports text2image and image2image tasks for QwenImage." ) # composing the inpainting denoising loops class QwenImageInpaintDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageLoopBeforeDenoiser, QwenImageLoopDenoiser, QwenImageLoopAfterDenoiser, QwenImageLoopAfterDenoiserInpaint, ] block_names = ["before_denoiser", "denoiser", "after_denoiser", "after_denoiser_inpaint"] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageLoopBeforeDenoiser`\n" " - `QwenImageLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" " - `QwenImageLoopAfterDenoiserInpaint`\n" "This block supports inpainting tasks for QwenImage." ) # composing the controlnet denoising loops class QwenImageControlNetDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageLoopBeforeDenoiser, QwenImageLoopBeforeDenoiserControlNet, QwenImageLoopDenoiser, QwenImageLoopAfterDenoiser, ] block_names = ["before_denoiser", "before_denoiser_controlnet", "denoiser", "after_denoiser"] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageLoopBeforeDenoiser`\n" " - `QwenImageLoopBeforeDenoiserControlNet`\n" " - `QwenImageLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" "This block supports text2img/img2img tasks with controlnet for QwenImage." ) # composing the controlnet denoising loops class QwenImageInpaintControlNetDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageLoopBeforeDenoiser, QwenImageLoopBeforeDenoiserControlNet, QwenImageLoopDenoiser, QwenImageLoopAfterDenoiser, QwenImageLoopAfterDenoiserInpaint, ] block_names = [ "before_denoiser", "before_denoiser_controlnet", "denoiser", "after_denoiser", "after_denoiser_inpaint", ] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageLoopBeforeDenoiser`\n" " - `QwenImageLoopBeforeDenoiserControlNet`\n" " - `QwenImageLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" " - `QwenImageLoopAfterDenoiserInpaint`\n" "This block supports inpainting tasks with controlnet for QwenImage." ) # composing the denoising loops class QwenImageEditDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageEditLoopBeforeDenoiser, QwenImageEditLoopDenoiser, QwenImageLoopAfterDenoiser, ] block_names = ["before_denoiser", "denoiser", "after_denoiser"] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageEditLoopBeforeDenoiser`\n" " - `QwenImageEditLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" "This block supports QwenImage Edit." ) class QwenImageEditInpaintDenoiseStep(QwenImageDenoiseLoopWrapper): block_classes = [ QwenImageEditLoopBeforeDenoiser, QwenImageEditLoopDenoiser, QwenImageLoopAfterDenoiser, QwenImageLoopAfterDenoiserInpaint, ] block_names = ["before_denoiser", "denoiser", "after_denoiser", "after_denoiser_inpaint"] @property def description(self) -> str: return ( "Denoise step that iteratively denoise the latents. \n" "Its loop logic is defined in `QwenImageDenoiseLoopWrapper.__call__` method \n" "At each iteration, it runs blocks defined in `sub_blocks` sequencially:\n" " - `QwenImageEditLoopBeforeDenoiser`\n" " - `QwenImageEditLoopDenoiser`\n" " - `QwenImageLoopAfterDenoiser`\n" " - `QwenImageLoopAfterDenoiserInpaint`\n" "This block supports inpainting tasks for QwenImage Edit." )