# Copyright 2023-2024 SGLang Team # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== # Adapted from: # https://github.com/vllm-project/vllm/blob/fb6af8bc086328ca6659e72d11ffd4309ce4de22/vllm/model_executor/models/deepseek_v2.py """Inference-only DeepseekV2 model.""" import concurrent.futures import logging import os from enum import IntEnum, auto from typing import Any, Dict, Iterable, Optional, Tuple, Union import torch import torch.nn.functional as F from torch import nn from tqdm import tqdm from transformers import PretrainedConfig from sglang.srt.distributed import ( get_moe_expert_parallel_world_size, get_pp_group, get_tensor_model_parallel_world_size, parallel_state, tensor_model_parallel_all_reduce, ) from sglang.srt.distributed.device_communicators.pynccl_allocator import ( use_symmetric_memory, ) from sglang.srt.eplb.expert_distribution import get_global_expert_distribution_recorder from sglang.srt.eplb.expert_location import ModelConfigForExpertLocation from sglang.srt.eplb.expert_location_dispatch import ExpertLocationDispatchInfo from sglang.srt.layers.activation import SiluAndMul from sglang.srt.layers.amx_utils import PackWeightMethod from sglang.srt.layers.attention.npu_ops.mla_preprocess import ( NPUFusedMLAPreprocess, is_mla_preprocess_enabled, ) from sglang.srt.layers.communicator import ( LayerCommunicator, LayerScatterModes, enable_moe_dense_fully_dp, ) from sglang.srt.layers.dp_attention import ( get_attention_tp_rank, get_attention_tp_size, is_dp_attention_enabled, ) from sglang.srt.layers.layernorm import RMSNorm from sglang.srt.layers.linear import ( ColumnParallelLinear, MergedColumnParallelLinear, ReplicatedLinear, RowParallelLinear, ) from sglang.srt.layers.logits_processor import LogitsProcessor from sglang.srt.layers.moe import ( get_deepep_mode, get_moe_a2a_backend, should_use_flashinfer_cutlass_moe_fp4_allgather, should_use_flashinfer_trtllm_moe, ) from sglang.srt.layers.moe.ep_moe.layer import DeepEPMoE, get_moe_impl_class from sglang.srt.layers.moe.fused_moe_triton.layer import FusedMoE from sglang.srt.layers.moe.topk import TopK, TopKOutputFormat from sglang.srt.layers.quantization import deep_gemm_wrapper from sglang.srt.layers.quantization.base_config import QuantizationConfig from sglang.srt.layers.quantization.fp8_kernel import ( is_fp8_fnuz, per_tensor_quant_mla_fp8, per_token_group_quant_mla_deep_gemm_masked_fp8, ) from sglang.srt.layers.quantization.fp8_utils import ( block_quant_dequant, block_quant_to_tensor_quant, channel_quant_to_tensor_quant, normalize_e4m3fn_to_e4m3fnuz, requant_weight_ue8m0_inplace, ) from sglang.srt.layers.quantization.int8_utils import ( block_dequant as int8_block_dequant, ) from sglang.srt.layers.radix_attention import RadixAttention from sglang.srt.layers.rotary_embedding import get_rope_wrapper from sglang.srt.layers.utils import PPMissingLayer, get_layer_id from sglang.srt.layers.vocab_parallel_embedding import ( ParallelLMHead, VocabParallelEmbedding, ) from sglang.srt.managers.schedule_batch import global_server_args_dict from sglang.srt.model_executor.forward_batch_info import ForwardBatch, PPProxyTensors from sglang.srt.model_loader.weight_utils import default_weight_loader from sglang.srt.two_batch_overlap import ( MaybeTboDeepEPDispatcher, model_forward_maybe_tbo, ) from sglang.srt.utils import ( BumpAllocator, LazyValue, add_prefix, bind_or_assign, cpu_has_amx_support, get_bool_env_var, get_device_sm, get_int_env_var, is_cpu, is_cuda, is_flashinfer_available, is_gfx95_supported, is_hip, is_non_idle_and_non_empty, is_npu, is_sm100_supported, log_info_on_rank0, make_layers, use_intel_amx_backend, ) _is_hip = is_hip() _is_cuda = is_cuda() _is_npu = is_npu() _is_fp8_fnuz = is_fp8_fnuz() _use_aiter = get_bool_env_var("SGLANG_USE_AITER") and _is_hip _is_cpu_amx_available = cpu_has_amx_support() _is_cpu = is_cpu() _device_sm = get_device_sm() _is_gfx95_supported = is_gfx95_supported() _use_aiter_gfx95 = _use_aiter and _is_gfx95_supported if _use_aiter_gfx95: from sglang.srt.layers.quantization.quark.utils import quark_post_load_weights from sglang.srt.layers.quantization.rocm_mxfp4_utils import ( batched_gemm_afp4wfp4_pre_quant, fused_flatten_mxfp4_quant, fused_rms_mxfp4_quant, ) from sglang.srt.layers.rocm_linear_utils import ( aiter_dsv3_router_gemm, fused_qk_rope_cat, get_dsv3_gemm_output_zero_allocator_size, ) if _is_cuda: from sgl_kernel import ( awq_dequantize, bmm_fp8, concat_mla_k, dsv3_fused_a_gemm, dsv3_router_gemm, merge_state_v2, ) elif _is_cpu and _is_cpu_amx_available: pass elif _is_hip: from sglang.srt.layers.quantization.awq_triton import ( awq_dequantize_triton as awq_dequantize, ) else: pass if _is_hip: from sglang.srt.layers.attention.triton_ops.rocm_mla_decode_rope import ( decode_attention_fwd_grouped_rope, ) _is_flashinfer_available = is_flashinfer_available() _is_sm100_supported = is_cuda() and is_sm100_supported() logger = logging.getLogger(__name__) FORWARD_ABSORB_CORE_ATTENTION_BACKENDS = [ "fa3", "flashinfer", "cutlass_mla", "trtllm_mla", "ascend", ] def add_forward_absorb_core_attention_backend(backend_name): if backend_name not in FORWARD_ABSORB_CORE_ATTENTION_BACKENDS: FORWARD_ABSORB_CORE_ATTENTION_BACKENDS.append(backend_name) logger.info(f"Added {backend_name} to FORWARD_ABSORB_CORE_ATTENTION_BACKENDS.") class AttnForwardMethod(IntEnum): # Use multi-head attention MHA = auto() # Use absorbed multi-latent attention MLA = auto() # Use multi-head attention, but with KV cache chunked. # This method can avoid OOM when prefix lengths are long. MHA_CHUNKED_KV = auto() # Use MLA but with fused RoPE MLA_FUSED_ROPE = auto() # Use MLA with fused RoPE kernel for CPU MLA_FUSED_ROPE_CPU = auto() def _dispatch_mla_subtype(attn, forward_batch): if _is_hip: if attn.rocm_fused_decode_mla and forward_batch.forward_mode.is_decode(): return AttnForwardMethod.MLA_FUSED_ROPE else: return AttnForwardMethod.MLA else: if hasattr(attn, "fused_qkv_a_proj_with_mqa") and use_intel_amx_backend(attn): return AttnForwardMethod.MLA_FUSED_ROPE_CPU else: return AttnForwardMethod.MLA class BackendRegistry: _handlers = {} @classmethod def register(cls, backend_name, handler_func): cls._handlers[backend_name] = handler_func @classmethod def get_handler(cls, backend_name): return cls._handlers.get(backend_name, cls._handlers.get("triton")) def handle_ascend(attn, forward_batch): if ( forward_batch.forward_mode.is_extend() and not forward_batch.forward_mode.is_target_verify() and not forward_batch.forward_mode.is_draft_extend() ): return AttnForwardMethod.MHA else: return AttnForwardMethod.MLA def _get_sum_extend_prefix_lens(forward_batch): return ( sum(forward_batch.extend_prefix_lens_cpu) if forward_batch.extend_prefix_lens_cpu is not None else 0 ) def _is_extend_without_speculative(forward_batch): return ( forward_batch.forward_mode.is_extend() and not forward_batch.forward_mode.is_target_verify() and not forward_batch.forward_mode.is_draft_extend() ) def _handle_backend(attn, forward_batch, backend_name): sum_extend_prefix_lens = _get_sum_extend_prefix_lens(forward_batch) disable_ragged = ( backend_name in ["flashinfer", "flashmla"] ) and attn.flashinfer_mla_disable_ragged if ( not disable_ragged and _is_extend_without_speculative(forward_batch) and ( ( sum_extend_prefix_lens >= attn.chunked_prefix_cache_threshold and not attn.disable_chunked_prefix_cache ) or sum_extend_prefix_lens == 0 ) ): return AttnForwardMethod.MHA_CHUNKED_KV else: return _dispatch_mla_subtype(attn, forward_batch) def handle_flashinfer(attn, forward_batch): return _handle_backend(attn, forward_batch, "flashinfer") def handle_fa3(attn, forward_batch): return _handle_backend(attn, forward_batch, "fa3") def handle_flashmla(attn, forward_batch): return _handle_backend(attn, forward_batch, "flashmla") def handle_cutlass_mla(attn, forward_batch): return _handle_backend(attn, forward_batch, "cutlass_mla") def handle_fa4(attn, forward_batch): # TODO(cicirori): use FA4 MHA for DeepSeekV3 for now return AttnForwardMethod.MHA_CHUNKED_KV def handle_trtllm_mla(attn, forward_batch): sum_extend_prefix_lens = _get_sum_extend_prefix_lens(forward_batch) if _is_extend_without_speculative(forward_batch) and ( not attn.disable_chunked_prefix_cache or sum_extend_prefix_lens == 0 ): return AttnForwardMethod.MHA_CHUNKED_KV else: return _dispatch_mla_subtype(attn, forward_batch) def handle_aiter(attn, forward_batch): if _is_extend_without_speculative(forward_batch): if is_dp_attention_enabled(): if sum(forward_batch.extend_prefix_lens_cpu) == 0: return AttnForwardMethod.MHA else: return AttnForwardMethod.MLA else: return AttnForwardMethod.MHA else: return AttnForwardMethod.MLA def handle_triton(attn, forward_batch): if ( _is_extend_without_speculative(forward_batch) and sum(forward_batch.extend_prefix_lens_cpu) == 0 ): return AttnForwardMethod.MHA else: return _dispatch_mla_subtype(attn, forward_batch) class DeepseekV2MLP(nn.Module): def __init__( self, hidden_size: int, intermediate_size: int, hidden_act: str, quant_config: Optional[QuantizationConfig] = None, reduce_results: bool = True, prefix: str = "", tp_rank: Optional[int] = None, tp_size: Optional[int] = None, ) -> None: super().__init__() self.tp_size = tp_size self.gate_up_proj = MergedColumnParallelLinear( hidden_size, [intermediate_size] * 2, bias=False, quant_config=quant_config, prefix=add_prefix("gate_up_proj", prefix), tp_rank=tp_rank, tp_size=tp_size, ) self.down_proj = RowParallelLinear( intermediate_size, hidden_size, bias=False, quant_config=quant_config, reduce_results=reduce_results, prefix=add_prefix("down_proj", prefix), tp_rank=tp_rank, tp_size=tp_size, ) if hidden_act != "silu": raise ValueError( f"Unsupported activation: {hidden_act}. " "Only silu is supported for now." ) self.act_fn = SiluAndMul() def forward( self, x, forward_batch=None, should_allreduce_fusion: bool = False, use_reduce_scatter: bool = False, gemm_output_zero_allocator: BumpAllocator = None, ): if (self.tp_size == 1) and x.shape[0] == 0: return x if ( gemm_output_zero_allocator is not None and x.shape[0] <= 256 and self.gate_up_proj.weight.dtype == torch.uint8 ): y = gemm_output_zero_allocator.allocate( x.shape[0] * self.gate_up_proj.output_size_per_partition ).view(x.shape[0], self.gate_up_proj.output_size_per_partition) x = (x, None, y) gate_up, _ = self.gate_up_proj(x) x = self.act_fn(gate_up) x, _ = self.down_proj( x, skip_all_reduce=should_allreduce_fusion or use_reduce_scatter ) return x class MoEGate(nn.Module): def __init__( self, config, quant_config, prefix: str = "", is_nextn: bool = False, ): super().__init__() self.is_nextn = is_nextn self.weight = nn.Parameter( torch.empty((config.n_routed_experts, config.hidden_size)) ) if config.topk_method == "noaux_tc": correction_bias_dtype = ( torch.bfloat16 if quant_config is not None and quant_config.get_name() == "modelopt_fp4" and should_use_flashinfer_trtllm_moe() else torch.float32 ) self.e_score_correction_bias = nn.Parameter( torch.empty((config.n_routed_experts), dtype=correction_bias_dtype) ) else: self.e_score_correction_bias = None if _is_cpu and _is_cpu_amx_available: self.quant_method = PackWeightMethod(weight_names=["weight"]) def forward(self, hidden_states, gemm_output_zero_allocator: BumpAllocator = None): if use_intel_amx_backend(self): return torch.ops.sgl_kernel.weight_packed_linear( hidden_states, self.weight, None, # bias True, # is_vnni ) # NOTE: For some unknown reason, router_gemm seems degrade accept length. if ( _is_cuda and hidden_states.shape[0] <= 16 and hidden_states.shape[1] == 7168 and (self.weight.shape[0] == 256 or self.weight.shape[0] == 384) and _device_sm >= 90 ): # router gemm output float32 logits = dsv3_router_gemm( hidden_states, self.weight, out_dtype=torch.float32 ) elif _use_aiter_gfx95 and hidden_states.shape[0] <= 256: logits = aiter_dsv3_router_gemm( hidden_states, self.weight, gemm_output_zero_allocator ) else: logits = F.linear(hidden_states, self.weight, None) return logits class DeepseekV2MoE(nn.Module): def __init__( self, config: PretrainedConfig, layer_id: int, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", alt_stream: Optional[torch.cuda.Stream] = None, is_nextn: bool = False, ): super().__init__() self.tp_size = get_tensor_model_parallel_world_size() self.routed_scaling_factor = config.routed_scaling_factor self.n_shared_experts = config.n_shared_experts self.num_fused_shared_experts = ( 0 if global_server_args_dict["disable_shared_experts_fusion"] else config.n_shared_experts ) self.config = config self.layer_id = layer_id self.alt_stream = alt_stream if self.tp_size > config.n_routed_experts: raise ValueError( f"Tensor parallel size {self.tp_size} is greater than " f"the number of experts {config.n_routed_experts}." ) if config.hidden_act != "silu": raise ValueError( f"Unsupported activation: {config.hidden_act}. " "Only silu is supported for now." ) self.gate = MoEGate( config=config, quant_config=quant_config, prefix=add_prefix("gate", prefix), is_nextn=is_nextn, ) self.experts = get_moe_impl_class(quant_config)( num_experts=config.n_routed_experts + self.num_fused_shared_experts + global_server_args_dict["ep_num_redundant_experts"], num_fused_shared_experts=self.num_fused_shared_experts, top_k=config.num_experts_per_tok + self.num_fused_shared_experts, hidden_size=config.hidden_size, intermediate_size=config.moe_intermediate_size, layer_id=self.layer_id, quant_config=quant_config, routed_scaling_factor=self.routed_scaling_factor, prefix=add_prefix("experts", prefix), ) self.topk = TopK( top_k=config.num_experts_per_tok + self.num_fused_shared_experts, renormalize=config.norm_topk_prob, use_grouped_topk=True, num_expert_group=config.n_group, num_fused_shared_experts=self.num_fused_shared_experts, topk_group=config.topk_group, correction_bias=self.gate.e_score_correction_bias, quant_config=quant_config, routed_scaling_factor=self.routed_scaling_factor, apply_routed_scaling_factor_on_output=self.experts.should_fuse_routed_scaling_factor_in_topk(), # Some Fp4 MoE backends require the output format to be bypassed but the MTP layers are unquantized # and requires the output format to be standard. We use quant_config to determine the output format. output_format=TopKOutputFormat.STANDARD if quant_config is None else None, ) self.shared_experts_is_int8 = False self.shared_experts_is_fp8 = False self.shared_experts_weight_block_size = None if config.n_shared_experts is not None and self.num_fused_shared_experts == 0: intermediate_size = config.moe_intermediate_size * config.n_shared_experts # disable tp for shared experts when enable deepep moe, or with fp4 allgather self.shared_experts = DeepseekV2MLP( hidden_size=config.hidden_size, intermediate_size=intermediate_size, hidden_act=config.hidden_act, quant_config=quant_config, reduce_results=False, prefix=add_prefix("shared_experts", prefix), **( dict(tp_rank=0, tp_size=1) if get_moe_a2a_backend().is_deepep() or should_use_flashinfer_cutlass_moe_fp4_allgather() else {} ), ) is_packed_weight = hasattr( self.shared_experts.gate_up_proj.quant_method, "quant_config" ) and self.shared_experts.gate_up_proj.quant_method.quant_config.get_name() in { "awq", "awq_marlin", "moe_wna16", } self.shared_experts_is_int8 = ( not is_packed_weight and self.shared_experts.gate_up_proj.weight.dtype == torch.int8 ) self.shared_experts_is_fp8 = ( not is_packed_weight and self.shared_experts.gate_up_proj.weight.dtype == torch.float8_e4m3fn ) if self.shared_experts_is_fp8: assert ( self.shared_experts.gate_up_proj.quant_method.quant_config.weight_block_size == self.shared_experts.down_proj.quant_method.quant_config.weight_block_size ) self.shared_experts_weight_block_size = ( self.shared_experts.gate_up_proj.quant_method.quant_config.weight_block_size ) self.top_k = config.num_experts_per_tok if get_moe_a2a_backend().is_deepep(): # TODO: we will support tp < ep in the future self.ep_size = get_moe_expert_parallel_world_size() self.num_experts = ( config.n_routed_experts + global_server_args_dict["ep_num_redundant_experts"] ) self.renormalize = config.norm_topk_prob self.topk_group = config.topk_group self.num_expert_group = config.n_group self.correction_bias = ( self.gate.e_score_correction_bias.data if self.gate.e_score_correction_bias is not None else None ) self.deepep_dispatcher = MaybeTboDeepEPDispatcher( group=parallel_state.get_tp_group().device_group, router_topk=self.top_k, permute_fusion=True, num_experts=self.num_experts, num_local_experts=config.n_routed_experts // self.tp_size, hidden_size=config.hidden_size, params_dtype=config.torch_dtype, deepep_mode=get_deepep_mode(), async_finish=True, return_recv_hook=True, ) self._enable_deepep_moe = get_moe_a2a_backend().is_deepep() def get_moe_weights(self): return [ x.data for name, x in self.experts.named_parameters() if name not in ["correction_bias"] ] def forward( self, hidden_states: torch.Tensor, forward_batch: Optional[ForwardBatch] = None, should_allreduce_fusion: bool = False, use_reduce_scatter: bool = False, gemm_output_zero_allocator: BumpAllocator = None, ) -> torch.Tensor: if not self._enable_deepep_moe: DUAL_STREAM_TOKEN_THRESHOLD = 1024 if ( self.alt_stream is not None and self.num_fused_shared_experts == 0 and hidden_states.shape[0] > 0 and hidden_states.shape[0] <= DUAL_STREAM_TOKEN_THRESHOLD ): return self.forward_normal_dual_stream( hidden_states, should_allreduce_fusion, use_reduce_scatter, gemm_output_zero_allocator, ) else: return self.forward_normal( hidden_states, should_allreduce_fusion, use_reduce_scatter, gemm_output_zero_allocator, ) else: return self.forward_deepep(hidden_states, forward_batch) def forward_normal_dual_stream( self, hidden_states: torch.Tensor, should_allreduce_fusion: bool = False, use_reduce_scatter: bool = False, gemm_output_zero_allocator: BumpAllocator = None, ) -> torch.Tensor: current_stream = torch.cuda.current_stream() self.alt_stream.wait_stream(current_stream) shared_output = self._forward_shared_experts( hidden_states, gemm_output_zero_allocator ) with torch.cuda.stream(self.alt_stream): # router_logits: (num_tokens, n_experts) router_logits = self.gate(hidden_states, gemm_output_zero_allocator) topk_output = self.topk(hidden_states, router_logits) final_hidden_states = self.experts(hidden_states, topk_output) if not _is_cuda: final_hidden_states *= self.routed_scaling_factor current_stream.wait_stream(self.alt_stream) with use_symmetric_memory(parallel_state.get_tp_group()) as sm: final_hidden_states_out = torch.empty_like(final_hidden_states) torch.add(final_hidden_states, shared_output, out=final_hidden_states_out) final_hidden_states = final_hidden_states_out sm.tag(final_hidden_states) if ( self.tp_size > 1 and not should_allreduce_fusion and not use_reduce_scatter and not should_use_flashinfer_cutlass_moe_fp4_allgather() ): final_hidden_states = tensor_model_parallel_all_reduce(final_hidden_states) return final_hidden_states def forward_normal( self, hidden_states: torch.Tensor, should_allreduce_fusion: bool = False, use_reduce_scatter: bool = False, gemm_output_zero_allocator: BumpAllocator = None, ) -> torch.Tensor: if hasattr(self, "shared_experts") and use_intel_amx_backend( self.shared_experts.gate_up_proj ): return self.forward_cpu(hidden_states, should_allreduce_fusion) if hidden_states.shape[0] > 0: shared_output = self._forward_shared_experts( hidden_states, gemm_output_zero_allocator ) # router_logits: (num_tokens, n_experts) router_logits = self.gate(hidden_states, gemm_output_zero_allocator) topk_output = self.topk(hidden_states, router_logits) else: shared_output = None topk_output = self.topk.empty_topk_output(hidden_states.device) final_hidden_states = self.experts(hidden_states, topk_output) if not _is_cuda and not _use_aiter: # fused in biased_grouped_topk so we can skip here final_hidden_states *= self.routed_scaling_factor if shared_output is not None: with use_symmetric_memory(parallel_state.get_tp_group()) as sm: final_hidden_states_out = torch.empty_like(final_hidden_states) torch.add(final_hidden_states, shared_output, out=final_hidden_states_out) final_hidden_states = final_hidden_states_out sm.tag(final_hidden_states) if ( self.tp_size > 1 and not should_allreduce_fusion and not use_reduce_scatter and not should_use_flashinfer_cutlass_moe_fp4_allgather() ): final_hidden_states = tensor_model_parallel_all_reduce(final_hidden_states) return final_hidden_states def forward_cpu( self, hidden_states: torch.Tensor, should_allreduce_fusion: bool = False, ) -> torch.Tensor: # router_logits: (num_tokens, n_experts) router_logits = self.gate(hidden_states) topk_output = self.topk(hidden_states, router_logits) fused_experts_out = self.experts( hidden_states=hidden_states, topk_output=topk_output ) assert use_intel_amx_backend( self.shared_experts.gate_up_proj ) == use_intel_amx_backend(self.shared_experts.down_proj) # [Note] inplace should be False in fused_experts. # If inplace is True in fused_experts (self.experts), hidden_states will be changed after fused_experts # While hidden_states is still needed in shared_expert. final_hidden_states = torch.ops.sgl_kernel.shared_expert_cpu( hidden_states, self.shared_experts.gate_up_proj.weight, self.shared_experts.down_proj.weight, fused_experts_out, self.routed_scaling_factor, True, # inplace self.shared_experts_is_int8, # use_int8_w8a8 self.shared_experts_is_fp8, # use_fp8_w8a16 ( self.shared_experts.gate_up_proj.weight_scale if self.shared_experts_is_int8 else ( self.shared_experts.gate_up_proj.weight_scale_inv if self.shared_experts_is_fp8 else None ) ), # w1_scale ( self.shared_experts.down_proj.weight_scale if self.shared_experts_is_int8 else ( self.shared_experts.down_proj.weight_scale_inv if self.shared_experts_is_fp8 else None ) ), # w2_scale ( self.shared_experts_weight_block_size if self.shared_experts_is_fp8 else None ), # block_size None, # a1_scale None, # a2_scale True, # is_vnni ) if self.tp_size > 1 and not should_allreduce_fusion: final_hidden_states = tensor_model_parallel_all_reduce(final_hidden_states) return final_hidden_states def forward_deepep( self, hidden_states: torch.Tensor, forward_batch: ForwardBatch ) -> torch.Tensor: shared_output = None if hidden_states.shape[0] > 0: # router_logits: (num_tokens, n_experts) router_logits = self.gate(hidden_states) shared_output = self._forward_shared_experts(hidden_states) topk_weights, topk_idx, _ = self.topk( hidden_states, router_logits, num_token_non_padded=forward_batch.num_token_non_padded, expert_location_dispatch_info=ExpertLocationDispatchInfo.init_new( layer_id=self.layer_id, ), ) else: topk_weights, topk_idx, _ = self.topk.empty_topk_output( hidden_states.device ) final_hidden_states = self.experts( hidden_states=hidden_states, topk_idx=topk_idx, topk_weights=topk_weights, forward_batch=forward_batch, ) if shared_output is not None: x = shared_output if self.experts.should_fuse_routed_scaling_factor_in_topk(): x.add_(final_hidden_states) else: x.add_(final_hidden_states, alpha=self.routed_scaling_factor) final_hidden_states = x else: if not self.experts.should_fuse_routed_scaling_factor_in_topk(): final_hidden_states *= self.routed_scaling_factor return final_hidden_states def _forward_shared_experts( self, hidden_states, gemm_output_zero_allocator: BumpAllocator = None ): if self.num_fused_shared_experts == 0: return self.shared_experts( hidden_states, gemm_output_zero_allocator=gemm_output_zero_allocator ) else: return None def op_gate(self, state): if is_non_idle_and_non_empty( state.forward_batch.forward_mode, state.hidden_states_mlp_input ): # router_logits: (num_tokens, n_experts) state.router_logits = self.gate(state.hidden_states_mlp_input) else: state.router_logits = None def op_shared_experts(self, state): hidden_states_mlp_input = state.pop("hidden_states_mlp_input") if (self.num_fused_shared_experts == 0) and is_non_idle_and_non_empty( state.forward_batch.forward_mode, hidden_states_mlp_input ): state.shared_output = self.shared_experts(hidden_states_mlp_input) else: state.shared_output = None def op_select_experts(self, state): router_logits = state.pop("router_logits") hidden_states = state.hidden_states_mlp_input if router_logits is not None: with get_global_expert_distribution_recorder().with_current_layer( self.layer_id ): state.topk_weights_local, state.topk_idx_local, _ = self.topk( hidden_states=hidden_states, router_logits=router_logits, num_token_non_padded=state.forward_batch.num_token_non_padded, expert_location_dispatch_info=ExpertLocationDispatchInfo.init_new( layer_id=self.layer_id, ), ) else: state.topk_idx_local = torch.full( (0, self.top_k), -1, dtype=torch.int, device=hidden_states.device ) state.topk_weights_local = torch.empty( (0, self.top_k), dtype=torch.float32, device=hidden_states.device ) def op_dispatch_a(self, state): if self.ep_size > 1: self.experts.deepep_dispatcher.dispatch_a( hidden_states=state.hidden_states_mlp_input, topk_idx=state.pop("topk_idx_local"), topk_weights=state.pop("topk_weights_local"), forward_batch=state.forward_batch, tbo_subbatch_index=state.get("tbo_subbatch_index"), ) def op_dispatch_b(self, state): if self.ep_size > 1: with get_global_expert_distribution_recorder().with_current_layer( self.layer_id ): state.dispatch_output = self.experts.deepep_dispatcher.dispatch_b( tbo_subbatch_index=state.get("tbo_subbatch_index"), ) def op_experts(self, state): state.hidden_states_experts_output = self.experts.moe_impl( dispatch_output=state.dispatch_output, ) def op_combine_a(self, state): if self.ep_size > 1: self.experts.deepep_dispatcher.combine_a( hidden_states=state.pop("hidden_states_experts_output"), topk_idx=state.dispatch_output.topk_idx, topk_weights=state.dispatch_output.topk_weights, forward_batch=state.forward_batch, tbo_subbatch_index=state.get("tbo_subbatch_index"), ) state.pop("dispatch_output") def op_combine_b(self, state): if self.ep_size > 1: state.hidden_states_after_combine = ( self.experts.deepep_dispatcher.combine_b( tbo_subbatch_index=state.get("tbo_subbatch_index"), ) ) def op_output(self, state): final_hidden_states = state.pop("hidden_states_after_combine") if (shared_output := state.pop("shared_output")) is not None: x = shared_output x.add_(final_hidden_states, alpha=self.routed_scaling_factor) final_hidden_states = x else: final_hidden_states *= self.routed_scaling_factor state.hidden_states_mlp_output = final_hidden_states def yarn_get_mscale(scale: float = 1, mscale: float = 1) -> float: import math if scale <= 1: return 1.0 return 0.1 * mscale * math.log(scale) + 1.0 class DeepseekV2AttentionMLA(nn.Module): def __init__( self, config: PretrainedConfig, hidden_size: int, num_heads: int, qk_nope_head_dim: int, qk_rope_head_dim: int, v_head_dim: int, q_lora_rank: int, kv_lora_rank: int, rope_theta: float = 10000, rope_scaling: Optional[Dict[str, Any]] = None, max_position_embeddings: int = 8192, quant_config: Optional[QuantizationConfig] = None, reduce_results: bool = True, layer_id: int = None, prefix: str = "", alt_stream: Optional[torch.cuda.Stream] = None, ) -> None: super().__init__() self.layer_id = layer_id self.hidden_size = hidden_size self.qk_nope_head_dim = qk_nope_head_dim self.qk_rope_head_dim = qk_rope_head_dim self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim self.v_head_dim = v_head_dim self.q_lora_rank = q_lora_rank self.kv_lora_rank = kv_lora_rank attn_tp_rank = get_attention_tp_rank() attn_tp_size = get_attention_tp_size() self.num_heads = num_heads assert num_heads % attn_tp_size == 0 self.num_local_heads = num_heads // attn_tp_size self.scaling = self.qk_head_dim**-0.5 self.rope_theta = rope_theta self.max_position_embeddings = max_position_embeddings # For tensor parallel attention if self.q_lora_rank is not None: self.fused_qkv_a_proj_with_mqa = ReplicatedLinear( self.hidden_size, self.q_lora_rank + self.kv_lora_rank + self.qk_rope_head_dim, bias=False, quant_config=quant_config, prefix=add_prefix("fused_qkv_a_proj_with_mqa", prefix), ) self.q_a_layernorm = RMSNorm(self.q_lora_rank, eps=config.rms_norm_eps) self.q_b_proj = ColumnParallelLinear( q_lora_rank, self.num_heads * self.qk_head_dim, bias=False, quant_config=quant_config, prefix=add_prefix("q_b_proj", prefix), tp_rank=attn_tp_rank, tp_size=attn_tp_size, ) else: self.q_proj = ColumnParallelLinear( self.hidden_size, self.num_heads * self.qk_head_dim, bias=False, quant_config=quant_config, prefix=add_prefix("q_proj", prefix), tp_rank=attn_tp_rank, tp_size=attn_tp_size, ) self.kv_a_proj_with_mqa = ReplicatedLinear( self.hidden_size, self.kv_lora_rank + self.qk_rope_head_dim, bias=False, quant_config=quant_config, prefix=add_prefix("kv_a_proj_with_mqa", prefix), ) self.kv_b_proj = ColumnParallelLinear( self.kv_lora_rank, self.num_heads * (self.qk_nope_head_dim + self.v_head_dim), bias=False, quant_config=quant_config, prefix=add_prefix("kv_b_proj", prefix), tp_rank=attn_tp_rank, tp_size=attn_tp_size, ) # O projection. self.o_proj = RowParallelLinear( self.num_heads * self.v_head_dim, self.hidden_size, bias=False, quant_config=quant_config, reduce_results=reduce_results, prefix=add_prefix("o_proj", prefix), tp_rank=attn_tp_rank, tp_size=attn_tp_size, ) self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, eps=config.rms_norm_eps) if rope_scaling: rope_scaling["rope_type"] = "deepseek_yarn" self.rotary_emb = get_rope_wrapper( qk_rope_head_dim, rotary_dim=qk_rope_head_dim, max_position=max_position_embeddings, base=rope_theta, rope_scaling=rope_scaling, is_neox_style=False, device=global_server_args_dict["device"], ) if rope_scaling: mscale_all_dim = rope_scaling.get("mscale_all_dim", False) scaling_factor = rope_scaling["factor"] mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim)) self.scaling = self.scaling * mscale * mscale else: self.rotary_emb.forward = self.rotary_emb.forward_native self.attn_mqa = RadixAttention( self.num_local_heads, self.kv_lora_rank + self.qk_rope_head_dim, self.scaling, num_kv_heads=1, layer_id=layer_id, v_head_dim=self.kv_lora_rank, quant_config=quant_config, prefix=add_prefix("attn_mqa", prefix), ) self.attn_mha = RadixAttention( self.num_local_heads, self.qk_nope_head_dim + self.qk_rope_head_dim, self.scaling, num_kv_heads=self.num_local_heads, layer_id=layer_id, v_head_dim=self.v_head_dim, quant_config=quant_config, prefix=add_prefix("attn_mha", prefix), ) self.alt_stream = alt_stream self.attn_mha.kv_b_proj = None self.w_kc = None self.w_vc = None self.w_scale = 1.0 self.w_scale_k = None self.w_scale_v = None self.use_deep_gemm_bmm = False self.flashinfer_mla_disable_ragged = global_server_args_dict[ "flashinfer_mla_disable_ragged" ] self.disable_chunked_prefix_cache = global_server_args_dict[ "disable_chunked_prefix_cache" ] self.current_attention_backend = ( None # Attention backend used by current forward batch ) self.rocm_fused_decode_mla = get_bool_env_var( "SGLANG_ROCM_FUSED_DECODE_MLA", "false" ) # TODO: Design a finer way to determine the threshold self.chunked_prefix_cache_threshold = get_int_env_var( "SGL_CHUNKED_PREFIX_CACHE_THRESHOLD", 8192 ) # If we have self.fused_qkv_a_proj_with_mqa and we're running on CPU, we will choose the torch.ops.sgl_kernel.qkv_proj_with_rope_fused_weight kernel # which requires self.w_kc and self.w_vc to be packed. # If not, we will use torch.bmm and weight shouldn't be packed in this case has_fused_proj = hasattr(self, "fused_qkv_a_proj_with_mqa") if has_fused_proj and _is_cpu and _is_cpu_amx_available: self.quant_method = PackWeightMethod( weight_names=["w_kc", "w_vc"], transpose_dims=[[1, 2], [1, 2]] ) is_packed_weight = ( has_fused_proj and hasattr(self.fused_qkv_a_proj_with_mqa.quant_method, "quant_config") and self.fused_qkv_a_proj_with_mqa.quant_method.quant_config.get_name() in {"awq", "awq_marlin", "moe_wna16"} ) self.use_min_latency_fused_a_gemm = ( has_fused_proj and not is_packed_weight and self.fused_qkv_a_proj_with_mqa.weight.dtype == torch.bfloat16 and self.fused_qkv_a_proj_with_mqa.weight.shape[0] == 2112 and self.fused_qkv_a_proj_with_mqa.weight.shape[1] == 7168 and _is_cuda and _device_sm >= 90 ) self.qkv_proj_with_rope_is_int8 = ( has_fused_proj and not is_packed_weight and self.fused_qkv_a_proj_with_mqa.weight.dtype == torch.int8 ) self.qkv_proj_with_rope_is_fp8 = ( has_fused_proj and not is_packed_weight and self.fused_qkv_a_proj_with_mqa.weight.dtype == torch.float8_e4m3fn ) self.weight_block_size = None if self.qkv_proj_with_rope_is_fp8 and _is_cpu and _is_cpu_amx_available: assert getattr( self.fused_qkv_a_proj_with_mqa.quant_method, "block_quant", False ) == getattr(self.q_b_proj.quant_method, "block_quant", False) use_block_quant = getattr( self.fused_qkv_a_proj_with_mqa.quant_method, "block_quant", False ) if use_block_quant: assert ( self.fused_qkv_a_proj_with_mqa.quant_method.quant_config.weight_block_size == self.q_b_proj.quant_method.quant_config.weight_block_size ) self.weight_block_size = ( self.fused_qkv_a_proj_with_mqa.quant_method.quant_config.weight_block_size ) self.is_mla_preprocess_enabled = is_mla_preprocess_enabled() if self.is_mla_preprocess_enabled: assert ( quant_config.get_name() == "w8a8_int8" ), "MLA Preprocess only works with W8A8Int8" self.mla_preprocess = None def dispatch_attn_forward_method( self, forward_batch: ForwardBatch ) -> AttnForwardMethod: # Determine attention backend used by current forward batch if forward_batch.forward_mode.is_decode_or_idle(): attention_backend = global_server_args_dict["decode_attention_backend"] elif ( forward_batch.forward_mode.is_target_verify() or forward_batch.forward_mode.is_draft_extend() ): # Use the specified backend for speculative operations (both verify and draft extend) if global_server_args_dict["speculative_attention_mode"] == "decode": attention_backend = global_server_args_dict["decode_attention_backend"] else: # default to prefill attention_backend = global_server_args_dict["prefill_attention_backend"] else: attention_backend = global_server_args_dict["prefill_attention_backend"] self.current_attention_backend = attention_backend handler = BackendRegistry.get_handler(attention_backend) return handler(self, forward_batch) def op_prepare(self, state): state.attn_intermediate_state = self.forward_prepare( positions=state.positions, hidden_states=state.pop("hidden_states_after_comm_pre_attn"), forward_batch=state.forward_batch, zero_allocator=state.zero_allocator, ) def op_core(self, state): state.hidden_states_after_attn = self.forward_core( state.pop("attn_intermediate_state") ) def forward( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): s = self.forward_prepare( positions=positions, hidden_states=hidden_states, forward_batch=forward_batch, zero_allocator=zero_allocator, ) return self.forward_core(s) def forward_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): if self.attn_mha.kv_b_proj is None: self.attn_mha.kv_b_proj = self.kv_b_proj # when hidden_states is a tuple of tensors, the tuple will include quantized weight and scale tensor if isinstance(hidden_states, tuple): if hidden_states[0].shape[0] == 0: assert ( not self.o_proj.reduce_results ), "short-circuiting allreduce will lead to hangs" return hidden_states[0] else: if hidden_states.shape[0] == 0: assert ( not self.o_proj.reduce_results ), "short-circuiting allreduce will lead to hangs" return hidden_states, None, forward_batch, None attn_forward_method = self.dispatch_attn_forward_method(forward_batch) if attn_forward_method == AttnForwardMethod.MHA: inner_state = self.forward_normal_prepare( positions, hidden_states, forward_batch, zero_allocator ) elif attn_forward_method == AttnForwardMethod.MHA_CHUNKED_KV: inner_state = self.forward_normal_chunked_kv_prepare( positions, hidden_states, forward_batch, zero_allocator ) elif attn_forward_method == AttnForwardMethod.MLA: if not self.is_mla_preprocess_enabled: inner_state = self.forward_absorb_prepare( positions, hidden_states, forward_batch, zero_allocator ) else: # TODO(iforgetmyname): to be separated as a standalone func if self.mla_preprocess is None: self.mla_preprocess = NPUFusedMLAPreprocess( self.fused_qkv_a_proj_with_mqa, self.q_a_layernorm, self.kv_a_layernorm, self.q_b_proj, self.w_kc, self.rotary_emb, self.layer_id, self.num_local_heads, self.qk_nope_head_dim, self.qk_rope_head_dim, ) inner_state = self.mla_preprocess.forward( positions, hidden_states, forward_batch, zero_allocator ) elif attn_forward_method == AttnForwardMethod.MLA_FUSED_ROPE: inner_state = self.forward_absorb_fused_mla_rope_prepare( positions, hidden_states, forward_batch, zero_allocator ) elif attn_forward_method == AttnForwardMethod.MLA_FUSED_ROPE_CPU: inner_state = self.forward_absorb_fused_mla_rope_cpu_prepare( positions, hidden_states, forward_batch, zero_allocator ) else: raise NotImplementedError return None, attn_forward_method, forward_batch, inner_state def forward_core(self, intermediate_state): hidden_states, attn_forward_method, forward_batch, inner_state = ( intermediate_state ) if inner_state is None: return hidden_states if attn_forward_method == AttnForwardMethod.MHA: return self.forward_normal_core(*inner_state) elif attn_forward_method == AttnForwardMethod.MHA_CHUNKED_KV: return self.forward_normal_chunked_kv_core(*inner_state) elif attn_forward_method == AttnForwardMethod.MLA: return self.forward_absorb_core(*inner_state) elif attn_forward_method == AttnForwardMethod.MLA_FUSED_ROPE: return self.forward_absorb_fused_mla_rope_core(*inner_state) elif attn_forward_method == AttnForwardMethod.MLA_FUSED_ROPE_CPU: return self.forward_absorb_fused_mla_rope_cpu_core(*inner_state) else: raise NotImplementedError def forward_normal_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): if self.q_lora_rank is not None: q, latent_cache = self.fused_qkv_a_proj_with_mqa(hidden_states)[0].split( [self.q_lora_rank, self.kv_lora_rank + self.qk_rope_head_dim], dim=-1 ) q = self.q_a_layernorm(q) q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim) else: q = self.q_proj(hidden_states)[0].view( -1, self.num_local_heads, self.qk_head_dim ) latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0] _, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1) kv_a, _ = latent_cache.split([self.kv_lora_rank, self.qk_rope_head_dim], dim=-1) latent_cache = latent_cache.unsqueeze(1) kv_a = self.kv_a_layernorm(kv_a) kv = self.kv_b_proj(kv_a)[0] kv = kv.view(-1, self.num_local_heads, self.qk_nope_head_dim + self.v_head_dim) k_nope = kv[..., : self.qk_nope_head_dim] v = kv[..., self.qk_nope_head_dim :] k_pe = latent_cache[:, :, self.kv_lora_rank :] q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe) q[..., self.qk_nope_head_dim :] = q_pe k = torch.empty_like(q) # Temporary for DeepSeek V3/R1 only, but can generalize if needed if ( _is_cuda and (self.num_local_heads == 128) and (self.qk_nope_head_dim == 128) and (self.qk_rope_head_dim == 64) ): concat_mla_k(k=k, k_nope=k_nope, k_rope=k_pe) else: k[..., : self.qk_nope_head_dim] = k_nope k[..., self.qk_nope_head_dim :] = k_pe if not _is_npu: latent_cache[:, :, : self.kv_lora_rank] = kv_a.unsqueeze(1) latent_cache[:, :, self.kv_lora_rank :] = k_pe # Save latent cache forward_batch.token_to_kv_pool.set_kv_buffer( self.attn_mha, forward_batch.out_cache_loc, latent_cache, None ) else: # To reduce a time-costing split operation forward_batch.token_to_kv_pool.set_kv_buffer( self.attn_mha, forward_batch.out_cache_loc, kv_a.unsqueeze(1), k_pe ) return q, k, v, forward_batch def forward_normal_core(self, q, k, v, forward_batch): attn_output = self.attn_mha(q, k, v, forward_batch, save_kv_cache=False) attn_output = attn_output.reshape(-1, self.num_local_heads * self.v_head_dim) output, _ = self.o_proj(attn_output) return output def _fuse_rope_for_trtllm_mla(self, forward_batch: ForwardBatch) -> bool: """ Check if we should skip rope and do fused rope+quantize for TRTLLM MLA decode in fp8_e4m3 path. """ return ( self.current_attention_backend == "trtllm_mla" and forward_batch.forward_mode.is_decode_or_idle() and forward_batch.attn_backend.data_type == torch.float8_e4m3fn ) def forward_absorb_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): from sglang.srt.model_executor.cuda_graph_runner import get_is_capture_mode if self.q_lora_rank is not None: if ( (not isinstance(hidden_states, tuple)) and hidden_states.shape[0] <= 16 and self.use_min_latency_fused_a_gemm ): fused_qkv_a_proj_out = dsv3_fused_a_gemm( hidden_states, self.fused_qkv_a_proj_with_mqa.weight.T ) else: fused_qkv_a_proj_out = self.fused_qkv_a_proj_with_mqa(hidden_states)[0] q, latent_cache = fused_qkv_a_proj_out.split( [self.q_lora_rank, self.kv_lora_rank + self.qk_rope_head_dim], dim=-1 ) k_nope = latent_cache[..., : self.kv_lora_rank] # overlap qk norm if self.alt_stream is not None and get_is_capture_mode(): current_stream = torch.cuda.current_stream() self.alt_stream.wait_stream(current_stream) q = self.q_a_layernorm(q) with torch.cuda.stream(self.alt_stream): k_nope = self.kv_a_layernorm(k_nope) current_stream.wait_stream(self.alt_stream) else: if _use_aiter_gfx95 and self.q_b_proj.weight.dtype == torch.uint8: q, k_nope = fused_rms_mxfp4_quant( q, self.q_a_layernorm.weight, self.q_a_layernorm.variance_epsilon, k_nope, self.kv_a_layernorm.weight, self.kv_a_layernorm.variance_epsilon, ) else: q = self.q_a_layernorm(q) k_nope = self.kv_a_layernorm(k_nope) k_nope = k_nope.unsqueeze(1) q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim) else: q = self.q_proj(hidden_states)[0].view( -1, self.num_local_heads, self.qk_head_dim ) latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0] k_nope = latent_cache[..., : self.kv_lora_rank] k_nope = self.kv_a_layernorm(k_nope).unsqueeze(1) q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1) k_pe = latent_cache[..., self.kv_lora_rank :].unsqueeze(1) if self.use_deep_gemm_bmm: q_nope_val, q_nope_scale, masked_m, expected_m, aligned_m = ( per_token_group_quant_mla_deep_gemm_masked_fp8(q_nope.transpose(0, 1)) ) q_nope_out = q_nope.new_empty( (self.num_local_heads, aligned_m, self.kv_lora_rank) ) deep_gemm_wrapper.grouped_gemm_nt_f8f8bf16_masked( (q_nope_val, q_nope_scale), (self.w_kc, self.w_scale_k), q_nope_out, masked_m, expected_m, ) q_nope_out = q_nope_out[:, :expected_m, :] elif _is_hip: # TODO(haishaw): add bmm_fp8 to ROCm if _use_aiter_gfx95 and self.w_kc.dtype == torch.uint8: x = q_nope.transpose(0, 1) q_nope_out = torch.empty( x.shape[0], x.shape[1], self.w_kc.shape[2], device=x.device, dtype=torch.bfloat16, ) batched_gemm_afp4wfp4_pre_quant( x, self.w_kc.transpose(-2, -1), self.w_scale_k.transpose(-2, -1), torch.bfloat16, q_nope_out, ) else: q_nope_out = torch.bmm( q_nope.to(torch.bfloat16).transpose(0, 1), self.w_kc.to(torch.bfloat16) * self.w_scale, ) elif self.w_kc.dtype == torch.float8_e4m3fn: q_nope_val, q_nope_scale = per_tensor_quant_mla_fp8( q_nope.transpose(0, 1), zero_allocator.allocate(1), ) q_nope_out = bmm_fp8( q_nope_val, self.w_kc, q_nope_scale, self.w_scale, torch.bfloat16 ) else: q_nope_out = torch.bmm(q_nope.transpose(0, 1), self.w_kc) q_nope_out = q_nope_out.transpose(0, 1) if not self._fuse_rope_for_trtllm_mla(forward_batch) and ( not _use_aiter or not _is_gfx95_supported ): q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe) return q_pe, k_pe, q_nope_out, k_nope, forward_batch, zero_allocator, positions def forward_absorb_core( self, q_pe, k_pe, q_nope_out, k_nope, forward_batch, zero_allocator, positions ): if self.current_attention_backend in FORWARD_ABSORB_CORE_ATTENTION_BACKENDS: extra_args = {} if self._fuse_rope_for_trtllm_mla(forward_batch): extra_args = { "cos_sin_cache": self.rotary_emb.cos_sin_cache, "is_neox": self.rotary_emb.is_neox_style, } attn_output = self.attn_mqa( q_nope_out, k_nope, k_nope, forward_batch, q_rope=q_pe, k_rope=k_pe, **extra_args, ) else: if _use_aiter_gfx95: cos = self.rotary_emb.cos_cache sin = self.rotary_emb.sin_cache q, k = fused_qk_rope_cat( q_nope_out, q_pe, k_nope, k_pe, positions, cos, sin, self.rotary_emb.is_neox_style, ) else: q = torch.cat([q_nope_out, q_pe], dim=-1) k = torch.cat([k_nope, k_pe], dim=-1) attn_output = self.attn_mqa(q, k, k_nope, forward_batch) attn_output = attn_output.view(-1, self.num_local_heads, self.kv_lora_rank) if self.use_deep_gemm_bmm: attn_output_val, attn_output_scale, masked_m, expected_m, aligned_m = ( per_token_group_quant_mla_deep_gemm_masked_fp8( attn_output.transpose(0, 1) ) ) attn_bmm_output = attn_output.new_empty( (self.num_local_heads, aligned_m, self.v_head_dim) ) deep_gemm_wrapper.grouped_gemm_nt_f8f8bf16_masked( (attn_output_val, attn_output_scale), (self.w_vc, self.w_scale_v), attn_bmm_output, masked_m, expected_m, ) attn_bmm_output = ( attn_bmm_output[:, :expected_m, :].transpose(0, 1).flatten(1, 2) ) elif _is_hip: # TODO(haishaw): add bmm_fp8 to ROCm if _use_aiter_gfx95 and self.w_vc.dtype == torch.uint8: x = attn_output.transpose(0, 1) attn_bmm_output = torch.empty( x.shape[0], x.shape[1], self.w_vc.shape[2], device=x.device, dtype=torch.bfloat16, ) batched_gemm_afp4wfp4_pre_quant( x, self.w_vc.transpose(-2, -1), self.w_scale_v.transpose(-2, -1), torch.bfloat16, attn_bmm_output, ) else: attn_bmm_output = torch.bmm( attn_output.to(torch.bfloat16).transpose(0, 1), self.w_vc.to(torch.bfloat16) * self.w_scale, ) if self.o_proj.weight.dtype == torch.uint8: attn_bmm_output = attn_bmm_output.transpose(0, 1) attn_bmm_output = fused_flatten_mxfp4_quant(attn_bmm_output) else: attn_bmm_output = attn_bmm_output.transpose(0, 1).flatten(1, 2) elif self.w_vc.dtype == torch.float8_e4m3fn: attn_output_val, attn_output_scale = per_tensor_quant_mla_fp8( attn_output.transpose(0, 1), zero_allocator.allocate(1), ) attn_bmm_output = bmm_fp8( attn_output_val, self.w_vc, attn_output_scale, self.w_scale, torch.bfloat16, ) attn_bmm_output = attn_bmm_output.transpose(0, 1).flatten(1, 2) else: attn_bmm_output = torch.empty( (attn_output.shape[0], self.num_local_heads * self.v_head_dim), dtype=attn_output.dtype, device=attn_output.device, ) torch.bmm( attn_output.transpose(0, 1), self.w_vc, out=attn_bmm_output.view( -1, self.num_local_heads, self.v_head_dim ).transpose(0, 1), ) output, _ = self.o_proj(attn_bmm_output) return output def forward_absorb_fused_mla_rope_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): enable_rope_fusion = ( os.getenv("SGLANG_FUSED_MLA_ENABLE_ROPE_FUSION", "1") == "1" ) q_len = hidden_states.shape[0] q_input = hidden_states.new_empty( q_len, self.num_local_heads, self.kv_lora_rank + self.qk_rope_head_dim ) if self.q_lora_rank is not None: q, latent_cache = self.fused_qkv_a_proj_with_mqa(hidden_states)[0].split( [self.q_lora_rank, self.kv_lora_rank + self.qk_rope_head_dim], dim=-1 ) q = self.q_a_layernorm(q) q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim) else: q = self.q_proj(hidden_states)[0].view( -1, self.num_local_heads, self.qk_head_dim ) latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0] q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1) if _is_hip: # TODO(haishaw): add bmm_fp8 to ROCm q_nope_out = torch.bmm( q_nope.to(torch.bfloat16).transpose(0, 1), self.w_kc.to(torch.bfloat16) * self.w_scale, ) elif self.w_kc.dtype == torch.float8_e4m3fn: q_nope_val, q_nope_scale = per_tensor_quant_mla_fp8( q_nope.transpose(0, 1), zero_allocator.allocate(1), dtype=torch.float8_e4m3fn, ) q_nope_out = bmm_fp8( q_nope_val, self.w_kc, q_nope_scale, self.w_scale, torch.bfloat16 ) else: q_nope_out = torch.bmm(q_nope.transpose(0, 1), self.w_kc) q_input[..., : self.kv_lora_rank] = q_nope_out.transpose(0, 1) v_input = latent_cache[..., : self.kv_lora_rank] v_input = self.kv_a_layernorm(v_input.contiguous()).unsqueeze(1) k_input = latent_cache.unsqueeze(1) k_input[..., : self.kv_lora_rank] = v_input if not enable_rope_fusion: k_pe = k_input[..., self.kv_lora_rank :] q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe) q_input[..., self.kv_lora_rank :] = q_pe k_input[..., self.kv_lora_rank :] = k_pe k_pe_output = None else: k_pe_output = torch.empty_like(k_input[..., self.kv_lora_rank :]) q_input[..., self.kv_lora_rank :] = q_pe # attn_output = self.attn_mqa(q_input, k_input, v_input, forward_batch) # Use Fused ROPE with use_rope=OFF. attn_output = torch.empty( (q_len, self.num_local_heads, self.kv_lora_rank), dtype=q.dtype, device=q.device, ) attn_logits, _, kv_indptr, kv_indices, _, _, _ = ( forward_batch.attn_backend.forward_metadata ) cos_sin_cache = self.rotary_emb.cos_sin_cache num_kv_split = forward_batch.attn_backend.num_kv_splits sm_scale = self.attn_mqa.scaling if attn_logits is None: attn_logits = torch.empty( ( forward_batch.batch_size, self.num_local_heads, num_kv_split, self.kv_lora_rank + 1, ), dtype=torch.float32, device=q.device, ) # save current latent cache. forward_batch.token_to_kv_pool.set_kv_buffer( self.attn_mqa, forward_batch.out_cache_loc, k_input, None ) key_cache_buf = forward_batch.token_to_kv_pool.get_key_buffer( self.attn_mqa.layer_id ) val_cache_buf = key_cache_buf[..., : self.kv_lora_rank] return ( q_input, key_cache_buf, val_cache_buf, attn_output, kv_indptr, kv_indices, k_pe_output, cos_sin_cache, positions, attn_logits, num_kv_split, sm_scale, enable_rope_fusion, k_input, forward_batch, zero_allocator, ) def forward_absorb_fused_mla_rope_cpu_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): assert self.q_lora_rank is not None and use_intel_amx_backend( self ), "forward_absorb_fused_mla_rope_cpu_prepare requires q_lora_rank is not None and use_intel_amx_backend" q_input, k_input, v_input = ( torch.ops.sgl_kernel.qkv_proj_with_rope_fused_weight( hidden_states, self.fused_qkv_a_proj_with_mqa.weight, self.q_b_proj.weight, self.w_kc, self.q_a_layernorm.weight, self.kv_a_layernorm.weight, positions, self.rotary_emb.cos_sin_cache, self.kv_a_layernorm.variance_epsilon, self.qkv_proj_with_rope_is_int8, self.qkv_proj_with_rope_is_fp8, ( self.fused_qkv_a_proj_with_mqa.weight_scale if self.qkv_proj_with_rope_is_int8 else ( self.fused_qkv_a_proj_with_mqa.weight_scale_inv if self.qkv_proj_with_rope_is_fp8 else None ) ), ( self.q_b_proj.weight_scale if self.qkv_proj_with_rope_is_int8 else ( self.q_b_proj.weight_scale_inv if self.qkv_proj_with_rope_is_fp8 else None ) ), True, # is_vnni self.weight_block_size, self.q_lora_rank, self.kv_lora_rank, self.qk_rope_head_dim, ) ) return (q_input, k_input, v_input, forward_batch, zero_allocator) def forward_absorb_fused_mla_rope_core( self, q_input, key_cache_buf, val_cache_buf, attn_output, kv_indptr, kv_indices, k_pe_output, cos_sin_cache, positions, attn_logits, num_kv_split, sm_scale, enable_rope_fusion, k_input, forward_batch, zero_allocator, ): decode_attention_fwd_grouped_rope( q_input, key_cache_buf, val_cache_buf, attn_output, kv_indptr, kv_indices, k_pe_output, self.kv_lora_rank, self.rotary_emb.rotary_dim, cos_sin_cache, positions, attn_logits, num_kv_split, sm_scale, logit_cap=self.attn_mqa.logit_cap, use_rope=enable_rope_fusion, is_neox_style=self.rotary_emb.is_neox_style, ) if enable_rope_fusion: k_input[..., self.kv_lora_rank :] = k_pe_output forward_batch.token_to_kv_pool.set_kv_buffer( self.attn_mqa, forward_batch.out_cache_loc, k_input, None ) attn_output = attn_output.view(-1, self.num_local_heads, self.kv_lora_rank) if _is_hip: # TODO(haishaw): add bmm_fp8 to ROCm attn_bmm_output = torch.bmm( attn_output.to(torch.bfloat16).transpose(0, 1), self.w_vc.to(torch.bfloat16) * self.w_scale, ) elif self.w_vc.dtype == torch.float8_e4m3fn: attn_output_val, attn_output_scale = per_tensor_quant_mla_fp8( attn_output.transpose(0, 1), zero_allocator.allocate(1), dtype=torch.float8_e4m3fn, ) attn_bmm_output = bmm_fp8( attn_output_val, self.w_vc, attn_output_scale, self.w_scale, torch.bfloat16, ) else: attn_bmm_output = torch.bmm(attn_output.transpose(0, 1), self.w_vc) attn_output = attn_bmm_output.transpose(0, 1).flatten(1, 2) output, _ = self.o_proj(attn_output) return output def forward_absorb_fused_mla_rope_cpu_core( self, q_input, k_input, v_input, forward_batch, zero_allocator ): assert self.q_lora_rank is not None and use_intel_amx_backend( self ), "forward_absorb_fused_mla_rope_cpu_core requires q_lora_rank is not None and use_intel_amx_backend" attn_output = self.attn_mqa(q_input, k_input, v_input, forward_batch) attn_output = attn_output.view(-1, self.num_local_heads, self.kv_lora_rank) # [Note] Align shapes of bmm inputs. # Shapes of inputs: # q_nope: [M, B, K] # original self.w_kc: [B, K, N] # current self.w_kc (which has been converted in PackWeightMethod): [B, N, K] # Shapes of inputs to sgl_kernel.cpu.bmm: # out: [B, M, N] # mat1: [B, M, K] # mat2: [B, N, K] B = self.w_vc.size(0) N = self.w_vc.size(1) M = attn_output.size(0) output = torch.empty([M, int(B * N)], dtype=attn_output.dtype) attn_bmm_output = output.view([M, B, N]).transpose_(0, 1) torch.ops.sgl_kernel.bmm_cpu( attn_bmm_output, attn_output.transpose(0, 1), self.w_vc, True, # is_vnni None, # scale ) attn_output = output output, _ = self.o_proj(attn_output) return output def _chunked_prefix_attn_mha( self, q: torch.Tensor, accum_output: torch.Tensor, accum_lse: torch.Tensor, forward_batch: ForwardBatch, ) -> torch.Tensor: assert forward_batch.num_prefix_chunks is not None for i in range(forward_batch.num_prefix_chunks): forward_batch.set_prefix_chunk_idx(i) # Fetch latent cache from memory pool with precomputed chunked kv indices latent_cache_buf = forward_batch.token_to_kv_pool.get_key_buffer( self.attn_mha.layer_id ) latent_cache = ( latent_cache_buf[forward_batch.prefix_chunk_kv_indices[i]] .contiguous() .to(q.dtype) ) kv_a_normed, k_pe = latent_cache.split( [self.kv_lora_rank, self.qk_rope_head_dim], dim=-1 ) kv_a_normed = kv_a_normed.squeeze(1).contiguous() kv = self.kv_b_proj(kv_a_normed)[0] kv = kv.view( -1, self.num_local_heads, self.qk_nope_head_dim + self.v_head_dim ) v = kv[..., self.qk_nope_head_dim :] k_nope = kv[..., : self.qk_nope_head_dim] k = torch.empty( ( k_nope.shape[0], self.num_local_heads, self.qk_nope_head_dim + self.qk_rope_head_dim, ), dtype=v.dtype, device=v.device, ) k[..., : self.qk_nope_head_dim] = k_nope k[..., self.qk_nope_head_dim :] = k_pe output, lse = self.attn_mha(q, k, v, forward_batch, save_kv_cache=False) tmp_output = torch.empty_like(accum_output) tmp_lse = torch.empty_like(accum_lse) merge_state_v2(output, lse, accum_output, accum_lse, tmp_output, tmp_lse) accum_output, accum_lse = tmp_output, tmp_lse del kv, k, v, output, lse, tmp_output, tmp_lse return accum_output def forward_normal_chunked_kv_prepare( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, zero_allocator: BumpAllocator, ): # In normal mha, the k and v tensors will become overly large when the prefix length is long. # To avoid this, we split the kv cache into chunks and process them one after another. # Since mha is compute friendly, the for loop induced here will not introduce significant overhead. # The top comments in https://github.com/vllm-project/vllm/blob/main/vllm/v1/attention/backends/mla/common.py # will be helpful for understanding the purpose of this function. # First do normal mha forward to get output for extended part return self.forward_normal_prepare( positions, hidden_states, forward_batch, zero_allocator ) def forward_normal_chunked_kv_core(self, q, k, v, forward_batch): has_extend_prefix = any(forward_batch.extend_prefix_lens_cpu) # Only initialize the info once if has_extend_prefix and forward_batch.num_prefix_chunks is None: forward_batch.prepare_chunked_prefix_cache_info(q.device) if hasattr(forward_batch.attn_backend, "init_mha_chunk_metadata"): forward_batch.attn_backend.init_mha_chunk_metadata(forward_batch) forward_batch.mha_return_lse = has_extend_prefix # Do mha for extended part without prefix forward_batch.set_attn_attend_prefix_cache(False) attn_output = self.attn_mha(q, k, v, forward_batch, save_kv_cache=False) # Do mha attention with chunked prefix cache if there are any sequence with prefix if has_extend_prefix: attn_output, lse = attn_output forward_batch.set_attn_attend_prefix_cache(True) attn_output = self._chunked_prefix_attn_mha( q=q, accum_output=attn_output, accum_lse=lse, forward_batch=forward_batch, ) attn_output = attn_output.reshape(-1, self.num_local_heads * self.v_head_dim) output, _ = self.o_proj(attn_output) return output class DeepseekV2DecoderLayer(nn.Module): def __init__( self, config: PretrainedConfig, layer_id: int, quant_config: Optional[QuantizationConfig] = None, is_nextn: bool = False, prefix: str = "", alt_stream: Optional[torch.cuda.Stream] = None, ) -> None: super().__init__() self.hidden_size = config.hidden_size self.config = config rope_theta = getattr(config, "rope_theta", 10000) rope_scaling = getattr(config, "rope_scaling", None) max_position_embeddings = getattr(config, "max_position_embeddings", 8192) self.speculative_algorithm = global_server_args_dict["speculative_algorithm"] self.layer_id = layer_id self.is_nextn = is_nextn self.self_attn = DeepseekV2AttentionMLA( config=config, hidden_size=self.hidden_size, num_heads=config.num_attention_heads, qk_nope_head_dim=config.qk_nope_head_dim, qk_rope_head_dim=config.qk_rope_head_dim, v_head_dim=config.v_head_dim, q_lora_rank=( config.q_lora_rank if hasattr(config, "q_lora_rank") else None ), kv_lora_rank=config.kv_lora_rank, rope_theta=rope_theta, rope_scaling=rope_scaling, max_position_embeddings=max_position_embeddings, quant_config=quant_config, layer_id=layer_id, reduce_results=False, prefix=add_prefix("self_attn", prefix), alt_stream=alt_stream, ) self.is_layer_sparse = self._is_layer_sparse(layer_id, is_nextn=is_nextn) is_previous_layer_sparse = self._is_layer_sparse(layer_id - 1, is_nextn=False) self.layer_scatter_modes = LayerScatterModes.init_new( layer_id=layer_id, num_layers=1 if is_nextn else config.num_hidden_layers, is_layer_sparse=self.is_layer_sparse, is_previous_layer_sparse=is_previous_layer_sparse, ) if self.is_layer_sparse: self.mlp = DeepseekV2MoE( config=config, quant_config=quant_config, prefix=add_prefix("mlp", prefix), layer_id=self.layer_id, alt_stream=alt_stream, is_nextn=is_nextn, ) else: if enable_moe_dense_fully_dp(): mlp_tp_rank, mlp_tp_size = 0, 1 else: mlp_tp_rank, mlp_tp_size = None, None self.mlp = DeepseekV2MLP( hidden_size=config.hidden_size, intermediate_size=config.intermediate_size, hidden_act=config.hidden_act, quant_config=quant_config, prefix=add_prefix("mlp", prefix), tp_rank=mlp_tp_rank, tp_size=mlp_tp_size, ) self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.post_attention_layernorm = RMSNorm( config.hidden_size, eps=config.rms_norm_eps ) self.layer_communicator = LayerCommunicator( layer_scatter_modes=self.layer_scatter_modes, input_layernorm=self.input_layernorm, post_attention_layernorm=self.post_attention_layernorm, allow_reduce_scatter=True, is_last_layer=( is_nextn or (self.layer_id == self.config.num_hidden_layers - 1) ), ) def _is_layer_sparse(self, layer_id: int, is_nextn: bool) -> bool: return is_nextn or ( self.config.n_routed_experts is not None and layer_id >= self.config.first_k_dense_replace and layer_id % self.config.moe_layer_freq == 0 ) def forward( self, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, residual: Optional[torch.Tensor], zero_allocator: BumpAllocator, gemm_output_zero_allocator: BumpAllocator = None, ) -> torch.Tensor: quant_format = ( "mxfp4" if _is_gfx95_supported and getattr(self.self_attn, "fused_qkv_a_proj_with_mqa", None) is not None and getattr(self.self_attn.fused_qkv_a_proj_with_mqa, "weight", None) is not None and self.self_attn.fused_qkv_a_proj_with_mqa.weight.dtype == torch.uint8 else "" ) hidden_states, residual = self.layer_communicator.prepare_attn( hidden_states, residual, forward_batch, quant_format, ) hidden_states = self.self_attn( positions=positions, hidden_states=hidden_states, forward_batch=forward_batch, zero_allocator=zero_allocator, ) hidden_states, residual = self.layer_communicator.prepare_mlp( hidden_states, residual, forward_batch ) should_allreduce_fusion = ( self.layer_communicator.should_fuse_mlp_allreduce_with_next_layer( forward_batch ) ) # For DP with padding, reduce scatter can be used instead of all-reduce. use_reduce_scatter = self.layer_communicator.should_use_reduce_scatter( forward_batch ) if isinstance(self.mlp, DeepseekV2MLP): gemm_output_zero_allocator = None hidden_states = self.mlp( hidden_states, forward_batch, should_allreduce_fusion, use_reduce_scatter, gemm_output_zero_allocator, ) if should_allreduce_fusion: hidden_states._sglang_needs_allreduce_fusion = True if not should_allreduce_fusion: hidden_states, residual = self.layer_communicator.postprocess_layer( hidden_states, residual, forward_batch ) return hidden_states, residual def op_comm_prepare_attn( self, state, positions: torch.Tensor, hidden_states: torch.Tensor, forward_batch: ForwardBatch, residual: Optional[torch.Tensor], zero_allocator: BumpAllocator, tbo_subbatch_index: Optional[int] = None, ): state.hidden_states_after_comm_pre_attn, state.residual_after_input_ln = ( self.layer_communicator.prepare_attn(hidden_states, residual, forward_batch) ) state.update( dict( forward_batch=forward_batch, positions=positions, zero_allocator=zero_allocator, tbo_subbatch_index=tbo_subbatch_index, ) ) def op_comm_prepare_mlp(self, state): state.hidden_states_mlp_input, state.residual_after_comm_pre_mlp = ( self.layer_communicator.prepare_mlp( state.pop("hidden_states_after_attn"), state.pop("residual_after_input_ln"), state.forward_batch, ) ) def op_mlp(self, state): hidden_states = state.pop("hidden_states_mlp_input") if not ( enable_moe_dense_fully_dp() and (not self.is_layer_sparse) and hidden_states.shape[0] == 0 ): state.hidden_states_mlp_output = self.mlp( hidden_states, state.forward_batch ) else: state.hidden_states_mlp_output = hidden_states def op_comm_postprocess_layer(self, state): hidden_states, residual = self.layer_communicator.postprocess_layer( state.pop("hidden_states_mlp_output"), state.pop("residual_after_comm_pre_mlp"), state.forward_batch, ) output = dict( positions=state.positions, hidden_states=hidden_states, residual=residual, forward_batch=state.forward_batch, zero_allocator=state.zero_allocator, tbo_subbatch_index=state.tbo_subbatch_index, ) state.clear( expect_keys={ "positions", "forward_batch", "zero_allocator", "tbo_subbatch_index", } ) return output class DeepseekV2Model(nn.Module): fall_back_to_pt_during_load = False def __init__( self, config: PretrainedConfig, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", ) -> None: super().__init__() self.padding_id = config.pad_token_id self.vocab_size = config.vocab_size self.first_k_dense_replace = config.first_k_dense_replace self.pp_group = get_pp_group() if self.pp_group.is_first_rank: self.embed_tokens = VocabParallelEmbedding( config.vocab_size, config.hidden_size, enable_tp=not is_dp_attention_enabled(), ) else: self.embed_tokens = PPMissingLayer() self.alt_stream = torch.cuda.Stream() if _is_cuda else None self.layers, self.start_layer, self.end_layer = make_layers( config.num_hidden_layers, lambda idx, prefix: DeepseekV2DecoderLayer( config=config, layer_id=idx, quant_config=quant_config, prefix=prefix, alt_stream=self.alt_stream, ), pp_rank=self.pp_group.rank_in_group, pp_size=self.pp_group.world_size, prefix=add_prefix("layers", prefix), offloader_kwargs=dict( submodule_accessor=lambda layer: ( layer.mlp.experts if isinstance(layer.mlp, DeepseekV2MoE) else layer.mlp ), whitelist_param_names_creator=lambda module: ( [ "w13_weight", "w2_weight", # only for nvfp4 *( [ "w13_blockscale_swizzled", "w2_blockscale_swizzled", ] if hasattr(module, "w13_blockscale_swizzled") else [] ), ] if isinstance(module, FusedMoE) else [] ), ), ) if self.pp_group.is_last_rank: self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) else: self.norm = PPMissingLayer(return_tuple=True) self.gemm_output_zero_allocator_size = 0 if ( _use_aiter_gfx95 and config.n_routed_experts == 256 and self.embed_tokens.embedding_dim == 7168 ): num_moe_layers = sum( [ 1 for i in range(len(self.layers)) if isinstance(self.layers[i].mlp, DeepseekV2MoE) ] ) allocate_size = 0 for i in range(len(self.layers)): if isinstance(self.layers[i].mlp, DeepseekV2MoE): allocate_size = self.layers[ i ].mlp.shared_experts.gate_up_proj.output_size_per_partition break self.gemm_output_zero_allocator_size = ( get_dsv3_gemm_output_zero_allocator_size( config.n_routed_experts, num_moe_layers, allocate_size, self.embed_tokens.embedding_dim, ) ) def get_input_embeddings(self) -> torch.Tensor: return self.embed_tokens def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, forward_batch: ForwardBatch, input_embeds: torch.Tensor = None, pp_proxy_tensors: Optional[PPProxyTensors] = None, ) -> Union[torch.Tensor, PPProxyTensors]: total_num_layers = self.end_layer - self.start_layer device = input_embeds.device if input_embeds is not None else input_ids.device zero_allocator = BumpAllocator( buffer_size=total_num_layers * 2 * (2 if forward_batch.can_run_tbo else 1), dtype=torch.float32, device=device, ) has_gemm_output_zero_allocator = hasattr( self, "gemm_output_zero_allocator_size" ) gemm_output_zero_allocator = ( BumpAllocator( buffer_size=self.gemm_output_zero_allocator_size, dtype=torch.float32, device=device, ) if has_gemm_output_zero_allocator and self.gemm_output_zero_allocator_size > 0 else None ) if self.pp_group.is_first_rank: if input_embeds is None: hidden_states = self.embed_tokens(input_ids) else: hidden_states = input_embeds residual = None else: assert pp_proxy_tensors is not None hidden_states = pp_proxy_tensors["hidden_states"] residual = pp_proxy_tensors["residual"] normal_start_layer = self.start_layer normal_end_layer = self.end_layer if forward_batch.can_run_tbo: if ( self.first_k_dense_replace > normal_start_layer and self.first_k_dense_replace < normal_end_layer ): normal_end_layer = self.first_k_dense_replace elif self.first_k_dense_replace < normal_start_layer: normal_end_layer = normal_start_layer = 0 for i in range(normal_start_layer, normal_end_layer): with get_global_expert_distribution_recorder().with_current_layer(i): layer = self.layers[i] hidden_states, residual = layer( positions, hidden_states, forward_batch, residual, zero_allocator, gemm_output_zero_allocator, ) if normal_end_layer != self.end_layer: hidden_states, residual = model_forward_maybe_tbo( layers=self.layers[normal_end_layer : self.end_layer], enable_tbo=True, positions=positions, forward_batch=forward_batch, hidden_states=hidden_states, residual=residual, input_data_scatter_mode=self.layers[ normal_end_layer - 1 ].layer_scatter_modes.layer_output_mode, zero_allocator=zero_allocator, ) if not self.pp_group.is_last_rank: return PPProxyTensors( { "hidden_states": hidden_states, "residual": residual, } ) else: if not forward_batch.forward_mode.is_idle(): if residual is None: hidden_states = self.norm(hidden_states) else: hidden_states, _ = self.norm(hidden_states, residual) return hidden_states class DeepseekV2ForCausalLM(nn.Module): # for quark model load packed_modules_mapping = {} def __init__( self, config: PretrainedConfig, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", ) -> None: super().__init__() # for quark model load # Fuse q_a_proj and kv_a_proj_with_mqa along output dimension when q_lora_rank is not None self.fuse_qkv_a_proj = ( hasattr(config, "q_lora_rank") and config.q_lora_rank is not None ) if self.fuse_qkv_a_proj: self.packed_modules_mapping["fused_qkv_a_proj_with_mqa"] = [ "q_a_proj", "kv_a_proj_with_mqa", ] self.pp_group = get_pp_group() self.config = config self.tp_size = get_tensor_model_parallel_world_size() self.quant_config = quant_config self.determine_num_fused_shared_experts() self.model = DeepseekV2Model( config, quant_config, prefix=add_prefix("model", prefix) ) self.lm_head = ParallelLMHead( config.vocab_size, config.hidden_size, quant_config=quant_config, prefix=add_prefix("lm_head", prefix), use_attn_tp_group=global_server_args_dict["enable_dp_lm_head"], ) self.logits_processor = LogitsProcessor(config) self._routed_experts_weights_of_layer = LazyValue( lambda: { layer_id: layer.mlp.get_moe_weights() for layer_id, layer in enumerate(self.model.layers) if isinstance(layer.mlp, DeepseekV2MoE) } ) @property def routed_experts_weights_of_layer(self): return self._routed_experts_weights_of_layer.value def determine_num_fused_shared_experts( self, architecture: str = "DeepseekV3ForCausalLM" ): self.num_fused_shared_experts = 0 if global_server_args_dict["disable_shared_experts_fusion"]: return # Only Deepseek V3/R1 can use shared experts fusion optimization now. disable_reason = None if ( not _is_cuda or torch.cuda.get_device_capability("cuda") < (8, 0) or self.config.architectures[0] != architecture or self.config.n_routed_experts != 256 or self.config.n_shared_experts != 1 ): disable_reason = "Only Deepseek V3/R1 on NV-platform with capability >= 80 can use shared experts fusion optimization." elif get_moe_expert_parallel_world_size() > 1: disable_reason = "Deepseek V3/R1 can not use shared experts fusion optimization under expert parallelism." elif self.quant_config.get_name() == "w4afp8": disable_reason = "Deepseek V3/R1 W4AFP8 model uses different quant method for routed experts and shared experts." if disable_reason is not None: global_server_args_dict["disable_shared_experts_fusion"] = True self.num_fused_shared_experts = 0 log_info_on_rank0( logger, f"{disable_reason} Shared experts fusion optimization is disabled.", ) return self.num_fused_shared_experts = self.config.n_shared_experts def get_input_embeddings(self) -> nn.Embedding: return self.model.embed_tokens @torch.no_grad() def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, forward_batch: ForwardBatch, input_embeds: torch.Tensor = None, pp_proxy_tensors: Optional[PPProxyTensors] = None, ) -> torch.Tensor: hidden_states = self.model( input_ids, positions, forward_batch, input_embeds, pp_proxy_tensors ) if self.pp_group.is_last_rank: return self.logits_processor( input_ids, hidden_states, self.lm_head, forward_batch ) else: return hidden_states @property def start_layer(self): return self.model.start_layer @property def end_layer(self): return self.model.end_layer def post_load_weights(self, is_nextn=False, weight_names=None): # Perform post-processing after loading weights if is_nextn: layer_ids = [self.config.num_hidden_layers] else: if weight_names is None: layer_ids = range(self.model.start_layer, self.model.end_layer) else: layer_ids = set() for name in weight_names: if "kv_b_proj" in name: layer_id = int(name.split(".")[2]) if layer_id < self.config.num_hidden_layers: layer_ids.add(layer_id) for layer_id in layer_ids: self_attn = ( self.model.layers[layer_id].self_attn if not is_nextn else self.model.decoder.self_attn ) if hasattr(self_attn.kv_b_proj, "qweight"): # AWQ compatible if _is_cuda or _is_hip: w = awq_dequantize( self_attn.kv_b_proj.qweight, self_attn.kv_b_proj.scales, self_attn.kv_b_proj.qzeros, ).T else: w = awq_dequantize( self_attn.kv_b_proj.qweight, self_attn.kv_b_proj.scales, self_attn.kv_b_proj.qzeros, 0, 0, 0, ).T else: w = self_attn.kv_b_proj.weight # NOTE(HandH1998): Since `bmm_fp8` only supports per-tensor scale, we have to requantize `self_attn.kv_b_proj`. # This may affect the accuracy of fp8 model. # Fix deepseek v3 blockwise bmm by using deep_gemm use_deep_gemm_bmm = False if w.dtype in ( torch.float8_e4m3fn, torch.float8_e4m3fnuz, ): if ( hasattr(self.quant_config, "weight_block_size") and self.quant_config.weight_block_size is not None ): weight_block_size = self.quant_config.weight_block_size assert hasattr(self_attn.kv_b_proj, "weight_scale_inv") if _is_fp8_fnuz: weight, weight_scale, _ = normalize_e4m3fn_to_e4m3fnuz( weight=w, weight_scale=self_attn.kv_b_proj.weight_scale_inv, input_scale=None, ) else: weight = w weight_scale = self_attn.kv_b_proj.weight_scale_inv if ( _is_cuda and weight_block_size[0] == 128 and weight_block_size[1] == 128 ): if ( deep_gemm_wrapper.ENABLE_JIT_DEEPGEMM and not deep_gemm_wrapper.DEEPGEMM_BLACKWELL and get_bool_env_var("SGL_USE_DEEPGEMM_BMM", "false") ): block_scale = weight_scale use_deep_gemm_bmm = True else: w = block_quant_dequant( weight, weight_scale, weight_block_size, torch.bfloat16, ) else: w, scale = block_quant_to_tensor_quant( weight, weight_scale, weight_block_size ) self_attn.w_scale = scale else: if _is_fp8_fnuz: weight, weight_scale, _ = normalize_e4m3fn_to_e4m3fnuz( weight=w, weight_scale=self_attn.kv_b_proj.weight_scale, input_scale=None, ) else: weight = w weight_scale = self_attn.kv_b_proj.weight_scale w, scale = channel_quant_to_tensor_quant(weight, weight_scale) self_attn.w_scale = scale if w.dtype == torch.int8: if hasattr(self.quant_config, "weight_block_size"): # block-wise int8 need it weight_block_size = self.quant_config.weight_block_size if weight_block_size is not None: assert hasattr(self_attn.kv_b_proj, "weight_scale_inv") weight = w weight_scale = self_attn.kv_b_proj.weight_scale_inv w = int8_block_dequant( weight, weight_scale, weight_block_size ).to(torch.bfloat16) else: # channel-wise int8 need it w = w.to(torch.bfloat16) * self_attn.kv_b_proj.weight_scale.to( torch.bfloat16 ) w_kc, w_vc = w.unflatten( 0, (-1, self_attn.qk_nope_head_dim + self_attn.v_head_dim) ).split([self_attn.qk_nope_head_dim, self_attn.v_head_dim], dim=1) if ( _use_aiter_gfx95 and self.quant_config is not None and self.quant_config.get_name() == "quark" ): w_kc, self_attn.w_scale_k, w_vc, self_attn.w_scale_v = ( quark_post_load_weights(self_attn, w, "mxfp4") ) if not use_deep_gemm_bmm: self_attn.w_kc = bind_or_assign( self_attn.w_kc, w_kc.transpose(1, 2).contiguous().transpose(1, 2) ) self_attn.w_vc = bind_or_assign( self_attn.w_vc, w_vc.contiguous().transpose(1, 2) ) if ( hasattr(self_attn.kv_b_proj, "weight_scale") and self_attn.w_scale is None ): self_attn.w_scale = bind_or_assign( self_attn.w_scale, self_attn.kv_b_proj.weight_scale ) if _is_hip: self_attn.w_scale *= 2.0 # TODO: remove this after adding FP8 support in bmm cpu kernel if _is_cpu and _is_cpu_amx_available and w.dtype == torch.float8_e4m3fn: self_attn.w_kc = ( self_attn.w_kc.to(torch.bfloat16) * self_attn.w_scale ) self_attn.w_vc = ( self_attn.w_vc.to(torch.bfloat16) * self_attn.w_scale ) else: num_tiles_k = self_attn.qk_nope_head_dim // weight_block_size[1] num_tiles_n = self_attn.v_head_dim // weight_block_size[0] ws_kc, ws_vc = block_scale.unflatten( 0, (-1, (num_tiles_k + num_tiles_n)) ).split([num_tiles_k, num_tiles_n], dim=1) self_attn.w_scale_k = bind_or_assign( self_attn.w_scale_k, ws_kc.transpose(1, 2).contiguous() ) self_attn.w_scale_v = bind_or_assign( self_attn.w_scale_v, ws_vc.contiguous() ) self_attn.w_kc = bind_or_assign( self_attn.w_kc, w_kc.transpose(1, 2).contiguous() ) self_attn.w_vc = bind_or_assign(self_attn.w_vc, w_vc.contiguous()) self_attn.use_deep_gemm_bmm = True if ( deep_gemm_wrapper.ENABLE_JIT_DEEPGEMM and deep_gemm_wrapper.DEEPGEMM_SCALE_UE8M0 and hasattr(self.quant_config, "weight_block_size") and self.quant_config.weight_block_size is not None ): self._weight_requant_ue8m0(is_nextn) def _weight_requant_ue8m0(self, is_nextn=False): weight_block_size = self.quant_config.weight_block_size moe_layers = list( range( self.config.first_k_dense_replace, self.config.num_hidden_layers, self.config.moe_layer_freq, ) ) num_hidden_layers = 1 if is_nextn else self.config.num_hidden_layers for layer_id in range(num_hidden_layers): if is_nextn: layer = self.model.decoder else: layer = self.model.layers[layer_id] module_list = [ layer.self_attn.kv_b_proj, layer.self_attn.o_proj, ] if self.config.q_lora_rank is not None: module_list.append(layer.self_attn.fused_qkv_a_proj_with_mqa) module_list.append(layer.self_attn.q_b_proj) else: module_list.append(layer.self_attn.kv_a_proj_with_mqa) module_list.append(layer.self_attn.q_proj) for module in module_list: requant_weight_ue8m0_inplace( module.weight, module.weight_scale_inv, weight_block_size ) if layer_id in moe_layers or is_nextn: shared_experts = getattr(layer.mlp, "shared_experts", None) if shared_experts is not None: for module in [ shared_experts.gate_up_proj, shared_experts.down_proj, ]: requant_weight_ue8m0_inplace( module.weight, module.weight_scale_inv, weight_block_size ) experts = layer.mlp.experts if isinstance(experts, DeepEPMoE): for w in [ experts.w13_weight_fp8, experts.w2_weight_fp8, ]: requant_weight_ue8m0_inplace(w[0], w[1], weight_block_size) else: mlp = layer.mlp assert isinstance(mlp, DeepseekV2MLP) for module in [ mlp.gate_up_proj, mlp.down_proj, ]: requant_weight_ue8m0_inplace( module.weight, module.weight_scale_inv, weight_block_size ) def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]], is_nextn=False): if is_nextn: if hasattr(self.config, "num_nextn_predict_layers"): num_nextn_layers = self.config.num_nextn_predict_layers assert num_nextn_layers == 1, "Only 1 nextn layer is supported" # compatible with old design nextn_layer_id = ( 0 if self.config.num_hidden_layers == 1 else self.config.num_hidden_layers ) else: raise ValueError("num_nextn_predict_layers is not in the config") stacked_params_mapping = [ # (param_name, shard_name, shard_id) ("gate_up_proj", "gate_proj", 0), ("gate_up_proj", "up_proj", 1), ] # Params for weights, fp8 weight scales, fp8 activation scales # (param_name, weight_name, expert_id, shard_id) expert_params_mapping = FusedMoE.make_expert_params_mapping( ckpt_gate_proj_name="gate_proj", ckpt_down_proj_name="down_proj", ckpt_up_proj_name="up_proj", num_experts=self.config.n_routed_experts + self.num_fused_shared_experts, ) # Params for special naming rules in mixed-precision models, for example: # model.layers.xx.mlp.experts.xx.w1.input_scale. For details, # see https://huggingface.co/Barrrrry/DeepSeek-R1-W4AFP8/blob/main. if self.quant_config and self.quant_config.get_name() == "w4afp8": expert_params_mapping += FusedMoE.make_expert_input_scale_params_mapping( num_experts=self.config.n_routed_experts ) # Fuse q_a_proj and kv_a_proj_with_mqa along output dimension when q_lora_rank is not None fuse_qkv_a_proj = hasattr(self.config, "q_lora_rank") and ( self.config.q_lora_rank is not None ) cached_a_proj = {} if fuse_qkv_a_proj else None if is_nextn: nextn_layer_prefix = f"model.layers.{nextn_layer_id}" nextn_spec_weight_names = [ "shared_head.norm", "eh_proj", "enorm", "hnorm", ] if self.num_fused_shared_experts > 0: assert self.num_fused_shared_experts == 1 log_info_on_rank0(logger, "Shared experts fusion optimization enabled.") with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] params_dict = dict(self.named_parameters()) weight_names = [] for name, loaded_weight in weights: layer_id = get_layer_id(name) if ( layer_id is not None and hasattr(self.model, "start_layer") and ( layer_id < self.model.start_layer or layer_id >= self.model.end_layer ) ): continue if self.num_fused_shared_experts > 0 and "mlp.shared_experts" in name: name = name.replace( "mlp.shared_experts", f"mlp.experts.{self.config.n_routed_experts}", ) weight_names.append(name) if not is_nextn: if hasattr(self.config, "num_nextn_predict_layers"): num_nextn_layers = self.config.num_nextn_predict_layers if num_nextn_layers > 0 and name.startswith("model.layers"): name_list = name.split(".") if ( len(name_list) >= 3 and int(name_list[2]) >= self.config.num_hidden_layers ): continue else: if not name.startswith(nextn_layer_prefix): continue # Use shared head and embed weights from target model if "shared_head.head" in name or "embed_tokens" in name: continue is_decoder = True # For nextn specific weights for weight_name in nextn_spec_weight_names: if weight_name in name: name = name.replace(nextn_layer_prefix, "model") is_decoder = False break # For decoder layer weights if is_decoder: name = name.replace(nextn_layer_prefix, "model.decoder") if "rotary_emb.inv_freq" in name: continue for param_name, weight_name, shard_id in stacked_params_mapping: # Skip non-stacked layers and experts (experts handled below). if weight_name not in name: continue # We have mlp.experts[0].gate_proj in the checkpoint. # Since we handle the experts below in expert_params_mapping, # we need to skip here BEFORE we update the name, otherwise # name will be updated to mlp.experts[0].gate_up_proj, which # will then be updated below in expert_params_mapping # for mlp.experts[0].gate_gate_up_proj, which breaks load. if ("mlp.experts." in name) and name not in params_dict: continue name = name.replace(weight_name, param_name) # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue param = params_dict[name] weight_loader = param.weight_loader futures.append( executor.submit(weight_loader, param, loaded_weight, shard_id) ) break else: for mapping in expert_params_mapping: param_name, weight_name, expert_id, shard_id = mapping if weight_name not in name: continue name = name.replace(weight_name, param_name) param = params_dict[name] weight_loader = param.weight_loader futures.append( executor.submit( weight_loader, param, loaded_weight, name, shard_id=shard_id, expert_id=expert_id, ) ) break else: # Skip loading extra bias for GPTQ models. if name.endswith(".bias") and name not in params_dict: continue # Skip loading embed_tokens if not first rank in pipeline parallelism if ".embed_tokens." in name and not self.pp_group.is_first_rank: continue # Skip loading norm if not last rank in pipeline parallelism if ".norm." in name and not self.pp_group.is_last_rank: continue if fuse_qkv_a_proj and ( "q_a_proj" in name or "kv_a_proj_with_mqa" in name ): cached_a_proj[name] = loaded_weight q_a_proj_name = ( name if "q_a_proj" in name else name.replace("kv_a_proj_with_mqa", "q_a_proj") ) kv_a_proj_name = ( name if "kv_a_proj_with_mqa" in name else name.replace("q_a_proj", "kv_a_proj_with_mqa") ) # When both q_a_proj and kv_a_proj_with_mqa has been cached, load the fused weight to parameter if ( q_a_proj_name in cached_a_proj and kv_a_proj_name in cached_a_proj ): q_a_proj_weight = cached_a_proj[q_a_proj_name] kv_a_proj_weight = cached_a_proj[kv_a_proj_name] cat_dim = 0 if self.quant_config is not None and ( self.quant_config.get_name() == "awq" or self.quant_config.get_name() == "awq_marlin" or self.quant_config.get_name() == "moe_wna16" ): cat_dim = 1 fused_weight = torch.cat( [q_a_proj_weight, kv_a_proj_weight], dim=cat_dim ) param_name = ( name.replace( "q_a_proj", "fused_qkv_a_proj_with_mqa" ) if "q_a_proj" in name else name.replace( "kv_a_proj_with_mqa", "fused_qkv_a_proj_with_mqa", ) ) param = params_dict[param_name] weight_loader = getattr( param, "weight_loader", default_weight_loader ) futures.append( executor.submit(weight_loader, param, fused_weight) ) cached_a_proj.pop(q_a_proj_name) cached_a_proj.pop(kv_a_proj_name) else: if ( "k_scale" in name or "v_scale" in name ) and name not in params_dict: # modelopt attn kv scale is named differently for scale in ["k_scale", "v_scale"]: if scale in name: name = name.replace( f"{scale[0]}_proj", "attn_mqa" ) break if name not in params_dict: # modelopt ckpt contains not needed weights for MTP module: # model.decoder.self_attn.attn_mqa.v_scale and # model.decoder.self_attn.attn_mqa.k_scale logger.warning(f"{name} not found in params_dict.") continue param = params_dict[name] weight_loader = getattr( param, "weight_loader", default_weight_loader ) futures.append( executor.submit(weight_loader, param, loaded_weight) ) # Wait for all tasks to complete and raise any exceptions. for future in concurrent.futures.as_completed(futures): future.result() self.post_load_weights(is_nextn=is_nextn, weight_names=weight_names) def get_embed_and_head(self): return self.model.embed_tokens.weight, self.lm_head.weight def set_embed_and_head(self, embed, head): del self.model.embed_tokens.weight del self.lm_head.weight self.model.embed_tokens.weight = embed self.lm_head.weight = head torch.cuda.empty_cache() torch.cuda.synchronize() @classmethod def get_model_config_for_expert_location(cls, config): return ModelConfigForExpertLocation( num_layers=config.num_hidden_layers, num_logical_experts=config.n_routed_experts, num_groups=config.n_group, ) BackendRegistry.register("ascend", handle_ascend) BackendRegistry.register("flashinfer", handle_flashinfer) BackendRegistry.register("fa3", handle_fa3) BackendRegistry.register("flashmla", handle_flashmla) BackendRegistry.register("cutlass_mla", handle_cutlass_mla) BackendRegistry.register("fa4", handle_fa4) BackendRegistry.register("trtllm_mla", handle_trtllm_mla) BackendRegistry.register("aiter", handle_aiter) BackendRegistry.register("triton", handle_triton) class DeepseekV3ForCausalLM(DeepseekV2ForCausalLM): pass EntryClass = [DeepseekV2ForCausalLM, DeepseekV3ForCausalLM]