Unverified Commit 3c7c1d64 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: pass in mocker engine args directly in python cli + default frontend port to 8000 (#2853)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 8d30753f
...@@ -7,17 +7,47 @@ The mocker engine is a mock vLLM implementation designed for testing and develop ...@@ -7,17 +7,47 @@ The mocker engine is a mock vLLM implementation designed for testing and develop
- Developing and debugging Dynamo components - Developing and debugging Dynamo components
- Load testing and performance analysis - Load testing and performance analysis
**Basic usage:** ## Basic usage
The `--model-path` is required but can point to any valid model path - the mocker doesn't actually load the model weights (but the pre-processor needs the tokenizer). The arguments `block_size`, `num_gpu_blocks`, `max_num_seqs`, `max_num_batched_tokens`, `enable_prefix_caching`, and `enable_chunked_prefill` are common arguments shared with the real VLLM engine. The mocker engine now supports a vLLM-style CLI interface with individual arguments for all configuration options.
And below are arguments that are mocker-specific: ### Required arguments:
- `speedup_ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster. - `--model-path`: Path to model directory or HuggingFace model ID (required for tokenizer)
- `dp_size`: Number of data parallel workers to simulate (default: 1)
- `watermark`: KV cache watermark threshold as a fraction (default: 0.01). This argument also exists for the real VLLM engine but cannot be passed as an engine arg.
### MockEngineArgs parameters (vLLM-style):
- `--num-gpu-blocks-override`: Number of GPU blocks for KV cache (default: 16384)
- `--block-size`: Token block size for KV cache blocks (default: 64)
- `--max-num-seqs`: Maximum number of sequences per iteration (default: 256)
- `--max-num-batched-tokens`: Maximum number of batched tokens per iteration (default: 8192)
- `--enable-prefix-caching` / `--no-enable-prefix-caching`: Enable/disable automatic prefix caching (default: True)
- `--enable-chunked-prefill` / `--no-enable-chunked-prefill`: Enable/disable chunked prefill (default: True)
- `--watermark`: KV cache watermark threshold as a fraction (default: 0.01)
- `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster
- `--data-parallel-size`: Number of data parallel workers to simulate (default: 1)
### Example with individual arguments (vLLM-style):
```bash ```bash
echo '{"speedup_ratio": 10.0}' > mocker_args.json # Start mocker with custom configuration
python -m dynamo.mocker --model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 --extra-engine-args mocker_args.json python -m dynamo.mocker \
--model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 \
--num-gpu-blocks-override 8192 \
--block-size 16 \
--speedup-ratio 10.0 \
--max-num-seqs 512 \
--enable-prefix-caching
# Start frontend server
python -m dynamo.frontend --http-port 8080 python -m dynamo.frontend --http-port 8080
``` ```
\ No newline at end of file
### Legacy JSON file support:
For backward compatibility, you can still provide configuration via a JSON file:
```bash
echo '{"speedup_ratio": 10.0, "num_gpu_blocks": 8192}' > mocker_args.json
python -m dynamo.mocker \
--model-path TinyLlama/TinyLlama-1.1B-Chat-v1.0 \
--extra-engine-args mocker_args.json
```
Note: If `--extra-engine-args` is provided, it overrides all individual CLI arguments.
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B-Q8_0.gguf --extra-engine-args args.json` # Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B-Q8_0.gguf`
# Now supports vLLM-style individual arguments for MockEngineArgs
import argparse import argparse
import json
import logging
import os import os
import tempfile
from pathlib import Path from pathlib import Path
import uvloop import uvloop
...@@ -19,35 +23,94 @@ DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo") ...@@ -19,35 +23,94 @@ DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate" DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"
configure_dynamo_logging() configure_dynamo_logging()
logger = logging.getLogger(__name__)
def create_temp_engine_args_file(args) -> Path:
"""
Create a temporary JSON file with MockEngineArgs from CLI arguments.
Returns the path to the temporary file.
"""
engine_args = {}
# Only include non-None values that differ from defaults
# Note: argparse converts hyphens to underscores in attribute names
# Extract all potential engine arguments, using None as default for missing attributes
engine_args = {
"num_gpu_blocks": getattr(args, "num_gpu_blocks", None),
"block_size": getattr(args, "block_size", None),
"max_num_seqs": getattr(args, "max_num_seqs", None),
"max_num_batched_tokens": getattr(args, "max_num_batched_tokens", None),
"enable_prefix_caching": getattr(args, "enable_prefix_caching", None),
"enable_chunked_prefill": getattr(args, "enable_chunked_prefill", None),
"watermark": getattr(args, "watermark", None),
"speedup_ratio": getattr(args, "speedup_ratio", None),
"dp_size": getattr(args, "dp_size", None),
"startup_time": getattr(args, "startup_time", None),
}
# Remove None values to only include explicitly set arguments
engine_args = {k: v for k, v in engine_args.items() if v is not None}
# Create temporary file
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(engine_args, f, indent=2)
temp_path = Path(f.name)
logger.debug(f"Created temporary MockEngineArgs file at {temp_path}")
logger.debug(f"MockEngineArgs: {engine_args}")
return temp_path
@dynamo_worker(static=False) @dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime): async def worker(runtime: DistributedRuntime):
args = cmd_line_args() args = cmd_line_args()
# Create engine configuration # Handle extra_engine_args: either use provided file or create from CLI args
entrypoint_args = EntrypointArgs( if args.extra_engine_args:
engine_type=EngineType.Mocker, # User provided explicit JSON file
model_path=args.model_path, extra_engine_args_path = args.extra_engine_args
model_name=args.model_name, logger.info(f"Using provided MockEngineArgs from {extra_engine_args_path}")
endpoint_id=args.endpoint, else:
extra_engine_args=args.extra_engine_args, # Create temporary JSON file from CLI arguments
) extra_engine_args_path = create_temp_engine_args_file(args)
logger.info("Created MockEngineArgs from CLI arguments")
# Create and run the engine
# NOTE: only supports dyn endpoint for now try:
engine_config = await make_engine(runtime, entrypoint_args) # Create engine configuration
await run_input(runtime, args.endpoint, engine_config) entrypoint_args = EntrypointArgs(
engine_type=EngineType.Mocker,
model_path=args.model_path,
model_name=args.model_name,
endpoint_id=args.endpoint,
extra_engine_args=extra_engine_args_path,
)
# Create and run the engine
# NOTE: only supports dyn endpoint for now
engine_config = await make_engine(runtime, entrypoint_args)
await run_input(runtime, args.endpoint, engine_config)
finally:
# Clean up temporary file if we created one
if not args.extra_engine_args and extra_engine_args_path.exists():
try:
extra_engine_args_path.unlink()
logger.debug(f"Cleaned up temporary file {extra_engine_args_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {e}")
def cmd_line_args(): def cmd_line_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Mocker engine for testing Dynamo LLM infrastructure.", description="Mocker engine for testing Dynamo LLM infrastructure with vLLM-style CLI.",
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
) )
parser.add_argument( parser.add_argument(
"--version", action="version", version=f"Dynamo Mocker {__version__}" "--version", action="version", version=f"Dynamo Mocker {__version__}"
) )
# Basic configuration
parser.add_argument( parser.add_argument(
"--model-path", "--model-path",
type=str, type=str,
...@@ -63,13 +126,95 @@ def cmd_line_args(): ...@@ -63,13 +126,95 @@ def cmd_line_args():
"--model-name", "--model-name",
type=str, type=str,
default=None, default=None,
help="Model name for API responses (default: mocker-engine)", help="Model name for API responses (default: derived from model-path)",
) )
# MockEngineArgs parameters (similar to vLLM style)
parser.add_argument(
"--num-gpu-blocks-override",
type=int,
dest="num_gpu_blocks", # Maps to num_gpu_blocks in MockEngineArgs
default=None,
help="Number of GPU blocks for KV cache (default: 16384)",
)
parser.add_argument(
"--block-size",
type=int,
default=None,
help="Token block size for KV cache blocks (default: 64)",
)
parser.add_argument(
"--max-num-seqs",
type=int,
default=None,
help="Maximum number of sequences per iteration (default: 256)",
)
parser.add_argument(
"--max-num-batched-tokens",
type=int,
default=None,
help="Maximum number of batched tokens per iteration (default: 8192)",
)
parser.add_argument(
"--enable-prefix-caching",
action="store_true",
dest="enable_prefix_caching",
default=None,
help="Enable automatic prefix caching (default: True)",
)
parser.add_argument(
"--no-enable-prefix-caching",
action="store_false",
dest="enable_prefix_caching",
default=None,
help="Disable automatic prefix caching",
)
parser.add_argument(
"--enable-chunked-prefill",
action="store_true",
dest="enable_chunked_prefill",
default=None,
help="Enable chunked prefill (default: True)",
)
parser.add_argument(
"--no-enable-chunked-prefill",
action="store_false",
dest="enable_chunked_prefill",
default=None,
help="Disable chunked prefill",
)
parser.add_argument(
"--watermark",
type=float,
default=None,
help="Watermark value for the mocker engine (default: 0.01)",
)
parser.add_argument(
"--speedup-ratio",
type=float,
default=None,
help="Speedup ratio for mock execution (default: 1.0)",
)
parser.add_argument(
"--data-parallel-size",
type=int,
dest="dp_size",
default=None,
help="Number of data parallel replicas (default: 1)",
)
parser.add_argument(
"--startup-time",
type=float,
default=None,
help="Simulated engine startup time in seconds (default: None)",
)
# Legacy support - allow direct JSON file specification
parser.add_argument( parser.add_argument(
"--extra-engine-args", "--extra-engine-args",
type=Path, type=Path,
help="Path to JSON file with mocker configuration " help="Path to JSON file with mocker configuration. "
"(num_gpu_blocks, speedup_ratio, etc.)", "If provided, overrides individual CLI arguments.",
) )
return parser.parse_args() return parser.parse_args()
......
...@@ -101,7 +101,7 @@ def parse_args(): ...@@ -101,7 +101,7 @@ def parse_args():
parser.add_argument( parser.add_argument(
"--http-port", "--http-port",
type=int, type=int,
default=int(os.environ.get("DYN_HTTP_PORT", "8080")), default=int(os.environ.get("DYN_HTTP_PORT", "8000")),
help="HTTP port for the engine (u16). Can be set via DYN_HTTP_PORT env var.", help="HTTP port for the engine (u16). Can be set via DYN_HTTP_PORT env var.",
) )
parser.add_argument( parser.add_argument(
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! MockSchedulerEngine - AsyncEngine wrapper around the Scheduler //! MockSchedulerEngine - AsyncEngine wrapper around the Scheduler
//! //!
...@@ -76,6 +64,13 @@ impl MockVllmEngine { ...@@ -76,6 +64,13 @@ impl MockVllmEngine {
pub async fn start(&self, component: Component) -> Result<()> { pub async fn start(&self, component: Component) -> Result<()> {
let cancel_token = component.drt().runtime().child_token(); let cancel_token = component.drt().runtime().child_token();
// Simulate engine startup time if configured
if let Some(startup_time_secs) = self.engine_args.startup_time {
tracing::info!("Simulating engine startup time: {:.2}s", startup_time_secs);
tokio::time::sleep(Duration::from_secs_f64(startup_time_secs)).await;
tracing::info!("Engine startup simulation completed");
}
let (schedulers, kv_event_receiver) = self.start_schedulers( let (schedulers, kv_event_receiver) = self.start_schedulers(
self.engine_args.clone(), self.engine_args.clone(),
self.active_requests.clone(), self.active_requests.clone(),
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::{Eq, Ordering}; use std::cmp::{Eq, Ordering};
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # KV Manager //! # KV Manager
//! A synchronous implementation of a block manager that handles MoveBlock signals for caching KV blocks. //! A synchronous implementation of a block manager that handles MoveBlock signals for caching KV blocks.
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use derive_builder::Builder; use derive_builder::Builder;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
...@@ -105,6 +93,10 @@ pub struct MockEngineArgs { ...@@ -105,6 +93,10 @@ pub struct MockEngineArgs {
#[builder(default = "1")] #[builder(default = "1")]
pub dp_size: u32, pub dp_size: u32,
/// Optional startup time in seconds to simulate engine initialization delay
#[builder(default = "None")]
pub startup_time: Option<f64>,
} }
impl Default for MockEngineArgs { impl Default for MockEngineArgs {
...@@ -139,6 +131,7 @@ impl MockEngineArgs { ...@@ -139,6 +131,7 @@ impl MockEngineArgs {
"watermark", "watermark",
"speedup_ratio", "speedup_ratio",
"dp_size", "dp_size",
"startup_time",
] ]
.iter() .iter()
.cloned() .cloned()
...@@ -214,6 +207,12 @@ impl MockEngineArgs { ...@@ -214,6 +207,12 @@ impl MockEngineArgs {
builder = builder.dp_size(num as u32); builder = builder.dp_size(num as u32);
} }
if let Some(value) = extra_args.get("startup_time")
&& let Some(num) = value.as_f64()
{
builder = builder.startup_time(Some(num));
}
// Build the MockEngineArgs with either defaults or overridden values // Build the MockEngineArgs with either defaults or overridden values
builder builder
.build() .build()
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::mocker::protocols::MoveBlock; use crate::mocker::protocols::MoveBlock;
use crate::tokens::blocks::UniqueBlock; use crate::tokens::blocks::UniqueBlock;
......
...@@ -176,7 +176,7 @@ def send_completion_request( ...@@ -176,7 +176,7 @@ def send_completion_request(
session = requests.Session() session = requests.Session()
try: try:
response = session.post( response = session.post(
"http://localhost:8080/v1/completions", "http://localhost:8000/v1/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
...@@ -211,7 +211,7 @@ def send_chat_completion_request( ...@@ -211,7 +211,7 @@ def send_chat_completion_request(
session = requests.Session() session = requests.Session()
try: try:
response = session.post( response = session.post(
"http://localhost:8080/v1/chat/completions", "http://localhost:8000/v1/chat/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
......
...@@ -165,7 +165,7 @@ def send_completion_request( ...@@ -165,7 +165,7 @@ def send_completion_request(
try: try:
response = requests.post( response = requests.post(
"http://localhost:8080/v1/completions", "http://localhost:8000/v1/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
......
...@@ -170,7 +170,7 @@ def send_completion_request( ...@@ -170,7 +170,7 @@ def send_completion_request(
try: try:
response = requests.post( response = requests.post(
"http://localhost:8080/v1/completions", "http://localhost:8000/v1/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# i.e. nats and etcd are running # i.e. nats and etcd are running
# Overview: # Overview:
# This script deploys dynamo disaggregated serving without LMCache on port 8080 # This script deploys dynamo disaggregated serving without LMCache on port 8000
# Used as baseline for correctness testing # Used as baseline for correctness testing
set -e set -e
trap 'echo Cleaning up...; kill 0' EXIT trap 'echo Cleaning up...; kill 0' EXIT
...@@ -22,7 +22,7 @@ fi ...@@ -22,7 +22,7 @@ fi
echo "🚀 Starting dynamo disaggregated serving setup without LMCache:" echo "🚀 Starting dynamo disaggregated serving setup without LMCache:"
echo " Model: $MODEL_URL" echo " Model: $MODEL_URL"
echo " Port: 8080" echo " Port: 8000"
echo " Mode: Disaggregated (prefill + decode workers)" echo " Mode: Disaggregated (prefill + decode workers)"
# Kill any existing dynamo processes # Kill any existing dynamo processes
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# i.e. nats and etcd are running # i.e. nats and etcd are running
# Overview: # Overview:
# This script deploys dynamo without LMCache on port 8080 # This script deploys dynamo without LMCache on port 8000
# Used as baseline for correctness testing # Used as baseline for correctness testing
set -e set -e
trap 'echo Cleaning up...; kill 0' EXIT trap 'echo Cleaning up...; kill 0' EXIT
...@@ -21,7 +21,7 @@ fi ...@@ -21,7 +21,7 @@ fi
echo "🚀 Starting dynamo setup without LMCache:" echo "🚀 Starting dynamo setup without LMCache:"
echo " Model: $MODEL_URL" echo " Model: $MODEL_URL"
echo " Port: 8080" echo " Port: 8000"
# Kill any existing dynamo processes # Kill any existing dynamo processes
echo "🧹 Cleaning up any existing dynamo processes..." echo "🧹 Cleaning up any existing dynamo processes..."
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# i.e. nats and etcd are running # i.e. nats and etcd are running
# Overview: # Overview:
# This script deploys dynamo disaggregated serving with LMCache enabled on port 8080 # This script deploys dynamo disaggregated serving with LMCache enabled on port 8000
# Used for LMCache correctness testing # Used for LMCache correctness testing
set -e set -e
trap 'echo Cleaning up...; kill 0' EXIT trap 'echo Cleaning up...; kill 0' EXIT
...@@ -22,7 +22,7 @@ fi ...@@ -22,7 +22,7 @@ fi
echo "🚀 Starting dynamo disaggregated serving setup with LMCache:" echo "🚀 Starting dynamo disaggregated serving setup with LMCache:"
echo " Model: $MODEL_URL" echo " Model: $MODEL_URL"
echo " Port: 8080" echo " Port: 8000"
echo " Mode: Disaggregated (prefill + decode workers) + LMCache" echo " Mode: Disaggregated (prefill + decode workers) + LMCache"
echo " !! Remember to kill the old dynamo processes otherwise the port will be busy !!" echo " !! Remember to kill the old dynamo processes otherwise the port will be busy !!"
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# i.e. nats and etcd are running # i.e. nats and etcd are running
# Overview: # Overview:
# This script deploys dynamo with LMCache enabled on port 8080 # This script deploys dynamo with LMCache enabled on port 8000
# Used for LMCache correctness testing # Used for LMCache correctness testing
set -e set -e
trap 'echo Cleaning up...; kill 0' EXIT trap 'echo Cleaning up...; kill 0' EXIT
...@@ -21,7 +21,7 @@ fi ...@@ -21,7 +21,7 @@ fi
echo "🚀 Starting dynamo setup with LMCache:" echo "🚀 Starting dynamo setup with LMCache:"
echo " Model: $MODEL_URL" echo " Model: $MODEL_URL"
echo " Port: 8080" echo " Port: 8000"
echo " !! Remmber to kill the old dynamo processes other wise the port will be busy !! " echo " !! Remmber to kill the old dynamo processes other wise the port will be busy !! "
# Kill any existing dynamo processes # Kill any existing dynamo processes
......
...@@ -175,7 +175,7 @@ if __name__ == "__main__": ...@@ -175,7 +175,7 @@ if __name__ == "__main__":
parser.add_argument("--result-file", type=str, required=False) parser.add_argument("--result-file", type=str, required=False)
parser.add_argument("--number-of-subjects", type=int, required=True) parser.add_argument("--number-of-subjects", type=int, required=True)
parser.add_argument("--host", type=str, default="localhost", help="Dynamo host") parser.add_argument("--host", type=str, default="localhost", help="Dynamo host")
parser.add_argument("--port", type=int, default=8080, help="Dynamo port") parser.add_argument("--port", type=int, default=8000, help="Dynamo port")
args = parser.parse_args() args = parser.parse_args()
if args.result_file is None: if args.result_file is None:
......
...@@ -174,7 +174,7 @@ if __name__ == "__main__": ...@@ -174,7 +174,7 @@ if __name__ == "__main__":
parser.add_argument("--result-file", type=str, required=False) parser.add_argument("--result-file", type=str, required=False)
parser.add_argument("--number-of-subjects", type=int, required=True) parser.add_argument("--number-of-subjects", type=int, required=True)
parser.add_argument("--host", type=str, default="localhost", help="Dynamo host") parser.add_argument("--host", type=str, default="localhost", help="Dynamo host")
parser.add_argument("--port", type=int, default=8080, help="Dynamo port") parser.add_argument("--port", type=int, default=8000, help="Dynamo port")
args = parser.parse_args() args = parser.parse_args()
if args.result_file is None: if args.result_file is None:
......
...@@ -67,7 +67,7 @@ sleep 30 ...@@ -67,7 +67,7 @@ sleep 30
# Check if server is responding # Check if server is responding
max_attempts=30 max_attempts=30
attempt=0 attempt=0
until curl -s http://localhost:8080/v1/models > /dev/null 2>&1; do until curl -s http://localhost:8000/v1/models > /dev/null 2>&1; do
attempt=$((attempt + 1)) attempt=$((attempt + 1))
if [ $attempt -gt $max_attempts ]; then if [ $attempt -gt $max_attempts ]; then
echo "❌ Server failed to start within timeout" echo "❌ Server failed to start within timeout"
...@@ -107,7 +107,7 @@ sleep 30 ...@@ -107,7 +107,7 @@ sleep 30
# Check if server is responding # Check if server is responding
attempt=0 attempt=0
until curl -s http://localhost:8080/v1/models > /dev/null 2>&1; do until curl -s http://localhost:8000/v1/models > /dev/null 2>&1; do
attempt=$((attempt + 1)) attempt=$((attempt + 1))
if [ $attempt -gt $max_attempts ]; then if [ $attempt -gt $max_attempts ]; then
echo "❌ Server failed to start within timeout" echo "❌ Server failed to start within timeout"
......
...@@ -49,7 +49,12 @@ TEST_PAYLOAD: Dict[str, Any] = { ...@@ -49,7 +49,12 @@ TEST_PAYLOAD: Dict[str, Any] = {
class MockerProcess: class MockerProcess:
"""Manages multiple mocker engine instances with the same namespace""" """Manages multiple mocker engine instances with the same namespace"""
def __init__(self, request, mocker_args_file: str, num_mockers: int = 1): def __init__(
self,
request,
mocker_args: Optional[Dict[str, Any]] = None,
num_mockers: int = 1,
):
# Generate a unique namespace suffix shared by all mockers # Generate a unique namespace suffix shared by all mockers
namespace_suffix = generate_random_suffix() namespace_suffix = generate_random_suffix()
self.namespace = f"test-namespace-{namespace_suffix}" self.namespace = f"test-namespace-{namespace_suffix}"
...@@ -57,6 +62,10 @@ class MockerProcess: ...@@ -57,6 +62,10 @@ class MockerProcess:
self.num_mockers = num_mockers self.num_mockers = num_mockers
self.mocker_processes = [] self.mocker_processes = []
# Default mocker args if not provided
if mocker_args is None:
mocker_args = {}
# Create multiple mocker processes with the same namespace # Create multiple mocker processes with the same namespace
for i in range(num_mockers): for i in range(num_mockers):
command = [ command = [
...@@ -65,12 +74,43 @@ class MockerProcess: ...@@ -65,12 +74,43 @@ class MockerProcess:
"dynamo.mocker", "dynamo.mocker",
"--model-path", "--model-path",
MODEL_NAME, MODEL_NAME,
"--extra-engine-args",
mocker_args_file,
"--endpoint", "--endpoint",
self.endpoint, self.endpoint,
] ]
# Add individual CLI arguments from mocker_args
if "speedup_ratio" in mocker_args:
command.extend(["--speedup-ratio", str(mocker_args["speedup_ratio"])])
if "block_size" in mocker_args:
command.extend(["--block-size", str(mocker_args["block_size"])])
if "num_gpu_blocks" in mocker_args:
command.extend(
["--num-gpu-blocks-override", str(mocker_args["num_gpu_blocks"])]
)
if "max_num_seqs" in mocker_args:
command.extend(["--max-num-seqs", str(mocker_args["max_num_seqs"])])
if "max_num_batched_tokens" in mocker_args:
command.extend(
[
"--max-num-batched-tokens",
str(mocker_args["max_num_batched_tokens"]),
]
)
if "enable_prefix_caching" in mocker_args:
if mocker_args["enable_prefix_caching"]:
command.append("--enable-prefix-caching")
else:
command.append("--no-enable-prefix-caching")
if "enable_chunked_prefill" in mocker_args:
if mocker_args["enable_chunked_prefill"]:
command.append("--enable-chunked-prefill")
else:
command.append("--no-enable-chunked-prefill")
if "watermark" in mocker_args:
command.extend(["--watermark", str(mocker_args["watermark"])])
if "dp_size" in mocker_args:
command.extend(["--data-parallel-size", str(mocker_args["dp_size"])])
process = ManagedProcess( process = ManagedProcess(
command=command, command=command,
timeout=60, timeout=60,
...@@ -291,13 +331,9 @@ def test_mocker_kv_router(request, runtime_services): ...@@ -291,13 +331,9 @@ def test_mocker_kv_router(request, runtime_services):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker KV router test") logger.info("Starting mocker KV router test")
# Create mocker args file # Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
try: try:
# Start KV router (frontend) # Start KV router (frontend)
frontend_port = PORT frontend_port = PORT
...@@ -306,9 +342,11 @@ def test_mocker_kv_router(request, runtime_services): ...@@ -306,9 +342,11 @@ def test_mocker_kv_router(request, runtime_services):
kv_router = KVRouterProcess(request, frontend_port) kv_router = KVRouterProcess(request, frontend_port)
kv_router.__enter__() kv_router.__enter__()
# Start mocker instances # Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances") logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS) mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -339,9 +377,6 @@ def test_mocker_kv_router(request, runtime_services): ...@@ -339,9 +377,6 @@ def test_mocker_kv_router(request, runtime_services):
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_mocker_two_kv_router(request, runtime_services): def test_mocker_two_kv_router(request, runtime_services):
...@@ -353,13 +388,9 @@ def test_mocker_two_kv_router(request, runtime_services): ...@@ -353,13 +388,9 @@ def test_mocker_two_kv_router(request, runtime_services):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker two KV router test") logger.info("Starting mocker two KV router test")
# Create mocker args file # Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
kv_routers = [] kv_routers = []
try: try:
...@@ -372,9 +403,11 @@ def test_mocker_two_kv_router(request, runtime_services): ...@@ -372,9 +403,11 @@ def test_mocker_two_kv_router(request, runtime_services):
kv_router.__enter__() kv_router.__enter__()
kv_routers.append(kv_router) kv_routers.append(kv_router)
# Start mocker instances # Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances") logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS) mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -411,9 +444,6 @@ def test_mocker_two_kv_router(request, runtime_services): ...@@ -411,9 +444,6 @@ def test_mocker_two_kv_router(request, runtime_services):
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.skip(reason="Flaky, temporarily disabled") @pytest.mark.skip(reason="Flaky, temporarily disabled")
...@@ -426,17 +456,13 @@ def test_mocker_kv_router_overload_503(request, runtime_services): ...@@ -426,17 +456,13 @@ def test_mocker_kv_router_overload_503(request, runtime_services):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting mocker KV router overload test for 503 status") logger.info("Starting mocker KV router overload test for 503 status")
# Create mocker args file with limited resources # Create mocker args dictionary with limited resources
mocker_args = { mocker_args = {
"speedup_ratio": 10, "speedup_ratio": 10,
"block_size": 4, # Smaller block size "block_size": 4, # Smaller block size
"num_gpu_blocks": 64, # Limited GPU blocks to exhaust quickly "num_gpu_blocks": 64, # Limited GPU blocks to exhaust quickly
} }
mocker_args_file = os.path.join(request.node.name, "mocker_args_overload.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
try: try:
# Start KV router (frontend) with limited block size # Start KV router (frontend) with limited block size
frontend_port = PORT + 10 # Use different port to avoid conflicts frontend_port = PORT + 10 # Use different port to avoid conflicts
...@@ -475,9 +501,9 @@ def test_mocker_kv_router_overload_503(request, runtime_services): ...@@ -475,9 +501,9 @@ def test_mocker_kv_router_overload_503(request, runtime_services):
) )
kv_router.__enter__() kv_router.__enter__()
# Start single mocker instance with limited resources # Start single mocker instance with limited resources using the new CLI interface
logger.info("Starting single mocker instance with limited resources") logger.info("Starting single mocker instance with limited resources")
mockers = MockerProcess(request, mocker_args_file, num_mockers=1) mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=1)
logger.info(f"Mocker using endpoint: {mockers.endpoint}") logger.info(f"Mocker using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -584,9 +610,6 @@ def test_mocker_kv_router_overload_503(request, runtime_services): ...@@ -584,9 +610,6 @@ def test_mocker_kv_router_overload_503(request, runtime_services):
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_kv_push_router_bindings(request, runtime_services): def test_kv_push_router_bindings(request, runtime_services):
...@@ -599,17 +622,15 @@ def test_kv_push_router_bindings(request, runtime_services): ...@@ -599,17 +622,15 @@ def test_kv_push_router_bindings(request, runtime_services):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting KvPushRouter bindings test") logger.info("Starting KvPushRouter bindings test")
# Create mocker args file # Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
try: try:
# Start mocker instances # Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances") logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS) mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -816,9 +837,6 @@ def test_kv_push_router_bindings(request, runtime_services): ...@@ -816,9 +837,6 @@ def test_kv_push_router_bindings(request, runtime_services):
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_indexers_sync(request, runtime_services): def test_indexers_sync(request, runtime_services):
...@@ -830,17 +848,15 @@ def test_indexers_sync(request, runtime_services): ...@@ -830,17 +848,15 @@ def test_indexers_sync(request, runtime_services):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting indexers sync test") logger.info("Starting indexers sync test")
# Create mocker args file # Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
try: try:
# Start mocker instances # Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances") logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS) mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -1048,9 +1064,6 @@ def test_indexers_sync(request, runtime_services): ...@@ -1048,9 +1064,6 @@ def test_indexers_sync(request, runtime_services):
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_query_instance_id_returns_worker_and_tokens(request, runtime_services): def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
...@@ -1076,10 +1089,7 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services): ...@@ -1076,10 +1089,7 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
logger.info("Starting KV router query_instance_id annotation test") logger.info("Starting KV router query_instance_id annotation test")
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
os.makedirs(request.node.name, exist_ok=True) os.makedirs(request.node.name, exist_ok=True)
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
try: try:
# Start KV router (frontend) # Start KV router (frontend)
...@@ -1090,7 +1100,9 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services): ...@@ -1090,7 +1100,9 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
# Start multiple mocker engines to ensure worker selection logic # Start multiple mocker engines to ensure worker selection logic
logger.info(f"Starting {NUM_MOCKERS} mocker instances") logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(request, mocker_args_file, num_mockers=NUM_MOCKERS) mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}") logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__() mockers.__enter__()
...@@ -1237,5 +1249,3 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services): ...@@ -1237,5 +1249,3 @@ def test_query_instance_id_returns_worker_and_tokens(request, runtime_services):
kv_router.__exit__(None, None, None) kv_router.__exit__(None, None, None)
if "mockers" in locals(): if "mockers" in locals():
mockers.__exit__(None, None, None) mockers.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
...@@ -92,7 +92,7 @@ class VLLMProcess(EngineProcess): ...@@ -92,7 +92,7 @@ class VLLMProcess(EngineProcess):
"""Simple process manager for vllm shell scripts""" """Simple process manager for vllm shell scripts"""
def __init__(self, config: VLLMConfig, request): def __init__(self, config: VLLMConfig, request):
self.port = 8080 self.port = 8000
self.config = config self.config = config
self.dir = config.directory self.dir = config.directory
script_path = os.path.join(self.dir, "launch", config.script_name) script_path = os.path.join(self.dir, "launch", config.script_name)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment