"examples/backends/trtllm/engine_configs/qwen3/decode.yaml" did not exist on "8354d3258c8864ee82622e0233295faff075e6c6"
test_consolidator_router_e2e.py 31.1 KB
Newer Older
1
2
3
4
5
6
7
8
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
E2E test for KV Event Consolidator with Router integration.

This test validates that:
9
10
1. vLLM/TensorRT-LLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from engine and KVBM
11
12
13
14
15
3. The router receives and processes consolidated events without warnings

"""

import concurrent.futures
16
import importlib.util
17
18
19
20
21
22
23
24
import logging
import os
import re
import time
from pathlib import Path

import pytest
import requests
25
import yaml
26

Richard Huo's avatar
Richard Huo committed
27
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
28
29
from tests.utils.managed_process import ManagedProcess

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# Check if engines are available and build list of available engines
# Use find_spec first (fast check), then verify import works (functional check)
def _check_engine_available(module_name: str) -> bool:
    """Check if an engine module is available and importable."""
    if importlib.util.find_spec(module_name) is None:
        return False
    try:
        importlib.import_module(module_name)
        return True
    except ImportError:
        return False


HAS_VLLM = _check_engine_available("vllm")
HAS_TRTLLM = _check_engine_available("tensorrt_llm")

# Build list of available engines for parameterization
AVAILABLE_ENGINES = []
if HAS_VLLM:
    AVAILABLE_ENGINES.append("vllm")
if HAS_TRTLLM:
    AVAILABLE_ENGINES.append("trtllm")
53

54
55
56
57
58
59
# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.slow,
    pytest.mark.gpu_1,
60
    pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), reason="requires vllm or trtllm"),
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
]

logger = logging.getLogger(__name__)

# Constants
FRONTEND_PORT = 8000


@pytest.fixture
def test_directory(request):
    """Create a test directory for logs and temporary files."""
    test_dir = Path(request.node.name)
    test_dir.mkdir(parents=True, exist_ok=True)
    yield test_dir
    # Cleanup handled by pytest (logs are kept for debugging)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
def create_trtllm_config(test_directory: Path) -> Path:
    """Create TensorRT-LLM config YAML file with KVBM connector configuration."""
    config_path = test_directory / "trtllm_config.yaml"
    config = {
        "backend": "pytorch",
        "cuda_graph_config": None,
        "kv_cache_config": {
            "enable_partial_reuse": False,
            "free_gpu_memory_fraction": 0.01,
        },
        "build_config": {
            "max_seq_len": 4096,
        },
        "kv_connector_config": {
            "connector_module": "kvbm.trtllm_integration.connector",
            "connector_scheduler_class": "DynamoKVBMConnectorLeader",
            "connector_worker_class": "DynamoKVBMConnectorWorker",
        },
    }
    with open(config_path, "w") as f:
        yaml.dump(config, f)
    return config_path


def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
    """Extract consolidator event statistics from engine logs."""
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
    stats = {
        "store_events": 0,
        "remove_events": 0,
        "dedup_events": 0,
        "published_events": 0,
        "consolidator_started": False,
        "consolidator_port": None,
    }

    if not log_path.exists():
        return stats

    try:
        with open(log_path, "r") as f:
            content = f.read()

            # Check if consolidator started
            # Actual log: "KV Event Consolidator fully started and ready"
            stats["consolidator_started"] = bool(
                re.search(r"KV Event Consolidator.*started", content, re.IGNORECASE)
            )

            # Extract consolidator output port from logs
            # Look for: "Starting KV Event Consolidator: subscribe from ..., publish to tcp://0.0.0.0:PORT"
            port_match = re.search(r"publish to tcp://[^:]+:(\d+)", content)
            if port_match:
                stats["consolidator_port"] = int(port_match.group(1))

            # Count event types (all at debug level)
            # Actual: "stored in first source ... will publish STORE event"
            stats["store_events"] = len(
                re.findall(r"will publish STORE event", content)
            )
            # Actual: "removed from last source ... will publish REMOVE event"
            stats["remove_events"] = len(
                re.findall(r"will publish REMOVE event", content)
            )
            # Actual: "DEDUP: Block ... added to source"
            stats["dedup_events"] = len(re.findall(r"DEDUP:.*added to source", content))
            # Actual: "Publishing N consolidated event(s) to router" or "Published batch with N event(s)"
            stats["published_events"] = len(
                re.findall(
                    r"Publish(?:ing|ed).*event.*to router", content, re.IGNORECASE
                )
            )
    except Exception as e:
        logger.warning(f"Error extracting consolidator stats: {e}")

    return stats


def wait_for_worker_registration(
    frontend_url: str, max_wait_seconds: int = 120, poll_interval: int = 2
) -> bool:
    """
    Poll frontend health endpoint until a worker registers.

    Args:
        frontend_url: Base URL of the frontend (e.g., "http://localhost:8000")
        max_wait_seconds: Maximum time to wait for registration
        poll_interval: Seconds between health checks

    Returns:
        True if worker registered, False if timeout
    """

    start_time = time.time()

    while time.time() - start_time < max_wait_seconds:
        try:
            response = requests.get(f"{frontend_url}/health", timeout=5)
            health_data = response.json()
            if health_data.get("instances"):
                elapsed = time.time() - start_time
178
                logger.info(f"Worker registered after {elapsed:.1f}s")
179
180
181
182
183
184
185
                return True
        except Exception as e:
            logger.debug(f"Health check failed: {e}")

        time.sleep(poll_interval)

    elapsed = time.time() - start_time
186
187
    logger.error(f"Worker failed to register after {elapsed:.1f}s")
    logger.error("Check worker logs for initialization errors")
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
    return False


@pytest.fixture
def frontend_server(test_directory, runtime_services):
    """Start Dynamo frontend with embedded KV router."""
    logger.info("Starting Dynamo frontend with KV router")

    # Frontend command - includes embedded router
    command = [
        "python",
        "-m",
        "dynamo.frontend",
        "--http-port",
        str(FRONTEND_PORT),
        "--router-mode",
        "kv",
        "--router-reset-states",
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for frontend to avoid conflicts with vllm
    frontend_log_dir = test_directory / "frontend"
    frontend_log_dir.mkdir(parents=True, exist_ok=True)
    log_file = frontend_log_dir / "python.log.txt"

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
        timeout=120,  # Increased timeout for frontend+router initialization
        working_dir=str(test_directory),
        display_output=False,
        log_dir=str(frontend_log_dir),  # Separate log directory
    ) as frontend_process:
        logger.info(f"Frontend started on port {FRONTEND_PORT}")

        yield {
            "process": frontend_process,
            "port": FRONTEND_PORT,
            "base_url": f"http://localhost:{FRONTEND_PORT}",
            "log_file": log_file,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("Frontend server stopped")


247
248
249
250
251
252
@pytest.fixture(params=AVAILABLE_ENGINES)
def engine_type(request):
    """Parameterize test to run with available engines only."""
    return request.param


253
@pytest.fixture
254
255
def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
    """Start LLM worker (vLLM or TensorRT-LLM) with KVBM connector and KV Event Consolidator."""
256
    model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
257
    engine = engine_type
258

259
260
261
    logger.info(
        f"Starting {engine.upper()} worker with KVBM connector and model {model_id}"
    )
262

263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
    # Build command based on engine type
    if engine == "vllm":
        command = [
            "python",
            "-m",
            "dynamo.vllm",
            "--model",
            model_id,
            "--connector",
            "kvbm",
            "--enforce-eager",  # For faster startup in tests
        ]
    else:  # trtllm
        # Create TensorRT-LLM config file with KVBM connector
        config_path = create_trtllm_config(test_directory)
        command = [
            "python",
            "-m",
            "dynamo.trtllm",
            "--model-path",
            model_id,
            "--served-model-name",
            model_id,
            "--extra-engine-args",
            str(
                config_path.absolute()
            ),  # Use absolute path to avoid working directory issues
            "--publish-events-and-metrics",
        ]
292
293
294
295
296
297
298
299
300
301
302
303
304
305

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_KVBM_CPU_CACHE_GB": "5",
            "DYN_KVBM_DISK_CACHE_GB": "5",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

306
307
308
309
310
311
312
313
    # Set ZMQ port for TensorRT-LLM consolidator
    if engine == "trtllm":
        env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

    # Create separate log directory for worker to avoid conflicts with frontend
    worker_log_dir = test_directory / engine
    worker_log_dir.mkdir(parents=True, exist_ok=True)
    log_file = worker_log_dir / "python.log.txt"
314
315
316
317
318
319
320
321
322

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[],  # Workers don't expose HTTP endpoints
        timeout=300,  # Increased timeout for model loading and consolidator init
        working_dir=str(test_directory),
        display_output=False,
323
        log_dir=str(worker_log_dir),  # Separate log directory
324
        terminate_existing=False,
325
326
327
328
    ) as worker_process:
        logger.info(
            f"Waiting for {engine.upper()} worker and consolidator to initialize..."
        )
329
330
331
332
333
334
335
336
337
338
339

        # Wait for worker to register with frontend
        worker_registered = wait_for_worker_registration(frontend_server["base_url"])

        if not worker_registered:
            logger.warning("Continuing test despite worker registration failure")

        # Additional wait for consolidator to fully initialize
        time.sleep(5)

        # Verify consolidator started by checking logs
340
        stats = extract_consolidator_stats(log_file, engine)
341
342
343
344
345
346
        if not stats["consolidator_started"]:
            logger.warning("Consolidator may not have started - check logs")
        else:
            logger.info("Consolidator detected in logs")

        yield {
347
            "process": worker_process,
348
349
350
            "model_id": model_id,
            "log_file": log_file,
            "consolidator_stats": stats,
351
            "engine": engine,
352
353
354
        }

    # Cleanup happens automatically via context manager __exit__
355
    logger.info(f"{engine.upper()} worker stopped")
356
357
358


@pytest.fixture
359
def tester(frontend_server, llm_worker):
360
361
362
    """Provides a test client that sends requests to frontend."""
    return ApiTester(
        base_url=frontend_server["base_url"],
363
        model_id=llm_worker["model_id"],
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
    )


class TestConsolidatorRouterE2E:
    """E2E tests for KV Event Consolidator with Router."""

    SYSTEM_PROMPT = "You are a helpful AI assistant."

    # Common error patterns to check in logs
    ERROR_PATTERNS = [
        r"error.*block_size=0",
        r"Failed to parse block_hash",
        r"panic",
        r"fatal",
    ]

380
381
382
383
384
385
386
    def assert_no_errors_in_logs(
        self, worker_log: Path, frontend_log: Path, engine: str = "vllm"
    ):
        """Helper to check both worker and frontend logs for errors."""
        engine_name = engine.upper()
        worker_errors = check_logs_for_patterns(
            worker_log, self.ERROR_PATTERNS, f"{engine_name} Worker"
387
        )
388
389
390
        assert (
            not worker_errors
        ), f"Errors in {engine_name} Worker logs: {worker_errors}"
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422

        frontend_errors = check_logs_for_patterns(
            frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
        )
        assert not frontend_errors, f"Errors in Frontend/Router logs: {frontend_errors}"

    def send_concurrent_requests(
        self,
        tester: ApiTester,
        num_requests: int,
        max_tokens: int = 50,
        content_template: str = "Request {i}: Tell me about topic {i}",
    ):
        """Helper to send concurrent requests and return results."""

        def send_request(i: int):
            try:
                messages = [{"role": "user", "content": content_template.format(i=i)}]
                response = tester.send_chat_request(messages, max_tokens=max_tokens)
                return True, response
            except Exception as e:
                logger.error(f"Request {i} failed: {e}")
                return False, str(e)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request, i) for i in range(num_requests)]
            results = [f.result() for f in futures]

        successes = sum(1 for success, _ in results if success)
        logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
        return successes, results

423
    def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
424
425
426
427
428
429
        """
        Test basic consolidator flow:
        1. Send requests
        2. Verify consolidator starts and processes events
        3. Verify router receives events without errors
        """
430
431
        engine = llm_worker["engine"]
        logger.info(f"TEST: Basic Consolidator Flow ({engine.upper()})")
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449

        # Send 3 requests to frontend
        requests_data = [
            [{"role": "user", "content": "What is machine learning?"}],
            [{"role": "user", "content": "Explain neural networks."}],
            [{"role": "user", "content": "Tell me about transformers."}],
        ]

        for i, messages in enumerate(requests_data):
            response = tester.send_chat_request(messages)
            content = response["choices"][0]["message"]["content"]
            logger.info(
                f"Request {i+1}/3: {messages[0]['content'][:30]}... => {content[:40]}..."
            )

        # Wait for logs to flush
        time.sleep(5)

450
451
452
        # Check worker logs for consolidator
        worker_log = llm_worker["log_file"]
        consolidator_stats = extract_consolidator_stats(worker_log, engine)
453
454
455
456
457
458
459
460
461

        logger.info(f"Consolidator stats: {consolidator_stats}")

        # Assertions
        assert consolidator_stats["consolidator_started"], "Consolidator did not start"
        assert consolidator_stats["store_events"] > 0, "No store events processed"
        assert consolidator_stats["published_events"] > 0, "No events published"

        # Check for errors in logs
462
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
463

464
        logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
465
466

    def test_consolidator_handles_concurrent_requests(
467
        self, tester, llm_worker, frontend_server
468
469
470
471
472
473
474
    ):
        """
        Test consolidator under concurrent load:
        1. Send many requests quickly
        2. Verify no crashes or critical errors
        3. Verify all events processed
        """
475
476
        engine = llm_worker["engine"]
        logger.info(f"TEST: Concurrent Request Handling ({engine.upper()})")
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496

        # Send 10 concurrent requests
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester,
            num_requests,
            max_tokens=20,
            content_template="Request {i}: Count to 5",
        )

        # Wait for logs to flush
        time.sleep(5)

        # Assertions
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Check for errors in logs
        self.assert_no_errors_in_logs(
497
            llm_worker["log_file"], frontend_server["log_file"], engine
498
499
500
        )

        # Verify events were processed
501
        stats = extract_consolidator_stats(llm_worker["log_file"], engine)
502
503
        assert stats["store_events"] > 0, "No events processed during concurrent load"

504
        logger.info(f"Concurrent request handling test passed ({engine.upper()})")
505
506

    def test_store_deduplication_across_sources(
507
        self, tester, llm_worker, frontend_server
508
509
    ):
        """
510
        Test STORE event deduplication across engine (G1) and KVBM (G2/G3):
511
512

        When a block is stored in G1 (GPU), it's automatically offloaded
513
        to G2 (CPU) and G3 (Disk). This triggers STORE events from both engine and KVBM.
514
515

        Test Scenario:
516
517
        1. Send requests → blocks stored in engine (G1)
        2. Consolidator receives engine STORE events → queues them for publishing
518
519
520
521
522
523
524
525
        3. KVBM replicates blocks to G2/G3 → emits STORE events
        4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
        5. Result: Router receives ONE STORE event per unique block (from step 2)

        This verifies: Only one STORE event is sent to router per unique block,
        even though the block exists in multiple storage tiers (G1, G2, G3).
        KVBM replications are deduplicated and don't trigger duplicate router updates.
        """
526
527
        engine = llm_worker["engine"]
        logger.info(f"Starting STORE deduplication test ({engine.upper()})")
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543

        # Send requests to generate STORE events
        logger.info("Sending concurrent requests to generate STORE events")
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester, num_requests, max_tokens=50
        )
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Wait for events to be processed
        time.sleep(5)

        # Phase 2: Analyze consolidator logs
        logger.info("Phase 2: Analyzing STORE event deduplication")
544
545
        worker_log = llm_worker["log_file"]
        log_content = worker_log.read_text()
546

547
548
549
        # Count STORE events - order-agnostic approach
        # First source stores (will publish) - could be engine or KVBM depending on timing
        first_source_stores = len(
550
            re.findall(
551
                r"stored in first source \w+.*will publish STORE event", log_content
552
553
554
            )
        )

555
556
557
        # Second source stores (DEDUP) - could be engine or KVBM depending on timing
        # Pattern: "DEDUP: Block ... added to source X"
        dedup_stores = len(
558
            re.findall(
559
                r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source \w+", log_content
560
561
562
563
            )
        )

        # Count total STORE events received (from both sources)
564
        total_stores_received = first_source_stores + dedup_stores
565
566
567
568

        # Count STORE events actually published to router
        published_stores = len(re.findall(r"will publish STORE event", log_content))

569
570
571
572
        logger.info(
            f"STORE events from first source (will publish): {first_source_stores}"
        )
        logger.info(f"STORE events from second source (DEDUP): {dedup_stores}")
573
574
575
576
        logger.info(f"Total STORE events received: {total_stores_received}")
        logger.info(f"STORE events published to router: {published_stores}")

        # Assertions:
577
578
579
580
581
582
583
        # 1. We should receive STORE events from both sources (order doesn't matter)
        assert (
            first_source_stores > 0
        ), f"Expected STORE events from first source (could be {engine.upper()} or KVBM)"
        assert (
            dedup_stores > 0
        ), "Expected DEDUP STORE events from second source (proves deduplication working)"
584

585
586
        # 2. Published stores should equal first source stores
        #    (each unique block is published once when first stored, regardless of which source)
587
        assert (
588
589
            published_stores == first_source_stores
        ), f"Expected published events ({published_stores}) to equal first-source stores ({first_source_stores})"
590

591
        # 3. Total stores should be first source + second source (each block stored in both)
592
        assert (
593
594
            total_stores_received == first_source_stores + dedup_stores
        ), f"Total should be first-source ({first_source_stores}) + second-source ({dedup_stores})"
595
596

        # 4. Check for errors in logs
597
        self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
598

599
        logger.info(f"STORE deduplication test passed ({engine.upper()})")
600

601
    @pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
602
    def test_remove_deduplication_across_sources(
603
        self, test_directory, runtime_services, engine_type
604
605
    ):
        """
606
        Test REMOVE event deduplication across G1 (engine GPU), G2 (KVBM CPU), G3 (KVBM disk):
607
608
609
610
611
612
613

        When blocks are stored in G1 (GPU), they are AUTOMATICALLY
        replicated to G2 (CPU) and G3 (Disk) simultaneously.

        Test Scenario:
        1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
        2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
614
        3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from engine
615
616
617
618
619
620
           → consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
        4. Some blocks only exist in G1 (not replicated) → when evicted → published to router

        This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
        Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
        """
621
622
623

        engine = engine_type
        logger.info(f"Starting REMOVE deduplication test ({engine.upper()})")
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661

        # Start frontend with router
        frontend_command = [
            "python",
            "-m",
            "dynamo.frontend",
            "--http-port",
            str(FRONTEND_PORT),
            "--router-mode",
            "kv",
            "--router-reset-states",
        ]

        frontend_env = os.environ.copy()
        frontend_env.update(
            {
                "RUST_BACKTRACE": "1",
                "NATS_SERVER": "nats://localhost:4222",
                "ETCD_ENDPOINTS": "http://localhost:2379",
                "DYN_LOG": "debug",
            }
        )

        frontend_log_dir = test_directory / "frontend"
        frontend_log_dir.mkdir(parents=True, exist_ok=True)
        frontend_log = frontend_log_dir / "python.log.txt"

        with ManagedProcess(
            command=frontend_command,
            env=frontend_env,
            health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
            timeout=120,
            working_dir=str(test_directory),
            display_output=False,
            log_dir=str(frontend_log_dir),
        ) as _frontend_process:
            logger.info(f"Frontend started on port {FRONTEND_PORT}")

662
            # Start worker with constrained GPU blocks but larger KVBM blocks
663
664
            model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
            # Build command based on engine type
            if engine == "vllm":
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.vllm",
                    "--model",
                    model_id,
                    "--connector",
                    "kvbm",
                    "--enforce-eager",
                    "--enable-prefix-caching",
                    "--num-gpu-blocks-override",
                    "30",  # Very small GPU cache to force evictions
                ]
            else:  # trtllm
                # Create TensorRT-LLM config file with KVBM connector
                # Use small GPU cache (0.01 = 1% of GPU memory) to force evictions
                # This ensures blocks will be evicted from GPU while still in KVBM
                # Small enough to trigger evictions but large enough to handle sequence requirements
                config_path = create_trtllm_config(test_directory)
                worker_command = [
                    "python",
                    "-m",
                    "dynamo.trtllm",
                    "--model-path",
                    model_id,
                    "--served-model-name",
                    model_id,
                    "--extra-engine-args",
                    str(
                        config_path.absolute()
                    ),  # Use absolute path to avoid working directory issues
                    "--publish-events-and-metrics",
                ]

            worker_env = os.environ.copy()
            worker_env.update(
703
704
705
706
707
708
709
710
711
712
                {
                    "RUST_BACKTRACE": "1",
                    "NATS_SERVER": "nats://localhost:4222",
                    "ETCD_ENDPOINTS": "http://localhost:2379",
                    "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": "50",  # Larger than GPU but still constrained
                    "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": "50",
                    "DYN_LOG": "debug",
                }
            )

713
714
715
716
717
718
719
            # Set ZMQ port for TensorRT-LLM consolidator
            if engine == "trtllm":
                worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"

            worker_log_dir = test_directory / engine
            worker_log_dir.mkdir(parents=True, exist_ok=True)
            worker_log = worker_log_dir / "python.log.txt"
720
721

            with ManagedProcess(
722
723
                command=worker_command,
                env=worker_env,
724
725
726
727
                health_check_urls=[],
                timeout=300,
                working_dir=str(test_directory),
                display_output=False,
728
                log_dir=str(worker_log_dir),
729
                terminate_existing=False,
730
731
            ) as _worker_process:
                logger.info(f"Waiting for {engine.upper()} worker to initialize...")
732
733
734
735
736
737
738

                # Wait for worker to register with frontend
                worker_registered = wait_for_worker_registration(
                    f"http://localhost:{FRONTEND_PORT}"
                )

                if not worker_registered:
739
740
741
                    pytest.fail(
                        f"{engine.upper()} worker failed to register with frontend"
                    )
742
743
744
745
746
747
748
749
750
751
752

                # Additional wait for consolidator to fully initialize
                time.sleep(5)

                # Create tester
                tester = ApiTester(
                    base_url=f"http://localhost:{FRONTEND_PORT}", model_id=model_id
                )

                # Phase 1: Send requests to fill GPU cache
                logger.info("Phase 1: Filling GPU cache with diverse prompts")
753
754
                num_requests = 100
                for i in range(num_requests):
755
756
757
758
759
760
                    prompt = f"Tell me a unique story about topic {i}. Make it very long and detailed with many paragraphs."
                    response = tester.send_chat_request(
                        messages=[{"role": "user", "content": prompt}],
                        max_tokens=100,  # Increase tokens to use more blocks per request
                    )
                    assert "content" in response["choices"][0]["message"]
761
                    logger.info(f"Request {i+1}/{num_requests} completed")
762

763
764
765
766
767
768
769
770
771
                # Wait for requests to complete and blocks to be freed
                # With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
                # We need to wait long enough for requests to finish and blocks to be freed
                # For vLLM with FIFO, evictions happen immediately when cache fills.
                wait_time = 5 if engine == "trtllm" else 5
                logger.info(
                    f"Waiting {wait_time}s for requests to complete and blocks to be freed..."
                )
                time.sleep(wait_time)
772
773
774

                # Phase 2: Analyze consolidator logs
                logger.info("Phase 2: Analyzing consolidator deduplication behavior")
775
776
777
778
779
780
781
782
783
784
                log_content = worker_log.read_text()

                # Count blocks removed but still in another source (deduplication working!)
                # Order-agnostic: checks for any removal where block still exists in another source
                # Pattern: "removed from source X, still in Y source(s): [sources]"
                removes_but_still_in_other_source = len(
                    re.findall(
                        r"removed from source \w+, still in \d+ source\(s\)",
                        log_content,
                    )
785
786
                )

787
788
789
790
791
792
793
                # Count blocks removed from last source (will publish REMOVE)
                # Order-agnostic: checks for any removal from last source, regardless of which source
                removes_from_last_source = len(
                    re.findall(
                        r"removed from last source \w+.*will publish REMOVE event",
                        log_content,
                    )
794
795
796
797
798
799
800
801
                )

                # Count REMOVE events actually published to router
                published_removes = len(
                    re.findall(r"will publish REMOVE event", log_content)
                )

                logger.info(
802
                    f"Blocks removed but still in another source (deduplication working): {removes_but_still_in_other_source}"
803
804
                )
                logger.info(
805
                    f"Blocks removed from last source (will publish): {removes_from_last_source}"
806
807
808
809
                )
                logger.info(f"REMOVE events published to router: {published_removes}")

                # Assertions:
810
                # 1. We should see removals where blocks still exist in another source
811
                #    This proves deduplication is working (REMOVE not sent to router yet)
812
                #    Order doesn't matter - could be engine→KVBM or KVBM→engine
813
                assert (
814
815
                    removes_but_still_in_other_source > 0
                ), f"Expected removals where blocks still exist in another source (deduplication working) for {engine.upper()}"
816
817

                # 2. REMOVE events should be published for last-source removals
818
                #    Order doesn't matter - could be engine or KVBM as last source
819
820
821
822
                assert (
                    published_removes > 0
                ), "Expected REMOVE events to be published for last-source removals"

823
824
825
826
827
                # 3. Published removes should equal removes from last source
                assert (
                    published_removes == removes_from_last_source
                ), f"Expected published REMOVE events ({published_removes}) to equal last-source removals ({removes_from_last_source})"

828
                # 3. Check for errors in logs
829
                self.assert_no_errors_in_logs(worker_log, frontend_log, engine)