layer.py 21.6 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
"""Attention layer."""
4
from typing import List, Optional
5
6
7

import torch
import torch.nn as nn
8
import torch.nn.functional as F
9

10
import vllm.envs as envs
11
from vllm.attention import AttentionType
12
from vllm.attention.backends.abstract import AttentionBackend
13
from vllm.attention.selector import backend_name_to_enum, get_attn_backend
14
from vllm.attention.utils.kv_sharing_utils import validate_kv_sharing_target
15
from vllm.config import CacheConfig, get_current_vllm_config
16
17
18
from vllm.distributed.kv_transfer import (get_kv_transfer_group,
                                          has_kv_transfer_group,
                                          is_v1_kv_transfer_group)
19
from vllm.forward_context import ForwardContext, get_forward_context
20
from vllm.logger import init_logger
21
from vllm.model_executor.layers.attention_layer_base import AttentionLayerBase
22
from vllm.model_executor.layers.linear import UnquantizedLinearMethod
23
24
from vllm.model_executor.layers.quantization.base_config import (
    QuantizationConfig)
25
from vllm.model_executor.layers.quantization.kv_cache import BaseKVCacheMethod
26
from vllm.platforms import _Backend, current_platform
27
from vllm.utils import direct_register_custom_op
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
logger = init_logger(__name__)
USE_XFORMERS_OPS = None


def check_xformers_availability():
    global USE_XFORMERS_OPS
    if USE_XFORMERS_OPS is not None:
        return USE_XFORMERS_OPS

    if current_platform.is_cuda() and current_platform.has_device_capability(
            100):
        # Xformers FA is not compatible with B200
        USE_XFORMERS_OPS = False
    else:
        try:
            from importlib.util import find_spec

            find_spec("xformers.ops")
            USE_XFORMERS_OPS = True
        except ImportError:
            USE_XFORMERS_OPS = False

    # the warning only needs to be shown once
    if not USE_XFORMERS_OPS:
        logger.warning("Xformers is not available, falling back.")

    return USE_XFORMERS_OPS

57

58
class Attention(nn.Module, AttentionLayerBase):
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    """Attention layer.

    This class takes query, key, and value tensors as input. The input tensors
    can either contain prompt tokens or generation tokens.
    The class does the following:

    1. Store the input key and value tensors in the KV cache.
    2. Perform (multi-head/multi-query/grouped-query) attention.
    3. Return the output tensor.
    """

    def __init__(
        self,
        num_heads: int,
        head_size: int,
        scale: float,
        num_kv_heads: Optional[int] = None,
        alibi_slopes: Optional[List[float]] = None,
77
        cache_config: Optional[CacheConfig] = None,
78
        quant_config: Optional[QuantizationConfig] = None,
79
        logits_soft_cap: Optional[float] = None,
80
        per_layer_sliding_window: Optional[int] = None,
81
        use_mla: bool = False,
82
        prefix: str = "",
83
        attn_type: str = AttentionType.DECODER,
84
        kv_sharing_target_layer_name: Optional[str] = None,
85
        attn_backend: Optional[type[AttentionBackend]] = None,
86
        **extra_impl_args,
87
    ) -> None:
88
89
90
91
        """
        The KV cache is stored inside this class and is accessed via
        `self.kv_cache`.
        """
92
        super().__init__()
93
94
95
96
97
98
99
100
101
        if per_layer_sliding_window is not None:
            # per-layer sliding window
            sliding_window = per_layer_sliding_window
        elif cache_config is not None:
            # model-level sliding window
            sliding_window = cache_config.sliding_window
        else:
            sliding_window = None

102
103
104
        if cache_config is not None:
            kv_cache_dtype = cache_config.cache_dtype
            block_size = cache_config.block_size
105
            is_attention_free = cache_config.is_attention_free
106
            calculate_kv_scales = cache_config.calculate_kv_scales
107
108
109
        else:
            kv_cache_dtype = "auto"
            block_size = 16
110
            is_attention_free = False
111
            calculate_kv_scales = False
112
113
        if num_kv_heads is None:
            num_kv_heads = num_heads
114
115
116
        assert num_heads % num_kv_heads == 0, \
            f"num_heads ({num_heads}) is not " \
            f"divisible by num_kv_heads ({num_kv_heads})"
117

118
        # The default k/v_scale is set to 1.0. This is ignored
119
120
        # when kv-cache is not fp8, and should be used with
        # kv-cache in fp8_e5m2. For kv-cache in fp8_e4m3, we
121
        # expect the pre-quantized k/v_scale to be loaded along
122
123
        # with the model weights.
        self.kv_cache_dtype = kv_cache_dtype
124
125
126
        self.calculate_kv_scales = calculate_kv_scales
        self._k_scale = torch.tensor(1.0, dtype=torch.float32)
        self._v_scale = torch.tensor(1.0, dtype=torch.float32)
127
128
129
        # FlashAttn doesn't support quantizing the kv-cache only
        # but requires q to be quantized as well.
        self._q_scale = torch.tensor(1.0, dtype=torch.float32)
130
        self._prob_scale = torch.tensor(1.0, dtype=torch.float32)
131

132
133
134
135
        # We also keep q/k/v_scale on host (cpu) memory for attention
        # backends that require the scales to be on host instead of on device.
        # e.g. Flashinfer
        self._q_scale_float = 1.0
136
137
138
        self._k_scale_float = 1.0
        self._v_scale_float = 1.0

139
140
141
142
        # The output scale on host memory. This should be the input scale of
        # the quant op after this attention layer.
        self._o_scale_float: Optional[float] = None

143
        self.use_mla = use_mla
144
145
146
147
        self.num_heads = num_heads
        self.head_size = head_size
        self.num_kv_heads = num_kv_heads
        self.sliding_window = sliding_window
148
        self.has_sink = extra_impl_args.get("sinks") is not None
149

150
        quant_method = quant_config.get_quant_method(
151
            self, prefix=prefix) if quant_config else None
152
153
        if quant_method is not None and not isinstance(
                quant_method, UnquantizedLinearMethod):
154
            assert isinstance(quant_method, BaseKVCacheMethod)
155
156
            # TODO (mgoin): kv cache dtype should be specified in the FP8
            # checkpoint config and become the "auto" behavior
157
158
159
160
161
162
163
164
165
            if self.kv_cache_dtype == "fp8_e5m2":
                raise ValueError("fp8_e5m2 kv-cache is not supported with "
                                 "fp8 checkpoints.")
            # If quantization is enabled, we make "k_scale" and "v_scale"
            # parameters so that it can be loaded from the model checkpoint.
            # The k/v_scale will then be converted back to native float32
            # values after weight loading.
            self.quant_method = quant_method
            self.quant_method.create_weights(self)
166

167
168
169
        # During model initialization, the default dtype is set as the model
        # weight and activation dtype.
        dtype = torch.get_default_dtype()
170
171
172
173
174
175
        if attn_backend is None:
            self.attn_backend = get_attn_backend(head_size,
                                                 dtype,
                                                 kv_cache_dtype,
                                                 block_size,
                                                 is_attention_free,
176
177
                                                 use_mla=use_mla,
                                                 has_sink=self.has_sink)
178
179
180
181
        else:
            self.attn_backend = attn_backend

        impl_cls = self.attn_backend.get_impl_cls()
182
        self.impl = impl_cls(num_heads, head_size, scale, num_kv_heads,
183
                             alibi_slopes, sliding_window, kv_cache_dtype,
184
                             logits_soft_cap, attn_type,
185
                             kv_sharing_target_layer_name, **extra_impl_args)
186
        self.backend = backend_name_to_enum(self.attn_backend.get_name())
187
        self.dtype = dtype
188

189
190
191
192
        # For cuda-alike (CUDA and ROCM) and cpu platforms, we control how
        # torch.compile works by registering the attention as one giant
        # opaque custom op. For other platforms, we directly call them
        # and let torch.compile handle them.
193
194
195
        self.use_direct_call = not current_platform.is_cuda_alike(
        ) and not current_platform.is_cpu()

196
        self.use_output = self.attn_backend.accept_output_buffer
197
198
199
200
201
        compilation_config = get_current_vllm_config().compilation_config
        if prefix in compilation_config.static_forward_context:
            raise ValueError(f"Duplicate layer name: {prefix}")
        compilation_config.static_forward_context[prefix] = self
        self.layer_name = prefix
202
        self.attn_type = attn_type
203
204
205
206
207
208
209
210
211

        if kv_sharing_target_layer_name is not None:
            validate_kv_sharing_target(
                prefix,
                kv_sharing_target_layer_name,
                compilation_config.static_forward_context,
            )
        self.kv_sharing_target_layer_name = kv_sharing_target_layer_name

212
213
214
215
216
217
218
        # use a placeholder kv cache tensor during init, which will be replaced
        # by bind_kv_cache
        # this variable will not be accessed if use_direct_call is True
        self.kv_cache = [
            torch.tensor([]) for _ in range(get_current_vllm_config(
            ).parallel_config.pipeline_parallel_size)
        ]
219

220
        self.q_range = torch.tensor(envs.Q_SCALE_CONSTANT, dtype=torch.float32)
221
222
223
        self.k_range = torch.tensor(envs.K_SCALE_CONSTANT, dtype=torch.float32)
        self.v_range = torch.tensor(envs.V_SCALE_CONSTANT, dtype=torch.float32)

224
225
226
227
228
    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
229
230
231
232
        # For some alternate attention backends like MLA the attention output
        # shape does not match the query shape, so we optionally let the model
        # definition specify the output tensor shape.
        output_shape: Optional[torch.Size] = None,
233
    ) -> torch.Tensor:
234
235
236
237
238
239
240
241
242
        """
        The KV cache is stored inside this class and is accessed via
        `self.kv_cache`.

        Attention metadata (`attn_metadata`) is set using a context manager in
        the model runner's `execute_model` method. It is accessed via forward
        context using
        `vllm.forward_context.get_forward_context().attn_metadata`.
        """
Chen Zhang's avatar
Chen Zhang committed
243
        if self.calculate_kv_scales:
244
245
            attn_metadata = get_forward_context().attn_metadata
            if attn_metadata.enable_kv_scales_calculation:
246
                self.calc_kv_scales(query, key, value)
247
        if self.use_output:
248
249
            output_shape = (output_shape
                            if output_shape is not None else query.shape)
250
            output = torch.zeros(output_shape,
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
                                 dtype=query.dtype,
                                 device=query.device)
            hidden_size = output_shape[-1]
            # We skip reshaping query, key and value tensors for the MLA
            # backend since these tensors have different semantics and are
            # processed differently.
            if not self.use_mla:
                # Reshape the query, key, and value tensors.
                # NOTE(woosuk): We do this outside the custom op to minimize the
                # CPU overheads from the non-CUDA-graph regions.
                query = query.view(-1, self.num_heads, self.head_size)
                output = output.view(-1, self.num_heads, self.head_size)
                if key is not None:
                    key = key.view(-1, self.num_kv_heads, self.head_size)
                if value is not None:
                    value = value.view(-1, self.num_kv_heads, self.head_size)
267
            if self.use_direct_call:
Chen Zhang's avatar
Chen Zhang committed
268
                forward_context: ForwardContext = get_forward_context()
269
                attn_metadata = forward_context.attn_metadata
270
271
                if isinstance(attn_metadata, dict):
                    attn_metadata = attn_metadata[self.layer_name]
Chen Zhang's avatar
Chen Zhang committed
272
273
274
275
276
277
                self_kv_cache = self.kv_cache[forward_context.virtual_engine]
                self.impl.forward(self,
                                  query,
                                  key,
                                  value,
                                  self_kv_cache,
278
                                  attn_metadata,
Chen Zhang's avatar
Chen Zhang committed
279
                                  output=output)
280
281
282
            else:
                torch.ops.vllm.unified_attention_with_output(
                    query, key, value, output, self.layer_name)
283
            return output.view(-1, hidden_size)
284
        else:
285
            if self.use_direct_call:
Chen Zhang's avatar
Chen Zhang committed
286
                forward_context = get_forward_context()
287
                attn_metadata = forward_context.attn_metadata
288
289
                if isinstance(attn_metadata, dict):
                    attn_metadata = attn_metadata[self.layer_name]
Chen Zhang's avatar
Chen Zhang committed
290
291
                self_kv_cache = self.kv_cache[forward_context.virtual_engine]
                return self.impl.forward(self, query, key, value,
292
                                         self_kv_cache, attn_metadata)
293
294
295
            else:
                return torch.ops.vllm.unified_attention(
                    query, key, value, self.layer_name)
296

297
298
    def calc_kv_scales(self, query, key, value):
        self._q_scale.copy_(torch.abs(query).max() / self.q_range)
299
300
        self._k_scale.copy_(torch.abs(key).max() / self.k_range)
        self._v_scale.copy_(torch.abs(value).max() / self.v_range)
301
        self._q_scale_float = self._q_scale.item()
302
303
304
305
306
        self._k_scale_float = self._k_scale.item()
        self._v_scale_float = self._v_scale.item()
        # We only calculate the scales once
        self.calculate_kv_scales = False

307
308
309
310
311
    def extra_repr(self) -> str:
        s = f"head_size={self.impl.head_size}"  # type: ignore
        s += f", num_heads={self.impl.num_heads}"  # type: ignore
        s += f", num_kv_heads={self.impl.num_kv_heads}"  # type: ignore
        s += f", scale={self.impl.scale}"  # type: ignore
312
        s += f", backend={self.impl.__class__.__name__}"
313
        return s
314

315
    def process_weights_after_loading(self, act_dtype: torch.dtype):
316
        if hasattr(self.impl, "process_weights_after_loading"):
317
            self.impl.process_weights_after_loading(act_dtype)
318

319
320
321
322
323
324
325
326
327
        # FlashInfer requires attention sinks to be float32
        if (self.backend == _Backend.FLASHINFER_VLLM_V1
                and hasattr(self.impl, 'sinks')):
            from vllm.v1.attention.backends.flashinfer import FlashInferImpl
            assert isinstance(self.impl, FlashInferImpl)
            if (self.impl.sinks is not None
                    and self.impl.sinks.dtype != torch.float32):
                self.impl.sinks = self.impl.sinks.to(torch.float32)

328
329
330
    def get_attn_backend(self) -> type[AttentionBackend]:
        return self.attn_backend

331

332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class MultiHeadAttention(nn.Module):
    """Multi-headed attention without any cache, used for ViT."""

    def __init__(
        self,
        num_heads: int,
        head_size: int,
        scale: float,
        num_kv_heads: Optional[int] = None,
    ):
        super().__init__()
        self.num_heads = num_heads
        self.head_size = head_size
        self.scale = scale
        self.num_kv_heads = num_heads if num_kv_heads is None else num_kv_heads

348
349
350
        assert self.num_heads % self.num_kv_heads == 0, \
            f"num_heads ({self.num_heads}) is not " \
            f"divisible by num_kv_heads ({self.num_kv_heads})"
351
352
        self.num_queries_per_kv = self.num_heads // self.num_kv_heads

353
354
355
356
357
358
        dtype = torch.get_default_dtype()
        attn_backend = get_attn_backend(head_size,
                                        dtype,
                                        kv_cache_dtype=None,
                                        block_size=16,
                                        is_attention_free=False)
359
        backend = backend_name_to_enum(attn_backend.get_name())
360
361
362
363
        if current_platform.is_rocm():
            # currently, only torch_sdpa is supported on rocm
            self.attn_backend = _Backend.TORCH_SDPA
        else:
364
365
            if backend in (_Backend.FLASH_ATTN, _Backend.FLASH_ATTN_VLLM_V1,
                           _Backend.FLEX_ATTENTION):
366
                backend = _Backend.XFORMERS
367

368
369
370
            self.attn_backend = backend if backend in {
                _Backend.TORCH_SDPA, _Backend.XFORMERS, _Backend.PALLAS_VLLM_V1
            } else _Backend.TORCH_SDPA
371

372
373
374
375
        if (self.attn_backend == _Backend.XFORMERS
                and not check_xformers_availability()):
            self.attn_backend = _Backend.TORCH_SDPA

376
377
378
379
380
381
382
    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
    ) -> torch.Tensor:
        """Input shape: batch_size x seq_len x hidden_size"""
383
        # TODO(Isotr0py): Use existing backend implementations and support FA3
384
385
386
387
388
389
390
        bsz, q_len, _ = query.size()
        kv_len = key.size(1)

        query = query.view(bsz, q_len, self.num_heads, self.head_size)
        key = key.view(bsz, kv_len, self.num_kv_heads, self.head_size)
        value = value.view(bsz, kv_len, self.num_kv_heads, self.head_size)

391
392
393
394
395
        if (num_repeat := self.num_queries_per_kv) > 1:
            # Handle MQA and GQA
            key = torch.repeat_interleave(key, num_repeat, dim=2)
            value = torch.repeat_interleave(value, num_repeat, dim=2)

396
        if self.attn_backend == _Backend.XFORMERS:
397
398
399
400
401
402
403
404
405
406
407
408
409
410
            from xformers import ops as xops

            out = xops.memory_efficient_attention_forward(query,
                                                          key,
                                                          value,
                                                          scale=self.scale)
        elif self.attn_backend == _Backend.TORCH_SDPA:
            query, key, value = (x.transpose(1, 2)
                                 for x in (query, key, value))
            out = F.scaled_dot_product_attention(query,
                                                 key,
                                                 value,
                                                 scale=self.scale)
            out = out.transpose(1, 2)
411
412
413
414
415
416
417
        elif self.attn_backend == _Backend.PALLAS_VLLM_V1:
            query, key, value = (x.transpose(1, 2)
                                 for x in (query, key, value))
            from torch_xla.experimental.custom_kernel import flash_attention
            out = flash_attention(query, key, value, sm_scale=self.scale)
            out = out.transpose(1, 2)

418
        return out.reshape(bsz, q_len, -1)
419
420


421
422
423
424
425
426
427
428
429
430
def wait_for_kv_layer_from_connector(layer_name: str):
    if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
        return

    connector = get_kv_transfer_group()

    forward_context: ForwardContext = get_forward_context()
    attn_metadata = forward_context.attn_metadata
    if attn_metadata is None:
        return
431
    assert isinstance(attn_metadata, dict)
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
    connector.wait_for_layer_load(layer_name)


def maybe_save_kv_layer_to_connector(
    layer_name: str,
    kv_cache_layer: List[torch.Tensor],
):
    if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
        return

    connector = get_kv_transfer_group()

    forward_context: ForwardContext = get_forward_context()
    attn_metadata = forward_context.attn_metadata
    if attn_metadata is None:
        return
448
449
450
    assert isinstance(attn_metadata, dict)
    connector.save_kv_layer(layer_name, kv_cache_layer,
                            attn_metadata[layer_name])
451
452


453
454
455
456
457
458
def unified_attention(
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    layer_name: str,
) -> torch.Tensor:
459
460
    wait_for_kv_layer_from_connector(layer_name)

461
    forward_context: ForwardContext = get_forward_context()
462
    attn_metadata = forward_context.attn_metadata
463
464
    if isinstance(attn_metadata, dict):
        attn_metadata = attn_metadata[layer_name]
465
    self = forward_context.no_compile_layers[layer_name]
466
    kv_cache = self.kv_cache[forward_context.virtual_engine]
467
468
469
470
471
    output = self.impl.forward(self, query, key, value, kv_cache,
                               attn_metadata)

    maybe_save_kv_layer_to_connector(layer_name, kv_cache)
    return output
472
473
474
475
476
477
478
479
480
481
482
483
484
485


def unified_attention_fake(
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    layer_name: str,
) -> torch.Tensor:
    return torch.empty_like(query).contiguous()


direct_register_custom_op(
    op_name="unified_attention",
    op_func=unified_attention,
486
    mutates_args=[],
487
488
489
    fake_impl=unified_attention_fake,
    dispatch_key=current_platform.dispatch_key,
)
490
491
492
493
494
495
496
497


def unified_attention_with_output(
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    output: torch.Tensor,
    layer_name: str,
498
    output_scale: Optional[torch.Tensor] = None,
499
    output_block_scale: Optional[torch.Tensor] = None,
500
) -> None:
501
    wait_for_kv_layer_from_connector(layer_name)
502
    forward_context: ForwardContext = get_forward_context()
503
    attn_metadata = forward_context.attn_metadata
504
505
    if isinstance(attn_metadata, dict):
        attn_metadata = attn_metadata[layer_name]
506
    self = forward_context.no_compile_layers[layer_name]
507
    kv_cache = self.kv_cache[forward_context.virtual_engine]
508
509
    self.impl.forward(self,
                      query,
510
511
512
513
                      key,
                      value,
                      kv_cache,
                      attn_metadata,
514
                      output=output,
515
516
                      output_scale=output_scale,
                      output_block_scale=output_block_scale)
517

518
519
    maybe_save_kv_layer_to_connector(layer_name, kv_cache)

520
521
522
523
524
525
526

def unified_attention_with_output_fake(
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    output: torch.Tensor,
    layer_name: str,
527
    output_scale: Optional[torch.Tensor] = None,
528
    output_block_scale: Optional[torch.Tensor] = None,
529
530
531
532
533
534
535
) -> None:
    return


direct_register_custom_op(
    op_name="unified_attention_with_output",
    op_func=unified_attention_with_output,
536
    mutates_args=["output", "output_block_scale"],
537
538
539
    fake_impl=unified_attention_with_output_fake,
    dispatch_key=current_platform.dispatch_key,
)