runner_step.py 3.64 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

from __future__ import annotations

from typing import Any, Optional

import numpy as np

import vllm.envs as envs

from vllm.v1.kv_compression.runner_prepare import prepare_kv_compression_for_step


def maybe_prepare_kv_compression_for_runner_step(
    *,
    runner: Any,
    num_reqs: int,
    total_num_scheduled_tokens: int,
    num_scheduled_tokens: np.ndarray,  # [B] int32
    cu_num_tokens: np.ndarray,  # [B] int64/int32
    req_indices: np.ndarray,  # [T] int64
    arange: np.ndarray,  # [T] int64
) -> Optional[np.ndarray]:
    """Prepare per-step KV compression metadata on CPU.

    Returns the per-token KV positions (`kv_positions_np`) or None if KV
    compression is disabled.
    """
    if not envs.VLLM_ENABLE_KV_COMPRESSION:
        runner.kv_compression_needs_compaction = False
        return None

    kv_positions_np = runner.kv_positions_np[:total_num_scheduled_tokens]
    must_keep_np = runner.kv_compression_must_keep_np[:total_num_scheduled_tokens]
    topk_budget_np = runner.kv_compression_topk_budget_np[:num_reqs]
    prompt_end_np = runner.kv_compression_prompt_end_np[:num_reqs]
    prompt_lens_np = runner.kv_compression_prompt_lens_np[:num_reqs]
    topk_keep_np = runner.kv_compression_prompt_topk_keep_np[:num_reqs]

    needs_compaction, prompt_topk_keep_max = prepare_kv_compression_for_step(
        num_reqs=num_reqs,
        total_num_scheduled_tokens=total_num_scheduled_tokens,
        num_scheduled_tokens=num_scheduled_tokens,
        cu_num_tokens=cu_num_tokens,
        req_indices=req_indices,
        arange=arange,
        num_computed_tokens_cpu=runner.input_batch.num_computed_tokens_cpu,
        num_prompt_tokens=runner.input_batch.num_prompt_tokens,
        num_kv_tokens_cpu=runner.input_batch.num_kv_tokens_cpu,
        kv_positions_np=kv_positions_np,
        must_keep_np=must_keep_np,
        topk_budget_np=topk_budget_np,
        prompt_end_np=prompt_end_np,
        prompt_lens_np=prompt_lens_np,
        prompt_topk_keep_np=topk_keep_np,
        chunked_prefill_enabled=runner.scheduler_config.enable_chunked_prefill,
    )
    runner.kv_compression_needs_compaction = bool(needs_compaction)
    if prompt_topk_keep_max is not None:
        runner.kv_compression_prompt_topk_keep_max = int(prompt_topk_keep_max)
    return kv_positions_np


def maybe_copy_kv_compression_step_tensors_to_gpu(
    *,
    runner: Any,
    num_reqs: int,
    total_num_scheduled_tokens: int,
    non_blocking: bool = True,
) -> None:
    """Stage per-step KV compression tensors to GPU if needed."""
    if not envs.VLLM_ENABLE_KV_COMPRESSION:
        return

    if runner.scheduler_config.enable_chunked_prefill:
        runner.kv_compression_prompt_end[:num_reqs].copy_(
            runner.kv_compression_prompt_end_cpu[:num_reqs],
            non_blocking=non_blocking,
        )
        runner.kv_compression_prompt_lens[:num_reqs].copy_(
            runner.kv_compression_prompt_lens_cpu[:num_reqs],
            non_blocking=non_blocking,
        )
        runner.kv_compression_prompt_topk_keep[:num_reqs].copy_(
            runner.kv_compression_prompt_topk_keep_cpu[:num_reqs],
            non_blocking=non_blocking,
        )
        return

    if runner.kv_compression_needs_compaction:
        runner.kv_compression_must_keep[:total_num_scheduled_tokens].copy_(
            runner.kv_compression_must_keep_cpu[:total_num_scheduled_tokens],
            non_blocking=non_blocking,
        )
        runner.kv_compression_topk_budget[:num_reqs].copy_(
            runner.kv_compression_topk_budget_cpu[:num_reqs],
            non_blocking=non_blocking,
        )