Unverified Commit 6ce9dbe8 authored by Chaitanya Sri Krishna Lolla's avatar Chaitanya Sri Krishna Lolla Committed by GitHub
Browse files

[ROCm] Enable Fused MLA Triton kernel for DeepSeekV3 (#3237)


Co-authored-by: default avatarHAI <hixiao@gmail.com>
parent 3758d209
# Copyright 2023-2024 SGLang Team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
Memory-efficient attention for decoding.
It supports page size = 1.
"""
# Adapted from
# https://github.com/ModelTC/lightllm/blob/96353e868a840db4d103138caf15ed9dbea8c186/lightllm/models/deepseek2/triton_kernel/gqa_flash_decoding_stage1.py
# https://github.com/ModelTC/lightllm/blob/96353e868a840db4d103138caf15ed9dbea8c186/lightllm/models/deepseek2/triton_kernel/gqa_flash_decoding_stage2.py
import argparse
import logging
import sys
import pytest
import torch
import triton
import triton.language as tl
from sglang.srt.layers.attention.triton_ops.decode_attention import (
_decode_softmax_reducev_fwd,
)
from sglang.srt.layers.rotary_embedding import DeepseekScalingRotaryEmbedding
def is_hip():
return triton.runtime.driver.active.get_current_target().backend == "hip"
is_hip_ = is_hip()
@triton.jit
def tanh(x):
# Tanh is just a scaled sigmoid
return 2 * tl.sigmoid(2 * x) - 1
@triton.jit
def _fwd_grouped_kernel_stage1_rope(
Q, # Holds [Q_NOPE; Q_PE], b x h x (d+r)
K_Buffer, # Holds [KV; K_PE], b*s x (c+r)
V_buffer, # Holds [KV], b*s x (c)
cos_sin_cache, # max_seq_len x (rotary_dim * 2)
positions, # sequence positions
sm_scale,
kv_indptr,
kv_indices,
Att_Out, # b x h x NUM_KV_SPLITS x (kv_lora_rank + 1)
k_pe_t_out,
stride_qb,
stride_qh,
stride_buf_kbs,
stride_buf_vbs,
stride_mid_ob,
stride_mid_oh,
stride_mid_os,
stride_kpe_tokens_out_b,
stride_cos_sin_cache_s,
stride_positions_b,
rotary_dim: tl.constexpr,
kv_lora_rank: tl.constexpr,
qk_rope_head_dim: tl.constexpr,
kv_group_num: tl.constexpr,
q_head_num: tl.constexpr,
BLOCK_C: tl.constexpr,
BLOCK_R: tl.constexpr,
BLOCK_N: tl.constexpr,
BLOCK_H: tl.constexpr,
NUM_KV_SPLITS: tl.constexpr,
logit_cap: tl.constexpr,
USE_ROPE: tl.constexpr,
IS_NEOX_STYLE: tl.constexpr,
):
cur_batch = tl.program_id(0)
cur_head_id = tl.program_id(1)
split_kv_id = tl.program_id(2)
if BLOCK_H < kv_group_num:
VALID_BLOCK_H: tl.constexpr = BLOCK_H
else:
VALID_BLOCK_H: tl.constexpr = kv_group_num
cur_head = cur_head_id * VALID_BLOCK_H + tl.arange(0, BLOCK_H)
mask_h = cur_head < (cur_head_id + 1) * VALID_BLOCK_H
mask_h = mask_h & (cur_head < q_head_num)
offs_c = tl.arange(0, BLOCK_C)
offs_qk_r = tl.arange(kv_lora_rank, kv_lora_rank + BLOCK_R) # to get the k_pe
off_q_pe = (
cur_batch * stride_qb + cur_head[:, None] * stride_qh + offs_qk_r[None, :]
)
offs_q = cur_batch * stride_qb + cur_head[:, None] * stride_qh + offs_c[None, :]
mask_c = offs_c < kv_lora_rank
mask_qk_r = offs_qk_r < (kv_lora_rank + qk_rope_head_dim)
cur_batch_kv_start_idx = tl.load(kv_indptr + cur_batch)
cur_batch_seq_len = tl.load(kv_indptr + cur_batch + 1) - cur_batch_kv_start_idx
q = tl.load(Q + offs_q, mask=(mask_h[:, None]) & (mask_c[None, :]), other=0.0)
q_pe = tl.load(
Q + off_q_pe, mask=(mask_h[:, None]) & (mask_qk_r[None, :]), other=0.0
)
kv_len_per_split = tl.cdiv(cur_batch_seq_len, NUM_KV_SPLITS)
split_kv_start = kv_len_per_split * split_kv_id
split_kv_end = tl.minimum(split_kv_start + kv_len_per_split, cur_batch_seq_len)
# apply rotary embedding for q_pe, and k_pe (last token per batch of K_PE)
LAST_SPLIT = split_kv_end == cur_batch_seq_len
k_pe_last_token = tl.zeros([BLOCK_R], dtype=q.dtype)
if USE_ROPE:
if IS_NEOX_STYLE:
# [BLOCK_ROTARY // 2, BLOCK_ROTARY // 2 + 1, BLOCK_ROTARY // 2 + 2, ..., 0, 1, 2, ..., BLOCK_ROTARY // 2 - 1, pass:]
offs_qk_rot_r = kv_lora_rank + (
(tl.arange(0, BLOCK_R) + (rotary_dim // 2)) % rotary_dim
)
# Which elements to flip
mask_rotate = tl.arange(0, BLOCK_R) < (rotary_dim // 2)
# [0 , 1, 2, ..., rotary_dim // 2 - 1, 0 , 1, 2, ..., rotary_dim // 2 - 1]
offs_rotary = tl.arange(0, BLOCK_R) % (rotary_dim // 2)
else:
# [1, 0, 3, 2, 5, 4, ..., BLOCK_R, BLOCK_R - 1]
offs_qk_rot_r = (
kv_lora_rank
+ (((tl.arange(0, BLOCK_R) + 1) % 2) * 2)
- 1
+ tl.arange(0, BLOCK_R)
)
mask_rotate = tl.arange(0, BLOCK_R) % 2 < 1
# [0, 0, 1, 1, ..., rotary_dim // 2 - 1, rotary_dim // 2 - 1]
offs_rotary = tl.arange(0, BLOCK_R) // 2
if qk_rope_head_dim > rotary_dim:
offs_qk_rot_r = tl.where(
tl.arange(0, BLOCK_R) < rotary_dim, offs_qk_rot_r, tl.arange(0, BLOCK_R)
)
offs_rotary = tl.where(
tl.arange(0, BLOCK_R) < rotary_dim, offs_rotary, tl.arange(0, BLOCK_R)
)
mask_rotary = tl.arange(0, BLOCK_R) < rotary_dim
pos = tl.load(positions + cur_batch * stride_positions_b)
cos = tl.load(
cos_sin_cache + pos * stride_cos_sin_cache_s + offs_rotary,
mask=mask_rotary,
other=1.0,
)
sin = tl.load(
cos_sin_cache
+ pos * stride_cos_sin_cache_s
+ offs_rotary
+ rotary_dim // 2,
mask_rotary,
other=0.0,
)
off_q_pe_rot = (
cur_batch * stride_qb
+ cur_head[:, None] * stride_qh
+ offs_qk_rot_r[None, :]
)
mask_qk_rot_r = offs_qk_rot_r < (kv_lora_rank + qk_rope_head_dim)
# 0, 2, 4,.... 1, 3, 5...
q_pe_rot = tl.load(
Q + off_q_pe_rot,
mask=(mask_h[:, None]) & (mask_qk_rot_r[None, :]),
other=0.0,
)
q_pe_rot = tl.where(mask_rotate[None, :], -q_pe_rot, q_pe_rot)
q_pe = q_pe * cos + q_pe_rot * sin
# we only apply to the last token in the K_PE
if LAST_SPLIT:
# debug assert
if (cur_batch == 0 and cur_head == 0) and split_kv_id < NUM_KV_SPLITS - 1:
tl.device_assert(False, "Only last split should compute k_pe")
kv_loc = tl.load(
kv_indices + cur_batch_kv_start_idx + cur_batch_seq_len - 1
)
offs_buf_k_pe_last_token = kv_loc * stride_buf_kbs + offs_qk_r
offs_buf_k_pe_rot_last_token = kv_loc * stride_buf_kbs + offs_qk_rot_r
k_pe_last_token = tl.load(K_Buffer + offs_buf_k_pe_last_token)
k_pe_rot_last_token = tl.load(K_Buffer + offs_buf_k_pe_rot_last_token)
k_pe_rot_last_token = tl.where(
mask_rotate, -k_pe_rot_last_token, k_pe_rot_last_token
)
k_pe_last_token = k_pe_last_token * cos + k_pe_rot_last_token * sin
e_max = tl.zeros([BLOCK_H], dtype=tl.float32) - float("inf")
e_sum = tl.zeros([BLOCK_H], dtype=tl.float32)
acc = tl.zeros([BLOCK_H, BLOCK_C], dtype=tl.float32)
if split_kv_end > split_kv_start:
for start_n in range(split_kv_start, split_kv_end, BLOCK_N):
offs_n = start_n + tl.arange(0, BLOCK_N)
kv_loc = tl.load(
kv_indices + cur_batch_kv_start_idx + offs_n,
mask=offs_n < split_kv_end,
other=0,
)
offs_buf_kv = kv_loc[None, :] * stride_buf_kbs + offs_c[:, None]
offs_buf_k_pe = kv_loc[None, :] * stride_buf_kbs + offs_qk_r[:, None]
k_pe = tl.load(
K_Buffer + offs_buf_k_pe,
mask=(offs_n[None, :] < split_kv_end) & (mask_qk_r[:, None]),
other=0.0,
) # positional embedding part of keys
if USE_ROPE and start_n >= cur_batch_seq_len - BLOCK_N:
k_pe = tl.where(
offs_n[None, :] != (split_kv_end - 1),
k_pe,
k_pe_last_token[:, None],
)
# (16, 64) x (64, 32)
# dot product of rope parts
qk = tl.dot(q_pe, k_pe.to(q_pe.dtype))
kv = tl.load(
K_Buffer + offs_buf_kv,
mask=(offs_n[None, :] < split_kv_end) & (mask_c[:, None]),
other=0.0,
) # the shared latent tensor for keys and values
# (16, 512) x (512, 32)
# dot product of nope parts
qk += tl.dot(q, kv)
qk *= sm_scale
if logit_cap > 0:
qk = logit_cap * tanh(qk / logit_cap)
qk = tl.where(
mask_h[:, None] & (offs_n[None, :] < split_kv_end), qk, float("-inf")
)
offs_buf_v = kv_loc[:, None] * stride_buf_vbs + offs_c[None, :]
v = tl.load(
V_buffer + offs_buf_v,
mask=(offs_n[:, None] < split_kv_end) & (mask_c[None, :]),
other=0.0,
)
n_e_max = tl.maximum(tl.max(qk, 1), e_max)
re_scale = tl.exp(e_max - n_e_max)
p = tl.exp(qk - n_e_max[:, None])
acc *= re_scale[:, None]
# (16, 32) x (32, 512)
acc += tl.dot(p.to(v.dtype), v)
e_sum = e_sum * re_scale + tl.sum(p, 1)
e_max = n_e_max
offs_mid_o = (
cur_batch * stride_mid_ob
+ cur_head[:, None] * stride_mid_oh
+ split_kv_id * stride_mid_os
+ offs_c[None, :]
)
if USE_ROPE:
if LAST_SPLIT:
k_pe_last_token_ptrs = (
k_pe_t_out
+ cur_batch * stride_kpe_tokens_out_b
+ tl.arange(0, BLOCK_R)
)
tl.store(k_pe_last_token_ptrs, k_pe_last_token, mask=mask_qk_r)
tl.store(
Att_Out + offs_mid_o,
acc / e_sum[:, None],
mask=(mask_h[:, None]) & (mask_c[None, :]),
)
offs_mid_o_1 = (
cur_batch * stride_mid_ob
+ cur_head * stride_mid_oh
+ split_kv_id * stride_mid_os
+ kv_lora_rank
)
tl.store(
Att_Out + offs_mid_o_1,
e_max + tl.log(e_sum),
mask=mask_h,
)
# TODO rope offset
def _decode_grouped_att_m_fwd_rope(
q,
k_buffer,
v_buffer,
att_out,
k_pe_tokens_out,
kv_lora_rank, # c
cos_sin_cache,
positions,
rotary_dim,
kv_indptr,
kv_indices,
num_kv_splits,
sm_scale,
logit_cap,
use_rope,
is_neox_style=True,
):
if use_rope:
assert (
k_pe_tokens_out is not None
), "We must output the k_pe tokens with rope applied if rope fusion enabled."
BLOCK = 32
# # [TODO] work around shmem limit on MI3xx
# if is_hip_ and kv_lora_rank >= 576:
# BLOCK = 16
qk_rope_head_dim = k_buffer.shape[-1] - kv_lora_rank
batch, head_num = kv_indptr.shape[0] - 1, q.shape[1]
kv_group_num = q.shape[1] // k_buffer.shape[1]
BLOCK_C = triton.next_power_of_2(kv_lora_rank)
BLOCK_R = triton.next_power_of_2(qk_rope_head_dim)
BLOCK_H = 16
NUM_KV_SPLITS = num_kv_splits
grid = (
batch,
triton.cdiv(head_num, min(BLOCK_H, kv_group_num)),
NUM_KV_SPLITS,
)
extra_kargs = {}
num_stages = 2
if is_hip_:
# https://rocm.docs.amd.com/en/docs-6.2.0/how-to/llm-fine-tuning-optimization/optimizing-triton-kernel.html
# https://github.com/triton-lang/triton/blob/main/third_party/amd/backend/compiler.py
extra_kargs = {"waves_per_eu": 1, "matrix_instr_nonkdim": 16, "kpack": 2}
num_stages = 1
_fwd_grouped_kernel_stage1_rope[grid](
q,
k_buffer,
v_buffer,
cos_sin_cache,
positions,
sm_scale,
kv_indptr,
kv_indices,
att_out,
k_pe_tokens_out,
q.stride(0),
q.stride(1),
k_buffer.stride(0),
v_buffer.stride(0),
att_out.stride(0),
att_out.stride(1),
att_out.stride(2),
k_pe_tokens_out.stride(0) if use_rope else 0,
cos_sin_cache.stride(0) if use_rope else 0,
positions.stride(0) if use_rope else 0,
rotary_dim,
kv_lora_rank,
qk_rope_head_dim,
kv_group_num=kv_group_num,
q_head_num=head_num,
BLOCK_C=BLOCK_C,
BLOCK_R=BLOCK_R,
BLOCK_N=BLOCK,
BLOCK_H=BLOCK_H,
NUM_KV_SPLITS=NUM_KV_SPLITS,
logit_cap=logit_cap,
USE_ROPE=use_rope,
IS_NEOX_STYLE=is_neox_style,
num_warps=4,
num_stages=num_stages,
**extra_kargs
)
def decode_attention_fwd_grouped_rope(
q,
k_buffer,
v_buffer,
o,
kv_indptr,
kv_indices,
k_pe_tokens,
kv_lora_rank,
rotary_dim,
cos_sin_cache,
positions,
attn_logits,
num_kv_splits,
sm_scale,
logit_cap=0.0,
use_rope=False,
is_neox_style=False,
):
_decode_grouped_att_m_fwd_rope(
q,
k_buffer,
v_buffer,
attn_logits,
k_pe_tokens,
kv_lora_rank,
cos_sin_cache,
positions,
rotary_dim,
kv_indptr,
kv_indices,
num_kv_splits,
sm_scale,
logit_cap,
use_rope,
is_neox_style,
)
_decode_softmax_reducev_fwd(attn_logits, q, o, v_buffer, kv_indptr, num_kv_splits)
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# https://github.com/vllm-project/vllm/blob/fb6af8bc086328ca6659e72d11ffd4309ce4de22/vllm/model_executor/models/deepseek_v2.py # https://github.com/vllm-project/vllm/blob/fb6af8bc086328ca6659e72d11ffd4309ce4de22/vllm/model_executor/models/deepseek_v2.py
"""Inference-only DeepseekV2 model.""" """Inference-only DeepseekV2 model."""
import os
from typing import Any, Dict, Iterable, Optional, Tuple from typing import Any, Dict, Iterable, Optional, Tuple
import torch import torch
...@@ -31,6 +32,9 @@ from sglang.srt.distributed import ( ...@@ -31,6 +32,9 @@ from sglang.srt.distributed import (
tensor_model_parallel_all_reduce, tensor_model_parallel_all_reduce,
) )
from sglang.srt.layers.activation import SiluAndMul from sglang.srt.layers.activation import SiluAndMul
from sglang.srt.layers.attention.triton_ops.rocm_mla_decode_rope import (
decode_attention_fwd_grouped_rope,
)
from sglang.srt.layers.layernorm import RMSNorm from sglang.srt.layers.layernorm import RMSNorm
from sglang.srt.layers.linear import ( from sglang.srt.layers.linear import (
ColumnParallelLinear, ColumnParallelLinear,
...@@ -533,7 +537,18 @@ class DeepseekV2AttentionMLA(nn.Module): ...@@ -533,7 +537,18 @@ class DeepseekV2AttentionMLA(nn.Module):
if no_absorb(): if no_absorb():
return self.forward_normal(positions, hidden_states, forward_batch) return self.forward_normal(positions, hidden_states, forward_batch)
else: else:
return self.forward_absorb(positions, hidden_states, forward_batch) if is_hip_:
if (
os.getenv("SGLANG_ROCM_FUSED_DECODE_MLA") == "1"
and forward_batch.forward_mode.is_decode()
):
return self.forward_absorb_fused_mla_rope(
positions, hidden_states, forward_batch
)
else:
return self.forward_absorb(positions, hidden_states, forward_batch)
else:
return self.forward_absorb(positions, hidden_states, forward_batch)
def forward_normal( def forward_normal(
self, self,
...@@ -652,6 +667,149 @@ class DeepseekV2AttentionMLA(nn.Module): ...@@ -652,6 +667,149 @@ class DeepseekV2AttentionMLA(nn.Module):
return output return output
def forward_absorb_fused_mla_rope(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
forward_batch: ForwardBatch,
) -> torch.Tensor:
enable_rope_fusion = (
os.getenv("SGLANG_FUSED_MLA_ENABLE_ROPE_FUSION", "1") == "1"
)
q_len = hidden_states.shape[0]
q_input = hidden_states.new_empty(
q_len, self.num_local_heads, self.kv_lora_rank + self.qk_rope_head_dim
)
if self.q_lora_rank is not None:
q = self.q_a_proj(hidden_states)[0]
q = self.q_a_layernorm(q)
q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim)
else:
q = self.q_proj(hidden_states)[0].view(
-1, self.num_local_heads, self.qk_head_dim
)
q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1)
if self.w_kc.dtype == torch.float8_e4m3fnuz:
# TODO(kernel): add bmm_fp8 for torch.float8_e4m3fnuz
q_nope_out = torch.bmm(
q_nope.to(torch.bfloat16).transpose(0, 1),
self.w_kc.to(torch.bfloat16) * self.w_scale,
)
elif self.w_kc.dtype == torch.float8_e4m3fn:
q_nope_val, q_nope_scale = input_to_float8(
q_nope.transpose(0, 1), torch.float8_e4m3fn
)
q_nope_out = bmm_fp8(
q_nope_val, self.w_kc, q_nope_scale, self.w_scale, torch.bfloat16
)
else:
q_nope_out = torch.bmm(q_nope.transpose(0, 1), self.w_kc)
q_input[..., : self.kv_lora_rank] = q_nope_out.transpose(0, 1)
latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0]
v_input = latent_cache[..., : self.kv_lora_rank]
v_input = self.kv_a_layernorm(v_input.contiguous()).unsqueeze(1)
k_input = latent_cache.unsqueeze(1)
k_input[..., : self.kv_lora_rank] = v_input
if not enable_rope_fusion:
k_pe = k_input[..., self.kv_lora_rank :]
q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe)
q_input[..., self.kv_lora_rank :] = q_pe
k_input[..., self.kv_lora_rank :] = k_pe
k_pe_output = None
else:
k_pe_output = torch.empty_like(k_input[..., self.kv_lora_rank :])
q_input[..., self.kv_lora_rank :] = q_pe
# attn_output = self.attn_mqa(q_input, k_input, v_input, forward_batch)
# Use Fused ROPE with use_rope=OFF.
attn_output = torch.empty(
(q_len, self.num_local_heads, self.kv_lora_rank),
dtype=q.dtype,
device=q.device,
)
attn_logits, _, kv_indptr, kv_indices, _, _, _ = (
forward_batch.attn_backend.forward_metadata
)
cos_sin_cache = self.rotary_emb.cos_sin_cache
num_kv_split = forward_batch.attn_backend.num_kv_splits
sm_scale = self.attn_mqa.scaling
if attn_logits is None:
attn_logits = torch.empty(
(
forward_batch.batch_size,
self.num_local_heads,
num_kv_split,
self.kv_lora_rank + 1,
),
dtype=torch.float32,
device=q.device,
)
# save current latent cache.
forward_batch.token_to_kv_pool.set_kv_buffer(
self.attn_mqa, forward_batch.out_cache_loc, k_input, None
)
key_cache_buf = forward_batch.token_to_kv_pool.get_key_buffer(
self.attn_mqa.layer_id
)
val_cache_buf = key_cache_buf[..., : self.kv_lora_rank]
decode_attention_fwd_grouped_rope(
q_input,
key_cache_buf,
val_cache_buf,
attn_output,
kv_indptr,
kv_indices,
k_pe_output,
self.kv_lora_rank,
self.rotary_emb.rotary_dim,
cos_sin_cache,
positions,
attn_logits,
num_kv_split,
sm_scale,
logit_cap=self.attn_mqa.logit_cap,
use_rope=enable_rope_fusion,
is_neox_style=self.rotary_emb.is_neox_style,
)
if enable_rope_fusion:
k_input[..., self.kv_lora_rank :] = k_pe_output
forward_batch.token_to_kv_pool.set_kv_buffer(
self.attn_mqa, forward_batch.out_cache_loc, k_input, None
)
attn_output = attn_output.view(-1, self.num_local_heads, self.kv_lora_rank)
if self.w_vc.dtype == torch.float8_e4m3fnuz:
# TODO(kernel): add bmm_fp8 for torch.float8_e4m3fnuz
attn_bmm_output = torch.bmm(
attn_output.to(torch.bfloat16).transpose(0, 1),
self.w_vc.to(torch.bfloat16) * self.w_scale,
)
elif self.w_vc.dtype == torch.float8_e4m3fn:
attn_output_val, attn_output_scale = input_to_float8(
attn_output.transpose(0, 1), torch.float8_e4m3fn
)
attn_bmm_output = bmm_fp8(
attn_output_val,
self.w_vc,
attn_output_scale,
self.w_scale,
torch.bfloat16,
)
else:
attn_bmm_output = torch.bmm(attn_output.transpose(0, 1), self.w_vc)
attn_output = attn_bmm_output.transpose(0, 1).flatten(1, 2)
output, _ = self.o_proj(attn_output)
return output
def all_gather( def all_gather(
input_tensor: torch.Tensor, forward_batch: ForwardBatch, rank, world_size, group input_tensor: torch.Tensor, forward_batch: ForwardBatch, rank, world_size, group
......
import random
import unittest
import torch
from sglang.srt.layers.attention.triton_ops.decode_attention import (
decode_attention_fwd_grouped,
)
from sglang.srt.layers.attention.triton_ops.rocm_mla_decode_rope import (
decode_attention_fwd_grouped_rope,
)
from sglang.srt.layers.rotary_embedding import DeepseekScalingRotaryEmbedding
class TestTritonAttentionMLA(unittest.TestCase):
def _set_all_seeds(self, seed):
"""Set all random seeds for reproducibility."""
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def setUp(self):
# Set seeds before each test method
self._set_all_seeds(42)
def preprocess_kv_cache(self, kv_cache, kv_lora_rank):
latent_cache = kv_cache
v_input = latent_cache[..., :kv_lora_rank]
v_input = v_input.contiguous().unsqueeze(1)
k_input = latent_cache.unsqueeze(1)
k_input[..., :kv_lora_rank] = v_input
return k_input, v_input
def input_helper(
self,
B,
H,
S,
kv_lora_rank,
rotary_dim,
qk_rope_head_dim,
num_kv_splits,
dtype,
device,
rope_base=10,
rope_max_seq_len=16384,
rope_scaling=1.0,
is_neox_style=False,
):
q = torch.randn(
B, H, kv_lora_rank + qk_rope_head_dim, device=device, dtype=dtype
)
kv_cache = torch.randn(
B * S, kv_lora_rank + qk_rope_head_dim, dtype=dtype, device=device
)
kv_indptr = torch.arange(B + 1, device=device) * S
kv_indices = torch.arange(B * S, device=device)
attn_logits = torch.empty(
B, H, num_kv_splits, kv_lora_rank + 1, dtype=dtype, device=device
)
rotary_emb = DeepseekScalingRotaryEmbedding(
qk_rope_head_dim,
rotary_dim,
rope_max_seq_len,
rope_base,
is_neox_style,
rope_scaling,
q.dtype,
device="cpu",
).cuda()
positions = torch.tensor([S], device=device).unsqueeze(0).repeat(B, 1)
return kv_indptr, kv_indices, q, kv_cache, attn_logits, rotary_emb, positions
def ref_compute_full_fwd(
self,
q,
k_input,
v_input,
kv_lora_rank,
kv_indptr,
kv_indices,
num_kv_splits,
sm_scale,
logit_cap,
rotary_emb,
positions,
use_rope,
device="cuda",
):
B, H = q.shape[0], q.shape[1]
S = kv_indptr[1].item()
qk_rope_head_dim = k_input.shape[-1] - kv_lora_rank
q_input = torch.empty(B, H, kv_lora_rank + qk_rope_head_dim, dtype=q.dtype).to(
device
)
q_nope_out, q_pe = q.split([kv_lora_rank, qk_rope_head_dim], dim=-1)
k_pe_t = k_input.view(B, 1, S, -1)[:, :, -1:, kv_lora_rank:]
if use_rope:
q_pe, k_pe_t = rotary_emb(positions, q_pe.unsqueeze(2), k_pe_t)
q_pe = q_pe.squeeze()
k_input.view(B, 1, S, -1)[:, :, -1:, kv_lora_rank:] = k_pe_t
q_input[..., :kv_lora_rank] = q_nope_out
q_input[..., kv_lora_rank:] = q_pe
B, H = q_input.shape[0], q_input.shape[1]
kv_lora_rank = v_input.shape[-1]
device = q_input.device
attn_logits = torch.empty(
B, H, num_kv_splits, kv_lora_rank + 1, dtype=q_input.dtype, device=device
)
o = torch.empty(B, H, kv_lora_rank, dtype=q_input.dtype, device=device)
decode_attention_fwd_grouped(
q_input,
k_input,
v_input,
o,
kv_indptr,
kv_indices,
attn_logits,
num_kv_splits,
sm_scale,
logit_cap,
)
return attn_logits, o, k_pe_t.squeeze()
def _test_rocm_fused_mla_kernel(
self,
B,
H,
S,
kv_lora_rank,
qk_rope_head_dim,
rotary_dim,
dtype,
use_rope,
is_neox_style,
num_kv_splits=2,
sm_scale=1.0,
logit_cap=0.0,
device="cuda",
):
kv_indptr, kv_indices, q, kv_cache, attn_logits, rotary_emb, positions = (
self.input_helper(
B,
H,
S,
kv_lora_rank,
rotary_dim,
qk_rope_head_dim,
num_kv_splits,
dtype,
device=device,
is_neox_style=is_neox_style,
)
)
k_input, v_input = self.preprocess_kv_cache(kv_cache, kv_lora_rank)
k_pe_tokens = torch.empty(
B, qk_rope_head_dim, dtype=kv_cache.dtype, device=device
)
tri_o = torch.empty(B, H, kv_lora_rank, dtype=kv_cache.dtype, device=device)
decode_attention_fwd_grouped_rope(
q,
k_input,
v_input,
tri_o,
kv_indptr,
kv_indices,
k_pe_tokens if use_rope else None,
kv_lora_rank,
rotary_dim if use_rope else None,
rotary_emb.cos_sin_cache if use_rope else None,
positions if use_rope else None,
attn_logits,
num_kv_splits,
sm_scale,
logit_cap,
use_rope,
is_neox_style,
)
tri_logits = attn_logits
# reference
ref_logits, ref_o, ref_k_pe_tokens = self.ref_compute_full_fwd(
q,
k_input,
v_input,
kv_lora_rank,
kv_indptr,
kv_indices,
num_kv_splits,
sm_scale,
logit_cap,
rotary_emb,
positions,
use_rope,
device="cuda",
)
if use_rope:
torch.testing.assert_close(
ref_k_pe_tokens, k_pe_tokens.squeeze(), atol=1e-2, rtol=1e-2
)
torch.testing.assert_close(ref_logits, tri_logits, atol=1e-2, rtol=1e-2)
torch.testing.assert_close(ref_o, tri_o, atol=1e-2, rtol=1e-2)
def test_grouped_rocm_fused_mla(self):
configs = [
(1, 128, 2048, 512, 64, 64),
(1, 128, 2048, 512, 128, 64),
(1, 128, 2048, 512, 127, 64),
(1, 128, 2050, 512, 127, 64),
(1, 128, 2050, 512, 128, 64),
(8, 128, 2048, 512, 64, 64),
(8, 128, 2048, 512, 128, 64),
(8, 128, 2048, 512, 127, 64),
(8, 128, 2050, 512, 127, 64),
(8, 128, 2050, 512, 128, 64),
]
dtypes = [torch.bfloat16, torch.float32]
use_rope_list = [True, False]
is_neox_style_list = [True, False]
for B, H, S, kv_lora_rank, qk_rope_head_dim, rotary_dim in configs:
for dtype in dtypes:
for use_rope in use_rope_list:
for is_neox_style in is_neox_style_list:
self._test_rocm_fused_mla_kernel(
B,
H,
S,
kv_lora_rank,
qk_rope_head_dim,
rotary_dim,
dtype,
use_rope,
is_neox_style,
)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment