Unverified Commit 41ff394f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: restructure mocker cli args handling, to include prefill/decode (#3847)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent a1b38af2
...@@ -198,6 +198,11 @@ for i in $(seq 1 $NUM_WORKERS); do ...@@ -198,6 +198,11 @@ for i in $(seq 1 $NUM_WORKERS); do
if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then
MOCKER_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE") MOCKER_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE")
fi fi
if [ "$MODE" = "prefill" ]; then
MOCKER_ARGS+=("--is-prefill-worker")
elif [ "$MODE" = "decode" ]; then
MOCKER_ARGS+=("--is-decode-worker")
fi
MOCKER_ARGS+=("${EXTRA_ARGS[@]}") MOCKER_ARGS+=("${EXTRA_ARGS[@]}")
exec python -m dynamo.mocker "${MOCKER_ARGS[@]}" exec python -m dynamo.mocker "${MOCKER_ARGS[@]}"
......
...@@ -40,14 +40,5 @@ python -m dynamo.mocker \ ...@@ -40,14 +40,5 @@ python -m dynamo.mocker \
python -m dynamo.frontend --http-port 8000 python -m dynamo.frontend --http-port 8000
``` ```
### Legacy JSON file support: > [!Note]
For backward compatibility, you can still provide configuration via a JSON file: > Each mocker instance runs as a single process, and each DP worker (specified by `--data-parallel-size`) is spawned as a lightweight async task within that process. For benchmarking (e.g., router testing), you would much prefer launching one mocker instance with a large `--data-parallel-size` rather than multiple separate mocker instances to reduce overhead.
\ No newline at end of file
```bash
echo '{"speedup_ratio": 10.0, "num_gpu_blocks": 8192}' > mocker_args.json
python -m dynamo.mocker \
--model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 \
--extra-engine-args mocker_args.json
```
Note: If `--extra-engine-args` is provided, it overrides all individual CLI arguments.
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import json
import logging
import os
import tempfile
from pathlib import Path
from . import __version__
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"
logger = logging.getLogger(__name__)
def create_temp_engine_args_file(args) -> Path:
"""
Create a temporary JSON file with MockEngineArgs from CLI arguments.
Returns the path to the temporary file.
"""
engine_args = {}
# Only include non-None values that differ from defaults
# Note: argparse converts hyphens to underscores in attribute names
# Extract all potential engine arguments, using None as default for missing attributes
engine_args = {
"num_gpu_blocks": getattr(args, "num_gpu_blocks", None),
"block_size": getattr(args, "block_size", None),
"max_num_seqs": getattr(args, "max_num_seqs", None),
"max_num_batched_tokens": getattr(args, "max_num_batched_tokens", None),
"enable_prefix_caching": getattr(args, "enable_prefix_caching", None),
"enable_chunked_prefill": getattr(args, "enable_chunked_prefill", None),
"watermark": getattr(args, "watermark", None),
"speedup_ratio": getattr(args, "speedup_ratio", None),
"dp_size": getattr(args, "dp_size", None),
"startup_time": getattr(args, "startup_time", None),
"is_prefill": getattr(args, "is_prefill_worker", None),
"is_decode": getattr(args, "is_decode_worker", None),
}
# Remove None values to only include explicitly set arguments
engine_args = {k: v for k, v in engine_args.items() if v is not None}
# Create temporary file
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(engine_args, f, indent=2)
temp_path = Path(f.name)
logger.debug(f"Created temporary MockEngineArgs file at {temp_path}")
logger.debug(f"MockEngineArgs: {engine_args}")
return temp_path
def validate_worker_type_args(args):
"""
Validate that is_prefill_worker and is_decode_worker are not both True.
Raises ValueError if validation fails.
"""
if args.is_prefill_worker and args.is_decode_worker:
raise ValueError(
"Cannot specify both --is-prefill-worker and --is-decode-worker. "
"A worker must be either prefill, decode, or aggregated (neither flag set)."
)
def parse_args():
parser = argparse.ArgumentParser(
description="Mocker engine for testing Dynamo LLM infrastructure with vLLM-style CLI.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--version", action="version", version=f"Dynamo Mocker {__version__}"
)
# Basic configuration
parser.add_argument(
"--model-path",
type=str,
help="Path to model directory or HuggingFace model ID for tokenizer",
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string (default: {DEFAULT_ENDPOINT})",
)
parser.add_argument(
"--model-name",
type=str,
default=None,
help="Model name for API responses (default: derived from model-path)",
)
# MockEngineArgs parameters (similar to vLLM style)
parser.add_argument(
"--num-gpu-blocks-override",
type=int,
dest="num_gpu_blocks", # Maps to num_gpu_blocks in MockEngineArgs
default=None,
help="Number of GPU blocks for KV cache (default: 16384)",
)
parser.add_argument(
"--block-size",
type=int,
default=None,
help="Token block size for KV cache blocks (default: 64)",
)
parser.add_argument(
"--max-num-seqs",
type=int,
default=None,
help="Maximum number of sequences per iteration (default: 256)",
)
parser.add_argument(
"--max-num-batched-tokens",
type=int,
default=None,
help="Maximum number of batched tokens per iteration (default: 8192)",
)
parser.add_argument(
"--enable-prefix-caching",
action="store_true",
dest="enable_prefix_caching",
default=None,
help="Enable automatic prefix caching (default: True)",
)
parser.add_argument(
"--no-enable-prefix-caching",
action="store_false",
dest="enable_prefix_caching",
default=None,
help="Disable automatic prefix caching",
)
parser.add_argument(
"--enable-chunked-prefill",
action="store_true",
dest="enable_chunked_prefill",
default=None,
help="Enable chunked prefill (default: True)",
)
parser.add_argument(
"--no-enable-chunked-prefill",
action="store_false",
dest="enable_chunked_prefill",
default=None,
help="Disable chunked prefill",
)
parser.add_argument(
"--watermark",
type=float,
default=None,
help="Watermark value for the mocker engine (default: 0.01)",
)
parser.add_argument(
"--speedup-ratio",
type=float,
default=None,
help="Speedup ratio for mock execution (default: 1.0)",
)
parser.add_argument(
"--data-parallel-size",
type=int,
dest="dp_size",
default=None,
help="Number of data parallel replicas (default: 1)",
)
parser.add_argument(
"--startup-time",
type=float,
default=None,
help="Simulated engine startup time in seconds (default: None)",
)
# Legacy support - allow direct JSON file specification
parser.add_argument(
"--extra-engine-args",
type=Path,
help="Path to JSON file with mocker configuration. "
"If provided, overrides individual CLI arguments.",
)
# Worker type configuration
parser.add_argument(
"--is-prefill-worker",
action="store_true",
default=False,
help="Register as Prefill model type instead of Chat+Completions (default: False)",
)
parser.add_argument(
"--is-decode-worker",
action="store_true",
default=False,
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
)
args = parser.parse_args()
validate_worker_type_args(args)
return args
...@@ -4,12 +4,7 @@ ...@@ -4,12 +4,7 @@
# Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B` # Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B`
# Now supports vLLM-style individual arguments for MockEngineArgs # Now supports vLLM-style individual arguments for MockEngineArgs
import argparse
import json
import logging import logging
import os
import tempfile
from pathlib import Path
import uvloop import uvloop
...@@ -17,55 +12,15 @@ from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input ...@@ -17,55 +12,15 @@ from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from . import __version__ from .args import create_temp_engine_args_file, parse_args
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"
configure_dynamo_logging() configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def create_temp_engine_args_file(args) -> Path:
"""
Create a temporary JSON file with MockEngineArgs from CLI arguments.
Returns the path to the temporary file.
"""
engine_args = {}
# Only include non-None values that differ from defaults
# Note: argparse converts hyphens to underscores in attribute names
# Extract all potential engine arguments, using None as default for missing attributes
engine_args = {
"num_gpu_blocks": getattr(args, "num_gpu_blocks", None),
"block_size": getattr(args, "block_size", None),
"max_num_seqs": getattr(args, "max_num_seqs", None),
"max_num_batched_tokens": getattr(args, "max_num_batched_tokens", None),
"enable_prefix_caching": getattr(args, "enable_prefix_caching", None),
"enable_chunked_prefill": getattr(args, "enable_chunked_prefill", None),
"watermark": getattr(args, "watermark", None),
"speedup_ratio": getattr(args, "speedup_ratio", None),
"dp_size": getattr(args, "dp_size", None),
"startup_time": getattr(args, "startup_time", None),
}
# Remove None values to only include explicitly set arguments
engine_args = {k: v for k, v in engine_args.items() if v is not None}
# Create temporary file
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(engine_args, f, indent=2)
temp_path = Path(f.name)
logger.debug(f"Created temporary MockEngineArgs file at {temp_path}")
logger.debug(f"MockEngineArgs: {engine_args}")
return temp_path
@dynamo_worker(static=False) @dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime): async def worker(runtime: DistributedRuntime):
args = cmd_line_args() args = parse_args()
# Handle extra_engine_args: either use provided file or create from CLI args # Handle extra_engine_args: either use provided file or create from CLI args
if args.extra_engine_args: if args.extra_engine_args:
...@@ -85,6 +40,7 @@ async def worker(runtime: DistributedRuntime): ...@@ -85,6 +40,7 @@ async def worker(runtime: DistributedRuntime):
model_name=args.model_name, model_name=args.model_name,
endpoint_id=args.endpoint, endpoint_id=args.endpoint,
extra_engine_args=extra_engine_args_path, extra_engine_args=extra_engine_args_path,
is_prefill=args.is_prefill_worker,
) )
# Create and run the engine # Create and run the engine
...@@ -101,125 +57,6 @@ async def worker(runtime: DistributedRuntime): ...@@ -101,125 +57,6 @@ async def worker(runtime: DistributedRuntime):
logger.warning(f"Failed to clean up temporary file: {e}") logger.warning(f"Failed to clean up temporary file: {e}")
def cmd_line_args():
parser = argparse.ArgumentParser(
description="Mocker engine for testing Dynamo LLM infrastructure with vLLM-style CLI.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--version", action="version", version=f"Dynamo Mocker {__version__}"
)
# Basic configuration
parser.add_argument(
"--model-path",
type=str,
help="Path to model directory or HuggingFace model ID for tokenizer",
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string (default: {DEFAULT_ENDPOINT})",
)
parser.add_argument(
"--model-name",
type=str,
default=None,
help="Model name for API responses (default: derived from model-path)",
)
# MockEngineArgs parameters (similar to vLLM style)
parser.add_argument(
"--num-gpu-blocks-override",
type=int,
dest="num_gpu_blocks", # Maps to num_gpu_blocks in MockEngineArgs
default=None,
help="Number of GPU blocks for KV cache (default: 16384)",
)
parser.add_argument(
"--block-size",
type=int,
default=None,
help="Token block size for KV cache blocks (default: 64)",
)
parser.add_argument(
"--max-num-seqs",
type=int,
default=None,
help="Maximum number of sequences per iteration (default: 256)",
)
parser.add_argument(
"--max-num-batched-tokens",
type=int,
default=None,
help="Maximum number of batched tokens per iteration (default: 8192)",
)
parser.add_argument(
"--enable-prefix-caching",
action="store_true",
dest="enable_prefix_caching",
default=None,
help="Enable automatic prefix caching (default: True)",
)
parser.add_argument(
"--no-enable-prefix-caching",
action="store_false",
dest="enable_prefix_caching",
default=None,
help="Disable automatic prefix caching",
)
parser.add_argument(
"--enable-chunked-prefill",
action="store_true",
dest="enable_chunked_prefill",
default=None,
help="Enable chunked prefill (default: True)",
)
parser.add_argument(
"--no-enable-chunked-prefill",
action="store_false",
dest="enable_chunked_prefill",
default=None,
help="Disable chunked prefill",
)
parser.add_argument(
"--watermark",
type=float,
default=None,
help="Watermark value for the mocker engine (default: 0.01)",
)
parser.add_argument(
"--speedup-ratio",
type=float,
default=None,
help="Speedup ratio for mock execution (default: 1.0)",
)
parser.add_argument(
"--data-parallel-size",
type=int,
dest="dp_size",
default=None,
help="Number of data parallel replicas (default: 1)",
)
parser.add_argument(
"--startup-time",
type=float,
default=None,
help="Simulated engine startup time in seconds (default: None)",
)
# Legacy support - allow direct JSON file specification
parser.add_argument(
"--extra-engine-args",
type=Path,
help="Path to JSON file with mocker configuration. "
"If provided, overrides individual CLI arguments.",
)
return parser.parse_args()
def main(): def main():
uvloop.run(worker()) uvloop.run(worker())
......
...@@ -145,6 +145,7 @@ async fn engine_for( ...@@ -145,6 +145,7 @@ async fn engine_for(
engine, engine,
model: Box::new(local_model), model: Box::new(local_model),
is_static: flags.static_worker, is_static: flags.static_worker,
is_prefill: false,
}) })
} }
} }
......
...@@ -120,13 +120,14 @@ pub(crate) struct EntrypointArgs { ...@@ -120,13 +120,14 @@ pub(crate) struct EntrypointArgs {
namespace: Option<String>, namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>, custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>, custom_backend_metrics_polling_interval: Option<f64>,
is_prefill: bool,
} }
#[pymethods] #[pymethods]
impl EntrypointArgs { impl EntrypointArgs {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[new] #[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, custom_backend_metrics_endpoint=None, custom_backend_metrics_polling_interval=None))] #[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, custom_backend_metrics_endpoint=None, custom_backend_metrics_polling_interval=None, is_prefill=false))]
pub fn new( pub fn new(
engine_type: EngineType, engine_type: EngineType,
model_path: Option<PathBuf>, model_path: Option<PathBuf>,
...@@ -144,6 +145,7 @@ impl EntrypointArgs { ...@@ -144,6 +145,7 @@ impl EntrypointArgs {
namespace: Option<String>, namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>, custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>, custom_backend_metrics_polling_interval: Option<f64>,
is_prefill: bool,
) -> PyResult<Self> { ) -> PyResult<Self> {
let endpoint_id_obj: Option<EndpointId> = endpoint_id.as_deref().map(EndpointId::from); let endpoint_id_obj: Option<EndpointId> = endpoint_id.as_deref().map(EndpointId::from);
if (tls_cert_path.is_some() && tls_key_path.is_none()) if (tls_cert_path.is_some() && tls_key_path.is_none())
...@@ -170,6 +172,7 @@ impl EntrypointArgs { ...@@ -170,6 +172,7 @@ impl EntrypointArgs {
namespace, namespace,
custom_backend_metrics_endpoint, custom_backend_metrics_endpoint,
custom_backend_metrics_polling_interval, custom_backend_metrics_polling_interval,
is_prefill,
}) })
} }
} }
...@@ -277,6 +280,7 @@ async fn select_engine( ...@@ -277,6 +280,7 @@ async fn select_engine(
engine, engine,
model: Box::new(local_model), model: Box::new(local_model),
is_static: false, is_static: false,
is_prefill: args.is_prefill,
} }
} }
}; };
......
...@@ -59,6 +59,7 @@ pub enum EngineConfig { ...@@ -59,6 +59,7 @@ pub enum EngineConfig {
engine: ExecutionContext, engine: ExecutionContext,
model: Box<LocalModel>, model: Box<LocalModel>,
is_static: bool, is_static: bool,
is_prefill: bool,
}, },
} }
......
...@@ -67,6 +67,7 @@ pub async fn run( ...@@ -67,6 +67,7 @@ pub async fn run(
engine: inner_engine, engine: inner_engine,
mut model, mut model,
is_static, is_static,
is_prefill,
} => { } => {
// Pre-processing is done ingress-side, so it should be already done. // Pre-processing is done ingress-side, so it should be already done.
let frontend = SegmentSource::< let frontend = SegmentSource::<
...@@ -83,8 +84,11 @@ pub async fn run( ...@@ -83,8 +84,11 @@ pub async fn run(
let ingress = Ingress::for_pipeline(pipeline)?; let ingress = Ingress::for_pipeline(pipeline)?;
if !is_static { if !is_static {
// Default to supporting both Chat and Completions endpoints let model_type = if is_prefill {
let model_type = ModelType::Chat | ModelType::Completions; ModelType::Prefill
} else {
ModelType::Chat | ModelType::Completions
};
model model
.attach(&endpoint, model_type, ModelInput::Tokens) .attach(&endpoint, model_type, ModelInput::Tokens)
.await?; .await?;
......
...@@ -61,6 +61,18 @@ pub struct OutputSignal { ...@@ -61,6 +61,18 @@ pub struct OutputSignal {
pub completed: bool, pub completed: bool,
} }
/// Worker type for disaggregated serving configurations
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum WorkerType {
/// Standard aggregated worker handling both prefill and decode
#[default]
Aggregated,
/// Dedicated prefill worker in disaggregated mode
Prefill,
/// Dedicated decode worker in disaggregated mode
Decode,
}
/// Configuration arguments for MockVllmEngine /// Configuration arguments for MockVllmEngine
#[derive(Debug, Clone, Serialize, Deserialize, Builder)] #[derive(Debug, Clone, Serialize, Deserialize, Builder)]
#[builder(pattern = "owned", build_fn(public))] #[builder(pattern = "owned", build_fn(public))]
...@@ -97,6 +109,10 @@ pub struct MockEngineArgs { ...@@ -97,6 +109,10 @@ pub struct MockEngineArgs {
/// Optional startup time in seconds to simulate engine initialization delay /// Optional startup time in seconds to simulate engine initialization delay
#[builder(default = "None")] #[builder(default = "None")]
pub startup_time: Option<f64>, pub startup_time: Option<f64>,
/// Worker type for disaggregated serving (Aggregated, Prefill, or Decode)
#[builder(default = "WorkerType::Aggregated")]
pub worker_type: WorkerType,
} }
impl Default for MockEngineArgs { impl Default for MockEngineArgs {
...@@ -132,6 +148,8 @@ impl MockEngineArgs { ...@@ -132,6 +148,8 @@ impl MockEngineArgs {
"speedup_ratio", "speedup_ratio",
"dp_size", "dp_size",
"startup_time", "startup_time",
"is_prefill",
"is_decode",
] ]
.iter() .iter()
.cloned() .cloned()
...@@ -213,6 +231,28 @@ impl MockEngineArgs { ...@@ -213,6 +231,28 @@ impl MockEngineArgs {
builder = builder.startup_time(Some(num)); builder = builder.startup_time(Some(num));
} }
// Parse worker type from is_prefill and is_decode flags
let is_prefill = extra_args
.get("is_prefill")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let is_decode = extra_args
.get("is_decode")
.and_then(|v| v.as_bool())
.unwrap_or(false);
// Determine worker type based on flags
let worker_type = match (is_prefill, is_decode) {
(false, false) => WorkerType::Aggregated,
(true, false) => WorkerType::Prefill,
(false, true) => WorkerType::Decode,
(true, true) => panic!(
"Invalid worker configuration: is_prefill and is_decode cannot both be true. \
Worker must be either Aggregated (both false), Prefill (is_prefill=true), or Decode (is_decode=true)."
),
};
builder = builder.worker_type(worker_type);
// Build the MockEngineArgs with either defaults or overridden values // Build the MockEngineArgs with either defaults or overridden values
builder builder
.build() .build()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment