test_consolidator_router_e2e.py 39.1 KB
Newer Older
1
#!/usr/bin/env python3
2
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
# SPDX-License-Identifier: Apache-2.0

"""
E2E test for KV Event Consolidator with Router integration.

This test validates that:
9
10
1. vLLM/TensorRT-LLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from engine and KVBM
11
12
13
14
15
16
17
18
19
20
21
22
23
3. The router receives and processes consolidated events without warnings

"""

import concurrent.futures
import logging
import os
import re
import time
from pathlib import Path

import pytest
import requests
24
import yaml
25

Richard Huo's avatar
Richard Huo committed
26
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
27
from tests.utils.managed_process import ManagedProcess
28
from tests.utils.test_output import resolve_test_output_path
29

30
# Check if engines are available and build list of available engines
31
from .common import check_module_available
32

33
34
HAS_VLLM = check_module_available("vllm")
HAS_TRTLLM = check_module_available("tensorrt_llm")
35
36
37
38
39
40
41

# Build list of available engines for parameterization
AVAILABLE_ENGINES = []
if HAS_VLLM:
    AVAILABLE_ENGINES.append("vllm")
if HAS_TRTLLM:
    AVAILABLE_ENGINES.append("trtllm")
42

43
44
45
46
47
48
# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.slow,
    pytest.mark.gpu_1,
49
    pytest.mark.pre_merge,
50
    pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), reason="requires vllm or trtllm"),
51
52
53
54
55
56
57
58
59
60
61
]

logger = logging.getLogger(__name__)

# Constants
FRONTEND_PORT = 8000


@pytest.fixture
def test_directory(request):
    """Create a test directory for logs and temporary files."""
62
    test_dir = Path(resolve_test_output_path(request.node.name))
63
64
65
66
67
    test_dir.mkdir(parents=True, exist_ok=True)
    yield test_dir
    # Cleanup handled by pytest (logs are kept for debugging)


68
69
def create_trtllm_config(test_directory: Path) -> Path:
    """Create TensorRT-LLM config YAML file with KVBM connector configuration."""
70
    config_path = Path(os.path.join(test_directory, "trtllm_config.yaml"))
71
72
73
74
75
76
77
    config = {
        "backend": "pytorch",
        "cuda_graph_config": None,
        "kv_cache_config": {
            "enable_partial_reuse": False,
            "free_gpu_memory_fraction": 0.01,
        },
78
        "max_seq_len": 4096,
79
80
81
82
83
84
85
86
87
88
89
        "kv_connector_config": {
            "connector_module": "kvbm.trtllm_integration.connector",
            "connector_scheduler_class": "DynamoKVBMConnectorLeader",
            "connector_worker_class": "DynamoKVBMConnectorWorker",
        },
    }
    with open(config_path, "w") as f:
        yaml.dump(config, f)
    return config_path


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def compute_deduplication_test_params(
    g1_gpu_blocks: int,
    g2_cpu_blocks: int,
    g3_disk_blocks: int,
    block_size: int = 16,
    gpu_utilization_per_request: float = 0.6,
    offload_overflow_factor: float = 100.0,
) -> dict:
    """
    Compute max_tokens and num_requests for the deduplication test.

    Given fixed block allocations for each storage tier, compute test parameters
    that ensure the test lands in the "Goldilocks zone" where:
    1. Each request fits within GPU cache (with safety margin)
    2. Total tokens generated overflow all tiers
    3. Deduplication behavior is observable (blocks exist in multiple sources)
    4. Blocks are eventually removed from ALL sources (triggering REMOVE events)

    The key insight is that blocks are replicated across GPU, CPU, and Disk
    simultaneously. To trigger "last source" removal, a block must be evicted
    from ALL three tiers. The bottleneck is the smallest offload cache (CPU or
    Disk), not total capacity.

    Args:
        g1_gpu_blocks: Number of GPU cache blocks (--num-gpu-blocks-override)
        g2_cpu_blocks: Number of CPU cache blocks (DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS)
        g3_disk_blocks: Number of disk cache blocks (DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS)
        block_size: Tokens per block (default 16 for vLLM)
        gpu_utilization_per_request: Fraction of GPU capacity to use per request (default 0.6)
            - Too high (>0.8): requests may cause immediate evictions
            - Too low (<0.3): too many requests needed, slow test
        offload_overflow_factor: How many times to overflow the smallest offload
            cache (default 100.0). High value needed because:
            - Blocks persist in offload caches longer than GPU
            - Offloading is asynchronous - not all blocks make it
            - Need enough volume to ensure blocks cycle through all tiers

    Returns:
        dict with:
            - max_tokens: Maximum tokens per request
            - num_requests: Number of requests to send
            - blocks_per_request: Estimated blocks consumed per request
            - total_capacity_blocks: Total blocks across all tiers
            - total_blocks_generated: Expected total blocks generated
            - min_offload_cache: The smallest offload cache (bottleneck)

    Example:
        >>> config = compute_deduplication_test_params(
        ...     g1_gpu_blocks=30,
        ...     g2_cpu_blocks=10,
        ...     g3_disk_blocks=10,
        ... )
        >>> config
        {
            'max_tokens': 288,
            'num_requests': 56,
            'blocks_per_request': 18,
            'total_capacity_blocks': 50,
            'total_blocks_generated': 1008,
            'min_offload_cache': 10,
        }
    """
    from math import ceil

    # 1. max_tokens: fits in GPU with margin
    # Use gpu_utilization to leave headroom for the model to process without
    # triggering evictions mid-generation
    max_blocks_per_request = max(1, int(g1_gpu_blocks * gpu_utilization_per_request))
    max_tokens = max_blocks_per_request * block_size

    # 2. num_requests: overflow the smallest offload cache many times
    # The bottleneck for "last source" removal is the smallest offload cache.
    # Blocks must be evicted from GPU AND both CPU and Disk.
    # Since blocks are replicated to all tiers, we need to cycle through
    # the smallest cache many times to ensure blocks get evicted from ALL sources.
    min_offload_cache = min(g2_cpu_blocks, g3_disk_blocks)
    total_capacity_blocks = g1_gpu_blocks + g2_cpu_blocks + g3_disk_blocks

    # Generate enough unique blocks to overflow the smallest offload cache
    # many times, ensuring blocks eventually get evicted from all sources
    unique_blocks_needed = int(min_offload_cache * offload_overflow_factor)
    num_requests = max(1, ceil(unique_blocks_needed / max_blocks_per_request))

    # Calculate expected total blocks generated
    total_blocks_generated = num_requests * max_blocks_per_request

    return {
        "max_tokens": max_tokens,
        "num_requests": num_requests,
        "blocks_per_request": max_blocks_per_request,
        "total_capacity_blocks": total_capacity_blocks,
        "total_blocks_generated": total_blocks_generated,
        "min_offload_cache": min_offload_cache,
    }


186
187
def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
    """Extract consolidator event statistics from engine logs."""
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
    stats = {
        "store_events": 0,
        "remove_events": 0,
        "dedup_events": 0,
        "published_events": 0,
        "consolidator_started": False,
        "consolidator_port": None,
    }

    if not log_path.exists():
        return stats

    try:
        with open(log_path, "r") as f:
            content = f.read()

            # Check if consolidator started
            # Actual log: "KV Event Consolidator fully started and ready"
            stats["consolidator_started"] = bool(
                re.search(r"KV Event Consolidator.*started", content, re.IGNORECASE)
            )

            # Extract consolidator output port from logs
            # Look for: "Starting KV Event Consolidator: subscribe from ..., publish to tcp://0.0.0.0:PORT"
            port_match = re.search(r"publish to tcp://[^:]+:(\d+)", content)
            if port_match:
                stats["consolidator_port"] = int(port_match.group(1))

            # Count event types (all at debug level)
            # Actual: "stored in first source ... will publish STORE event"
            stats["store_events"] = len(
                re.findall(r"will publish STORE event", content)
            )
            # Actual: "removed from last source ... will publish REMOVE event"
            stats["remove_events"] = len(
                re.findall(r"will publish REMOVE event", content)
            )
            # Actual: "DEDUP: Block ... added to source"
            stats["dedup_events"] = len(re.findall(r"DEDUP:.*added to source", content))
            # Actual: "Publishing N consolidated event(s) to router" or "Published batch with N event(s)"
            stats["published_events"] = len(
                re.findall(
                    r"Publish(?:ing|ed).*event.*to router", content, re.IGNORECASE
                )
            )
    except Exception as e:
        logger.warning(f"Error extracting consolidator stats: {e}")

    return stats


def wait_for_worker_registration(
    frontend_url: str, max_wait_seconds: int = 120, poll_interval: int = 2
) -> bool:
    """
    Poll frontend health endpoint until a worker registers.

    Args:
        frontend_url: Base URL of the frontend (e.g., "http://localhost:8000")
        max_wait_seconds: Maximum time to wait for registration
        poll_interval: Seconds between health checks

    Returns:
        True if worker registered, False if timeout
    """

    start_time = time.time()

    while time.time() - start_time < max_wait_seconds:
        try:
            response = requests.get(f"{frontend_url}/health", timeout=5)
            health_data = response.json()
            if health_data.get("instances"):
                elapsed = time.time() - start_time
262
                logger.info(f"Worker registered after {elapsed:.1f}s")
263
264
265
266
267
268
269
                return True
        except Exception as e:
            logger.debug(f"Health check failed: {e}")

        time.sleep(poll_interval)

    elapsed = time.time() - start_time
270
271
    logger.error(f"Worker failed to register after {elapsed:.1f}s")
    logger.error("Check worker logs for initialization errors")
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    return False


@pytest.fixture
def frontend_server(test_directory, runtime_services):
    """Start Dynamo frontend with embedded KV router."""
    logger.info("Starting Dynamo frontend with KV router")

    # Frontend command - includes embedded router
    command = [
        "python",
        "-m",
        "dynamo.frontend",
        "--http-port",
        str(FRONTEND_PORT),
        "--router-mode",
        "kv",
        "--router-reset-states",
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for frontend to avoid conflicts with vllm
304
    frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
305
306
307
308
309
310
311
312
313
314
    frontend_log_dir.mkdir(parents=True, exist_ok=True)

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
        timeout=120,  # Increased timeout for frontend+router initialization
        working_dir=str(test_directory),
        display_output=False,
315
        log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
316
        terminate_all_matching_process_names=False,  # Don't kill nats-server/etcd started by runtime_services
317
    ) as frontend_process:
318
319
320
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(frontend_process._log_path)
        logger.info(f"Frontend started on port {FRONTEND_PORT}, log file: {log_file}")
321
322
323
324
325
326
327
328
329
330
331
332

        yield {
            "process": frontend_process,
            "port": FRONTEND_PORT,
            "base_url": f"http://localhost:{FRONTEND_PORT}",
            "log_file": log_file,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("Frontend server stopped")


333
334
335
336
337
338
@pytest.fixture(params=AVAILABLE_ENGINES)
def engine_type(request):
    """Parameterize test to run with available engines only."""
    return request.param


339
@pytest.fixture
340
341
def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
    """Start LLM worker (vLLM or TensorRT-LLM) with KVBM connector and KV Event Consolidator."""
342
    model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
343
    engine = engine_type
344

345
346
347
    logger.info(
        f"Starting {engine.upper()} worker with KVBM connector and model {model_id}"
    )
348

349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
    # Build command based on engine type
    if engine == "vllm":
        command = [
            "python",
            "-m",
            "dynamo.vllm",
            "--model",
            model_id,
            "--connector",
            "kvbm",
            "--enforce-eager",  # For faster startup in tests
        ]
    else:  # trtllm
        # Create TensorRT-LLM config file with KVBM connector
        config_path = create_trtllm_config(test_directory)
        command = [
            "python",
            "-m",
            "dynamo.trtllm",
            "--model-path",
            model_id,
            "--served-model-name",
            model_id,
            "--extra-engine-args",
            str(
                config_path.absolute()
            ),  # Use absolute path to avoid working directory issues
            "--publish-events-and-metrics",
        ]
378
379
380
381
382
383
384
385
386
387
388
389
390
391

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_KVBM_CPU_CACHE_GB": "5",
            "DYN_KVBM_DISK_CACHE_GB": "5",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

392
393
394
395
396
    # Set ZMQ port for TensorRT-LLM consolidator
    if engine == "trtllm":
        env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

    # Create separate log directory for worker to avoid conflicts with frontend
397
    worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
398
    worker_log_dir.mkdir(parents=True, exist_ok=True)
399
400
401
402
403
404
405
406
407

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[],  # Workers don't expose HTTP endpoints
        timeout=300,  # Increased timeout for model loading and consolidator init
        working_dir=str(test_directory),
        display_output=False,
408
        log_dir=str(worker_log_dir),  # Absolute path keeps logs in test directory
409
        terminate_all_matching_process_names=False,
410
    ) as worker_process:
411
412
413
414
        # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
        log_file = Path(worker_process._log_path)
        logger.info(f"Worker log file: {log_file}")

415
416
417
        logger.info(
            f"Waiting for {engine.upper()} worker and consolidator to initialize..."
        )
418
419
420
421
422
423
424
425
426
427
428

        # Wait for worker to register with frontend
        worker_registered = wait_for_worker_registration(frontend_server["base_url"])

        if not worker_registered:
            logger.warning("Continuing test despite worker registration failure")

        # Additional wait for consolidator to fully initialize
        time.sleep(5)

        # Verify consolidator started by checking logs
429
        stats = extract_consolidator_stats(log_file, engine)
430
431
432
433
434
435
        if not stats["consolidator_started"]:
            logger.warning("Consolidator may not have started - check logs")
        else:
            logger.info("Consolidator detected in logs")

        yield {
436
            "process": worker_process,
437
438
439
            "model_id": model_id,
            "log_file": log_file,
            "consolidator_stats": stats,
440
            "engine": engine,
441
442
443
        }

    # Cleanup happens automatically via context manager __exit__
444
    logger.info(f"{engine.upper()} worker stopped")
445
446
447


@pytest.fixture
448
def tester(frontend_server, llm_worker):
449
450
451
    """Provides a test client that sends requests to frontend."""
    return ApiTester(
        base_url=frontend_server["base_url"],
452
        model_id=llm_worker["model_id"],
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
    )


class TestConsolidatorRouterE2E:
    """E2E tests for KV Event Consolidator with Router."""

    SYSTEM_PROMPT = "You are a helpful AI assistant."

    # Common error patterns to check in logs
    ERROR_PATTERNS = [
        r"error.*block_size=0",
        r"Failed to parse block_hash",
        r"panic",
        r"fatal",
    ]

469
470
471
472
473
474
475
    def assert_no_errors_in_logs(
        self, worker_log: Path, frontend_log: Path, engine: str = "vllm"
    ):
        """Helper to check both worker and frontend logs for errors."""
        engine_name = engine.upper()
        worker_errors = check_logs_for_patterns(
            worker_log, self.ERROR_PATTERNS, f"{engine_name} Worker"
476
        )
477
478
479
        assert (
            not worker_errors
        ), f"Errors in {engine_name} Worker logs: {worker_errors}"
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511

        frontend_errors = check_logs_for_patterns(
            frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
        )
        assert not frontend_errors, f"Errors in Frontend/Router logs: {frontend_errors}"

    def send_concurrent_requests(
        self,
        tester: ApiTester,
        num_requests: int,
        max_tokens: int = 50,
        content_template: str = "Request {i}: Tell me about topic {i}",
    ):
        """Helper to send concurrent requests and return results."""

        def send_request(i: int):
            try:
                messages = [{"role": "user", "content": content_template.format(i=i)}]
                response = tester.send_chat_request(messages, max_tokens=max_tokens)
                return True, response
            except Exception as e:
                logger.error(f"Request {i} failed: {e}")
                return False, str(e)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request, i) for i in range(num_requests)]
            results = [f.result() for f in futures]

        successes = sum(1 for success, _ in results if success)
        logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
        return successes, results

512
    @pytest.mark.timeout(150)  # 4x measured (~37s), rounded up
513
    def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
514
515
516
517
518
519
        """
        Test basic consolidator flow:
        1. Send requests
        2. Verify consolidator starts and processes events
        3. Verify router receives events without errors
        """
520
521
        engine = llm_worker["engine"]
        logger.info(f"TEST: Basic Consolidator Flow ({engine.upper()})")
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539

        # Send 3 requests to frontend
        requests_data = [
            [{"role": "user", "content": "What is machine learning?"}],
            [{"role": "user", "content": "Explain neural networks."}],
            [{"role": "user", "content": "Tell me about transformers."}],
        ]

        for i, messages in enumerate(requests_data):
            response = tester.send_chat_request(messages)
            content = response["choices"][0]["message"]["content"]
            logger.info(
                f"Request {i+1}/3: {messages[0]['content'][:30]}... => {content[:40]}..."
            )

        # Wait for logs to flush
        time.sleep(5)

540
541
542
        # Check worker logs for consolidator
        worker_log = llm_worker["log_file"]
        consolidator_stats = extract_consolidator_stats(worker_log, engine)
543
544
545
546
547
548
549
550
551

        logger.info(f"Consolidator stats: {consolidator_stats}")

        # Assertions
        assert consolidator_stats["consolidator_started"], "Consolidator did not start"
        assert consolidator_stats["store_events"] > 0, "No store events processed"
        assert consolidator_stats["published_events"] > 0, "No events published"

        # Check for errors in logs
552
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
553

554
        logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
555

556
    @pytest.mark.timeout(170)  # 4x measured (~41s), rounded up
557
    def test_consolidator_handles_concurrent_requests(
558
        self, tester, llm_worker, frontend_server
559
560
561
562
563
564
565
    ):
        """
        Test consolidator under concurrent load:
        1. Send many requests quickly
        2. Verify no crashes or critical errors
        3. Verify all events processed
        """
566
567
        engine = llm_worker["engine"]
        logger.info(f"TEST: Concurrent Request Handling ({engine.upper()})")
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587

        # Send 10 concurrent requests
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester,
            num_requests,
            max_tokens=20,
            content_template="Request {i}: Count to 5",
        )

        # Wait for logs to flush
        time.sleep(5)

        # Assertions
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Check for errors in logs
        self.assert_no_errors_in_logs(
588
            llm_worker["log_file"], frontend_server["log_file"], engine
589
590
591
        )

        # Verify events were processed
592
        stats = extract_consolidator_stats(llm_worker["log_file"], engine)
593
594
        assert stats["store_events"] > 0, "No events processed during concurrent load"

595
        logger.info(f"Concurrent request handling test passed ({engine.upper()})")
596

597
    @pytest.mark.timeout(180)  # 4x measured (~44s), rounded up
598
    def test_store_deduplication_across_sources(
599
        self, tester, llm_worker, frontend_server
600
601
    ):
        """
602
        Test STORE event deduplication across engine (G1) and KVBM (G2/G3):
603
604

        When a block is stored in G1 (GPU), it's automatically offloaded
605
        to G2 (CPU) and G3 (Disk). This triggers STORE events from both engine and KVBM.
606
607

        Test Scenario:
608
609
        1. Send requests → blocks stored in engine (G1)
        2. Consolidator receives engine STORE events → queues them for publishing
610
611
612
613
614
615
616
617
        3. KVBM replicates blocks to G2/G3 → emits STORE events
        4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
        5. Result: Router receives ONE STORE event per unique block (from step 2)

        This verifies: Only one STORE event is sent to router per unique block,
        even though the block exists in multiple storage tiers (G1, G2, G3).
        KVBM replications are deduplicated and don't trigger duplicate router updates.
        """
618
619
        engine = llm_worker["engine"]
        logger.info(f"Starting STORE deduplication test ({engine.upper()})")
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635

        # Send requests to generate STORE events
        logger.info("Sending concurrent requests to generate STORE events")
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester, num_requests, max_tokens=50
        )
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Wait for events to be processed
        time.sleep(5)

        # Phase 2: Analyze consolidator logs
        logger.info("Phase 2: Analyzing STORE event deduplication")
636
637
        worker_log = llm_worker["log_file"]
        log_content = worker_log.read_text()
638

639
640
641
        # Count STORE events - order-agnostic approach
        # First source stores (will publish) - could be engine or KVBM depending on timing
        first_source_stores = len(
642
            re.findall(
643
                r"stored in first source \w+.*will publish STORE event", log_content
644
645
646
            )
        )

647
648
649
        # Second source stores (DEDUP) - could be engine or KVBM depending on timing
        # Pattern: "DEDUP: Block ... added to source X"
        dedup_stores = len(
650
            re.findall(
651
                r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source \w+", log_content
652
653
654
655
            )
        )

        # Count total STORE events received (from both sources)
656
        total_stores_received = first_source_stores + dedup_stores
657
658
659
660

        # Count STORE events actually published to router
        published_stores = len(re.findall(r"will publish STORE event", log_content))

661
662
663
664
        logger.info(
            f"STORE events from first source (will publish): {first_source_stores}"
        )
        logger.info(f"STORE events from second source (DEDUP): {dedup_stores}")
665
666
667
668
        logger.info(f"Total STORE events received: {total_stores_received}")
        logger.info(f"STORE events published to router: {published_stores}")

        # Assertions:
669
670
671
672
673
674
675
        # 1. We should receive STORE events from both sources (order doesn't matter)
        assert (
            first_source_stores > 0
        ), f"Expected STORE events from first source (could be {engine.upper()} or KVBM)"
        assert (
            dedup_stores > 0
        ), "Expected DEDUP STORE events from second source (proves deduplication working)"
676

677
678
        # 2. Published stores should equal first source stores
        #    (each unique block is published once when first stored, regardless of which source)
679
        assert (
680
681
            published_stores == first_source_stores
        ), f"Expected published events ({published_stores}) to equal first-source stores ({first_source_stores})"
682

683
        # 3. Total stores should be first source + second source (each block stored in both)
684
        assert (
685
686
            total_stores_received == first_source_stores + dedup_stores
        ), f"Total should be first-source ({first_source_stores}) + second-source ({dedup_stores})"
687
688

        # 4. Check for errors in logs
689
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
690

691
        logger.info(f"STORE deduplication test passed ({engine.upper()})")
692

693
    @pytest.mark.timeout(340)  # 4x measured (~85s), rounded up
694
    @pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
695
    def test_remove_deduplication_across_sources(
696
        self, test_directory, runtime_services, engine_type
697
698
    ):
        """
699
        Test REMOVE event deduplication across G1 (engine GPU), G2 (KVBM CPU), G3 (KVBM disk):
700
701
702
703
704
705
706

        When blocks are stored in G1 (GPU), they are AUTOMATICALLY
        replicated to G2 (CPU) and G3 (Disk) simultaneously.

        Test Scenario:
        1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
        2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
707
        3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from engine
708
709
710
711
712
713
           → consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
        4. Some blocks only exist in G1 (not replicated) → when evicted → published to router

        This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
        Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
        """
714
715
716

        engine = engine_type
        logger.info(f"Starting REMOVE deduplication test ({engine.upper()})")
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739

        # Start frontend with router
        frontend_command = [
            "python",
            "-m",
            "dynamo.frontend",
            "--http-port",
            str(FRONTEND_PORT),
            "--router-mode",
            "kv",
            "--router-reset-states",
        ]

        frontend_env = os.environ.copy()
        frontend_env.update(
            {
                "RUST_BACKTRACE": "1",
                "NATS_SERVER": "nats://localhost:4222",
                "ETCD_ENDPOINTS": "http://localhost:2379",
                "DYN_LOG": "debug",
            }
        )

740
        frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
741
742
743
744
745
746
747
748
749
        frontend_log_dir.mkdir(parents=True, exist_ok=True)

        with ManagedProcess(
            command=frontend_command,
            env=frontend_env,
            health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
            timeout=120,
            working_dir=str(test_directory),
            display_output=False,
750
            log_dir=str(frontend_log_dir),  # Absolute path keeps logs in test directory
751
            terminate_all_matching_process_names=False,  # Don't kill nats-server/etcd started by runtime_services
752
        ) as _frontend_process:
753
754
            # Get actual log file path from ManagedProcess
            frontend_log = Path(_frontend_process._log_path)
755
756
            logger.info(f"Frontend started on port {FRONTEND_PORT}")

757
            # Start worker with constrained GPU blocks but larger KVBM blocks
758
759
            model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

760
761
762
763
764
765
766
767
768
769
770
771
            # Fixed cache tier sizes
            g1_gpu_blocks = 10  # Very small GPU cache to force evictions
            g2_cpu_blocks = 5  # Smaller than GPU but large enough to retain blocks
            g3_disk_blocks = 5  # Smaller than GPU but large enough to retain blocks

            # Compute optimal test parameters for this configuration
            test_params = compute_deduplication_test_params(
                g1_gpu_blocks=g1_gpu_blocks,
                g2_cpu_blocks=g2_cpu_blocks,
                g3_disk_blocks=g3_disk_blocks,
            )

772
773
774
775
776
777
778
779
780
781
782
783
784
            # Build command based on engine type
            if engine == "vllm":
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.vllm",
                    "--model",
                    model_id,
                    "--connector",
                    "kvbm",
                    "--enforce-eager",
                    "--enable-prefix-caching",
                    "--num-gpu-blocks-override",
785
                    str(g1_gpu_blocks),
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
                ]
            else:  # trtllm
                # Create TensorRT-LLM config file with KVBM connector
                # Use small GPU cache (0.01 = 1% of GPU memory) to force evictions
                # This ensures blocks will be evicted from GPU while still in KVBM
                # Small enough to trigger evictions but large enough to handle sequence requirements
                config_path = create_trtllm_config(test_directory)
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.trtllm",
                    "--model-path",
                    model_id,
                    "--served-model-name",
                    model_id,
                    "--extra-engine-args",
                    str(
                        config_path.absolute()
                    ),  # Use absolute path to avoid working directory issues
                    "--publish-events-and-metrics",
                ]

            worker_env = os.environ.copy()
            worker_env.update(
810
811
812
813
                {
                    "RUST_BACKTRACE": "1",
                    "NATS_SERVER": "nats://localhost:4222",
                    "ETCD_ENDPOINTS": "http://localhost:2379",
814
815
                    "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": str(g2_cpu_blocks),
                    "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": str(g3_disk_blocks),
816
817
818
819
                    "DYN_LOG": "debug",
                }
            )

820
821
822
823
            # Set ZMQ port for TensorRT-LLM consolidator
            if engine == "trtllm":
                worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

824
            worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
825
            worker_log_dir.mkdir(parents=True, exist_ok=True)
826
827

            with ManagedProcess(
828
829
                command=worker_command,
                env=worker_env,
830
831
832
833
                health_check_urls=[],
                timeout=300,
                working_dir=str(test_directory),
                display_output=False,
834
835
836
                log_dir=str(
                    worker_log_dir
                ),  # Absolute path keeps logs in test directory
837
                terminate_all_matching_process_names=False,
838
            ) as _worker_process:
839
840
                # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
                worker_log = Path(_worker_process._log_path)
841
                logger.info(f"Waiting for {engine.upper()} worker to initialize...")
842
843
844
845
846
847
848

                # Wait for worker to register with frontend
                worker_registered = wait_for_worker_registration(
                    f"http://localhost:{FRONTEND_PORT}"
                )

                if not worker_registered:
849
850
851
                    pytest.fail(
                        f"{engine.upper()} worker failed to register with frontend"
                    )
852
853
854
855
856
857
858
859
860
861
862

                # Additional wait for consolidator to fully initialize
                time.sleep(5)

                # Create tester
                tester = ApiTester(
                    base_url=f"http://localhost:{FRONTEND_PORT}", model_id=model_id
                )

                # Phase 1: Send requests to fill GPU cache
                logger.info("Phase 1: Filling GPU cache with diverse prompts")
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
                num_requests = test_params["num_requests"]
                max_tokens = test_params["max_tokens"]
                concurrency = 2  # Parallel requests to create cache pressure
                logger.info(
                    f"Computed test params: max_tokens={max_tokens}, "
                    f"num_requests={num_requests}, concurrency={concurrency}, "
                    f"blocks_per_request={test_params['blocks_per_request']}, "
                    f"total_capacity={test_params['total_capacity_blocks']} blocks, "
                    f"min_offload_cache={test_params['min_offload_cache']} blocks, "
                    f"total_blocks_generated={test_params['total_blocks_generated']}"
                )

                # Use parallel requests to create concurrent cache pressure
                # This forces the GPU cache to actually fill up (multiple requests
                # holding blocks simultaneously) rather than recycling blocks
                # between serial requests
                from concurrent.futures import ThreadPoolExecutor, as_completed

                def send_request(request_idx: int) -> tuple[int, bool]:
                    """Send a single request and return (index, success)."""
                    prompt = f"Tell me a unique story about topic {request_idx}. Make it very long and detailed with many paragraphs."
                    try:
                        response = tester.send_chat_request(
                            messages=[{"role": "user", "content": prompt}],
                            max_tokens=max_tokens,
                        )
                    except requests.RequestException:
                        logger.exception(f"Request {request_idx} failed")
                        return (request_idx, False)
                    else:
                        success = "content" in response["choices"][0]["message"]
                        return (request_idx, success)

                completed_count = 0
                failed_count = 0
                with ThreadPoolExecutor(max_workers=concurrency) as executor:
                    futures = {
                        executor.submit(send_request, i): i for i in range(num_requests)
                    }
                    for future in as_completed(futures):
                        _request_idx, success = future.result()
                        if success:
                            completed_count += 1
                        else:
                            failed_count += 1
                        if completed_count % 10 == 0 or completed_count == num_requests:
                            logger.info(
                                f"Progress: {completed_count}/{num_requests} completed"
                            )

                logger.info(
                    f"All requests finished: {completed_count} succeeded, {failed_count} failed"
                )
                assert failed_count == 0, f"{failed_count} requests failed"
917

918
919
920
921
922
923
924
925
926
                # Wait for requests to complete and blocks to be freed
                # With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
                # We need to wait long enough for requests to finish and blocks to be freed
                # For vLLM with FIFO, evictions happen immediately when cache fills.
                wait_time = 5 if engine == "trtllm" else 5
                logger.info(
                    f"Waiting {wait_time}s for requests to complete and blocks to be freed..."
                )
                time.sleep(wait_time)
927
928
929

                # Phase 2: Analyze consolidator logs
                logger.info("Phase 2: Analyzing consolidator deduplication behavior")
930
931
932
933
934
935
936
937
938
939
                log_content = worker_log.read_text()

                # Count blocks removed but still in another source (deduplication working!)
                # Order-agnostic: checks for any removal where block still exists in another source
                # Pattern: "removed from source X, still in Y source(s): [sources]"
                removes_but_still_in_other_source = len(
                    re.findall(
                        r"removed from source \w+, still in \d+ source\(s\)",
                        log_content,
                    )
940
941
                )

942
943
944
945
946
947
948
                # Count blocks removed from last source (will publish REMOVE)
                # Order-agnostic: checks for any removal from last source, regardless of which source
                removes_from_last_source = len(
                    re.findall(
                        r"removed from last source \w+.*will publish REMOVE event",
                        log_content,
                    )
949
950
951
952
953
954
955
956
                )

                # Count REMOVE events actually published to router
                published_removes = len(
                    re.findall(r"will publish REMOVE event", log_content)
                )

                logger.info(
957
                    f"Blocks removed but still in another source (deduplication working): {removes_but_still_in_other_source}"
958
959
                )
                logger.info(
960
                    f"Blocks removed from last source (will publish): {removes_from_last_source}"
961
962
963
964
                )
                logger.info(f"REMOVE events published to router: {published_removes}")

                # Assertions:
965
                # 1. We should see removals where blocks still exist in another source
966
                #    This proves deduplication is working (REMOVE not sent to router yet)
967
                #    Order doesn't matter - could be engine→KVBM or KVBM→engine
968
                assert (
969
970
                    removes_but_still_in_other_source > 0
                ), f"Expected removals where blocks still exist in another source (deduplication working) for {engine.upper()}"
971
972

                # 2. REMOVE events should be published for last-source removals
973
                #    Order doesn't matter - could be engine or KVBM as last source
974
975
976
977
                assert (
                    published_removes > 0
                ), "Expected REMOVE events to be published for last-source removals"

978
979
980
981
982
                # 3. Published removes should equal removes from last source
                assert (
                    published_removes == removes_from_last_source
                ), f"Expected published REMOVE events ({published_removes}) to equal last-source removals ({removes_from_last_source})"

983
                # 3. Check for errors in logs
984
                self.assert_no_errors_in_logs(worker_log, frontend_log, engine)