"csrc/quantization/w8a8/cutlass/scaled_mm_entry.cu" did not exist on "7a6d45bc8a201623c646627becd837afd6b35bc7"
test_consolidator_router_e2e.py 38.9 KB
Newer Older
1
#!/usr/bin/env python3
2
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
# SPDX-License-Identifier: Apache-2.0

"""
E2E test for KV Event Consolidator with Router integration.

This test validates that:
9
10
1. vLLM/TensorRT-LLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from engine and KVBM
11
12
13
14
15
3. The router receives and processes consolidated events without warnings

"""

import concurrent.futures
16
import importlib.util
17
18
19
20
21
22
23
24
import logging
import os
import re
import time
from pathlib import Path

import pytest
import requests
25
import yaml
26

Richard Huo's avatar
Richard Huo committed
27
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
28
29
from tests.utils.managed_process import ManagedProcess

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# Check if engines are available and build list of available engines
# Use find_spec first (fast check), then verify import works (functional check)
def _check_engine_available(module_name: str) -> bool:
    """Check if an engine module is available and importable."""
    if importlib.util.find_spec(module_name) is None:
        return False
    try:
        importlib.import_module(module_name)
        return True
    except ImportError:
        return False


HAS_VLLM = _check_engine_available("vllm")
HAS_TRTLLM = _check_engine_available("tensorrt_llm")

# Build list of available engines for parameterization
AVAILABLE_ENGINES = []
if HAS_VLLM:
    AVAILABLE_ENGINES.append("vllm")
if HAS_TRTLLM:
    AVAILABLE_ENGINES.append("trtllm")
53

54
55
56
57
58
59
# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.slow,
    pytest.mark.gpu_1,
60
    pytest.mark.pre_merge,
61
    pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), reason="requires vllm or trtllm"),
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
]

logger = logging.getLogger(__name__)

# Constants
FRONTEND_PORT = 8000


@pytest.fixture
def test_directory(request):
    """Create a test directory for logs and temporary files."""
    test_dir = Path(request.node.name)
    test_dir.mkdir(parents=True, exist_ok=True)
    yield test_dir
    # Cleanup handled by pytest (logs are kept for debugging)


79
80
def create_trtllm_config(test_directory: Path) -> Path:
    """Create TensorRT-LLM config YAML file with KVBM connector configuration."""
81
    config_path = Path(os.path.join(test_directory, "trtllm_config.yaml"))
82
83
84
85
86
87
88
    config = {
        "backend": "pytorch",
        "cuda_graph_config": None,
        "kv_cache_config": {
            "enable_partial_reuse": False,
            "free_gpu_memory_fraction": 0.01,
        },
89
        "max_seq_len": 4096,
90
91
92
93
94
95
96
97
98
99
100
        "kv_connector_config": {
            "connector_module": "kvbm.trtllm_integration.connector",
            "connector_scheduler_class": "DynamoKVBMConnectorLeader",
            "connector_worker_class": "DynamoKVBMConnectorWorker",
        },
    }
    with open(config_path, "w") as f:
        yaml.dump(config, f)
    return config_path


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def compute_deduplication_test_params(
    g1_gpu_blocks: int,
    g2_cpu_blocks: int,
    g3_disk_blocks: int,
    block_size: int = 16,
    gpu_utilization_per_request: float = 0.6,
    offload_overflow_factor: float = 100.0,
) -> dict:
    """
    Compute max_tokens and num_requests for the deduplication test.

    Given fixed block allocations for each storage tier, compute test parameters
    that ensure the test lands in the "Goldilocks zone" where:
    1. Each request fits within GPU cache (with safety margin)
    2. Total tokens generated overflow all tiers
    3. Deduplication behavior is observable (blocks exist in multiple sources)
    4. Blocks are eventually removed from ALL sources (triggering REMOVE events)

    The key insight is that blocks are replicated across GPU, CPU, and Disk
    simultaneously. To trigger "last source" removal, a block must be evicted
    from ALL three tiers. The bottleneck is the smallest offload cache (CPU or
    Disk), not total capacity.

    Args:
        g1_gpu_blocks: Number of GPU cache blocks (--num-gpu-blocks-override)
        g2_cpu_blocks: Number of CPU cache blocks (DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS)
        g3_disk_blocks: Number of disk cache blocks (DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS)
        block_size: Tokens per block (default 16 for vLLM)
        gpu_utilization_per_request: Fraction of GPU capacity to use per request (default 0.6)
            - Too high (>0.8): requests may cause immediate evictions
            - Too low (<0.3): too many requests needed, slow test
        offload_overflow_factor: How many times to overflow the smallest offload
            cache (default 100.0). High value needed because:
            - Blocks persist in offload caches longer than GPU
            - Offloading is asynchronous - not all blocks make it
            - Need enough volume to ensure blocks cycle through all tiers

    Returns:
        dict with:
            - max_tokens: Maximum tokens per request
            - num_requests: Number of requests to send
            - blocks_per_request: Estimated blocks consumed per request
            - total_capacity_blocks: Total blocks across all tiers
            - total_blocks_generated: Expected total blocks generated
            - min_offload_cache: The smallest offload cache (bottleneck)

    Example:
        >>> config = compute_deduplication_test_params(
        ...     g1_gpu_blocks=30,
        ...     g2_cpu_blocks=10,
        ...     g3_disk_blocks=10,
        ... )
        >>> config
        {
            'max_tokens': 288,
            'num_requests': 56,
            'blocks_per_request': 18,
            'total_capacity_blocks': 50,
            'total_blocks_generated': 1008,
            'min_offload_cache': 10,
        }
    """
    from math import ceil

    # 1. max_tokens: fits in GPU with margin
    # Use gpu_utilization to leave headroom for the model to process without
    # triggering evictions mid-generation
    max_blocks_per_request = max(1, int(g1_gpu_blocks * gpu_utilization_per_request))
    max_tokens = max_blocks_per_request * block_size

    # 2. num_requests: overflow the smallest offload cache many times
    # The bottleneck for "last source" removal is the smallest offload cache.
    # Blocks must be evicted from GPU AND both CPU and Disk.
    # Since blocks are replicated to all tiers, we need to cycle through
    # the smallest cache many times to ensure blocks get evicted from ALL sources.
    min_offload_cache = min(g2_cpu_blocks, g3_disk_blocks)
    total_capacity_blocks = g1_gpu_blocks + g2_cpu_blocks + g3_disk_blocks

    # Generate enough unique blocks to overflow the smallest offload cache
    # many times, ensuring blocks eventually get evicted from all sources
    unique_blocks_needed = int(min_offload_cache * offload_overflow_factor)
    num_requests = max(1, ceil(unique_blocks_needed / max_blocks_per_request))

    # Calculate expected total blocks generated
    total_blocks_generated = num_requests * max_blocks_per_request

    return {
        "max_tokens": max_tokens,
        "num_requests": num_requests,
        "blocks_per_request": max_blocks_per_request,
        "total_capacity_blocks": total_capacity_blocks,
        "total_blocks_generated": total_blocks_generated,
        "min_offload_cache": min_offload_cache,
    }


197
198
def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
    """Extract consolidator event statistics from engine logs."""
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
    stats = {
        "store_events": 0,
        "remove_events": 0,
        "dedup_events": 0,
        "published_events": 0,
        "consolidator_started": False,
        "consolidator_port": None,
    }

    if not log_path.exists():
        return stats

    try:
        with open(log_path, "r") as f:
            content = f.read()

            # Check if consolidator started
            # Actual log: "KV Event Consolidator fully started and ready"
            stats["consolidator_started"] = bool(
                re.search(r"KV Event Consolidator.*started", content, re.IGNORECASE)
            )

            # Extract consolidator output port from logs
            # Look for: "Starting KV Event Consolidator: subscribe from ..., publish to tcp://0.0.0.0:PORT"
            port_match = re.search(r"publish to tcp://[^:]+:(\d+)", content)
            if port_match:
                stats["consolidator_port"] = int(port_match.group(1))

            # Count event types (all at debug level)
            # Actual: "stored in first source ... will publish STORE event"
            stats["store_events"] = len(
                re.findall(r"will publish STORE event", content)
            )
            # Actual: "removed from last source ... will publish REMOVE event"
            stats["remove_events"] = len(
                re.findall(r"will publish REMOVE event", content)
            )
            # Actual: "DEDUP: Block ... added to source"
            stats["dedup_events"] = len(re.findall(r"DEDUP:.*added to source", content))
            # Actual: "Publishing N consolidated event(s) to router" or "Published batch with N event(s)"
            stats["published_events"] = len(
                re.findall(
                    r"Publish(?:ing|ed).*event.*to router", content, re.IGNORECASE
                )
            )
    except Exception as e:
        logger.warning(f"Error extracting consolidator stats: {e}")

    return stats


def wait_for_worker_registration(
    frontend_url: str, max_wait_seconds: int = 120, poll_interval: int = 2
) -> bool:
    """
    Poll frontend health endpoint until a worker registers.

    Args:
        frontend_url: Base URL of the frontend (e.g., "http://localhost:8000")
        max_wait_seconds: Maximum time to wait for registration
        poll_interval: Seconds between health checks

    Returns:
        True if worker registered, False if timeout
    """

    start_time = time.time()

    while time.time() - start_time < max_wait_seconds:
        try:
            response = requests.get(f"{frontend_url}/health", timeout=5)
            health_data = response.json()
            if health_data.get("instances"):
                elapsed = time.time() - start_time
273
                logger.info(f"Worker registered after {elapsed:.1f}s")
274
275
276
277
278
279
280
                return True
        except Exception as e:
            logger.debug(f"Health check failed: {e}")

        time.sleep(poll_interval)

    elapsed = time.time() - start_time
281
282
    logger.error(f"Worker failed to register after {elapsed:.1f}s")
    logger.error("Check worker logs for initialization errors")
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    return False


@pytest.fixture
def frontend_server(test_directory, runtime_services):
    """Start Dynamo frontend with embedded KV router."""
    logger.info("Starting Dynamo frontend with KV router")

    # Frontend command - includes embedded router
    command = [
        "python",
        "-m",
        "dynamo.frontend",
        "--http-port",
        str(FRONTEND_PORT),
        "--router-mode",
        "kv",
        "--router-reset-states",
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for frontend to avoid conflicts with vllm
315
    frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
316
317
318
319
320
321
322
323
324
325
    frontend_log_dir.mkdir(parents=True, exist_ok=True)

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
        timeout=120,  # Increased timeout for frontend+router initialization
        working_dir=str(test_directory),
        display_output=False,
326
        log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
327
    ) as frontend_process:
328
329
330
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(frontend_process._log_path)
        logger.info(f"Frontend started on port {FRONTEND_PORT}, log file: {log_file}")
331
332
333
334
335
336
337
338
339
340
341
342

        yield {
            "process": frontend_process,
            "port": FRONTEND_PORT,
            "base_url": f"http://localhost:{FRONTEND_PORT}",
            "log_file": log_file,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("Frontend server stopped")


343
344
345
346
347
348
@pytest.fixture(params=AVAILABLE_ENGINES)
def engine_type(request):
    """Parameterize test to run with available engines only."""
    return request.param


349
@pytest.fixture
350
351
def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
    """Start LLM worker (vLLM or TensorRT-LLM) with KVBM connector and KV Event Consolidator."""
352
    model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
353
    engine = engine_type
354

355
356
357
    logger.info(
        f"Starting {engine.upper()} worker with KVBM connector and model {model_id}"
    )
358

359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
    # Build command based on engine type
    if engine == "vllm":
        command = [
            "python",
            "-m",
            "dynamo.vllm",
            "--model",
            model_id,
            "--connector",
            "kvbm",
            "--enforce-eager",  # For faster startup in tests
        ]
    else:  # trtllm
        # Create TensorRT-LLM config file with KVBM connector
        config_path = create_trtllm_config(test_directory)
        command = [
            "python",
            "-m",
            "dynamo.trtllm",
            "--model-path",
            model_id,
            "--served-model-name",
            model_id,
            "--extra-engine-args",
            str(
                config_path.absolute()
            ),  # Use absolute path to avoid working directory issues
            "--publish-events-and-metrics",
        ]
388
389
390
391
392
393
394
395
396
397
398
399
400
401

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_KVBM_CPU_CACHE_GB": "5",
            "DYN_KVBM_DISK_CACHE_GB": "5",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

402
403
404
405
406
    # Set ZMQ port for TensorRT-LLM consolidator
    if engine == "trtllm":
        env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

    # Create separate log directory for worker to avoid conflicts with frontend
407
    worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
408
    worker_log_dir.mkdir(parents=True, exist_ok=True)
409
410
411
412
413
414
415
416
417

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[],  # Workers don't expose HTTP endpoints
        timeout=300,  # Increased timeout for model loading and consolidator init
        working_dir=str(test_directory),
        display_output=False,
418
        log_dir=str(worker_log_dir),  # Absolute path keeps logs in test directory
419
        terminate_existing=False,
420
    ) as worker_process:
421
422
423
424
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(worker_process._log_path)
        logger.info(f"Worker log file: {log_file}")

425
426
427
        logger.info(
            f"Waiting for {engine.upper()} worker and consolidator to initialize..."
        )
428
429
430
431
432
433
434
435
436
437
438

        # Wait for worker to register with frontend
        worker_registered = wait_for_worker_registration(frontend_server["base_url"])

        if not worker_registered:
            logger.warning("Continuing test despite worker registration failure")

        # Additional wait for consolidator to fully initialize
        time.sleep(5)

        # Verify consolidator started by checking logs
439
        stats = extract_consolidator_stats(log_file, engine)
440
441
442
443
444
445
        if not stats["consolidator_started"]:
            logger.warning("Consolidator may not have started - check logs")
        else:
            logger.info("Consolidator detected in logs")

        yield {
446
            "process": worker_process,
447
448
449
            "model_id": model_id,
            "log_file": log_file,
            "consolidator_stats": stats,
450
            "engine": engine,
451
452
453
        }

    # Cleanup happens automatically via context manager __exit__
454
    logger.info(f"{engine.upper()} worker stopped")
455
456
457


@pytest.fixture
458
def tester(frontend_server, llm_worker):
459
460
461
    """Provides a test client that sends requests to frontend."""
    return ApiTester(
        base_url=frontend_server["base_url"],
462
        model_id=llm_worker["model_id"],
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
    )


class TestConsolidatorRouterE2E:
    """E2E tests for KV Event Consolidator with Router."""

    SYSTEM_PROMPT = "You are a helpful AI assistant."

    # Common error patterns to check in logs
    ERROR_PATTERNS = [
        r"error.*block_size=0",
        r"Failed to parse block_hash",
        r"panic",
        r"fatal",
    ]

479
480
481
482
483
484
485
    def assert_no_errors_in_logs(
        self, worker_log: Path, frontend_log: Path, engine: str = "vllm"
    ):
        """Helper to check both worker and frontend logs for errors."""
        engine_name = engine.upper()
        worker_errors = check_logs_for_patterns(
            worker_log, self.ERROR_PATTERNS, f"{engine_name} Worker"
486
        )
487
488
489
        assert (
            not worker_errors
        ), f"Errors in {engine_name} Worker logs: {worker_errors}"
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521

        frontend_errors = check_logs_for_patterns(
            frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
        )
        assert not frontend_errors, f"Errors in Frontend/Router logs: {frontend_errors}"

    def send_concurrent_requests(
        self,
        tester: ApiTester,
        num_requests: int,
        max_tokens: int = 50,
        content_template: str = "Request {i}: Tell me about topic {i}",
    ):
        """Helper to send concurrent requests and return results."""

        def send_request(i: int):
            try:
                messages = [{"role": "user", "content": content_template.format(i=i)}]
                response = tester.send_chat_request(messages, max_tokens=max_tokens)
                return True, response
            except Exception as e:
                logger.error(f"Request {i} failed: {e}")
                return False, str(e)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request, i) for i in range(num_requests)]
            results = [f.result() for f in futures]

        successes = sum(1 for success, _ in results if success)
        logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
        return successes, results

522
    def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
523
524
525
526
527
528
        """
        Test basic consolidator flow:
        1. Send requests
        2. Verify consolidator starts and processes events
        3. Verify router receives events without errors
        """
529
530
        engine = llm_worker["engine"]
        logger.info(f"TEST: Basic Consolidator Flow ({engine.upper()})")
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548

        # Send 3 requests to frontend
        requests_data = [
            [{"role": "user", "content": "What is machine learning?"}],
            [{"role": "user", "content": "Explain neural networks."}],
            [{"role": "user", "content": "Tell me about transformers."}],
        ]

        for i, messages in enumerate(requests_data):
            response = tester.send_chat_request(messages)
            content = response["choices"][0]["message"]["content"]
            logger.info(
                f"Request {i+1}/3: {messages[0]['content'][:30]}... => {content[:40]}..."
            )

        # Wait for logs to flush
        time.sleep(5)

549
550
551
        # Check worker logs for consolidator
        worker_log = llm_worker["log_file"]
        consolidator_stats = extract_consolidator_stats(worker_log, engine)
552
553
554
555
556
557
558
559
560

        logger.info(f"Consolidator stats: {consolidator_stats}")

        # Assertions
        assert consolidator_stats["consolidator_started"], "Consolidator did not start"
        assert consolidator_stats["store_events"] > 0, "No store events processed"
        assert consolidator_stats["published_events"] > 0, "No events published"

        # Check for errors in logs
561
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
562

563
        logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
564
565

    def test_consolidator_handles_concurrent_requests(
566
        self, tester, llm_worker, frontend_server
567
568
569
570
571
572
573
    ):
        """
        Test consolidator under concurrent load:
        1. Send many requests quickly
        2. Verify no crashes or critical errors
        3. Verify all events processed
        """
574
575
        engine = llm_worker["engine"]
        logger.info(f"TEST: Concurrent Request Handling ({engine.upper()})")
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595

        # Send 10 concurrent requests
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester,
            num_requests,
            max_tokens=20,
            content_template="Request {i}: Count to 5",
        )

        # Wait for logs to flush
        time.sleep(5)

        # Assertions
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Check for errors in logs
        self.assert_no_errors_in_logs(
596
            llm_worker["log_file"], frontend_server["log_file"], engine
597
598
599
        )

        # Verify events were processed
600
        stats = extract_consolidator_stats(llm_worker["log_file"], engine)
601
602
        assert stats["store_events"] > 0, "No events processed during concurrent load"

603
        logger.info(f"Concurrent request handling test passed ({engine.upper()})")
604
605

    def test_store_deduplication_across_sources(
606
        self, tester, llm_worker, frontend_server
607
608
    ):
        """
609
        Test STORE event deduplication across engine (G1) and KVBM (G2/G3):
610
611

        When a block is stored in G1 (GPU), it's automatically offloaded
612
        to G2 (CPU) and G3 (Disk). This triggers STORE events from both engine and KVBM.
613
614

        Test Scenario:
615
616
        1. Send requests → blocks stored in engine (G1)
        2. Consolidator receives engine STORE events → queues them for publishing
617
618
619
620
621
622
623
624
        3. KVBM replicates blocks to G2/G3 → emits STORE events
        4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
        5. Result: Router receives ONE STORE event per unique block (from step 2)

        This verifies: Only one STORE event is sent to router per unique block,
        even though the block exists in multiple storage tiers (G1, G2, G3).
        KVBM replications are deduplicated and don't trigger duplicate router updates.
        """
625
626
        engine = llm_worker["engine"]
        logger.info(f"Starting STORE deduplication test ({engine.upper()})")
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642

        # Send requests to generate STORE events
        logger.info("Sending concurrent requests to generate STORE events")
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester, num_requests, max_tokens=50
        )
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Wait for events to be processed
        time.sleep(5)

        # Phase 2: Analyze consolidator logs
        logger.info("Phase 2: Analyzing STORE event deduplication")
643
644
        worker_log = llm_worker["log_file"]
        log_content = worker_log.read_text()
645

646
647
648
        # Count STORE events - order-agnostic approach
        # First source stores (will publish) - could be engine or KVBM depending on timing
        first_source_stores = len(
649
            re.findall(
650
                r"stored in first source \w+.*will publish STORE event", log_content
651
652
653
            )
        )

654
655
656
        # Second source stores (DEDUP) - could be engine or KVBM depending on timing
        # Pattern: "DEDUP: Block ... added to source X"
        dedup_stores = len(
657
            re.findall(
658
                r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source \w+", log_content
659
660
661
662
            )
        )

        # Count total STORE events received (from both sources)
663
        total_stores_received = first_source_stores + dedup_stores
664
665
666
667

        # Count STORE events actually published to router
        published_stores = len(re.findall(r"will publish STORE event", log_content))

668
669
670
671
        logger.info(
            f"STORE events from first source (will publish): {first_source_stores}"
        )
        logger.info(f"STORE events from second source (DEDUP): {dedup_stores}")
672
673
674
675
        logger.info(f"Total STORE events received: {total_stores_received}")
        logger.info(f"STORE events published to router: {published_stores}")

        # Assertions:
676
677
678
679
680
681
682
        # 1. We should receive STORE events from both sources (order doesn't matter)
        assert (
            first_source_stores > 0
        ), f"Expected STORE events from first source (could be {engine.upper()} or KVBM)"
        assert (
            dedup_stores > 0
        ), "Expected DEDUP STORE events from second source (proves deduplication working)"
683

684
685
        # 2. Published stores should equal first source stores
        #    (each unique block is published once when first stored, regardless of which source)
686
        assert (
687
688
            published_stores == first_source_stores
        ), f"Expected published events ({published_stores}) to equal first-source stores ({first_source_stores})"
689

690
        # 3. Total stores should be first source + second source (each block stored in both)
691
        assert (
692
693
            total_stores_received == first_source_stores + dedup_stores
        ), f"Total should be first-source ({first_source_stores}) + second-source ({dedup_stores})"
694
695

        # 4. Check for errors in logs
696
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
697

698
        logger.info(f"STORE deduplication test passed ({engine.upper()})")
699

700
    @pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
701
    def test_remove_deduplication_across_sources(
702
        self, test_directory, runtime_services, engine_type
703
704
    ):
        """
705
        Test REMOVE event deduplication across G1 (engine GPU), G2 (KVBM CPU), G3 (KVBM disk):
706
707
708
709
710
711
712

        When blocks are stored in G1 (GPU), they are AUTOMATICALLY
        replicated to G2 (CPU) and G3 (Disk) simultaneously.

        Test Scenario:
        1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
        2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
713
        3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from engine
714
715
716
717
718
719
           → consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
        4. Some blocks only exist in G1 (not replicated) → when evicted → published to router

        This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
        Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
        """
720
721
722

        engine = engine_type
        logger.info(f"Starting REMOVE deduplication test ({engine.upper()})")
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745

        # Start frontend with router
        frontend_command = [
            "python",
            "-m",
            "dynamo.frontend",
            "--http-port",
            str(FRONTEND_PORT),
            "--router-mode",
            "kv",
            "--router-reset-states",
        ]

        frontend_env = os.environ.copy()
        frontend_env.update(
            {
                "RUST_BACKTRACE": "1",
                "NATS_SERVER": "nats://localhost:4222",
                "ETCD_ENDPOINTS": "http://localhost:2379",
                "DYN_LOG": "debug",
            }
        )

746
        frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
747
748
749
750
751
752
753
754
755
        frontend_log_dir.mkdir(parents=True, exist_ok=True)

        with ManagedProcess(
            command=frontend_command,
            env=frontend_env,
            health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
            timeout=120,
            working_dir=str(test_directory),
            display_output=False,
756
            log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
757
        ) as _frontend_process:
758
759
            # Get actual log file path from ManagedProcess
            frontend_log = Path(_frontend_process._log_path)
760
761
            logger.info(f"Frontend started on port {FRONTEND_PORT}")

762
            # Start worker with constrained GPU blocks but larger KVBM blocks
763
764
            model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

765
766
767
768
769
770
771
772
773
774
775
776
            # Fixed cache tier sizes
            g1_gpu_blocks = 10  # Very small GPU cache to force evictions
            g2_cpu_blocks = 5  # Smaller than GPU but large enough to retain blocks
            g3_disk_blocks = 5  # Smaller than GPU but large enough to retain blocks

            # Compute optimal test parameters for this configuration
            test_params = compute_deduplication_test_params(
                g1_gpu_blocks=g1_gpu_blocks,
                g2_cpu_blocks=g2_cpu_blocks,
                g3_disk_blocks=g3_disk_blocks,
            )

777
778
779
780
781
782
783
784
785
786
787
788
789
            # Build command based on engine type
            if engine == "vllm":
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.vllm",
                    "--model",
                    model_id,
                    "--connector",
                    "kvbm",
                    "--enforce-eager",
                    "--enable-prefix-caching",
                    "--num-gpu-blocks-override",
790
                    str(g1_gpu_blocks),
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
                ]
            else:  # trtllm
                # Create TensorRT-LLM config file with KVBM connector
                # Use small GPU cache (0.01 = 1% of GPU memory) to force evictions
                # This ensures blocks will be evicted from GPU while still in KVBM
                # Small enough to trigger evictions but large enough to handle sequence requirements
                config_path = create_trtllm_config(test_directory)
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.trtllm",
                    "--model-path",
                    model_id,
                    "--served-model-name",
                    model_id,
                    "--extra-engine-args",
                    str(
                        config_path.absolute()
                    ),  # Use absolute path to avoid working directory issues
                    "--publish-events-and-metrics",
                ]

            worker_env = os.environ.copy()
            worker_env.update(
815
816
817
818
                {
                    "RUST_BACKTRACE": "1",
                    "NATS_SERVER": "nats://localhost:4222",
                    "ETCD_ENDPOINTS": "http://localhost:2379",
819
820
                    "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": str(g2_cpu_blocks),
                    "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": str(g3_disk_blocks),
821
822
823
824
                    "DYN_LOG": "debug",
                }
            )

825
826
827
828
            # Set ZMQ port for TensorRT-LLM consolidator
            if engine == "trtllm":
                worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

829
            worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
830
            worker_log_dir.mkdir(parents=True, exist_ok=True)
831
832

            with ManagedProcess(
833
834
                command=worker_command,
                env=worker_env,
835
836
837
838
                health_check_urls=[],
                timeout=300,
                working_dir=str(test_directory),
                display_output=False,
839
840
841
                log_dir=str(
                    worker_log_dir
                ),  # Absolute path keeps logs in test directory
842
                terminate_existing=False,
843
            ) as _worker_process:
844
845
                # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
                worker_log = Path(_worker_process._log_path)
846
                logger.info(f"Waiting for {engine.upper()} worker to initialize...")
847
848
849
850
851
852
853

                # Wait for worker to register with frontend
                worker_registered = wait_for_worker_registration(
                    f"http://localhost:{FRONTEND_PORT}"
                )

                if not worker_registered:
854
855
856
                    pytest.fail(
                        f"{engine.upper()} worker failed to register with frontend"
                    )
857
858
859
860
861
862
863
864
865
866
867

                # Additional wait for consolidator to fully initialize
                time.sleep(5)

                # Create tester
                tester = ApiTester(
                    base_url=f"http://localhost:{FRONTEND_PORT}", model_id=model_id
                )

                # Phase 1: Send requests to fill GPU cache
                logger.info("Phase 1: Filling GPU cache with diverse prompts")
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
                num_requests = test_params["num_requests"]
                max_tokens = test_params["max_tokens"]
                concurrency = 2  # Parallel requests to create cache pressure
                logger.info(
                    f"Computed test params: max_tokens={max_tokens}, "
                    f"num_requests={num_requests}, concurrency={concurrency}, "
                    f"blocks_per_request={test_params['blocks_per_request']}, "
                    f"total_capacity={test_params['total_capacity_blocks']} blocks, "
                    f"min_offload_cache={test_params['min_offload_cache']} blocks, "
                    f"total_blocks_generated={test_params['total_blocks_generated']}"
                )

                # Use parallel requests to create concurrent cache pressure
                # This forces the GPU cache to actually fill up (multiple requests
                # holding blocks simultaneously) rather than recycling blocks
                # between serial requests
                from concurrent.futures import ThreadPoolExecutor, as_completed

                def send_request(request_idx: int) -> tuple[int, bool]:
                    """Send a single request and return (index, success)."""
                    prompt = f"Tell me a unique story about topic {request_idx}. Make it very long and detailed with many paragraphs."
                    try:
                        response = tester.send_chat_request(
                            messages=[{"role": "user", "content": prompt}],
                            max_tokens=max_tokens,
                        )
                    except requests.RequestException:
                        logger.exception(f"Request {request_idx} failed")
                        return (request_idx, False)
                    else:
                        success = "content" in response["choices"][0]["message"]
                        return (request_idx, success)

                completed_count = 0
                failed_count = 0
                with ThreadPoolExecutor(max_workers=concurrency) as executor:
                    futures = {
                        executor.submit(send_request, i): i for i in range(num_requests)
                    }
                    for future in as_completed(futures):
                        _request_idx, success = future.result()
                        if success:
                            completed_count += 1
                        else:
                            failed_count += 1
                        if completed_count % 10 == 0 or completed_count == num_requests:
                            logger.info(
                                f"Progress: {completed_count}/{num_requests} completed"
                            )

                logger.info(
                    f"All requests finished: {completed_count} succeeded, {failed_count} failed"
                )
                assert failed_count == 0, f"{failed_count} requests failed"
922

923
924
925
926
927
928
929
930
931
                # Wait for requests to complete and blocks to be freed
                # With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
                # We need to wait long enough for requests to finish and blocks to be freed
                # For vLLM with FIFO, evictions happen immediately when cache fills.
                wait_time = 5 if engine == "trtllm" else 5
                logger.info(
                    f"Waiting {wait_time}s for requests to complete and blocks to be freed..."
                )
                time.sleep(wait_time)
932
933
934

                # Phase 2: Analyze consolidator logs
                logger.info("Phase 2: Analyzing consolidator deduplication behavior")
935
936
937
938
939
940
941
942
943
944
                log_content = worker_log.read_text()

                # Count blocks removed but still in another source (deduplication working!)
                # Order-agnostic: checks for any removal where block still exists in another source
                # Pattern: "removed from source X, still in Y source(s): [sources]"
                removes_but_still_in_other_source = len(
                    re.findall(
                        r"removed from source \w+, still in \d+ source\(s\)",
                        log_content,
                    )
945
946
                )

947
948
949
950
951
952
953
                # Count blocks removed from last source (will publish REMOVE)
                # Order-agnostic: checks for any removal from last source, regardless of which source
                removes_from_last_source = len(
                    re.findall(
                        r"removed from last source \w+.*will publish REMOVE event",
                        log_content,
                    )
954
955
956
957
958
959
960
961
                )

                # Count REMOVE events actually published to router
                published_removes = len(
                    re.findall(r"will publish REMOVE event", log_content)
                )

                logger.info(
962
                    f"Blocks removed but still in another source (deduplication working): {removes_but_still_in_other_source}"
963
964
                )
                logger.info(
965
                    f"Blocks removed from last source (will publish): {removes_from_last_source}"
966
967
968
969
                )
                logger.info(f"REMOVE events published to router: {published_removes}")

                # Assertions:
970
                # 1. We should see removals where blocks still exist in another source
971
                #    This proves deduplication is working (REMOVE not sent to router yet)
972
                #    Order doesn't matter - could be engine→KVBM or KVBM→engine
973
                assert (
974
975
                    removes_but_still_in_other_source > 0
                ), f"Expected removals where blocks still exist in another source (deduplication working) for {engine.upper()}"
976
977

                # 2. REMOVE events should be published for last-source removals
978
                #    Order doesn't matter - could be engine or KVBM as last source
979
980
981
982
                assert (
                    published_removes > 0
                ), "Expected REMOVE events to be published for last-source removals"

983
984
985
986
987
                # 3. Published removes should equal removes from last source
                assert (
                    published_removes == removes_from_last_source
                ), f"Expected published REMOVE events ({published_removes}) to equal last-source removals ({removes_from_last_source})"

988
                # 3. Check for errors in logs
989
                self.assert_no_errors_in_logs(worker_log, frontend_log, engine)