Unverified Commit 18d9d1fa authored by Janelle Cai's avatar Janelle Cai Committed by GitHub
Browse files

feat(mocker): pre-fetch model and staggered launches (#5871)

parent 2b199540
...@@ -25,6 +25,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume ...@@ -25,6 +25,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster - `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster
- `--data-parallel-size`: Number of data parallel workers to simulate (default: 1) - `--data-parallel-size`: Number of data parallel workers to simulate (default: 1)
- `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool - `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool
- `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto)
- `--is-prefill-worker` / `--is-decode-worker`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode. - `--is-prefill-worker` / `--is-decode-worker`: Whether the worker is a prefill or decode worker for disaggregated deployment. If not specified, mocker will be in aggregated mode.
### Example with individual arguments (vLLM-style): ### Example with individual arguments (vLLM-style):
......
...@@ -315,6 +315,17 @@ def parse_args(): ...@@ -315,6 +315,17 @@ def parse_args():
"Prefill workers listen on these ports; decode workers connect to them. " "Prefill workers listen on these ports; decode workers connect to them. "
"If not specified, bootstrap rendezvous is disabled.", "If not specified, bootstrap rendezvous is disabled.",
) )
parser.add_argument(
"--stagger-delay",
type=float,
default=-1.0,
help=(
"Delay in seconds between launching each worker to avoid overwhelming "
"etcd/NATS/frontend with many workers. Set to 0 to disable staggering. "
"Use -1 for auto mode (0.1s for 32-128 workers, 0.2s for >128 workers, 0 otherwise). "
"Default: -1 (auto)"
),
)
parser.add_argument( parser.add_argument(
"--store-kv", "--store-kv",
type=str, type=str,
......
...@@ -16,7 +16,7 @@ import uvloop ...@@ -16,7 +16,7 @@ import uvloop
os.environ.setdefault("DYN_COMPUTE_THREADS", "0") os.environ.setdefault("DYN_COMPUTE_THREADS", "0")
from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input from dynamo.llm import EngineType, EntrypointArgs, fetch_llm, make_engine, run_input
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
...@@ -37,6 +37,24 @@ async def graceful_shutdown(runtimes: list): ...@@ -37,6 +37,24 @@ async def graceful_shutdown(runtimes: list):
logger.info("DistributedRuntime shutdown complete") logger.info("DistributedRuntime shutdown complete")
async def prefetch_model(model_path: str) -> None:
"""Pre-fetch model from HuggingFace to avoid rate limiting with many workers."""
if Path(model_path).exists():
logger.info(f"Using local model path: {model_path}")
return
logger.info(f"Pre-fetching model from HuggingFace: {model_path}")
try:
local_path = await fetch_llm(model_path, ignore_weights=True)
logger.info(f"Model cached at: {local_path}")
except Exception as e:
logger.warning(
f"Failed to pre-fetch model: {e}. "
"Workers will attempt individual downloads (may cause rate limiting)."
)
async def worker(): async def worker():
"""Main worker function that launches mocker instances. """Main worker function that launches mocker instances.
...@@ -59,6 +77,10 @@ async def worker(): ...@@ -59,6 +77,10 @@ async def worker():
extra_engine_args_path = create_temp_engine_args_file(args) extra_engine_args_path = create_temp_engine_args_file(args)
logger.info("Created MockEngineArgs from CLI arguments") logger.info("Created MockEngineArgs from CLI arguments")
# Pre-fetch model once to avoid HuggingFace rate limiting when launching many workers
if args.num_workers > 1 and args.model_path:
await prefetch_model(args.model_path)
try: try:
logger.info( logger.info(
f"Launching {args.num_workers} mocker worker(s) with isolated DistributedRuntime instances" f"Launching {args.num_workers} mocker worker(s) with isolated DistributedRuntime instances"
...@@ -76,6 +98,28 @@ async def worker(): ...@@ -76,6 +98,28 @@ async def worker():
del profile_data_result # Triggers tmpdir cleanup via __del__ del profile_data_result # Triggers tmpdir cleanup via __del__
def compute_stagger_delay(num_workers: int, stagger_delay: float) -> float:
"""Compute the stagger delay based on worker count to give the frontend time to process registrations.
Returns the delay in seconds between worker launches.
"""
if stagger_delay >= 0:
return stagger_delay
if stagger_delay != -1:
raise ValueError(
f"Invalid --stagger-delay value: {stagger_delay}. "
"Use -1 for auto mode, 0 to disable, or a positive value for explicit delay."
)
# Auto mode: stagger based on worker count
if num_workers <= 32:
return 0.0
elif num_workers <= 128:
return 0.1
else:
return 0.2
async def launch_workers(args, extra_engine_args_path): async def launch_workers(args, extra_engine_args_path):
"""Launch mocker worker(s) with isolated DistributedRuntime instances. """Launch mocker worker(s) with isolated DistributedRuntime instances.
...@@ -90,6 +134,21 @@ async def launch_workers(args, extra_engine_args_path): ...@@ -90,6 +134,21 @@ async def launch_workers(args, extra_engine_args_path):
runtimes = [] runtimes = []
per_worker_temp_files: list[Path] = [] per_worker_temp_files: list[Path] = []
stagger_delay = compute_stagger_delay(args.num_workers, args.stagger_delay)
batch_size = 32
batch_pause = 2.0
if stagger_delay > 0:
total_time = (args.num_workers - 1) * stagger_delay
if args.num_workers > batch_size:
num_batches = (args.num_workers + batch_size - 1) // batch_size
total_time += batch_pause * (num_batches - 1)
logger.info(
f"Staggering {args.num_workers} worker launches: "
f"{stagger_delay}s between workers, {batch_pause}s pause every {batch_size} workers "
f"(estimated total: {total_time:.1f}s)"
)
# Load base engine args if we need to create per-worker files with bootstrap_port # Load base engine args if we need to create per-worker files with bootstrap_port
base_engine_args = None base_engine_args = None
if args.bootstrap_ports_list: if args.bootstrap_ports_list:
...@@ -137,6 +196,17 @@ async def launch_workers(args, extra_engine_args_path): ...@@ -137,6 +196,17 @@ async def launch_workers(args, extra_engine_args_path):
future = run_input(runtime, args.endpoint, engine_config) future = run_input(runtime, args.endpoint, engine_config)
futures.append(future) futures.append(future)
# Stagger worker launches for large deployments
if stagger_delay > 0 and worker_id < args.num_workers - 1:
await asyncio.sleep(stagger_delay)
# Add extra pause between batches to let frontend catch up
if (worker_id + 1) % batch_size == 0:
logger.info(
f"Batch {(worker_id + 1) // batch_size} complete, "
f"pausing {batch_pause}s for frontend to process..."
)
await asyncio.sleep(batch_pause)
logger.info(f"All {args.num_workers} mocker worker(s) created and running") logger.info(f"All {args.num_workers} mocker worker(s) created and running")
# Set up signal handler for graceful shutdown # Set up signal handler for graceful shutdown
......
...@@ -87,6 +87,7 @@ python -m dynamo.mocker \ ...@@ -87,6 +87,7 @@ python -m dynamo.mocker \
| `--startup-time` | None | Simulated startup delay (seconds) | | `--startup-time` | None | Simulated startup delay (seconds) |
| `--planner-profile-data` | None | Path to NPZ file with timing data | | `--planner-profile-data` | None | Path to NPZ file with timing data |
| `--num-workers` | 1 | Workers per process | | `--num-workers` | 1 | Workers per process |
| `--stagger-delay` | -1 (auto) | Delay between worker launches (seconds). 0 disables, -1 enables auto mode |
| `--is-prefill-worker` | False | Prefill-only mode | | `--is-prefill-worker` | False | Prefill-only mode |
| `--is-decode-worker` | False | Decode-only mode | | `--is-decode-worker` | False | Decode-only mode |
| `--enable-local-indexer` | False | Enable local KV indexer | | `--enable-local-indexer` | False | Enable local KV indexer |
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES.
All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Mocker: LLM Engine Simulation in Rust
The Mocker is a lightweight, high-fidelity simulation of an LLM inference engine, implemented entirely in Rust. It replicates the core scheduling, memory management, and timing behaviors of production engines without requiring a GPU, making it invaluable for testing Dynamo's routing, KV cache events, disaggregated serving, and planner components.
## Overview
The mocker simulates:
- **Block-based KV cache management** with LRU eviction
- **Continuous batching scheduler** with watermark-based admission control
- **Prefix caching** with hash-based block deduplication
- **Chunked prefill** for better batching efficiency
- **Realistic timing models** for prefill and decode phases
- **Disaggregated serving** (prefill/decode separation)
- **KV event publishing** for router integration
- **Data parallelism** (multiple DP ranks per engine)
> **Note:** While the mocker uses vLLM as its primary reference implementation, these core components—block-based KV cache management, continuous batching schedulers, LRU evictors, and prefix caching—are fundamental to all modern LLM inference engines, including SGLang and TensorRT-LLM. The architectural patterns simulated here are engine-agnostic and apply broadly across the inference ecosystem.
## Quick Start
### Basic Usage
```bash
# Launch a single mocker worker
python -m dynamo.mocker --model-path Qwen/Qwen3-0.6B
# Launch with custom KV cache configuration
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--num-gpu-blocks-override 8192 \
--block-size 64 \
--max-num-seqs 256
# Launch with timing speedup for faster testing
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--speedup-ratio 10.0
```
### Disaggregated Serving
```bash
# Launch prefill worker
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--is-prefill-worker \
--bootstrap-ports 50100
# Launch decode worker (in another terminal)
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--is-decode-worker
```
### Multiple Workers in One Process
```bash
# Launch 4 mocker workers sharing the same tokio runtime
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--num-workers 4
```
## CLI Arguments
| Argument | Default | Description |
|----------|---------|-------------|
| `--model-path` | Required | HuggingFace model ID or local path for tokenizer |
| `--endpoint` | `dyn://dynamo.backend.generate` | Dynamo endpoint string |
| `--model-name` | Derived from model-path | Model name for API responses |
| `--num-gpu-blocks-override` | 16384 | Number of KV cache blocks |
| `--block-size` | 64 | Tokens per KV cache block |
| `--max-num-seqs` | 256 | Maximum concurrent sequences |
| `--max-num-batched-tokens` | 8192 | Maximum tokens per batch |
| `--enable-prefix-caching` | True | Enable prefix caching |
| `--enable-chunked-prefill` | True | Enable chunked prefill |
| `--watermark` | 0.01 | KV cache watermark (fraction reserved) |
| `--speedup-ratio` | 1.0 | Timing speedup factor |
| `--data-parallel-size` | 1 | Number of DP replicas |
| `--startup-time` | None | Simulated startup delay (seconds) |
| `--planner-profile-data` | None | Path to NPZ file with timing data |
| `--num-workers` | 1 | Workers per process |
| `--stagger-delay` | -1 (auto) | Delay between worker launches (seconds). 0 disables, -1 enables auto mode |
| `--is-prefill-worker` | False | Prefill-only mode |
| `--is-decode-worker` | False | Decode-only mode |
| `--enable-local-indexer` | False | Enable local KV indexer |
| `--bootstrap-ports` | None | Ports for P/D rendezvous |
## Architecture
The mocker is organized into several cooperating components that mirror the internal architecture of production LLM inference engines.
### Scheduler
The scheduler implements continuous batching, maintaining three logical queues:
1. **Waiting Queue** - Newly arrived requests awaiting scheduling
2. **Prefill Queue** - Requests scheduled for prefill
3. **Decode Queue** - Requests actively decoding (ordered by age for preemption)
Each iteration, the scheduler receives incoming requests, moves eligible requests from waiting to prefill based on available memory and compute budgets, simulates the prefill phase for queued requests, runs one decode step for all active sequences, and publishes metrics about current resource utilization.
When resources become constrained, the scheduler employs preemption: the oldest decoding request is evicted back to the waiting queue, its KV blocks are freed, and it will be rescheduled later. This mirrors how real engines handle memory pressure.
### KV Block Manager
The block manager tracks KV cache blocks using reference counting and an LRU eviction policy. Blocks exist in one of two pools:
- **Active Pool** - Blocks currently in use by one or more sequences, tracked with reference counts
- **Inactive Pool** - Blocks no longer actively referenced but kept for potential reuse (prefix caching)
When a sequence needs blocks, the manager first checks if they already exist (cache hit). If not, it allocates new blocks, potentially evicting the least-recently-used inactive blocks to make room. When a sequence completes or is preempted, its blocks are either moved to the inactive pool (for potential reuse) or freed entirely.
The following diagram illustrates the block lifecycle, based on vLLM's block manager design:
```
┌───── Cache hit (Use) ────┐
│ │
▼ │
┌───────────┐ ┌───────────┐ ┌──────────┴──────┐ ┌───────────┐
│ New Block │──────►│ Active │──────►│ Inactive │──────►│ Freed │
└───────────┘ alloc │ Pool │ deref │ Pool │ evict └───────────┘
│(ref_count)│ │ (LRU order) │
└─────┬─────┘ └─────────────────┘
│ destroy (preemption)
┌───────────┐
│ Freed │
└───────────┘
```
### Evictor
The LRU evictor maintains blocks ordered by their last access time, enabling O(1) eviction of the oldest unused block. It supports both normal insertion (for completed sequences) and front-insertion (for preempted sequences that should be evicted first if memory pressure continues).
### Sequence Tracking
Each active request is tracked as a sequence, managing its token blocks and generation state. As tokens are generated, the sequence tracks which blocks are partial (still being filled) versus full (complete and hashable for prefix caching). When a partial block fills up, it gets "promoted" to a full block with a content-based hash, enabling future cache hits from requests with matching prefixes.
### Performance Model
The mocker supports two timing prediction modes:
**Polynomial Model (Default):** Uses hardcoded polynomial formulas that approximate typical GPU behavior. Prefill time scales quadratically with token count, while decode time depends on the total active KV cache size.
**Interpolated Model:** Loads actual profiling data from an NPZ file containing measured prefill and decode latencies. The mocker interpolates between data points to predict timing for any input size. This enables high-fidelity simulation matching a specific hardware configuration.
### Bootstrap Rendezvous (Disaggregated Serving)
For disaggregated prefill/decode deployments, prefill and decode workers coordinate via a simple TCP-based rendezvous protocol. The decode worker connects to the prefill worker's bootstrap port and waits until the prefill phase completes and KV cache is ready. Either side can arrive first—the rendezvous completes when both are ready.
## Integration with Dynamo
### KV Event Publishing
When prefix caching is enabled, the mocker publishes KV cache events to the distributed runtime. These events notify the system when blocks are stored (new content cached) or removed (evicted). This enables the KV-aware router to make intelligent routing decisions based on which workers have which prefixes cached.
### Metrics Publishing
Each scheduler publishes metrics about its current state, including the number of active decode blocks per DP rank. The router uses these metrics for load-aware routing decisions.
## Testing Scenarios
The mocker is particularly useful for:
1. **Router Testing** - Validate KV-aware routing without GPUs
2. **Planner Testing** - Test SLA-based planners with realistic timing
3. **Fault Tolerance** - Test request migration, graceful shutdown
4. **Disaggregation** - Test P/D separation and KV transfer coordination
5. **Performance Modeling** - Prototype scheduling policies
6. **CI/CD** - Fast integration tests without hardware dependencies
## Comparison with Real Engines
| Feature | Real Engine | Mocker |
|---------|-------------|--------|
| GPU Required | Yes | No |
| Block Manager | Paged KV cache | Simulated blocks |
| Scheduler | Continuous batching | Continuous batching |
| Prefix Caching | Hash-based | Hash-based |
| Chunked Prefill | Supported | Supported |
| Preemption | Recompute/swap | Recompute (simulated) |
| Timing | Real execution | Model-based |
| KV Events | Native | Compatible |
| Data Parallelism | Multi-GPU | Simulated |
## Feature Gaps (WIP)
The following features are not yet supported by the mocker:
- **KV transfer latency simulation** - Disaggregated serving simulates the rendezvous handshake but does not model the actual KV cache transfer time between prefill and decode workers
- **Multi-tier memory** - No support for offloading KV cache to CPU/disk or onboarding back to GPU; potential future integration with KVBM
- **Multimodal support** - Currently only simulates text token processing; no vision encoder or cross-attention simulation
- **Native Rust reference counting** - Work in progress to use native Rc/Arc for block reference counting, enabling natural RAII patterns for simpler tracking
...@@ -429,11 +429,17 @@ fn unregister_llm<'p>( ...@@ -429,11 +429,17 @@ fn unregister_llm<'p>(
/// Download a model from Hugging Face, returning it's local path /// Download a model from Hugging Face, returning it's local path
/// Example: `model_path = await fetch_llm("Qwen/Qwen3-0.6B")` /// Example: `model_path = await fetch_llm("Qwen/Qwen3-0.6B")`
#[pyfunction] #[pyfunction]
#[pyo3(signature = (remote_name))] #[pyo3(signature = (remote_name, ignore_weights=false))]
fn fetch_llm<'p>(py: Python<'p>, remote_name: &str) -> PyResult<Bound<'p, PyAny>> { fn fetch_llm<'p>(
py: Python<'p>,
remote_name: &str,
ignore_weights: bool,
) -> PyResult<Bound<'p, PyAny>> {
let repo = remote_name.to_string(); let repo = remote_name.to_string();
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
LocalModel::fetch(&repo, false).await.map_err(to_pyerr) LocalModel::fetch(&repo, ignore_weights)
.await
.map_err(to_pyerr)
}) })
} }
......
...@@ -1081,9 +1081,10 @@ def lora_name_to_id(lora_name: str) -> int: ...@@ -1081,9 +1081,10 @@ def lora_name_to_id(lora_name: str) -> int:
"""Generate a deterministic integer ID from a LoRA name using blake3 hash.""" """Generate a deterministic integer ID from a LoRA name using blake3 hash."""
... ...
async def fetch_llm(remote_name: str) -> str: async def fetch_llm(remote_name: str, ignore_weights: bool = False) -> str:
""" """
Download a model from Hugging Face, returning it's local path. Download a model from Hugging Face, returning it's local path.
If `ignore_weights` is True, only fetches tokenizer and config files.
Example: `model_path = await fetch_llm("Qwen/Qwen3-0.6B")` Example: `model_path = await fetch_llm("Qwen/Qwen3-0.6B")`
""" """
... ...
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment