test_consolidator_router_e2e.py 24.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
E2E test for KV Event Consolidator with Router integration.

This test validates that:
1. vLLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from vLLM and KVBM
3. The router receives and processes consolidated events without warnings

"""

import concurrent.futures
16
import importlib.util
17
18
19
20
21
22
23
24
25
import logging
import os
import re
import time
from pathlib import Path

import pytest
import requests

Richard Huo's avatar
Richard Huo committed
26
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
27
28
from tests.utils.managed_process import ManagedProcess

29
30
31
# Check if vLLM is available
HAS_VLLM = importlib.util.find_spec("vllm") is not None

32
33
34
35
36
37
# Test markers
pytestmark = [
    pytest.mark.kvbm,
    pytest.mark.e2e,
    pytest.mark.slow,
    pytest.mark.gpu_1,
38
    pytest.mark.nightly,
39
    pytest.mark.skipif(not HAS_VLLM, reason="requires vllm"),
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
]

logger = logging.getLogger(__name__)

# Constants
FRONTEND_PORT = 8000


@pytest.fixture
def test_directory(request):
    """Create a test directory for logs and temporary files."""
    test_dir = Path(request.node.name)
    test_dir.mkdir(parents=True, exist_ok=True)
    yield test_dir
    # Cleanup handled by pytest (logs are kept for debugging)


def extract_consolidator_stats(log_path: Path) -> dict:
    """Extract consolidator event statistics from vLLM logs."""
    stats = {
        "store_events": 0,
        "remove_events": 0,
        "dedup_events": 0,
        "published_events": 0,
        "consolidator_started": False,
        "consolidator_port": None,
    }

    if not log_path.exists():
        return stats

    try:
        with open(log_path, "r") as f:
            content = f.read()

            # Check if consolidator started
            # Actual log: "KV Event Consolidator fully started and ready"
            stats["consolidator_started"] = bool(
                re.search(r"KV Event Consolidator.*started", content, re.IGNORECASE)
            )

            # Extract consolidator output port from logs
            # Look for: "Starting KV Event Consolidator: subscribe from ..., publish to tcp://0.0.0.0:PORT"
            port_match = re.search(r"publish to tcp://[^:]+:(\d+)", content)
            if port_match:
                stats["consolidator_port"] = int(port_match.group(1))

            # Count event types (all at debug level)
            # Actual: "stored in first source ... will publish STORE event"
            stats["store_events"] = len(
                re.findall(r"will publish STORE event", content)
            )
            # Actual: "removed from last source ... will publish REMOVE event"
            stats["remove_events"] = len(
                re.findall(r"will publish REMOVE event", content)
            )
            # Actual: "DEDUP: Block ... added to source"
            stats["dedup_events"] = len(re.findall(r"DEDUP:.*added to source", content))
            # Actual: "Publishing N consolidated event(s) to router" or "Published batch with N event(s)"
            stats["published_events"] = len(
                re.findall(
                    r"Publish(?:ing|ed).*event.*to router", content, re.IGNORECASE
                )
            )
    except Exception as e:
        logger.warning(f"Error extracting consolidator stats: {e}")

    return stats


def wait_for_worker_registration(
    frontend_url: str, max_wait_seconds: int = 120, poll_interval: int = 2
) -> bool:
    """
    Poll frontend health endpoint until a worker registers.

    Args:
        frontend_url: Base URL of the frontend (e.g., "http://localhost:8000")
        max_wait_seconds: Maximum time to wait for registration
        poll_interval: Seconds between health checks

    Returns:
        True if worker registered, False if timeout
    """

    start_time = time.time()

    while time.time() - start_time < max_wait_seconds:
        try:
            response = requests.get(f"{frontend_url}/health", timeout=5)
            health_data = response.json()
            if health_data.get("instances"):
                elapsed = time.time() - start_time
                logger.info(f"vLLM worker registered after {elapsed:.1f}s")
                return True
        except Exception as e:
            logger.debug(f"Health check failed: {e}")

        time.sleep(poll_interval)

    elapsed = time.time() - start_time
    logger.error(f"vLLM worker failed to register after {elapsed:.1f}s")
    logger.error("Check vLLM logs for initialization errors")
    return False


@pytest.fixture
def frontend_server(test_directory, runtime_services):
    """Start Dynamo frontend with embedded KV router."""
    logger.info("Starting Dynamo frontend with KV router")

    # Frontend command - includes embedded router
    command = [
        "python",
        "-m",
        "dynamo.frontend",
        "--http-port",
        str(FRONTEND_PORT),
        "--router-mode",
        "kv",
        "--router-reset-states",
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for frontend to avoid conflicts with vllm
    frontend_log_dir = test_directory / "frontend"
    frontend_log_dir.mkdir(parents=True, exist_ok=True)
    log_file = frontend_log_dir / "python.log.txt"

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
        timeout=120,  # Increased timeout for frontend+router initialization
        working_dir=str(test_directory),
        display_output=False,
        log_dir=str(frontend_log_dir),  # Separate log directory
    ) as frontend_process:
        logger.info(f"Frontend started on port {FRONTEND_PORT}")

        yield {
            "process": frontend_process,
            "port": FRONTEND_PORT,
            "base_url": f"http://localhost:{FRONTEND_PORT}",
            "log_file": log_file,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("Frontend server stopped")


@pytest.fixture
def vllm_worker(frontend_server, test_directory, runtime_services):
    """Start vLLM worker with KVBM connector and KV Event Consolidator."""
    model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

    logger.info(f"Starting vLLM worker with KVBM connector and model {model_id}")

    # vLLM worker command - consolidator is auto-enabled with KVBM
    command = [
        "python",
        "-m",
        "dynamo.vllm",
        "--model",
        model_id,
        "--connector",
        "kvbm",
        "--enforce-eager",  # For faster startup in tests
    ]

    # Environment
    env = os.environ.copy()
    env.update(
        {
            "RUST_BACKTRACE": "1",
            "NATS_SERVER": "nats://localhost:4222",
            "ETCD_ENDPOINTS": "http://localhost:2379",
            "DYN_KVBM_CPU_CACHE_GB": "5",
            "DYN_KVBM_DISK_CACHE_GB": "5",
            "DYN_LOG": "debug",  # Enable debug logs for consolidator visibility
        }
    )

    # Create separate log directory for vllm to avoid conflicts with frontend
    vllm_log_dir = test_directory / "vllm"
    vllm_log_dir.mkdir(parents=True, exist_ok=True)
    log_file = vllm_log_dir / "python.log.txt"

    # Create managed process and start via context manager
    with ManagedProcess(
        command=command,
        env=env,
        health_check_urls=[],  # Workers don't expose HTTP endpoints
        timeout=300,  # Increased timeout for model loading and consolidator init
        working_dir=str(test_directory),
        display_output=False,
        log_dir=str(vllm_log_dir),  # Separate log directory
        terminate_existing=False,
    ) as vllm_process:
        logger.info("Waiting for vLLM worker and consolidator to initialize...")

        # Wait for worker to register with frontend
        worker_registered = wait_for_worker_registration(frontend_server["base_url"])

        if not worker_registered:
            logger.warning("Continuing test despite worker registration failure")

        # Additional wait for consolidator to fully initialize
        time.sleep(5)

        # Verify consolidator started by checking logs
        stats = extract_consolidator_stats(log_file)
        if not stats["consolidator_started"]:
            logger.warning("Consolidator may not have started - check logs")
        else:
            logger.info("Consolidator detected in logs")

        yield {
            "process": vllm_process,
            "model_id": model_id,
            "log_file": log_file,
            "consolidator_stats": stats,
        }

    # Cleanup happens automatically via context manager __exit__
    logger.info("vLLM worker stopped")


@pytest.fixture
def tester(frontend_server, vllm_worker):
    """Provides a test client that sends requests to frontend."""
    return ApiTester(
        base_url=frontend_server["base_url"],
        model_id=vllm_worker["model_id"],
    )


class TestConsolidatorRouterE2E:
    """E2E tests for KV Event Consolidator with Router."""

    SYSTEM_PROMPT = "You are a helpful AI assistant."

    # Common error patterns to check in logs
    ERROR_PATTERNS = [
        r"error.*block_size=0",
        r"Failed to parse block_hash",
        r"panic",
        r"fatal",
    ]

    def assert_no_errors_in_logs(self, vllm_log: Path, frontend_log: Path):
        """Helper to check both vLLM and frontend logs for errors."""
        vllm_errors = check_logs_for_patterns(
            vllm_log, self.ERROR_PATTERNS, "vLLM Worker"
        )
        assert not vllm_errors, f"Errors in vLLM Worker logs: {vllm_errors}"

        frontend_errors = check_logs_for_patterns(
            frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
        )
        assert not frontend_errors, f"Errors in Frontend/Router logs: {frontend_errors}"

    def send_concurrent_requests(
        self,
        tester: ApiTester,
        num_requests: int,
        max_tokens: int = 50,
        content_template: str = "Request {i}: Tell me about topic {i}",
    ):
        """Helper to send concurrent requests and return results."""

        def send_request(i: int):
            try:
                messages = [{"role": "user", "content": content_template.format(i=i)}]
                response = tester.send_chat_request(messages, max_tokens=max_tokens)
                return True, response
            except Exception as e:
                logger.error(f"Request {i} failed: {e}")
                return False, str(e)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request, i) for i in range(num_requests)]
            results = [f.result() for f in futures]

        successes = sum(1 for success, _ in results if success)
        logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
        return successes, results

    def test_basic_consolidator_flow(self, tester, vllm_worker, frontend_server):
        """
        Test basic consolidator flow:
        1. Send requests
        2. Verify consolidator starts and processes events
        3. Verify router receives events without errors
        """
        logger.info("TEST: Basic Consolidator Flow")

        # Send 3 requests to frontend
        requests_data = [
            [{"role": "user", "content": "What is machine learning?"}],
            [{"role": "user", "content": "Explain neural networks."}],
            [{"role": "user", "content": "Tell me about transformers."}],
        ]

        for i, messages in enumerate(requests_data):
            response = tester.send_chat_request(messages)
            content = response["choices"][0]["message"]["content"]
            logger.info(
                f"Request {i+1}/3: {messages[0]['content'][:30]}... => {content[:40]}..."
            )

        # Wait for logs to flush
        time.sleep(5)

        # Check vLLM worker logs for consolidator
        vllm_log = vllm_worker["log_file"]
        consolidator_stats = extract_consolidator_stats(vllm_log)

        logger.info(f"Consolidator stats: {consolidator_stats}")

        # Assertions
        assert consolidator_stats["consolidator_started"], "Consolidator did not start"
        assert consolidator_stats["store_events"] > 0, "No store events processed"
        assert consolidator_stats["published_events"] > 0, "No events published"

        # Check for errors in logs
        self.assert_no_errors_in_logs(vllm_log, frontend_server["log_file"])

        logger.info("Basic consolidator flow test passed")

    def test_consolidator_handles_concurrent_requests(
        self, tester, vllm_worker, frontend_server
    ):
        """
        Test consolidator under concurrent load:
        1. Send many requests quickly
        2. Verify no crashes or critical errors
        3. Verify all events processed
        """
        logger.info("TEST: Concurrent Request Handling")

        # Send 10 concurrent requests
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester,
            num_requests,
            max_tokens=20,
            content_template="Request {i}: Count to 5",
        )

        # Wait for logs to flush
        time.sleep(5)

        # Assertions
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Check for errors in logs
        self.assert_no_errors_in_logs(
            vllm_worker["log_file"], frontend_server["log_file"]
        )

        # Verify events were processed
        stats = extract_consolidator_stats(vllm_worker["log_file"])
        assert stats["store_events"] > 0, "No events processed during concurrent load"

        logger.info("Concurrent request handling test passed")

    def test_store_deduplication_across_sources(
        self, tester, vllm_worker, frontend_server
    ):
        """
        Test STORE event deduplication across vLLM (G1) and KVBM (G2/G3):

        When a block is stored in G1 (GPU), it's automatically offloaded
        to G2 (CPU) and G3 (Disk). This triggers STORE events from both vLLM and KVBM.

        Test Scenario:
        1. Send requests → blocks stored in vLLM (G1)
        2. Consolidator receives vLLM STORE events → queues them for publishing
        3. KVBM replicates blocks to G2/G3 → emits STORE events
        4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
        5. Result: Router receives ONE STORE event per unique block (from step 2)

        This verifies: Only one STORE event is sent to router per unique block,
        even though the block exists in multiple storage tiers (G1, G2, G3).
        KVBM replications are deduplicated and don't trigger duplicate router updates.
        """
        logger.info("Starting STORE deduplication test")

        # Send requests to generate STORE events
        logger.info("Sending concurrent requests to generate STORE events")
        num_requests = 10
        successes, _ = self.send_concurrent_requests(
            tester, num_requests, max_tokens=50
        )
        assert (
            successes >= num_requests * 0.9
        ), f"Too many failed requests: {num_requests - successes}"

        # Wait for events to be processed
        time.sleep(5)

        # Phase 2: Analyze consolidator logs
        logger.info("Phase 2: Analyzing STORE event deduplication")
        vllm_log = vllm_worker["log_file"]
        log_content = vllm_log.read_text()

        # Count STORE events received from vLLM (first source = will publish)
        vllm_stores = len(
            re.findall(
                r"stored in first source Vllm.*will publish STORE event", log_content
            )
        )

        # Count STORE events received from KVBM (they appear as DEDUP messages)
        kvbm_stores = len(
            re.findall(
                r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source Kvbm", log_content
            )
        )

        # Count total STORE events received (from both sources)
        total_stores_received = vllm_stores + kvbm_stores

        # Count STORE events actually published to router
        published_stores = len(re.findall(r"will publish STORE event", log_content))

        logger.info(f"STORE events received from vLLM: {vllm_stores}")
        logger.info(f"STORE events received from KVBM: {kvbm_stores}")
        logger.info(f"Total STORE events received: {total_stores_received}")
        logger.info(f"STORE events published to router: {published_stores}")

        # Assertions:
        # 1. We should receive STORE events from both vLLM and KVBM
        assert vllm_stores > 0, "Expected STORE events from vLLM"
        assert kvbm_stores > 0, "Expected STORE events from KVBM (replication to G2/G3)"

        # 2. Published stores should approximately equal vLLM stores
        #    (each unique block is published once when first stored in vLLM)
        assert (
            published_stores == vllm_stores
        ), f"Expected published events ({published_stores}) to equal vLLM stores ({vllm_stores})"

        # 3. Total stores should be vLLM + KVBM (each block stored in both)
        assert (
            total_stores_received == vllm_stores + kvbm_stores
        ), f"Total should be vLLM ({vllm_stores}) + KVBM ({kvbm_stores})"

        # 4. Check for errors in logs
        self.assert_no_errors_in_logs(vllm_log, frontend_server["log_file"])

        logger.info("STORE deduplication test passed")

    def test_remove_deduplication_across_sources(
        self, test_directory, runtime_services
    ):
        """
        Test REMOVE event deduplication across G1 (vLLM GPU), G2 (KVBM CPU), G3 (KVBM disk):

        When blocks are stored in G1 (GPU), they are AUTOMATICALLY
        replicated to G2 (CPU) and G3 (Disk) simultaneously.

        Test Scenario:
        1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
        2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
        3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from vLLM
           → consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
        4. Some blocks only exist in G1 (not replicated) → when evicted → published to router

        This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
        Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
        """
        logger.info("Starting REMOVE deduplication test")

        # Start frontend with router
        frontend_command = [
            "python",
            "-m",
            "dynamo.frontend",
            "--http-port",
            str(FRONTEND_PORT),
            "--router-mode",
            "kv",
            "--router-reset-states",
        ]

        frontend_env = os.environ.copy()
        frontend_env.update(
            {
                "RUST_BACKTRACE": "1",
                "NATS_SERVER": "nats://localhost:4222",
                "ETCD_ENDPOINTS": "http://localhost:2379",
                "DYN_LOG": "debug",
            }
        )

        frontend_log_dir = test_directory / "frontend"
        frontend_log_dir.mkdir(parents=True, exist_ok=True)
        frontend_log = frontend_log_dir / "python.log.txt"

        with ManagedProcess(
            command=frontend_command,
            env=frontend_env,
            health_check_urls=[f"http://localhost:{FRONTEND_PORT}/health"],
            timeout=120,
            working_dir=str(test_directory),
            display_output=False,
            log_dir=str(frontend_log_dir),
        ) as _frontend_process:
            logger.info(f"Frontend started on port {FRONTEND_PORT}")

            # Start vLLM worker with constrained GPU blocks but larger KVBM blocks
            model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")

            vllm_command = [
                "python",
                "-m",
                "dynamo.vllm",
                "--model",
                model_id,
                "--connector",
                "kvbm",
                "--enforce-eager",
                "--enable-prefix-caching",
                "--num-gpu-blocks-override",
                "30",  # Very small GPU cache to force evictions
            ]

            vllm_env = os.environ.copy()
            vllm_env.update(
                {
                    "RUST_BACKTRACE": "1",
                    "NATS_SERVER": "nats://localhost:4222",
                    "ETCD_ENDPOINTS": "http://localhost:2379",
                    "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": "50",  # Larger than GPU but still constrained
                    "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": "50",
                    "DYN_LOG": "debug",
                }
            )

            vllm_log_dir = test_directory / "vllm"
            vllm_log_dir.mkdir(parents=True, exist_ok=True)
            vllm_log = vllm_log_dir / "python.log.txt"

            with ManagedProcess(
                command=vllm_command,
                env=vllm_env,
                health_check_urls=[],
                timeout=300,
                working_dir=str(test_directory),
                display_output=False,
                log_dir=str(vllm_log_dir),
                terminate_existing=False,
            ) as _vllm_process:
                logger.info("Waiting for vLLM worker to initialize...")

                # Wait for worker to register with frontend
                worker_registered = wait_for_worker_registration(
                    f"http://localhost:{FRONTEND_PORT}"
                )

                if not worker_registered:
                    pytest.fail("vLLM worker failed to register with frontend")

                # Additional wait for consolidator to fully initialize
                time.sleep(5)

                # Create tester
                tester = ApiTester(
                    base_url=f"http://localhost:{FRONTEND_PORT}", model_id=model_id
                )

                # Phase 1: Send requests to fill GPU cache
                logger.info("Phase 1: Filling GPU cache with diverse prompts")
                for i in range(25):  # Send enough requests to trigger GPU eviction
                    prompt = f"Tell me a unique story about topic {i}. Make it very long and detailed with many paragraphs."
                    response = tester.send_chat_request(
                        messages=[{"role": "user", "content": prompt}],
                        max_tokens=100,  # Increase tokens to use more blocks per request
                    )
                    assert "content" in response["choices"][0]["message"]
                    logger.info(f"Request {i+1}/25 completed")

                # Wait for evictions to settle
                time.sleep(5)

                # Phase 2: Analyze consolidator logs
                logger.info("Phase 2: Analyzing consolidator deduplication behavior")
                log_content = vllm_log.read_text()

                # Count blocks removed from vLLM but still in KVBM (deduplication working!)
                vllm_removes_but_in_kvbm = len(
                    re.findall(r"removed from source Vllm, still in.*Kvbm", log_content)
                )

                # Count blocks removed from vLLM as last source (no KVBM copy)
                vllm_removes_last_source = len(
                    re.findall(r"removed from last source Vllm", log_content)
                )

                # Count REMOVE events actually published to router
                published_removes = len(
                    re.findall(r"will publish REMOVE event", log_content)
                )

                logger.info(
                    f"Blocks removed from vLLM (G1) but still in KVBM (G2/G3): {vllm_removes_but_in_kvbm}"
                )
                logger.info(
                    f"Blocks removed from vLLM as last source: {vllm_removes_last_source}"
                )
                logger.info(f"REMOVE events published to router: {published_removes}")

                # Assertions:
                # 1. We should see GPU evictions where blocks still exist in KVBM
                #    This proves deduplication is working (REMOVE not sent to router yet)
                assert (
                    vllm_removes_but_in_kvbm > 0
                ), "Expected GPU evictions where blocks still exist in KVBM (deduplication working)"

                # 2. REMOVE events should be published for last-source removals
                assert (
                    published_removes > 0
                ), "Expected REMOVE events to be published for last-source removals"

                # 3. Check for errors in logs
                self.assert_no_errors_in_logs(vllm_log, frontend_log)