import argparse import math import random import torch import triton from flash_mla import flash_mla_with_kvcache_fp8, get_mla_decoding_metadata_dense_fp8 torch.set_printoptions(precision=4, profile="default", sci_mode=False) def scaled_dot_product_attention(query, key, value, h_q, h_kv, is_causal=False, k_scale=1.0): query = query.float() key = key.float() * k_scale value = value.float() * k_scale key = key.repeat_interleave(h_q // h_kv, dim=0) value = value.repeat_interleave(h_q // h_kv, dim=0) tmp = query @ key.transpose(-2, -1) # print("tmp s ", tmp[0, :4, :10]) attn_weight = query @ key.transpose(-2, -1) / math.sqrt(query.size(-1)) if is_causal: s_q = query.shape[-2] s_k = key.shape[-2] attn_bias = torch.zeros(s_q, s_k, dtype=query.dtype) temp_mask = torch.ones(s_q, s_k, dtype=torch.bool).tril(diagonal=s_k - s_q) attn_bias.masked_fill_(temp_mask.logical_not(), float("-inf")) attn_bias.to(query.dtype) attn_weight += attn_bias lse = attn_weight.logsumexp(dim=-1) attn_weight = torch.softmax(attn_weight, dim=-1, dtype=torch.float32) return attn_weight @ value, lse def cal_diff(x: torch.Tensor, y: torch.Tensor, name: str, use_fp8: bool=False) -> None: torch_dtype = x.dtype x, y = x.double(), y.double() RMSE = ((x - y) * (x - y)).mean().sqrt().item() cos_diff = 1 - 2 * (x * y).sum().item() / max((x * x + y * y).sum().item(), 1e-12) amax_diff = (x - y).abs().max().item() print(f"{name}: {cos_diff=}, {RMSE=}, {amax_diff=}") if use_fp8: assert cos_diff < 1e-3 else: assert cos_diff < (1e-4 if torch_dtype == torch.bfloat16 else 1e-5) @torch.inference_mode() def test_flash_mla(b, s_q, mean_sk, h_q, h_kv, d, dv, causal, varlen, is_prof=False,torch_dtype=torch.float16): print( f"{b=}, {s_q=}, {mean_sk=}, {h_q=}, {h_kv=}, {d=}, {dv=}, {causal=}, {varlen=}, {torch_dtype=}" ) use_fp8 = torch_dtype == torch.float8_e4m3fn cache_seqlens = torch.full((b,), mean_sk, dtype=torch.int32) if varlen: for i in range(b): cache_seqlens[i] = max(random.normalvariate(mean_sk, mean_sk / 2), s_q) total_seqlens = cache_seqlens.sum().item() mean_seqlens = cache_seqlens.float().mean().int().item() max_seqlen = cache_seqlens.max().item() max_seqlen_pad = triton.cdiv(max_seqlen, 256) * 256 print(f"{total_seqlens=}, {mean_seqlens=}, {max_seqlen=}, {max_seqlen_pad=}") q = torch.ones(b, s_q, h_q, d) q = torch.randn(b, s_q, h_q, d) # for i in range(576): # q[:, :, :, i] = i # q[:, :, 1:, :] = 0 # q = torch.ones(b, s_q, h_q, d) # print("q ", q[0, 0, 0:3, :10]) block_size = 64 block_table = torch.arange( b * max_seqlen_pad // block_size, dtype=torch.int32 ).view(b, max_seqlen_pad // block_size) blocked_k = torch.ones(block_table.numel(), block_size, h_kv, d) blocked_k = torch.randn(block_table.numel(), block_size, h_kv, d) # blocked_k[:, :, :, 32:] = 0.0 # blocked_k[:, 32:, :, :] = 0 # blocked_k[:, :, :, 4:] = 0 # blocked_k[:, :32, :, :] = 0 # blocked_k[:, 16:, :, :] = 0 for i in range(b): blocked_k.view(b, max_seqlen_pad, h_kv, d)[i, cache_seqlens[i].item():] = ( float("nan") ) blocked_v = blocked_k[..., :dv] tile_scheduler_metadata, num_splits = get_mla_decoding_metadata_dense_fp8( cache_seqlens, s_q * h_q // h_kv, h_kv, h_q ) init_dtype = q.dtype def prepare_fp8_input(): q_fp8, blocked_k_fp8, blocked_v_fp8, descale_q, descale_k = None, None, None, None, None if use_fp8: nonlocal q, blocked_k, blocked_v fp8_dtype = torch.float8_e4m3fn descale_q = torch.ones((1), dtype=torch.float32) descale_k = torch.ones((1), dtype=torch.float32) q_fp8 = q.to(fp8_dtype) blocked_k_fp8 = blocked_k.to(fp8_dtype) blocked_v_fp8 = blocked_k_fp8[..., :dv] return q_fp8, blocked_k_fp8, blocked_v_fp8, descale_q, descale_k q_fp8, blocked_k_fp8, blocked_v_fp8, descale_q, descale_k = prepare_fp8_input() # print(blocked_v_fp8[0, 32:36, 0, :4]) if use_fp8: q = q_fp8 blocked_k = blocked_k_fp8 blocked_v = blocked_v_fp8 # print(" descale_q ", descale_q.shape, descale_q.stride()) # print(" blocked_k ", blocked_k.shape) def flash_mla(): return flash_mla_with_kvcache_fp8( q, blocked_k, block_table, cache_seqlens, dv, tile_scheduler_metadata, num_splits, causal=causal, descale_q=descale_q, descale_k=descale_k, ) def ref_mla(): q_ = (q.to(torch.float) * descale_q).to(init_dtype) if use_fp8 else q blocked_k_ = (blocked_k.to(torch.float) * descale_k).to(init_dtype) if use_fp8 else blocked_k blocked_v_ = (blocked_v.to(torch.float) * descale_k).to(init_dtype) if use_fp8 else blocked_v out = torch.empty(b, s_q, h_q, dv, dtype=torch.float32) lse = torch.empty(b, h_q, s_q, dtype=torch.float32) for i in range(b): begin = i * max_seqlen_pad end = begin + cache_seqlens[i] O, LSE = scaled_dot_product_attention( q_[i].transpose(0, 1), blocked_k_.view(-1, h_kv, d)[begin:end].transpose(0, 1), blocked_v_.view(-1, h_kv, dv)[begin:end].transpose(0, 1), h_q=h_q, h_kv=h_kv, is_causal=causal, ) out[i] = O.transpose(0, 1) lse[i] = LSE return out, lse out_flash, lse_flash = flash_mla() out_torch, lse_torch = ref_mla() # print(" ", out_flash.shape, lse_flash.shape, q.shape) # print("out max_diff ", (out_flash - out_torch).abs().max()) # print("lse max_diff ", (lse_flash - lse_torch).abs().max()) # print(" diff ", torch.nonzero((lse_flash - lse_torch).abs() > 0.1)) # print(" diff ", torch.nonzero((out_flash - out_torch).abs() > 0.1)) # print(" out_torch ", out_torch[0, 0, 0, 0:10]) # print(" out_flash ", out_flash[0, 0, 0, 0:10]) # print(" lse_flash ", lse_flash[0, 0:3, :1]) # print(" lse_torch ", lse_torch[0, 0:3, :1]) # print(" nan ", torch.nonzero(torch.isnan(out_flash))) cal_diff(out_flash, out_torch, "out", use_fp8) cal_diff(lse_flash, lse_torch, "lse") if is_prof: return t = triton.testing.do_bench(flash_mla) FLOPS = s_q * total_seqlens * h_q * (d + dv) * 2 bytes = (total_seqlens * h_kv * d + b * s_q * h_q * d) * (torch.finfo(torch_dtype).bits // 8) + (b * s_q * h_q * dv) * (torch.finfo(init_dtype).bits // 8) print( f"{t:.3f} ms, {FLOPS / 10 ** 9 / t:.0f} TFLOPS, {bytes / 10 ** 6 / t:.0f} GB/s" ) def main(torch_dtype, is_prof=False): device = torch.device("cuda:0") init_dtype = torch.bfloat16 if torch_dtype == torch.float8_e4m3fn else torch_dtype torch.set_default_dtype(init_dtype) torch.set_default_device(device) torch.cuda.set_device(device) torch.manual_seed(0) random.seed(0) ''' h_kv = 1 d, dv = 576, 512 causal = True for b in [128]: for s in [4096, 8192]: for h_q in [16, 32, 64, 128]: # TP = 8, 4, 2, 1 for s_q in [1, 2]: # MTP = 1, 2 for varlen in [False, True]: test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen) # b, s_q, s, h_q, h_kv, d, dv, causal, varlen''' # test_flash_mla( 1, 1, 64, 16, 1, 576, 512, True, False, is_prof=is_prof) # test_flash_mla_fp8( 1, 1, 1000, 1, 1, 576, 512, True, False, is_prof=is_prof) # test_flash_mla_fp8( 1, 1, 4096, 8, 1, 576, 512, True, False, is_prof=is_prof) # test_flash_mla_fp8(32, 1, 4096, 16, 1, 576, 512, False, False, is_prof=is_prof) # ''' h_kv = 1 d, dv = 576, 512 causal = True # for b in [40, 80]: # for s in [3500, 4000, 8192, 16384]: # for h_q in [16]: # for s_q in [1]: # MTP = 1, 2 # for varlen in [False]: # test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen,False,torch_dtype) # 压测 for b in [3, 6, 9, 12, 15, 18, 21, 24, 40, 41, 79, 80]: for s in [111, 112, 123, 1234, 432, 4325, 4000, 8192, 12345, 45321]: for h_q in [32, 16, 64, 128]: for s_q in [1, 2, 3]: # MTP = 1, 2 for varlen in [False, True]: test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen,True,torch_dtype) for b in [3, 6, 9, 12, 15, 18, 21, 24, 32, 64, 128, 256]: for s in [4000]: for h_q in [32, 16, 64, 128]: for s_q in [1]: # MTP = 1, 2 for varlen in [False]: test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen,False,torch_dtype) # for b in [1]: # for s in [128]: # for h_q in [128]: # for s_q in [2]: # MTP = 1, 2 # for varlen in [False]: # test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen,False,torch_dtype) # for b in [1, 32]: # for s in [200, 1002, 2002, 1024, 2000, 4000, 32768, 65536]: # for h_q in [4, 16, 32, 64]: # for s_q in [1, 2]: # MTP = 1, 2 # for varlen in [False]: # test_flash_mla(b, s_q, s, h_q, h_kv, d, dv, causal, varlen,False,torch_dtype) # ''' if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--dtype", type=str, choices=["bf16", "fp16","e4m3"], default="bf16", help="Data type to use for testing (bf16/fp16/e4m3)", ) parser.add_argument('--prof', default=False, action='store_true', help='prof or not') args = parser.parse_args() torch_dtype = torch.float8_e4m3fn if args.dtype == "fp16": torch_dtype = torch.float16 elif args.dtype == "e4m3": torch_dtype = torch.float8_e4m3fn main(torch_dtype, args.prof)