# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import json import pytest from dynamo.llm import KvRouterConfig, MockEngineArgs from dynamo.replay import run_synthetic_trace_replay, run_trace_replay pytestmark = [ pytest.mark.gpu_0, pytest.mark.parallel, pytest.mark.pre_merge, ] MOONCAKE_TRACE_FIRST20 = """{"timestamp": 0, "input_length": 6755, "output_length": 500, "hash_ids": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]} {"timestamp": 0, "input_length": 7319, "output_length": 490, "hash_ids": [0, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]} {"timestamp": 0, "input_length": 7234, "output_length": 794, "hash_ids": [0, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41]} {"timestamp": 0, "input_length": 2287, "output_length": 316, "hash_ids": [0, 42, 43, 44, 45]} {"timestamp": 0, "input_length": 9013, "output_length": 3, "hash_ids": [46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63]} {"timestamp": 0, "input_length": 6506, "output_length": 3, "hash_ids": [46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 64]} {"timestamp": 0, "input_length": 4824, "output_length": 173, "hash_ids": [0, 65, 66, 67, 68, 69, 70, 71, 72, 73]} {"timestamp": 0, "input_length": 3119, "output_length": 20, "hash_ids": [74, 75, 76, 77, 78, 79, 80]} {"timestamp": 0, "input_length": 23090, "output_length": 453, "hash_ids": [0, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125]} {"timestamp": 0, "input_length": 3135, "output_length": 19, "hash_ids": [74, 75, 76, 77, 78, 126, 127]} {"timestamp": 0, "input_length": 26874, "output_length": 458, "hash_ids": [0, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179]} {"timestamp": 0, "input_length": 10487, "output_length": 402, "hash_ids": [0, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199]} {"timestamp": 0, "input_length": 17448, "output_length": 610, "hash_ids": [0, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233]} {"timestamp": 0, "input_length": 6253, "output_length": 3, "hash_ids": [46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 234]} {"timestamp": 0, "input_length": 6725, "output_length": 32, "hash_ids": [46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 235, 236]} {"timestamp": 3052, "input_length": 13538, "output_length": 71, "hash_ids": [0, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262]} {"timestamp": 3052, "input_length": 87162, "output_length": 402, "hash_ids": [0, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432]} {"timestamp": 3052, "input_length": 6166, "output_length": 24, "hash_ids": [46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 433]} {"timestamp": 3052, "input_length": 6320, "output_length": 548, "hash_ids": [0, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445]} {"timestamp": 3052, "input_length": 2007, "output_length": 354, "hash_ids": [0, 446, 447, 448]} """ def _write_trace_and_args(tmp_path): trace_path = tmp_path / "trace.jsonl" records = [ { "timestamp": 1000.0, "input_length": 64, "output_length": 2, "hash_ids": [101], }, { "timestamp": 1005.0, "input_length": 64, "output_length": 2, "hash_ids": [101], }, ] trace_path.write_text( "\n".join(json.dumps(record) for record in records) + "\n", encoding="utf-8", ) return trace_path def _write_vllm_args(tmp_path): args_path = tmp_path / "args.json" args_path.write_text( json.dumps( { "block_size": 64, "speedup_ratio": 1000.0, } ), encoding="utf-8", ) return args_path def _vllm_args(): return MockEngineArgs.from_json( json.dumps( { "block_size": 64, "speedup_ratio": 1000.0, } ) ) def _write_sglang_args(tmp_path): args_path = tmp_path / "sglang_args.json" args_path.write_text( json.dumps( { "engine_type": "sglang", "num_gpu_blocks": 512, "block_size": 64, "speedup_ratio": 1000.0, "sglang": { "page_size": 64, }, } ), encoding="utf-8", ) return args_path def _sglang_args(): return MockEngineArgs.from_json( json.dumps( { "engine_type": "sglang", "num_gpu_blocks": 512, "block_size": 64, "speedup_ratio": 1000.0, "sglang": { "page_size": 64, }, } ) ) def _write_router_config(tmp_path): config_path = tmp_path / "router_config.json" config_path.write_text( json.dumps( { "router_queue_threshold": 1.25, "router_event_threads": 1, "router_queue_policy": "wspt", "router_temperature": 0.0, "overlap_score_weight": 1.0, "use_kv_events": True, "durable_kv_events": False, "router_replica_sync": False, "router_track_active_blocks": True, "router_track_output_blocks": False, "router_assume_kv_reuse": True, "router_snapshot_threshold": 1000000, "router_reset_states": False, "router_ttl_secs": 120.0, "router_max_tree_size": 1048576, "router_prune_target_ratio": 0.8, "router_enable_cache_control": False, "skip_initial_worker_wait": False, "min_initial_workers": 1, "remote_indexer_component": None, } ), encoding="utf-8", ) return config_path def _router_config(): return KvRouterConfig.from_json( json.dumps( { "router_queue_threshold": 1.25, "router_event_threads": 1, "router_queue_policy": "wspt", "router_temperature": 0.0, "overlap_score_weight": 1.0, "use_kv_events": True, "durable_kv_events": False, "router_replica_sync": False, "router_track_active_blocks": True, "router_track_output_blocks": False, "router_assume_kv_reuse": True, "router_snapshot_threshold": 1000000, "router_reset_states": False, "router_ttl_secs": 120.0, "router_max_tree_size": 1048576, "router_prune_target_ratio": 0.8, "router_enable_cache_control": False, "skip_initial_worker_wait": False, "min_initial_workers": 1, "remote_indexer_component": None, } ) ) def _partial_router_config(): return KvRouterConfig( router_queue_threshold=1.25, router_event_threads=1, router_queue_policy="wspt", ) def _assert_basic_report_counts(report, *, num_requests, input_tokens, output_tokens): assert report["num_requests"] == num_requests assert report["completed_requests"] == num_requests assert report["total_input_tokens"] == num_requests * input_tokens assert report["total_output_tokens"] == num_requests * output_tokens @pytest.mark.parametrize("engine_type", ["vllm", "sglang"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) @pytest.mark.parametrize("router_mode", ["round_robin", "kv_router"]) def test_run_trace_replay_smoke_matrix(tmp_path, engine_type, replay_mode, router_mode): trace_path = _write_trace_and_args(tmp_path) args_path = _vllm_args() if engine_type == "vllm" else _sglang_args() num_workers = 1 if router_mode == "round_robin" else 2 report = run_trace_replay( trace_path, extra_engine_args=args_path, num_workers=num_workers, replay_mode=replay_mode, router_mode=router_mode, ) _assert_basic_report_counts( report, num_requests=2, input_tokens=64, output_tokens=2, ) @pytest.mark.parametrize("engine_type", ["vllm", "sglang"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_trace_replay_invariant_counts_match(tmp_path, engine_type, replay_mode): trace_path = _write_trace_and_args(tmp_path) args_path = _vllm_args() if engine_type == "vllm" else _sglang_args() single = run_trace_replay( trace_path, extra_engine_args=args_path, num_workers=1, replay_mode=replay_mode, ) multi_round_robin = run_trace_replay( trace_path, extra_engine_args=args_path, num_workers=4, replay_mode=replay_mode, router_mode="round_robin", ) multi_kv_router = run_trace_replay( trace_path, extra_engine_args=args_path, num_workers=4, replay_mode=replay_mode, router_mode="kv_router", ) for field in ( "num_requests", "completed_requests", "total_input_tokens", "total_output_tokens", ): assert single[field] == multi_round_robin[field] assert single[field] == multi_kv_router[field] @pytest.mark.parametrize("engine_type", ["vllm", "sglang"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) @pytest.mark.parametrize("router_mode", ["round_robin", "kv_router"]) def test_run_synthetic_trace_replay_smoke_matrix( tmp_path, engine_type, replay_mode, router_mode ): args_path = _vllm_args() if engine_type == "vllm" else _sglang_args() num_workers = 1 if router_mode == "round_robin" else 2 report = run_synthetic_trace_replay( 64, 2, 2, extra_engine_args=args_path, num_workers=num_workers, replay_mode=replay_mode, router_mode=router_mode, arrival_interval_ms=5.0, ) _assert_basic_report_counts( report, num_requests=2, input_tokens=64, output_tokens=2, ) @pytest.mark.parametrize("engine_type", ["vllm", "sglang"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_synthetic_trace_replay_invariant_counts_match( tmp_path, engine_type, replay_mode ): args_path = _vllm_args() if engine_type == "vllm" else _sglang_args() single = run_synthetic_trace_replay( 64, 2, 2, extra_engine_args=args_path, num_workers=1, replay_mode=replay_mode, arrival_interval_ms=5.0, ) multi_round_robin = run_synthetic_trace_replay( 64, 2, 2, extra_engine_args=args_path, num_workers=4, replay_mode=replay_mode, router_mode="round_robin", arrival_interval_ms=5.0, ) multi_kv_router = run_synthetic_trace_replay( 64, 2, 2, extra_engine_args=args_path, num_workers=4, replay_mode=replay_mode, router_mode="kv_router", arrival_interval_ms=5.0, ) for field in ( "num_requests", "completed_requests", "total_input_tokens", "total_output_tokens", ): assert single[field] == multi_round_robin[field] assert single[field] == multi_kv_router[field] @pytest.mark.parametrize("engine_type", ["vllm", "sglang"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_synthetic_concurrency_replay_counts_match( tmp_path, engine_type, replay_mode ): args_path = _vllm_args() if engine_type == "vllm" else _sglang_args() report = run_synthetic_trace_replay( 64, 2, 3, extra_engine_args=args_path, num_workers=2, replay_mode=replay_mode, replay_concurrency=2, ) _assert_basic_report_counts( report, num_requests=3, input_tokens=64, output_tokens=2, ) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_trace_replay_accepts_router_config(tmp_path, replay_mode): trace_path = _write_trace_and_args(tmp_path) args_path = _vllm_args() router_config_path = _router_config() report = run_trace_replay( trace_path, extra_engine_args=args_path, router_config=router_config_path, num_workers=2, replay_mode=replay_mode, router_mode="kv_router", ) _assert_basic_report_counts( report, num_requests=2, input_tokens=64, output_tokens=2, ) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_trace_replay_accepts_partial_router_config_json(tmp_path, replay_mode): trace_path = _write_trace_and_args(tmp_path) args_path = _vllm_args() report = run_trace_replay( trace_path, extra_engine_args=args_path, router_config=_partial_router_config(), num_workers=2, replay_mode=replay_mode, router_mode="kv_router", ) _assert_basic_report_counts( report, num_requests=2, input_tokens=64, output_tokens=2, ) @pytest.mark.parametrize("replay_mode", ["offline", "online"]) def test_run_trace_replay_accepts_partial_extra_engine_args_json(tmp_path, replay_mode): trace_path = _write_trace_and_args(tmp_path) report = run_trace_replay( trace_path, extra_engine_args=MockEngineArgs(block_size=64, speedup_ratio=1000.0), num_workers=1, replay_mode=replay_mode, ) _assert_basic_report_counts( report, num_requests=2, input_tokens=64, output_tokens=2, )