"requirements/test/cuda.txt" did not exist on "7c080dd3c5b794f31906aff92f4cb829cde4986a"
test_consolidator_router_e2e.py 38.7 KB
Newer Older
1
#!/usr/bin/env python3
2
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
# SPDX-License-Identifier: Apache-2.0

"""
E2E test for KV Event Consolidator with Router integration.

This test validates that:
9
10
1. vLLM/TensorRT-LLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from engine and KVBM
11
12
13
14
15
16
17
18
19
20
21
22
23
3. The router receives and processes consolidated events without warnings

"""

import concurrent.futures
import logging
import os
import re
import time
from pathlib import Path

import pytest
import requests
24
import yaml
25

Richard Huo's avatar
Richard Huo committed
26
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
27
28
from tests.utils.managed_process import ManagedProcess

29
# Check if engines are available and build list of available engines
30
from .common import check_module_available
31

32
33
HAS_VLLM = check_module_available("vllm")
HAS_TRTLLM = check_module_available("tensorrt_llm")
34
35
36
37
38
39
40

# Build list of available engines for parameterization
AVAILABLE_ENGINES = []
if HAS_VLLM:
    AVAILABLE_ENGINES.append("vllm")
if HAS_TRTLLM:
    AVAILABLE_ENGINES.append("trtllm")
41

42
43
44
45
46
47
# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.slow,
    pytest.mark.gpu_1,
48
    pytest.mark.pre_merge,
49
    pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), reason="requires vllm or trtllm"),
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
]

logger = logging.getLogger(__name__)

# Constants
FRONTEND_PORT = 8000


@pytest.fixture
def test_directory(request):
    """Create a test directory for logs and temporary files."""
    test_dir = Path(request.node.name)
    test_dir.mkdir(parents=True, exist_ok=True)
    yield test_dir
    # Cleanup handled by pytest (logs are kept for debugging)


67
68
def create_trtllm_config(test_directory: Path) -> Path:
    """Create TensorRT-LLM config YAML file with KVBM connector configuration."""
69
    config_path = Path(os.path.join(test_directory, "trtllm_config.yaml"))
70
71
72
73
74
75
76
    config = {
        "backend": "pytorch",
        "cuda_graph_config": None,
        "kv_cache_config": {
            "enable_partial_reuse": False,
            "free_gpu_memory_fraction": 0.01,
        },
77
        "max_seq_len": 4096,
78
79
80
81
82
83
84
85
86
87
88
        "kv_connector_config": {
            "connector_module": "kvbm.trtllm_integration.connector",
            "connector_scheduler_class": "DynamoKVBMConnectorLeader",
            "connector_worker_class": "DynamoKVBMConnectorWorker",
        },
    }
    with open(config_path, "w") as f:
        yaml.dump(config, f)
    return config_path


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def compute_deduplication_test_params(
    g1_gpu_blocks: int,
    g2_cpu_blocks: int,
    g3_disk_blocks: int,
    block_size: int = 16,
    gpu_utilization_per_request: float = 0.6,
    offload_overflow_factor: float = 100.0,
) -> dict:
    """
    Compute max_tokens and num_requests for the deduplication test.

    Given fixed block allocations for each storage tier, compute test parameters
    that ensure the test lands in the "Goldilocks zone" where:
    1. Each request fits within GPU cache (with safety margin)
    2. Total tokens generated overflow all tiers
    3. Deduplication behavior is observable (blocks exist in multiple sources)
    4. Blocks are eventually removed from ALL sources (triggering REMOVE events)

    The key insight is that blocks are replicated across GPU, CPU, and Disk
    simultaneously. To trigger "last source" removal, a block must be evicted
    from ALL three tiers. The bottleneck is the smallest offload cache (CPU or
    Disk), not total capacity.

    Args:
        g1_gpu_blocks: Number of GPU cache blocks (--num-gpu-blocks-override)
        g2_cpu_blocks: Number of CPU cache blocks (DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS)
        g3_disk_blocks: Number of disk cache blocks (DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS)
        block_size: Tokens per block (default 16 for vLLM)
        gpu_utilization_per_request: Fraction of GPU capacity to use per request (default 0.6)
            - Too high (>0.8): requests may cause immediate evictions
            - Too low (<0.3): too many requests needed, slow test
        offload_overflow_factor: How many times to overflow the smallest offload
            cache (default 100.0). High value needed because:
            - Blocks persist in offload caches longer than GPU
            - Offloading is asynchronous - not all blocks make it
            - Need enough volume to ensure blocks cycle through all tiers

    Returns:
        dict with:
            - max_tokens: Maximum tokens per request
            - num_requests: Number of requests to send
            - blocks_per_request: Estimated blocks consumed per request
            - total_capacity_blocks: Total blocks across all tiers
            - total_blocks_generated: Expected total blocks generated
            - min_offload_cache: The smallest offload cache (bottleneck)

    Example:
        >>> config = compute_deduplication_test_params(
        ...     g1_gpu_blocks=30,
        ...     g2_cpu_blocks=10,
        ...     g3_disk_blocks=10,
        ... )
        >>> config
        {
            'max_tokens': 288,
            'num_requests': 56,
            'blocks_per_request': 18,
            'total_capacity_blocks': 50,
            'total_blocks_generated': 1008,
            'min_offload_cache': 10,
        }
    """
    from math import ceil

    # 1. max_tokens: fits in GPU with margin
    # Use gpu_utilization to leave headroom for the model to process without
    # triggering evictions mid-generation
    max_blocks_per_request = max(1, int(g1_gpu_blocks * gpu_utilization_per_request))
    max_tokens = max_blocks_per_request * block_size

    # 2. num_requests: overflow the smallest offload cache many times
    # The bottleneck for "last source" removal is the smallest offload cache.
    # Blocks must be evicted from GPU AND both CPU and Disk.
    # Since blocks are replicated to all tiers, we need to cycle through
    # the smallest cache many times to ensure blocks get evicted from ALL sources.
    min_offload_cache = min(g2_cpu_blocks, g3_disk_blocks)
    total_capacity_blocks = g1_gpu_blocks + g2_cpu_blocks + g3_disk_blocks

    # Generate enough unique blocks to overflow the smallest offload cache
    # many times, ensuring blocks eventually get evicted from all sources
    unique_blocks_needed = int(min_offload_cache * offload_overflow_factor)
    num_requests = max(1, ceil(unique_blocks_needed / max_blocks_per_request))

    # Calculate expected total blocks generated
    total_blocks_generated = num_requests * max_blocks_per_request

    return {
        "max_tokens": max_tokens,
        "num_requests": num_requests,
        "blocks_per_request": max_blocks_per_request,
        "total_capacity_blocks": total_capacity_blocks,
        "total_blocks_generated": total_blocks_generated,
        "min_offload_cache": min_offload_cache,
    }


185
186
def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
    """Extract consolidator event statistics from engine logs."""
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
    stats = {
        "store_events": 0,
        "remove_events": 0,
        "dedup_events": 0,
        "published_events": 0,
        "consolidator_started": False,
        "consolidator_port": None,
    }

    if not log_path.exists():
        return stats

    try:
        with open(log_path, "r") as f:
            content = f.read()

            # Check if consolidator started
            # Actual log: "KV Event Consolidator fully started and ready"
            stats["consolidator_started"] = bool(
                re.search(r"KV Event Consolidator.*started", content, re.IGNORECASE)
            )

            # Extract consolidator output port from logs
            # Look for: "Starting KV Event Consolidator: subscribe from ..., publish to tcp://0.0.0.0:PORT"
            port_match = re.search(r"publish to tcp://[^:]+:(\d+)", content)
            if port_match:
                stats["consolidator_port"] = int(port_match.group(1))

            # Count event types (all at debug level)
            # Actual: "stored in first source ... will publish STORE event"
            stats["store_events"] = len(
                re.findall(r"will publish STORE event", content)
            )
            # Actual: "removed from last source ... will publish REMOVE event"
            stats["remove_events"] = len(
                re.findall(r"will publish REMOVE event", content)
            )
            # Actual: "DEDUP: Block ... added to source"
            stats["dedup_events"] = len(re.findall(r"DEDUP:.*added to source", content))
            # Actual: "Publishing N consolidated event(s) to router" or "Published batch with N event(s)"
            stats["published_events"] = len(
                re.findall(
                    r"Publish(?:ing|ed).*event.*to router", content, re.IGNORECASE
                )
            )
    except Exception as e:
        logger.warning(f"Error extracting consolidator stats: {e}")

    return stats


def wait_for_worker_registration(
    frontend_url: str, max_wait_seconds: int = 120, poll_interval: int = 2
) -> bool:
    """
    Poll frontend health endpoint until a worker registers.

    Args:
        frontend_url: Base URL of the frontend (e.g., "http://localhost:8000")
        max_wait_seconds: Maximum time to wait for registration
        poll_interval: Seconds between health checks

    Returns:
        True if worker registered, False if timeout
    """

    start_time = time.time()

    while time.time() - start_time < max_wait_seconds:
        try:
            response = requests.get(f"{frontend_url}/health", timeout=5)
            health_data = response.json()
            if health_data.get("instances"):
                elapsed = time.time() - start_time
261
                logger.info(f"Worker registered after {elapsed:.1f}s")
262
263
264
265
266
267
268
                return True
        except Exception as e:
            logger.debug(f"Health check failed: {e}")

        time.sleep(poll_interval)

    elapsed = time.time() - start_time
269
270
    logger.error(f"Worker failed to register after {elapsed:.1f}s")
    logger.error("Check worker logs for initialization errors")
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    return False


@pytest.fixture
def frontend_server(test_directory, runtime_services):
    """Start Dynamo frontend with embedded KV router."""
    logger.info("Starting Dynamo frontend with KV router")

    # Frontend command - includes embedded router
    command = [
        "python",
        "-m",
        "dynamo.frontend",
        "--http-port",
        str(FRONTEND_PORT),
        "--router-mode",
        "kv",
        "--router-reset-states",
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for frontend to avoid conflicts with vllm
303
    frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
304
305
306
307
308
309
310
311
312
313
    frontend_log_dir.mkdir(parents=True, exist_ok=True)

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
        timeout=120,  # Increased timeout for frontend+router initialization
        working_dir=str(test_directory),
        display_output=False,
314
        log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
315
        terminate_existing=False,  # Don't kill nats-server/etcd started by runtime_services
316
    ) as frontend_process:
317
318
319
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(frontend_process._log_path)
        logger.info(f"Frontend started on port {FRONTEND_PORT}, log file: {log_file}")
320
321
322
323
324
325
326
327
328
329
330
331

        yield {
            "process": frontend_process,
            "port": FRONTEND_PORT,
            "base_url": f"http://localhost:{FRONTEND_PORT}",
            "log_file": log_file,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("Frontend server stopped")


332
333
334
335
336
337
@pytest.fixture(params=AVAILABLE_ENGINES)
def engine_type(request):
    """Parameterize test to run with available engines only."""
    return request.param


338
@pytest.fixture
339
340
def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
    """Start LLM worker (vLLM or TensorRT-LLM) with KVBM connector and KV Event Consolidator."""
341
    model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
342
    engine = engine_type
343

344
345
346
    logger.info(
        f"Starting {engine.upper()} worker with KVBM connector and model {model_id}"
    )
347

348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
    # Build command based on engine type
    if engine == "vllm":
        command = [
            "python",
            "-m",
            "dynamo.vllm",
            "--model",
            model_id,
            "--connector",
            "kvbm",
            "--enforce-eager",  # For faster startup in tests
        ]
    else:  # trtllm
        # Create TensorRT-LLM config file with KVBM connector
        config_path = create_trtllm_config(test_directory)
        command = [
            "python",
            "-m",
            "dynamo.trtllm",
            "--model-path",
            model_id,
            "--served-model-name",
            model_id,
            "--extra-engine-args",
            str(
                config_path.absolute()
            ),  # Use absolute path to avoid working directory issues
            "--publish-events-and-metrics",
        ]
377
378
379
380
381
382
383
384
385
386
387
388
389
390

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_KVBM_CPU_CACHE_GB": "5",
            "DYN_KVBM_DISK_CACHE_GB": "5",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

391
392
393
394
395
    # Set ZMQ port for TensorRT-LLM consolidator
    if engine == "trtllm":
        env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

    # Create separate log directory for worker to avoid conflicts with frontend
396
    worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
397
    worker_log_dir.mkdir(parents=True, exist_ok=True)
398
399
400
401
402
403
404
405
406

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[],  # Workers don't expose HTTP endpoints
        timeout=300,  # Increased timeout for model loading and consolidator init
        working_dir=str(test_directory),
        display_output=False,
407
        log_dir=str(worker_log_dir),  # Absolute path keeps logs in test directory
408
        terminate_existing=False,
409
    ) as worker_process:
410
411
412
413
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(worker_process._log_path)
        logger.info(f"Worker log file: {log_file}")

414
415
416
        logger.info(
            f"Waiting for {engine.upper()} worker and consolidator to initialize..."
        )
417
418
419
420
421
422
423
424
425
426
427

        # Wait for worker to register with frontend
        worker_registered = wait_for_worker_registration(frontend_server["base_url"])

        if not worker_registered:
            logger.warning("Continuing test despite worker registration failure")

        # Additional wait for consolidator to fully initialize
        time.sleep(5)

        # Verify consolidator started by checking logs
428
        stats = extract_consolidator_stats(log_file, engine)
429
430
431
432
433
434
        if not stats["consolidator_started"]:
            logger.warning("Consolidator may not have started - check logs")
        else:
            logger.info("Consolidator detected in logs")

        yield {
435
            "process": worker_process,
436
437
438
            "model_id": model_id,
            "log_file": log_file,
            "consolidator_stats": stats,
439
            "engine": engine,
440
441
442
        }

    # Cleanup happens automatically via context manager __exit__
443
    logger.info(f"{engine.upper()} worker stopped")
444
445
446


@pytest.fixture
447
def tester(frontend_server, llm_worker):
448
449
450
    """Provides a test client that sends requests to frontend."""
    return ApiTester(
        base_url=frontend_server["base_url"],
451
        model_id=llm_worker["model_id"],
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
    )


class TestConsolidatorRouterE2E:
    """E2E tests for KV Event Consolidator with Router."""

    SYSTEM_PROMPT = "You are a helpful AI assistant."

    # Common error patterns to check in logs
    ERROR_PATTERNS = [
        r"error.*block_size=0",
        r"Failed to parse block_hash",
        r"panic",
        r"fatal",
    ]

468
469
470
471
472
473
474
    def assert_no_errors_in_logs(
        self, worker_log: Path, frontend_log: Path, engine: str = "vllm"
    ):
        """Helper to check both worker and frontend logs for errors."""
        engine_name = engine.upper()
        worker_errors = check_logs_for_patterns(
            worker_log, self.ERROR_PATTERNS, f"{engine_name} Worker"
475
        )
476
477
478
        assert (
            not worker_errors
        ), f"Errors in {engine_name} Worker logs: {worker_errors}"
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510

        frontend_errors = check_logs_for_patterns(
            frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
        )
        assert not frontend_errors, f"Errors in Frontend/Router logs: {frontend_errors}"

    def send_concurrent_requests(
        self,
        tester: ApiTester,
        num_requests: int,
        max_tokens: int = 50,
        content_template: str = "Request {i}: Tell me about topic {i}",
    ):
        """Helper to send concurrent requests and return results."""

        def send_request(i: int):
            try:
                messages = [{"role": "user", "content": content_template.format(i=i)}]
                response = tester.send_chat_request(messages, max_tokens=max_tokens)
                return True, response
            except Exception as e:
                logger.error(f"Request {i} failed: {e}")
                return False, str(e)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request, i) for i in range(num_requests)]
            results = [f.result() for f in futures]

        successes = sum(1 for success, _ in results if success)
        logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
        return successes, results

511
    def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
512
513
514
515
516
517
        """
        Test basic consolidator flow:
        1. Send requests
        2. Verify consolidator starts and processes events
        3. Verify router receives events without errors
        """
518
519
        engine = llm_worker["engine"]
        logger.info(f"TEST: Basic Consolidator Flow ({engine.upper()})")
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537

        # Send 3 requests to frontend
        requests_data = [
            [{"role": "user", "content": "What is machine learning?"}],
            [{"role": "user", "content": "Explain neural networks."}],
            [{"role": "user", "content": "Tell me about transformers."}],
        ]

        for i, messages in enumerate(requests_data):
            response = tester.send_chat_request(messages)
            content = response["choices"][0]["message"]["content"]
            logger.info(
                f"Request {i+1}/3: {messages[0]['content'][:30]}... => {content[:40]}..."
            )

        # Wait for logs to flush
        time.sleep(5)

538
539
540
        # Check worker logs for consolidator
        worker_log = llm_worker["log_file"]
        consolidator_stats = extract_consolidator_stats(worker_log, engine)
541
542
543
544
545
546
547
548
549

        logger.info(f"Consolidator stats: {consolidator_stats}")

        # Assertions
        assert consolidator_stats["consolidator_started"], "Consolidator did not start"
        assert consolidator_stats["store_events"] > 0, "No store events processed"
        assert consolidator_stats["published_events"] > 0, "No events published"

        # Check for errors in logs
550
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
551

552
        logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
553
554

    def test_consolidator_handles_concurrent_requests(
555
        self, tester, llm_worker, frontend_server
556
557
558
559
560
561
562
    ):
        """
        Test consolidator under concurrent load:
        1. Send many requests quickly
        2. Verify no crashes or critical errors
        3. Verify all events processed
        """
563
564
        engine = llm_worker["engine"]
        logger.info(f"TEST: Concurrent Request Handling ({engine.upper()})")
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584

        # Send 10 concurrent requests
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester,
            num_requests,
            max_tokens=20,
            content_template="Request {i}: Count to 5",
        )

        # Wait for logs to flush
        time.sleep(5)

        # Assertions
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Check for errors in logs
        self.assert_no_errors_in_logs(
585
            llm_worker["log_file"], frontend_server["log_file"], engine
586
587
588
        )

        # Verify events were processed
589
        stats = extract_consolidator_stats(llm_worker["log_file"], engine)
590
591
        assert stats["store_events"] > 0, "No events processed during concurrent load"

592
        logger.info(f"Concurrent request handling test passed ({engine.upper()})")
593
594

    def test_store_deduplication_across_sources(
595
        self, tester, llm_worker, frontend_server
596
597
    ):
        """
598
        Test STORE event deduplication across engine (G1) and KVBM (G2/G3):
599
600

        When a block is stored in G1 (GPU), it's automatically offloaded
601
        to G2 (CPU) and G3 (Disk). This triggers STORE events from both engine and KVBM.
602
603

        Test Scenario:
604
605
        1. Send requests → blocks stored in engine (G1)
        2. Consolidator receives engine STORE events → queues them for publishing
606
607
608
609
610
611
612
613
        3. KVBM replicates blocks to G2/G3 → emits STORE events
        4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
        5. Result: Router receives ONE STORE event per unique block (from step 2)

        This verifies: Only one STORE event is sent to router per unique block,
        even though the block exists in multiple storage tiers (G1, G2, G3).
        KVBM replications are deduplicated and don't trigger duplicate router updates.
        """
614
615
        engine = llm_worker["engine"]
        logger.info(f"Starting STORE deduplication test ({engine.upper()})")
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631

        # Send requests to generate STORE events
        logger.info("Sending concurrent requests to generate STORE events")
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester, num_requests, max_tokens=50
        )
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Wait for events to be processed
        time.sleep(5)

        # Phase 2: Analyze consolidator logs
        logger.info("Phase 2: Analyzing STORE event deduplication")
632
633
        worker_log = llm_worker["log_file"]
        log_content = worker_log.read_text()
634

635
636
637
        # Count STORE events - order-agnostic approach
        # First source stores (will publish) - could be engine or KVBM depending on timing
        first_source_stores = len(
638
            re.findall(
639
                r"stored in first source \w+.*will publish STORE event", log_content
640
641
642
            )
        )

643
644
645
        # Second source stores (DEDUP) - could be engine or KVBM depending on timing
        # Pattern: "DEDUP: Block ... added to source X"
        dedup_stores = len(
646
            re.findall(
647
                r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source \w+", log_content
648
649
650
651
            )
        )

        # Count total STORE events received (from both sources)
652
        total_stores_received = first_source_stores + dedup_stores
653
654
655
656

        # Count STORE events actually published to router
        published_stores = len(re.findall(r"will publish STORE event", log_content))

657
658
659
660
        logger.info(
            f"STORE events from first source (will publish): {first_source_stores}"
        )
        logger.info(f"STORE events from second source (DEDUP): {dedup_stores}")
661
662
663
664
        logger.info(f"Total STORE events received: {total_stores_received}")
        logger.info(f"STORE events published to router: {published_stores}")

        # Assertions:
665
666
667
668
669
670
671
        # 1. We should receive STORE events from both sources (order doesn't matter)
        assert (
            first_source_stores > 0
        ), f"Expected STORE events from first source (could be {engine.upper()} or KVBM)"
        assert (
            dedup_stores > 0
        ), "Expected DEDUP STORE events from second source (proves deduplication working)"
672

673
674
        # 2. Published stores should equal first source stores
        #    (each unique block is published once when first stored, regardless of which source)
675
        assert (
676
677
            published_stores == first_source_stores
        ), f"Expected published events ({published_stores}) to equal first-source stores ({first_source_stores})"
678

679
        # 3. Total stores should be first source + second source (each block stored in both)
680
        assert (
681
682
            total_stores_received == first_source_stores + dedup_stores
        ), f"Total should be first-source ({first_source_stores}) + second-source ({dedup_stores})"
683
684

        # 4. Check for errors in logs
685
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
686

687
        logger.info(f"STORE deduplication test passed ({engine.upper()})")
688

689
    @pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
690
    def test_remove_deduplication_across_sources(
691
        self, test_directory, runtime_services, engine_type
692
693
    ):
        """
694
        Test REMOVE event deduplication across G1 (engine GPU), G2 (KVBM CPU), G3 (KVBM disk):
695
696
697
698
699
700
701

        When blocks are stored in G1 (GPU), they are AUTOMATICALLY
        replicated to G2 (CPU) and G3 (Disk) simultaneously.

        Test Scenario:
        1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
        2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
702
        3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from engine
703
704
705
706
707
708
           → consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
        4. Some blocks only exist in G1 (not replicated) → when evicted → published to router

        This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
        Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
        """
709
710
711

        engine = engine_type
        logger.info(f"Starting REMOVE deduplication test ({engine.upper()})")
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734

        # Start frontend with router
        frontend_command = [
            "python",
            "-m",
            "dynamo.frontend",
            "--http-port",
            str(FRONTEND_PORT),
            "--router-mode",
            "kv",
            "--router-reset-states",
        ]

        frontend_env = os.environ.copy()
        frontend_env.update(
            {
                "RUST_BACKTRACE": "1",
                "NATS_SERVER": "nats://localhost:4222",
                "ETCD_ENDPOINTS": "http://localhost:2379",
                "DYN_LOG": "debug",
            }
        )

735
        frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
736
737
738
739
740
741
742
743
744
        frontend_log_dir.mkdir(parents=True, exist_ok=True)

        with ManagedProcess(
            command=frontend_command,
            env=frontend_env,
            health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
            timeout=120,
            working_dir=str(test_directory),
            display_output=False,
745
            log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
746
            terminate_existing=False,  # Don't kill nats-server/etcd started by runtime_services
747
        ) as _frontend_process:
748
749
            # Get actual log file path from ManagedProcess
            frontend_log = Path(_frontend_process._log_path)
750
751
            logger.info(f"Frontend started on port {FRONTEND_PORT}")

752
            # Start worker with constrained GPU blocks but larger KVBM blocks
753
754
            model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

755
756
757
758
759
760
761
762
763
764
765
766
            # Fixed cache tier sizes
            g1_gpu_blocks = 10  # Very small GPU cache to force evictions
            g2_cpu_blocks = 5  # Smaller than GPU but large enough to retain blocks
            g3_disk_blocks = 5  # Smaller than GPU but large enough to retain blocks

            # Compute optimal test parameters for this configuration
            test_params = compute_deduplication_test_params(
                g1_gpu_blocks=g1_gpu_blocks,
                g2_cpu_blocks=g2_cpu_blocks,
                g3_disk_blocks=g3_disk_blocks,
            )

767
768
769
770
771
772
773
774
775
776
777
778
779
            # Build command based on engine type
            if engine == "vllm":
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.vllm",
                    "--model",
                    model_id,
                    "--connector",
                    "kvbm",
                    "--enforce-eager",
                    "--enable-prefix-caching",
                    "--num-gpu-blocks-override",
780
                    str(g1_gpu_blocks),
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
                ]
            else:  # trtllm
                # Create TensorRT-LLM config file with KVBM connector
                # Use small GPU cache (0.01 = 1% of GPU memory) to force evictions
                # This ensures blocks will be evicted from GPU while still in KVBM
                # Small enough to trigger evictions but large enough to handle sequence requirements
                config_path = create_trtllm_config(test_directory)
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.trtllm",
                    "--model-path",
                    model_id,
                    "--served-model-name",
                    model_id,
                    "--extra-engine-args",
                    str(
                        config_path.absolute()
                    ),  # Use absolute path to avoid working directory issues
                    "--publish-events-and-metrics",
                ]

            worker_env = os.environ.copy()
            worker_env.update(
805
806
807
808
                {
                    "RUST_BACKTRACE": "1",
                    "NATS_SERVER": "nats://localhost:4222",
                    "ETCD_ENDPOINTS": "http://localhost:2379",
809
810
                    "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": str(g2_cpu_blocks),
                    "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": str(g3_disk_blocks),
811
812
813
814
                    "DYN_LOG": "debug",
                }
            )

815
816
817
818
            # Set ZMQ port for TensorRT-LLM consolidator
            if engine == "trtllm":
                worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

819
            worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
820
            worker_log_dir.mkdir(parents=True, exist_ok=True)
821
822

            with ManagedProcess(
823
824
                command=worker_command,
                env=worker_env,
825
826
827
828
                health_check_urls=[],
                timeout=300,
                working_dir=str(test_directory),
                display_output=False,
829
830
831
                log_dir=str(
                    worker_log_dir
                ),  # Absolute path keeps logs in test directory
832
                terminate_existing=False,
833
            ) as _worker_process:
834
835
                # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
                worker_log = Path(_worker_process._log_path)
836
                logger.info(f"Waiting for {engine.upper()} worker to initialize...")
837
838
839
840
841
842
843

                # Wait for worker to register with frontend
                worker_registered = wait_for_worker_registration(
                    f"http://localhost:{FRONTEND_PORT}"
                )

                if not worker_registered:
844
845
846
                    pytest.fail(
                        f"{engine.upper()} worker failed to register with frontend"
                    )
847
848
849
850
851
852
853
854
855
856
857

                # Additional wait for consolidator to fully initialize
                time.sleep(5)

                # Create tester
                tester = ApiTester(
                    base_url=f"http://localhost:{FRONTEND_PORT}", model_id=model_id
                )

                # Phase 1: Send requests to fill GPU cache
                logger.info("Phase 1: Filling GPU cache with diverse prompts")
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
                num_requests = test_params["num_requests"]
                max_tokens = test_params["max_tokens"]
                concurrency = 2  # Parallel requests to create cache pressure
                logger.info(
                    f"Computed test params: max_tokens={max_tokens}, "
                    f"num_requests={num_requests}, concurrency={concurrency}, "
                    f"blocks_per_request={test_params['blocks_per_request']}, "
                    f"total_capacity={test_params['total_capacity_blocks']} blocks, "
                    f"min_offload_cache={test_params['min_offload_cache']} blocks, "
                    f"total_blocks_generated={test_params['total_blocks_generated']}"
                )

                # Use parallel requests to create concurrent cache pressure
                # This forces the GPU cache to actually fill up (multiple requests
                # holding blocks simultaneously) rather than recycling blocks
                # between serial requests
                from concurrent.futures import ThreadPoolExecutor, as_completed

                def send_request(request_idx: int) -> tuple[int, bool]:
                    """Send a single request and return (index, success)."""
                    prompt = f"Tell me a unique story about topic {request_idx}. Make it very long and detailed with many paragraphs."
                    try:
                        response = tester.send_chat_request(
                            messages=[{"role": "user", "content": prompt}],
                            max_tokens=max_tokens,
                        )
                    except requests.RequestException:
                        logger.exception(f"Request {request_idx} failed")
                        return (request_idx, False)
                    else:
                        success = "content" in response["choices"][0]["message"]
                        return (request_idx, success)

                completed_count = 0
                failed_count = 0
                with ThreadPoolExecutor(max_workers=concurrency) as executor:
                    futures = {
                        executor.submit(send_request, i): i for i in range(num_requests)
                    }
                    for future in as_completed(futures):
                        _request_idx, success = future.result()
                        if success:
                            completed_count += 1
                        else:
                            failed_count += 1
                        if completed_count % 10 == 0 or completed_count == num_requests:
                            logger.info(
                                f"Progress: {completed_count}/{num_requests} completed"
                            )

                logger.info(
                    f"All requests finished: {completed_count} succeeded, {failed_count} failed"
                )
                assert failed_count == 0, f"{failed_count} requests failed"
912

913
914
915
916
917
918
919
920
921
                # Wait for requests to complete and blocks to be freed
                # With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
                # We need to wait long enough for requests to finish and blocks to be freed
                # For vLLM with FIFO, evictions happen immediately when cache fills.
                wait_time = 5 if engine == "trtllm" else 5
                logger.info(
                    f"Waiting {wait_time}s for requests to complete and blocks to be freed..."
                )
                time.sleep(wait_time)
922
923
924

                # Phase 2: Analyze consolidator logs
                logger.info("Phase 2: Analyzing consolidator deduplication behavior")
925
926
927
928
929
930
931
932
933
934
                log_content = worker_log.read_text()

                # Count blocks removed but still in another source (deduplication working!)
                # Order-agnostic: checks for any removal where block still exists in another source
                # Pattern: "removed from source X, still in Y source(s): [sources]"
                removes_but_still_in_other_source = len(
                    re.findall(
                        r"removed from source \w+, still in \d+ source\(s\)",
                        log_content,
                    )
935
936
                )

937
938
939
940
941
942
943
                # Count blocks removed from last source (will publish REMOVE)
                # Order-agnostic: checks for any removal from last source, regardless of which source
                removes_from_last_source = len(
                    re.findall(
                        r"removed from last source \w+.*will publish REMOVE event",
                        log_content,
                    )
944
945
946
947
948
949
950
951
                )

                # Count REMOVE events actually published to router
                published_removes = len(
                    re.findall(r"will publish REMOVE event", log_content)
                )

                logger.info(
952
                    f"Blocks removed but still in another source (deduplication working): {removes_but_still_in_other_source}"
953
954
                )
                logger.info(
955
                    f"Blocks removed from last source (will publish): {removes_from_last_source}"
956
957
958
959
                )
                logger.info(f"REMOVE events published to router: {published_removes}")

                # Assertions:
960
                # 1. We should see removals where blocks still exist in another source
961
                #    This proves deduplication is working (REMOVE not sent to router yet)
962
                #    Order doesn't matter - could be engine→KVBM or KVBM→engine
963
                assert (
964
965
                    removes_but_still_in_other_source > 0
                ), f"Expected removals where blocks still exist in another source (deduplication working) for {engine.upper()}"
966
967

                # 2. REMOVE events should be published for last-source removals
968
                #    Order doesn't matter - could be engine or KVBM as last source
969
970
971
972
                assert (
                    published_removes > 0
                ), "Expected REMOVE events to be published for last-source removals"

973
974
975
976
977
                # 3. Published removes should equal removes from last source
                assert (
                    published_removes == removes_from_last_source
                ), f"Expected published REMOVE events ({published_removes}) to equal last-source removals ({removes_from_last_source})"

978
                # 3. Check for errors in logs
979
                self.assert_no_errors_in_logs(worker_log, frontend_log, engine)