test_kvbm.py 13.2 KB
Newer Older
1
#!/usr/bin/env python3
2
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# SPDX-License-Identifier: Apache-2.0

"""
KVBM (KV Block Manager) integration tests for vLLM.

These tests validate core KVBM functionality:
1. Offload/Onboard: Request offloads to CPU, cache reset, re-request triggers onboarding
2. Eviction: GPU cache fills, blocks evicted, later retrieved without corruption
3. Determinism: Responses remain identical across offload/onboard/eviction cycles
"""

import pytest
import requests

17
from .common import llm_server_kvbm  # noqa: F401
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from .common import DeterminismTester, assert_deterministic, fetch_kvbm_metrics

# Test configuration
MIN_OFFLOAD_BLOCKS = 12  # Minimum blocks expected for Qwen3-0.6B with test prompts
MAX_TOKENS = 15  # Max tokens to generate in test responses

# Shared test prompt (Aeldora story)
AELDORA_STORY = (
    "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, "
    "lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria "
    "was buried beneath the shifting sands of time, lost to the world for centuries. You are "
    "an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled "
    "upon an ancient map hinting at secrets that Aeloria holds a secret so profound that it has "
    "the potential to reshape the very fabric of reality. Your journey will take you through "
    "treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: "
    "Character Background: Develop a detailed background for your character. Describe their "
    "motivations for seeking out Aeloria, their skills and weaknesses, and any personal "
    "connections to the ancient city or its legends. Are they driven by a quest for knowledge, "
    "a search for lost familt clue is hidden."
)

# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.gpu_1,
    pytest.mark.vllm,
    pytest.mark.pre_merge,
]


# Helper functions
def print_test_header(title: str) -> None:
    """Print a formatted test header."""
    print(f"\n{'=' * 70}")
    print(title)
    print("=" * 70)


def print_phase(phase_num: int, description: str) -> None:
    """Print a formatted phase header."""
    print(f"\n=== Phase {phase_num}: {description} ===")


62
def check_kvbm_metrics(phase_name: str, metrics_port: int) -> dict[str, int]:
63
64
65
66
    """Fetch and display KVBM metrics.

    Args:
        phase_name: Name of the test phase for logging
67
        metrics_port: Port number for the KVBM metrics endpoint
68
69
70
71
72
73
74

    Returns:
        Dictionary containing KVBM metrics with keys:
        - kvbm_offload_blocks_d2h: Blocks offloaded from GPU to CPU
        - kvbm_onboard_blocks_h2d: Blocks onboarded from CPU to GPU
    """
    print(f"\n--- Checking KVBM metrics after {phase_name} ---")
75
    metrics = fetch_kvbm_metrics(port=metrics_port)
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

    offload_d2h = metrics.get("kvbm_offload_blocks_d2h", 0)
    onboard_h2d = metrics.get("kvbm_onboard_blocks_h2d", 0)

    print(f"  kvbm_offload_blocks_d2h: {offload_d2h}")
    print(f"  kvbm_onboard_blocks_h2d: {onboard_h2d}")

    return {
        "kvbm_offload_blocks_d2h": offload_d2h,
        "kvbm_onboard_blocks_h2d": onboard_h2d,
    }


def reset_cache(base_url: str) -> None:
    """Reset the GPU prefix cache."""
    print("Resetting prefix cache...")
    try:
        response = requests.post(f"{base_url}/reset_prefix_cache", timeout=30)
        response.raise_for_status()
        print("Cache reset successful")
    except Exception as e:
        print(f"Warning: Cache reset failed: {e}")


100
101
102
103
# Model used for test_kvbm tests (smaller model for faster CI)
KVBM_TEST_MODEL = "Qwen/Qwen3-0.6B"


104
105
106
107
108
109
# Fixtures
@pytest.fixture(scope="function")
def tester(llm_server_kvbm):  # noqa: F811
    """Create tester bound to the KVBM-enabled server."""
    return DeterminismTester(
        base_url=llm_server_kvbm.base_url,
110
        model_id=KVBM_TEST_MODEL,
111
112
113
114
115
        server_type=llm_server_kvbm.server_type,
    )


# Tests
116
@pytest.mark.parametrize("llm_server_kvbm", [{"model": KVBM_TEST_MODEL}], indirect=True)
117
@pytest.mark.timeout(170)  # 4x measured (~41s), rounded up
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def test_offload_and_onboard(tester, llm_server_kvbm):  # noqa: F811
    """
    Test offload → cache reset → onboard cycle with determinism verification.

    Validates that:
    - Initial request triggers offload to CPU cache
    - Cache reset clears GPU cache
    - Repeated request triggers onboard from CPU to GPU
    - Responses are deterministic across the cycle
    """
    print_test_header("OFFLOAD AND ONBOARD TEST")

    # Use subset of Aeldora story for offload/onboard test
    prompt = AELDORA_STORY[:400]  # Use first ~400 chars for smaller cache footprint

    # Phase 1: Initial request triggers offload
    print_phase(1, "Initial request (expect offload to CPU)")
    print(f"Sending request: {prompt[:80]}...")

    response_1 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
    print(f"Response 1: {response_1}")

140
    metrics = check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    assert (
        metrics["kvbm_offload_blocks_d2h"] > 0
    ), "Phase 1: No blocks offloaded. KVBM may not be triggering offloads."
    assert (
        metrics["kvbm_onboard_blocks_h2d"] == 0
    ), f"Phase 1: Expected 0 onboarded blocks, got {metrics['kvbm_onboard_blocks_h2d']}"
    print(f"✓ Phase 1: {metrics['kvbm_offload_blocks_d2h']} blocks offloaded")

    # Phase 2: Reset GPU cache
    print_phase(2, "Clean up GPU cache")
    reset_cache(llm_server_kvbm.base_url)

    # Phase 3: Repeated request triggers onboard
    print_phase(3, "Re-send same request (expect onboard from CPU)")
    print(f"Sending same request: {prompt[:80]}...")

    response_2 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
    print(f"Response 2: {response_2}")

160
    metrics = check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
    assert (
        metrics["kvbm_onboard_blocks_h2d"] > 0
    ), "Phase 3: No blocks onboarded. Expected CPU→GPU transfer after cache reset."
    print(f"✓ Phase 3: {metrics['kvbm_onboard_blocks_h2d']} blocks onboarded from CPU")

    # Verify determinism
    print_test_header("DETERMINISM VERIFICATION")
    assert_deterministic(
        response_1,
        response_2,
        test_name="Offload/Onboard",
        label1="Initial response",
        label2="After cache reset",
    )

    print("\n=== TEST PASSED ===")


@pytest.mark.parametrize(
180
181
182
    "llm_server_kvbm",
    [{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
    indirect=True,
183
)
184
@pytest.mark.timeout(170)  # 4x measured (~42s), rounded up
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def test_gpu_cache_eviction(tester, llm_server_kvbm):  # noqa: F811
    """
    Test GPU cache eviction mechanics.

    Validates that:
    - Multiple requests fill GPU cache causing eviction
    - Evicted blocks can be retrieved from CPU cache via onboarding
    - Metrics correctly reflect offload and onboard operations
    """
    print_test_header("GPU CACHE EVICTION TEST")
    print(f"GPU blocks: {llm_server_kvbm.gpu_cache_blocks}")
    print(f"CPU blocks: {llm_server_kvbm.cpu_cache_blocks}")

    # Use full Aeldora story with variations for cache filling
    prompt_1 = AELDORA_STORY
    prompt_2 = (
        "Read the following entry from the ancient scrolls of Aeloria: " + AELDORA_STORY
    )

    # Phase 1: First request triggers offload
    print_phase(1, "Send first request")
    print(f"Prompt 1: {prompt_1[:80]}...")

    tester.make_request(prompt_1, max_tokens=MAX_TOKENS)

210
    metrics_p1 = check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    assert metrics_p1["kvbm_offload_blocks_d2h"] >= MIN_OFFLOAD_BLOCKS, (
        f"Phase 1: Expected >= {MIN_OFFLOAD_BLOCKS} blocks offloaded, "
        f"got {metrics_p1['kvbm_offload_blocks_d2h']}"
    )
    assert (
        metrics_p1["kvbm_onboard_blocks_h2d"] == 0
    ), f"Phase 1: Expected 0 onboarded, got {metrics_p1['kvbm_onboard_blocks_h2d']}"
    print(f"✓ Phase 1: {metrics_p1['kvbm_offload_blocks_d2h']} blocks offloaded")

    # Phase 2: Second request may evict first from GPU
    print_phase(2, "Send second request (may evict first from GPU)")
    print(f"Prompt 2: {prompt_2[:80]}...")

    tester.make_request(prompt_2, max_tokens=MAX_TOKENS)

226
    metrics_p2 = check_kvbm_metrics("Phase 2", llm_server_kvbm.metrics_port)
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    assert (
        metrics_p2["kvbm_offload_blocks_d2h"] > metrics_p1["kvbm_offload_blocks_d2h"]
    ), (
        f"Phase 2: Expected additional offloads, got {metrics_p2['kvbm_offload_blocks_d2h']} "
        f"(was {metrics_p1['kvbm_offload_blocks_d2h']})"
    )
    additional_offloads = (
        metrics_p2["kvbm_offload_blocks_d2h"] - metrics_p1["kvbm_offload_blocks_d2h"]
    )
    print(f"✓ Phase 2: {additional_offloads} additional blocks offloaded")

    # Phase 3: Re-request first prompt (should onboard from CPU)
    print_phase(3, "Re-request first prompt (verify onboarding)")
    print(f"Re-sending Prompt 1: {prompt_1[:80]}...")

    tester.make_request(prompt_1, max_tokens=MAX_TOKENS)

244
    metrics_p3 = check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
245
246
247
248
249
250
251
252
253
254
    assert (
        metrics_p3["kvbm_onboard_blocks_h2d"] > 0
    ), "Phase 3: No blocks onboarded. Expected CPU→GPU retrieval after eviction."
    print(f"✓ Phase 3: {metrics_p3['kvbm_onboard_blocks_h2d']} blocks onboarded")
    print("✓ Eviction mechanics verified: offload → eviction → onboard")

    print("\n=== TEST PASSED ===")


@pytest.mark.parametrize(
255
256
257
    "llm_server_kvbm",
    [{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
    indirect=True,
258
)
259
@pytest.mark.timeout(160)  # 4x measured (~39s), rounded up
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def test_onboarding_determinism(tester, llm_server_kvbm):  # noqa: F811
    """
    Test onboarding determinism under eviction scenario.

    Validates that:
    - Multiple onboarding cycles produce deterministic results
    - Responses are consistent when blocks are onboarded multiple times
    - Tests onboarded vs onboarded (not initial vs onboarded)
    """
    print_test_header("ONBOARDING DETERMINISM TEST")
    print(f"GPU blocks: {llm_server_kvbm.gpu_cache_blocks}")
    print(f"CPU blocks: {llm_server_kvbm.cpu_cache_blocks}")

    # Use full Aeldora story with variations
    prompt_1 = AELDORA_STORY
    prompt_2 = (
        "Read the following entry from the ancient scrolls of Aeloria: " + AELDORA_STORY
    )

    # Phase 1: First request triggers offload
    print_phase(1, "Send first request")
    print(f"Prompt 1: {prompt_1[:80]}...")
    tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
283
    check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
284
285
286
287
288

    # Phase 2: Second request (may evict first from GPU)
    print_phase(2, "Send second request (may evict first from GPU)")
    print(f"Prompt 2: {prompt_2[:80]}...")
    tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
289
    check_kvbm_metrics("Phase 2", llm_server_kvbm.metrics_port)
290
291
292
293
294
295

    # Phase 3: Re-request prompt 1 (first onboard cycle)
    print_phase(3, "Re-request Prompt 1 (first onboard cycle)")
    print(f"Re-sending Prompt 1: {prompt_1[:80]}...")
    response_1_first_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
    print(f"Response 1 (first onboard): {response_1_first_onboard}")
296
    check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
297
298
299
300
301
302

    # Phase 4: Re-request prompt 2 (first onboard cycle)
    print_phase(4, "Re-request Prompt 2 (first onboard cycle)")
    print(f"Re-sending Prompt 2: {prompt_2[:80]}...")
    response_2_first_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
    print(f"Response 2 (first onboard): {response_2_first_onboard}")
303
    check_kvbm_metrics("Phase 4", llm_server_kvbm.metrics_port)
304
305
306
307
308
309

    # Phase 5: Re-request prompt 1 (second onboard cycle)
    print_phase(5, "Re-request Prompt 1 (second onboard cycle)")
    print(f"Re-sending Prompt 1 (third time): {prompt_1[:80]}...")
    response_1_second_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
    print(f"Response 1 (second onboard): {response_1_second_onboard}")
310
    check_kvbm_metrics("Phase 5", llm_server_kvbm.metrics_port)
311
312
313
314
315
316

    # Phase 6: Re-request prompt 2 (second onboard cycle)
    print_phase(6, "Re-request Prompt 2 (second onboard cycle)")
    print(f"Re-sending Prompt 2 (third time): {prompt_2[:80]}...")
    response_2_second_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
    print(f"Response 2 (second onboard): {response_2_second_onboard}")
317
    check_kvbm_metrics("Phase 6", llm_server_kvbm.metrics_port)
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343

    # Verify determinism between onboarded requests
    print_test_header("DETERMINISM VERIFICATION")
    print("\nComparing Prompt 1: First onboard vs Second onboard")
    assert_deterministic(
        response_1_first_onboard,
        response_1_second_onboard,
        test_name="Prompt 1 onboarding determinism",
        label1="First onboard (Phase 3)",
        label2="Second onboard (Phase 5)",
    )

    print("\nComparing Prompt 2: First onboard vs Second onboard")
    assert_deterministic(
        response_2_first_onboard,
        response_2_second_onboard,
        test_name="Prompt 2 onboarding determinism",
        label1="First onboard (Phase 4)",
        label2="Second onboard (Phase 6)",
    )

    print("\n=== TEST PASSED ===")


if __name__ == "__main__":
    pytest.main([__file__, "-v", "-s"])