from typing import Optional from typing import Tuple from typing import Union import torch import torch.nn as nn from transformers.cache_utils import Cache from transformers.utils import logging from liger_kernel.transformers.model.loss_utils import LigerForCausalLMLoss from liger_kernel.transformers.model.loss_utils import unpack_cross_entropy_result from liger_kernel.transformers.model.output_classes import LigerCausalLMOutputWithPast from liger_kernel.transformers.model.output_classes import LigerGemma3CausalLMOutputWithPast logger = logging.get_logger(__name__) def causal_forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, logits_to_keep: Union[int, torch.Tensor] = 0, skip_logits: Optional[bool] = None, **loss_kwargs, ) -> Union[Tuple, LigerCausalLMOutputWithPast]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. logits_to_keep (`int` or `torch.Tensor`, *optional*): If an `int`, compute logits for the last `logits_to_keep` tokens. If `0`, calculate logits for all `input_ids` (special case). Only last token logits are needed for generation, and calculating them only for that token can save memory, which becomes pretty significant for long sequences or large vocabulary size. If a `torch.Tensor`, must be 1D corresponding to the indices to keep in the sequence length dimension. This is useful when using packed tensor format (single dimension for batch and sequence length). Returns: Example: ```python >>> from transformers import AutoTokenizer, Gemma3ForCausalLM >>> model = Gemma3ForCausalLM.from_pretrained("google/gemma-2-9b") >>> tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b") >>> prompt = "What is your favorite condiment?" >>> inputs = tokenizer(prompt, return_tensors="pt") >>> # Generate >>> generate_ids = model.generate(inputs.input_ids, max_length=30) >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] "What is your favorite condiment?" ```""" if self.training and self.config._attn_implementation != "eager": logger.warning_once( "It is strongly recommended to train Gemma3 models with the `eager` attention implementation " f"instead of `{self.config._attn_implementation}`. Use `eager` with `AutoModelForCausalLM.from_pretrained('', attn_implementation='eager')`." ) output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, cache_position=cache_position, **loss_kwargs, ) hidden_states = outputs[0] # Only compute necessary logits, and do not upcast them to float if we are not computing the loss slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep kept_hidden_states = hidden_states[:, slice_indices, :] shift_labels = loss_kwargs.pop("shift_labels", None) loss = None logits = None token_accuracy = None predicted_tokens = None if skip_logits is None: skip_logits = self.training and (labels is not None or shift_labels is not None) # Compute loss if skip_logits: result = LigerForCausalLMLoss( hidden_states=kept_hidden_states, lm_head_weight=self.lm_head.weight, labels=labels, shift_labels=shift_labels, hidden_size=self.config.hidden_size, final_logit_softcapping=self.config.final_logit_softcapping, **loss_kwargs, ) loss, _, token_accuracy, predicted_tokens = unpack_cross_entropy_result(result) else: logits = self.lm_head(kept_hidden_states) if self.config.final_logit_softcapping is not None: logits = logits / self.config.final_logit_softcapping logits = torch.tanh(logits) logits = logits * self.config.final_logit_softcapping if labels is not None or shift_labels is not None: loss = self.loss_function( logits=logits, labels=labels, shift_labels=shift_labels, vocab_size=self.vocab_size, **loss_kwargs, ) if not return_dict: output_tuple = (logits,) + outputs[1:] output_tuple = (loss,) + output_tuple if loss is not None else output_tuple output_tuple = output_tuple + (token_accuracy,) if token_accuracy is not None else output_tuple output_tuple = output_tuple + (predicted_tokens,) if predicted_tokens is not None else output_tuple return output_tuple # Return custom output class with token_accuracy field return LigerCausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, token_accuracy=token_accuracy, predicted_tokens=predicted_tokens, ) def multimodal_forward( self, input_ids: torch.LongTensor = None, pixel_values: torch.FloatTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Union[list[torch.FloatTensor], Cache]] = None, token_type_ids: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, logits_to_keep: Union[int, torch.Tensor] = 0, skip_logits: Optional[bool] = None, **lm_kwargs, ) -> Union[tuple, LigerGemma3CausalLMOutputWithPast]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.text_config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.text_config.vocab_size]`. Example: ```python >>> from PIL import Image >>> import requests >>> from transformers import AutoProcessor, Gemma3ForConditionalGeneration >>> model = Gemma3ForConditionalGeneration.from_pretrained("google/gemma-3-4b-it") >>> processor = AutoProcessor.from_pretrained("google/gemma-3-4b-it") >>> messages = [ ... { ... "role": "system", ... "content": [ ... {"type": "text", "text": "You are a helpful assistant."} ... ] ... }, ... { ... "role": "user", "content": [ ... {"type": "image", "url": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/pipeline-cat-chonk.jpeg"}, ... {"type": "text", "text": "Where is the cat standing?"}, ... ] ... }, ... ] >>> inputs = processor.apply_chat_template( ... messages, ... tokenize=True, ... return_dict=True, ... return_tensors="pt", ... add_generation_prompt=True ... ) >>> # Generate >>> generate_ids = model.generate(**inputs) >>> processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] "user\nYou are a helpful assistant.\n\n\n\n\n\nWhere is the cat standing?\nmodel\nBased on the image, the cat is standing in a snowy area, likely outdoors. It appears to" ``` """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.model( input_ids=input_ids, pixel_values=pixel_values, token_type_ids=token_type_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, labels=labels, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, cache_position=cache_position, **lm_kwargs, ) shift_labels = lm_kwargs.pop("shift_labels", None) hidden_states = outputs[0] slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep kept_hidden_states = hidden_states[:, slice_indices, :] loss = None logits = None token_accuracy = None predicted_tokens = None if skip_logits and labels is None: raise ValueError("skip_logits is True, but labels is None") if skip_logits is None: skip_logits = self.training and (labels is not None) if skip_logits: shift_hidden_states = kept_hidden_states[..., :-1, :] shift_labels = labels[..., 1:] hidden_device = shift_hidden_states.device if attention_mask is not None: # we use the input attention mask to shift the hidden_states and labels, because it is 2D. # we also crop attn mask in case it is longer, which happens in PrefixTuning with peft shift_attention_mask = attention_mask[:, -shift_hidden_states.shape[1] :].to(hidden_device) shift_hidden_states = shift_hidden_states[shift_attention_mask.to(hidden_device) != 0].contiguous() shift_labels = shift_labels[shift_attention_mask.to(shift_labels.device) != 0].contiguous() else: shift_hidden_states = shift_hidden_states.contiguous() shift_labels = shift_labels.contiguous() # Flatten hidden state shift_hidden_states = shift_hidden_states.view(-1, self.config.text_config.hidden_size) shift_labels = shift_labels.view(-1).to(hidden_device) result = LigerForCausalLMLoss( hidden_states=shift_hidden_states, lm_head_weight=self.lm_head.weight, labels=shift_labels, hidden_size=self.config.text_config.hidden_size, shift_labels=shift_labels, final_logit_softcapping=getattr(self.config.text_config, "final_logit_softcapping", None), **lm_kwargs, ) loss, _, token_accuracy, predicted_tokens = unpack_cross_entropy_result(result) else: logits = self.lm_head(kept_hidden_states) if labels is not None: # Upcast to float if we need to compute the loss to avoid potential precision issues logits = logits.float() shift_logits = logits[..., :-1, :] shift_labels = labels[..., 1:] if attention_mask is not None: # we use the input attention mask to shift the logits and labels, because it is 2D. # we also crop attn mask in case it is longer, which happens in PrefixTuning with peft shift_attention_mask = attention_mask[:, -shift_logits.shape[1] :].to(logits.device) shift_logits = shift_logits[shift_attention_mask.to(logits.device) != 0].contiguous() shift_labels = shift_labels[shift_attention_mask.to(shift_labels.device) != 0].contiguous() else: shift_logits = shift_logits.contiguous() shift_labels = shift_labels.contiguous() # Flatten the tokens loss_fct = nn.CrossEntropyLoss() flat_logits = shift_logits.view(-1, self.config.text_config.vocab_size) flat_labels = shift_labels.view(-1).to(shift_logits.device) loss = loss_fct(flat_logits, flat_labels) elif shift_labels is not None: # Upcast to float if we need to compute the loss to avoid potential precision issues logits = logits.float() shift_logits = logits[..., :-1, :] if attention_mask is not None: # we use the input attention mask to shift the logits and labels, because it is 2D. # we also crop attn mask in case it is longer, which happens in PrefixTuning with peft shift_attention_mask = attention_mask[:, -shift_logits.shape[1] :].to(logits.device) shift_logits = shift_logits[shift_attention_mask.to(logits.device) != 0].contiguous() shift_labels = shift_labels[shift_attention_mask.to(shift_labels.device) != 0].contiguous() else: shift_logits = shift_logits.contiguous() shift_labels = shift_labels.contiguous() # Flatten the tokens loss_fct = nn.CrossEntropyLoss() flat_logits = shift_logits.view(-1, self.config.text_config.vocab_size) flat_labels = shift_labels.view(-1).to(shift_logits.device) loss = loss_fct(flat_logits, flat_labels) if not return_dict: output = (logits,) + outputs[1:] output = (loss,) + output if loss is not None else output output = output + (token_accuracy,) if token_accuracy is not None else output output = output + (predicted_tokens,) if predicted_tokens is not None else output return output return LigerGemma3CausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, image_hidden_states=outputs.image_hidden_states, token_accuracy=token_accuracy, predicted_tokens=predicted_tokens, )