Unverified Commit f31732a2 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: integrate mocker with dynamo-run and python cli (#1927)

parent aba60996
# Mocker engine
The mocker engine is a mock vLLM implementation designed for testing and development purposes. It simulates realistic token generation timing without requiring actual model inference, making it useful for:
- Testing distributed system components without GPU resources
- Benchmarking infrastructure and networking overhead
- Developing and debugging Dynamo components
- Load testing and performance analysis
**Basic usage:**
The `--model-path` is required but can point to any valid model path - the mocker doesn't actually load the model weights (but the pre-processor needs the tokenizer). The arguments `block-size`, `num-gpu-blocks`, `max-num-seqs`, `max-num-batched-tokens`, and `enable-prefix-caching` are common arguments shared with the real VLLM engine.
And below are arguments that are mocker-specific:
- `speedup_ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster.
- `dp_size`: Number of data parallel workers to simulate (default: 1)
- `watermark`: KV cache watermark threshold as a fraction (default: 0.01). This argument also exists for the real VLLM engine but cannot be passed as an engine arg.
>[!NOTE]
>Currently, `enable_chunked_prefill` is always assumed to be false, which mirrors the vllm v0 behavior. This is also the current behavior in `examples/llm`. This will be updated in the near future as we move to support vllm v1 (and deprecate support for vllm v0).
```bash
echo '{"speedup_ratio": 10.0}' > mocker_args.json
python -m dynamo.mocker --model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 --extra-engine-args mocker_args.json
python -m dynamo.frontend --http-port 8080
```
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from dynamo.mocker.main import main
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B-Q8_0.gguf --extra-engine-args args.json`
import argparse
from pathlib import Path
import uvloop
from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
configure_dynamo_logging()
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
args = cmd_line_args()
# Create engine configuration
entrypoint_args = EntrypointArgs(
engine_type=EngineType.Mocker,
model_path=args.model_path,
model_name=args.model_name,
endpoint_id=args.endpoint,
extra_engine_args=args.extra_engine_args,
)
# Create and run the engine
# NOTE: only supports dyn endpoint for now
engine_config = await make_engine(runtime, entrypoint_args)
await run_input(runtime, args.endpoint, engine_config)
def cmd_line_args():
parser = argparse.ArgumentParser(
description="Mocker engine for testing Dynamo LLM infrastructure.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--model-path",
type=str,
help="Path to model directory or HuggingFace model ID for tokenizer",
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string (default: {DEFAULT_ENDPOINT})",
)
parser.add_argument(
"--model-name",
type=str,
default=None,
help="Model name for API responses (default: mocker-engine)",
)
parser.add_argument(
"--extra-engine-args",
type=Path,
help="Path to JSON file with mocker configuration "
"(num_gpu_blocks, speedup_ratio, etc.)",
)
return parser.parse_args()
def main():
uvloop.run(worker())
if __name__ == "__main__":
main()
......@@ -538,6 +538,32 @@ The output looks like this:
{"text":"What is the capital of Spain?","response":".The capital of Spain is Madrid.","tokens_in":7,"tokens_out":7,"elapsed_ms":855}
```
#### Mocker engine
The mocker engine is a mock vLLM implementation designed for testing and development purposes. It simulates realistic token generation timing without requiring actual model inference, making it useful for:
- Testing distributed system components without GPU resources
- Benchmarking infrastructure and networking overhead
- Developing and debugging Dynamo components
- Load testing and performance analysis
**Basic usage:**
The `--model-path` is required but can point to any valid model path - the mocker doesn't actually load the model weights. The arguments `block-size`, `num-gpu-blocks`, `max-num-seqs`, `max-num-batched-tokens`, and `enable-prefix-caching` are common arguments shared with the real VLLM engine.
And below are arguments that are mocker-specific:
- `speedup_ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster.
- `dp_size`: Number of data parallel workers to simulate (default: 1)
- `watermark`: KV cache watermark threshold as a fraction (default: 0.01). This argument also exists for the real VLLM engine but cannot be passed as an engine arg.
>[!NOTE]
>Currently, `enable_chunked_prefill` is always assumed to be false, which mirrors the vllm v0 behavior. This is also the current behavior in `examples/llm`. This will be updated in the near future as we move to support vllm v1 (and deprecate support for vllm v0).
```bash
echo '{"speedup_ratio": 10.0}' > mocker_args.json
dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 --extra-engine-args mocker_args.json
dynamo-run in=http out=dyn --router-mode kv
```
### Extra engine arguments
The vllm and sglang backends support passing any argument the engine accepts.
Put the arguments in a JSON file:
......
......@@ -20,6 +20,7 @@ use clap::ValueEnum;
use dynamo_llm::entrypoint::RouterConfig;
use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::local_model::LocalModel;
use dynamo_llm::mocker::protocols::MockEngineArgs;
use dynamo_runtime::pipeline::RouterMode as RuntimeRouterMode;
use crate::Output;
......@@ -212,6 +213,9 @@ impl Flags {
anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
}
}
Output::Mocker => {
// nothing to check here
}
}
Ok(())
}
......@@ -241,6 +245,15 @@ impl Flags {
Ok(None)
}
}
pub fn mocker_config(&self) -> MockEngineArgs {
let Some(path) = &self.extra_engine_args else {
tracing::warn!("Did not specify extra engine args. Using default mocker args.");
return MockEngineArgs::default();
};
MockEngineArgs::from_json_file(path)
.unwrap_or_else(|e| panic!("Failed to build mocker engine args from {path:?}: {e}"))
}
}
#[derive(Default, PartialEq, Eq, ValueEnum, Clone, Debug, Copy)]
......
......@@ -9,6 +9,7 @@ use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::CancellationToken;
use dynamo_runtime::{DistributedRuntime, Runtime};
mod flags;
use either::Either;
......@@ -21,7 +22,7 @@ mod subprocess;
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);
pub async fn run(
runtime: dynamo_runtime::Runtime,
runtime: Runtime,
in_opt: Input,
out_opt: Option<Output>,
flags: Flags,
......@@ -52,8 +53,7 @@ pub async fn run(
if let Input::Endpoint(path) = &in_opt {
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));
let distributed_runtime =
dynamo_runtime::DistributedRuntime::from_settings(runtime.clone()).await?;
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
rt = Either::Right(distributed_runtime);
};
......@@ -70,8 +70,14 @@ pub async fn run(
flags.validate(&local_model, &out_opt)?;
// Make an engine from the local_model, flags and output.
let (engine_config, extra) =
engine_for(runtime.primary_token(), out_opt, flags.clone(), local_model).await?;
let (engine_config, extra) = engine_for(
runtime.primary_token(),
out_opt,
flags.clone(),
local_model,
rt.clone(),
)
.await?;
//
// Run in from an input
......@@ -96,6 +102,7 @@ async fn engine_for(
out_opt: Output,
flags: Flags,
local_model: LocalModel,
rt: Either<Runtime, DistributedRuntime>,
) -> anyhow::Result<(EngineConfig, Option<ExtraFuture>)> {
match out_opt {
Output::Dynamic => Ok((EngineConfig::Dynamic(Box::new(local_model)), None)),
......@@ -161,6 +168,25 @@ async fn engine_for(
)
.await
}
Output::Mocker => {
let Either::Right(drt) = rt else {
panic!("Mocker requires a distributed runtime to run.");
};
let args = flags.mocker_config();
let endpoint = local_model.endpoint_id().clone();
let engine =
dynamo_llm::mocker::engine::make_mocker_engine(drt, endpoint, args).await?;
Ok((
EngineConfig::StaticCore {
engine,
model: Box::new(local_model),
},
None,
))
}
}
}
......
......@@ -31,6 +31,8 @@ pub enum Output {
// Start vllm in a sub-process connecting via nats
// Sugar for `python vllm_inc.py --endpoint <thing> --model <thing>`
Vllm,
Mocker,
}
impl TryFrom<&str> for Output {
......@@ -47,6 +49,7 @@ impl TryFrom<&str> for Output {
"sglang" => Ok(Output::SgLang),
"trtllm" => Ok(Output::Trtllm),
"vllm" => Ok(Output::Vllm),
"mocker" => Ok(Output::Mocker),
"echo_full" => Ok(Output::EchoFull),
"echo_core" => Ok(Output::EchoCore),
......@@ -79,6 +82,7 @@ impl fmt::Display for Output {
Output::SgLang => "sglang",
Output::Trtllm => "trtllm",
Output::Vllm => "vllm",
Output::Mocker => "mocker",
Output::EchoFull => "echo_full",
Output::EchoCore => "echo_core",
......@@ -106,6 +110,7 @@ impl Output {
out.push(Output::SgLang.to_string());
out.push(Output::Trtllm.to_string());
out.push(Output::Vllm.to_string());
out.push(Output::Mocker.to_string());
out
}
......
......@@ -9,6 +9,7 @@ use pyo3::{exceptions::PyException, prelude::*};
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_llm::mocker::protocols::MockEngineArgs;
use dynamo_runtime::protocols::Endpoint as EndpointId;
#[pyclass(eq, eq_int)]
......@@ -19,6 +20,7 @@ pub enum EngineType {
MistralRs = 2,
LlamaCpp = 3,
Dynamic = 4,
Mocker = 5,
}
#[pyclass]
......@@ -34,13 +36,14 @@ pub(crate) struct EntrypointArgs {
//router_config: Option<RouterConfig>,
kv_cache_block_size: Option<u32>,
http_port: Option<u16>,
extra_engine_args: Option<PathBuf>,
}
#[pymethods]
impl EntrypointArgs {
#[allow(clippy::too_many_arguments)]
#[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, model_config=None, endpoint_id=None, context_length=None, template_file=None, kv_cache_block_size=None, http_port=None))]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, model_config=None, endpoint_id=None, context_length=None, template_file=None, kv_cache_block_size=None, http_port=None, extra_engine_args=None))]
pub fn new(
engine_type: EngineType,
model_path: Option<PathBuf>,
......@@ -52,6 +55,7 @@ impl EntrypointArgs {
//router_config: Option<RouterConfig>,
kv_cache_block_size: Option<u32>,
http_port: Option<u16>,
extra_engine_args: Option<PathBuf>,
) -> PyResult<Self> {
let endpoint_id_obj: Option<EndpointId> = match endpoint_id {
Some(eid) => Some(eid.parse().map_err(|_| {
......@@ -72,6 +76,7 @@ impl EntrypointArgs {
//router_config,
kv_cache_block_size,
http_port,
extra_engine_args,
})
}
}
......@@ -91,17 +96,17 @@ pub fn make_engine<'p>(
) -> PyResult<Bound<'p, PyAny>> {
let mut builder = LocalModelBuilder::default();
builder
.model_path(args.model_path)
.model_name(args.model_name)
.model_config(args.model_config)
.endpoint_id(args.endpoint_id)
.model_path(args.model_path.clone())
.model_name(args.model_name.clone())
.model_config(args.model_config.clone())
.endpoint_id(args.endpoint_id.clone())
.context_length(args.context_length)
.request_template(args.template_file)
.request_template(args.template_file.clone())
.kv_cache_block_size(args.kv_cache_block_size)
.http_port(args.http_port);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let local_model = builder.build().await.map_err(to_pyerr)?;
let inner = select_engine(distributed_runtime, args.engine_type, local_model)
let inner = select_engine(distributed_runtime, args, local_model)
.await
.map_err(to_pyerr)?;
Ok(EngineConfig { inner })
......@@ -110,10 +115,10 @@ pub fn make_engine<'p>(
async fn select_engine(
#[allow(unused_variables)] distributed_runtime: super::DistributedRuntime,
engine_type: EngineType,
args: EntrypointArgs,
local_model: LocalModel,
) -> anyhow::Result<RsEngineConfig> {
let inner = match engine_type {
let inner = match args.engine_type {
EngineType::Echo => {
// There is no validation for the echo engine
RsEngineConfig::StaticFull {
......@@ -122,6 +127,36 @@ async fn select_engine(
}
}
EngineType::Dynamic => RsEngineConfig::Dynamic(Box::new(local_model)),
EngineType::Mocker => {
let mocker_args = if let Some(extra_args_path) = args.extra_engine_args {
MockEngineArgs::from_json_file(&extra_args_path).map_err(|e| {
anyhow::anyhow!(
"Failed to load mocker args from {:?}: {}",
extra_args_path,
e
)
})?
} else {
tracing::warn!(
"No extra_engine_args specified for mocker engine. Using default mocker args."
);
MockEngineArgs::default()
};
let endpoint = local_model.endpoint_id().clone();
let engine = dynamo_llm::mocker::engine::make_mocker_engine(
distributed_runtime.inner,
endpoint,
mocker_args,
)
.await?;
RsEngineConfig::StaticCore {
engine,
model: Box::new(local_model),
}
}
EngineType::MistralRs => {
#[cfg(feature = "mistralrs")]
{
......
......@@ -385,7 +385,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
break;
};
if signal.completed && token_count < max_tokens {
if signal.completed && token_count < max_tokens - 1 {
let _ = stream_tx.send(LLMEngineOutput::error("Completion signal received before max tokens reached".to_string())).await;
break;
}
......@@ -513,7 +513,7 @@ pub async fn make_mocker_engine(
args: MockEngineArgs,
) -> Result<crate::backend::ExecutionContext, Error> {
// Create the mocker engine
tracing::info!("Creating mocker engine (service will be started in background)");
tracing::info!("Creating mocker engine with config: {args:?}");
let annotated_engine =
AnnotatedMockEngine::new(MockVllmEngine::new(args), distributed_runtime, endpoint);
......
......@@ -15,6 +15,8 @@
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use uuid::Uuid;
use crate::kv_router::protocols::{
......@@ -97,10 +99,111 @@ pub struct MockEngineArgs {
pub dp_size: u32,
}
impl Default for MockEngineArgs {
fn default() -> MockEngineArgs {
MockEngineArgsBuilder::default()
.build()
.expect("Failed to build default MockEngineArgs")
}
}
impl MockEngineArgs {
pub fn builder() -> MockEngineArgsBuilder {
MockEngineArgsBuilder::default()
}
/// Create MockEngineArgs from a JSON file containing extra engine arguments
pub fn from_json_file(path: &Path) -> anyhow::Result<Self> {
let mut builder = Self::builder();
// Load and parse the JSON file
let file_content = std::fs::read_to_string(path)?;
let extra_args: HashMap<String, serde_json::Value> = serde_json::from_str(&file_content)?;
// Define valid field names
let valid_fields: HashSet<&str> = [
"num_gpu_blocks",
"block_size",
"max_num_seqs",
"max_num_batched_tokens",
"enable_prefix_caching",
"watermark",
"speedup_ratio",
"dp_size",
]
.iter()
.cloned()
.collect();
// Check for invalid arguments
let invalid_args: Vec<String> = extra_args
.keys()
.filter(|key| !valid_fields.contains(key.as_str()))
.cloned()
.collect();
if !invalid_args.is_empty() {
return Err(anyhow::anyhow!(
"Invalid arguments found in JSON file: {}. Valid arguments are: {:?}",
invalid_args.join(", "),
valid_fields
));
}
// Apply each extra argument to the builder
if let Some(value) = extra_args.get("num_gpu_blocks") {
if let Some(num) = value.as_u64() {
builder = builder.num_gpu_blocks(num as usize);
}
}
if let Some(value) = extra_args.get("block_size") {
if let Some(num) = value.as_u64() {
builder = builder.block_size(num as usize);
}
}
if let Some(value) = extra_args.get("max_num_seqs") {
if let Some(num) = value.as_u64() {
builder = builder.max_num_seqs(Some(num as usize));
}
}
if let Some(value) = extra_args.get("max_num_batched_tokens") {
if let Some(num) = value.as_u64() {
builder = builder.max_num_batched_tokens(Some(num as usize));
}
}
if let Some(value) = extra_args.get("enable_prefix_caching") {
if let Some(enabled) = value.as_bool() {
builder = builder.enable_prefix_caching(enabled);
}
}
if let Some(value) = extra_args.get("watermark") {
if let Some(num) = value.as_f64() {
builder = builder.watermark(num);
}
}
if let Some(value) = extra_args.get("speedup_ratio") {
if let Some(num) = value.as_f64() {
builder = builder.speedup_ratio(num);
}
}
if let Some(value) = extra_args.get("dp_size") {
if let Some(num) = value.as_u64() {
builder = builder.dp_size(num as u32);
}
}
// Build the MockEngineArgs with either defaults or overridden values
builder
.build()
.map_err(|e| anyhow::anyhow!("Failed to build MockEngineArgs: {}", e))
}
}
/// Note: This assumes block_hash and tokens_hash are the same, which is not correct in rare cases
......
......@@ -79,7 +79,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["deploy/sdk/src/dynamo", "components/planner/src/dynamo", "components/frontend/src/dynamo", "components/backends/llama_cpp/src/dynamo"]
packages = ["deploy/sdk/src/dynamo", "components/planner/src/dynamo", "components/frontend/src/dynamo", "components/backends/llama_cpp/src/dynamo", "components/backends/mocker/src/dynamo"]
# This section is for including the binaries in the wheel package
# but doesn't make them executable scripts in the venv bin directory
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment