Unverified Commit 6b67c1c4 authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

feat(mocker): integrate AIConfigurator for latency prediction (#7505)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent c3908a36
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
AIC (AI Configurator) direct session wrapper for mocker perf model.
Provides a Python class that wraps the AIC InferenceSession and exposes
predict_prefill() and predict_decode() methods callable from Rust via PyO3.
"""
import logging
from aiconfigurator.sdk import config
from aiconfigurator.sdk.backends.factory import get_backend
from aiconfigurator.sdk.inference_session import InferenceSession
from aiconfigurator.sdk.models import get_model
from aiconfigurator.sdk.perf_database import get_database
logger = logging.getLogger(__name__)
DEFAULT_BACKEND_VERSIONS = {
"vllm": "0.12.0",
"sglang": "0.5.6.post2",
}
class AicSession:
"""Wraps AIC InferenceSession with predict_prefill/predict_decode methods."""
def __init__(
self,
backend_name: str,
system: str,
model_path: str,
tp_size: int,
backend_version: str | None = None,
):
version = backend_version or DEFAULT_BACKEND_VERSIONS.get(
backend_name, DEFAULT_BACKEND_VERSIONS["vllm"]
)
database = get_database(system=system, backend=backend_name, version=version)
model_config = config.ModelConfig(tp_size=tp_size)
model = get_model(
model_path=model_path,
model_config=model_config,
backend_name=backend_name,
)
backend = get_backend(backend_name)
self._session = InferenceSession(
model=model, database=database, backend=backend
)
self._config = config
logger.info(
"AIC session initialized: backend=%s, system=%s, model=%s, tp=%d",
backend_name,
system,
model_path,
tp_size,
)
def predict_prefill(
self, batch_size: int, isl: int, prefix: int, osl: int
) -> float:
"""Predict prefill latency in ms. Parameters match AIC RuntimeConfig."""
# AIC requires at least 1 new token (isl > prefix)
actual_prefix = min(prefix, isl - 1) if isl > 0 else 0
rt = self._config.RuntimeConfig(
batch_size=batch_size, isl=isl, osl=osl, prefix=actual_prefix
)
summary = self._session.run_static(mode="static_ctx", runtime_config=rt)
return sum(summary.get_context_latency_dict().values())
def predict_decode(self, batch_size: int, isl: int, osl: int) -> float:
"""Predict decode (generation) latency in ms."""
rt = self._config.RuntimeConfig(batch_size=batch_size, isl=isl, osl=osl)
summary = self._session.run_static(mode="static_gen", runtime_config=rt)
return sum(summary.get_generation_latency_dict().values())
def create_session(
backend_name: str,
system: str,
model_path: str,
tp_size: int,
backend_version: str | None = None,
) -> AicSession:
"""Factory function called from Rust via PyO3."""
return AicSession(backend_name, system, model_path, tp_size, backend_version)
......@@ -129,6 +129,19 @@ def create_temp_engine_args_file(args: argparse.Namespace) -> Path:
"engine_type": getattr(args, "engine_type", None),
}
# If --aic-perf-model is set, add AIC fields
if getattr(args, "aic_perf_model", False):
engine_type = getattr(args, "engine_type", None) or "vllm"
engine_args["aic_backend"] = engine_type
if getattr(args, "aic_system", None):
engine_args["aic_system"] = args.aic_system
if getattr(args, "aic_backend_version", None):
engine_args["aic_backend_version"] = args.aic_backend_version
if getattr(args, "aic_tp_size", None):
engine_args["aic_tp_size"] = args.aic_tp_size
if getattr(args, "model_path", None):
engine_args["aic_model_path"] = args.model_path
# Parse --reasoning JSON string into a nested object
reasoning_str = getattr(args, "reasoning", None)
if reasoning_str:
......@@ -364,6 +377,33 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
help="Path to profile results directory containing selected_prefill_interpolation/ and "
"selected_decode_interpolation/ subdirectories (default: None, uses hardcoded polynomials)",
)
parser.add_argument(
"--aic-perf-model",
action="store_true",
default=False,
help="Use direct AIC SDK calls for latency prediction. "
"Requires aiconfigurator SDK installed.",
)
parser.add_argument(
"--aic-system",
type=str,
default=None,
help="AIC system name (e.g., 'h200_sxm'). Used with --aic-perf-model.",
)
parser.add_argument(
"--aic-backend-version",
type=str,
default=None,
help="AIC backend engine version (e.g., '0.12.0' for vLLM, '0.5.6.post2' for SGLang). "
"If not set, uses the default version for the backend.",
)
parser.add_argument(
"--aic-tp-size",
type=int,
default=None,
help="Tensor parallel size for AIC latency prediction (default: 1). "
"Only affects AIC performance model lookups, not mocker scheduling.",
)
parser.add_argument(
"--num-workers",
type=int,
......
......@@ -73,7 +73,9 @@ async def worker():
while still sharing the same event loop and tokio runtime.
"""
args = parse_args()
profile_data_result = None
# Resolve planner-profile-data: convert profile results dir to NPZ if needed
profile_data_result = resolve_planner_profile_data(args.planner_profile_data)
args.planner_profile_data = profile_data_result.npz_path
# Offline replay does not need planner profile conversion or runtime setup.
if args.trace_file is not None:
......@@ -101,10 +103,6 @@ async def worker():
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {e}")
# Resolve planner-profile-data: convert profile results dir to NPZ if needed
profile_data_result = resolve_planner_profile_data(args.planner_profile_data)
args.planner_profile_data = profile_data_result.npz_path
# Handle extra_engine_args: either use provided file or create from CLI args
if args.extra_engine_args:
# User provided explicit JSON file
......
......@@ -100,6 +100,10 @@ python -m dynamo.mocker \
| `--sglang-chunked-prefill-size` | 8192 | SGLang chunked-prefill chunk size |
| `--sglang-clip-max-new-tokens` | 4096 | SGLang admission-budget cap for max new tokens |
| `--sglang-schedule-conservativeness` | 1.0 | SGLang schedule conservativeness factor |
| `--aic-perf-model` | False | Use AIC SDK for latency prediction instead of interpolated/polynomial models. Requires `aiconfigurator` SDK installed (install with `pip install ai-dynamo[mocker]`) |
| `--aic-system` | `h200_sxm` | AIC system name (e.g., `h200_sxm`). Used with `--aic-perf-model` |
| `--aic-backend-version` | Auto | AIC backend engine version (e.g., `0.12.0` for vLLM). If not set, uses the default version for the backend |
| `--aic-tp-size` | 1 | Tensor parallel size for AIC latency prediction. Only affects AIC performance model lookups, not mocker scheduling |
| `--extra-engine-args` | None | Path to a JSON file with mocker configuration; overrides individual CLI arguments |
| `--stagger-delay` | -1 (auto) | Delay between worker launches (seconds). 0 disables, -1 enables auto mode |
| `--disaggregation-mode` | `agg` | Worker mode: `agg` (aggregated), `prefill`, or `decode` |
......@@ -157,6 +161,22 @@ python -m dynamo.mocker \
--speedup-ratio 1.0
```
### AIC Performance Model
To use the AIC SDK for latency prediction:
```bash
pip install ai-dynamo[mocker]
python -m dynamo.mocker \
--model-path nvidia/Llama-3.1-8B-Instruct-FP8 \
--engine-type vllm \
--aic-perf-model \
--aic-system h200_sxm
```
The AIC model automatically uses `--model-path` and `--engine-type` to select the appropriate performance data. Available systems include `h200_sxm`, `h100_sxm`, etc. (see AIC SDK documentation for the full list).
Example `--reasoning` configuration:
```bash
......@@ -257,12 +277,14 @@ Each active request is tracked as a sequence, managing its token blocks and gene
### Performance Model
The mocker supports two timing prediction modes:
The mocker supports three timing prediction modes:
**Polynomial Model (Default):** Uses hardcoded polynomial formulas that approximate typical GPU behavior. Prefill time scales quadratically with token count, while decode time depends on the total active KV cache size.
**Interpolated Model:** Loads actual profiling data from an NPZ file containing measured prefill and decode latencies. The mocker interpolates between data points to predict timing for any input size. This enables high-fidelity simulation matching a specific hardware configuration.
**AIC Model (`--aic-perf-model`):** Uses the NVIDIA AI Configurator (AIC) SDK for latency prediction. AIC provides calibrated performance models for specific GPU/model/engine combinations, predicting prefill and decode latency as a function of batch size, sequence length, and prefix cache hits. The model path is automatically derived from `--model-path`, and the engine type from `--engine-type`. This mode requires the `aiconfigurator` SDK, installable via `pip install ai-dynamo[mocker]`.
### Bootstrap Rendezvous (Disaggregated Serving)
For disaggregated prefill/decode deployments, prefill and decode workers coordinate via a simple TCP-based rendezvous protocol. The decode worker connects to the prefill worker's bootstrap port and waits until the prefill phase completes and KV cache is ready. Either side can arrive first—the rendezvous completes when both are ready.
......
......@@ -23,6 +23,7 @@
/// integration between Python tools and the Dynamo runtime.
use super::*;
pub mod aic_callback;
pub mod entrypoint;
pub mod fpm;
pub mod kv;
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Python↔Rust bridge for the AIC (AI Configurator) perf model.
//!
//! [`PyAicCallback`] wraps a Python `AicSession` object and implements the
//! [`AicCallback`] trait so the Rust mocker scheduler can call AIC latency
//! predictions without knowing about PyO3.
use std::sync::Arc;
use pyo3::prelude::*;
use dynamo_mocker::common::perf_model::AicCallback;
/// Wraps a Python AIC InferenceSession for direct calls from Rust.
///
/// The Python object must expose:
/// - `predict_prefill(batch_size, isl, prefix, osl) -> float`
/// - `predict_decode(batch_size, isl, osl) -> float`
pub(super) struct PyAicCallback {
pub(super) session: PyObject,
}
// Safety: PyAicCallback is only called via Python::with_gil which acquires the GIL.
unsafe impl Send for PyAicCallback {}
unsafe impl Sync for PyAicCallback {}
impl AicCallback for PyAicCallback {
fn predict_prefill(&self, batch_size: usize, isl: usize, prefix: usize, osl: usize) -> f64 {
Python::with_gil(|py| {
self.session
.call_method1(py, "predict_prefill", (batch_size, isl, prefix, osl))
.and_then(|r| r.extract::<f64>(py))
.unwrap_or_else(|e| panic!("AIC predict_prefill failed: {e}"))
})
}
fn predict_decode(&self, batch_size: usize, isl: usize, osl: usize) -> f64 {
Python::with_gil(|py| {
self.session
.call_method1(py, "predict_decode", (batch_size, isl, osl))
.and_then(|r| r.extract::<f64>(py))
.unwrap_or_else(|e| panic!("AIC predict_decode failed: {e}"))
})
}
}
/// Initialize an AIC callback by importing and calling the Python setup function.
///
/// Called once at mocker startup when `--aic-perf-model` is requested.
pub(super) fn create_aic_callback(
py: Python<'_>,
backend_name: &str,
system: &str,
model_path: &str,
tp_size: usize,
backend_version: Option<&str>,
) -> PyResult<Arc<dyn AicCallback>> {
let module = py.import("dynamo.mocker.aic_session")?;
let session = module.call_method1(
"create_session",
(backend_name, system, model_path, tp_size, backend_version),
)?;
Ok(Arc::new(PyAicCallback {
session: session.into(),
}))
}
......@@ -22,6 +22,9 @@ use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_llm::mocker::make_mocker_engine;
use dynamo_llm::model_card::ModelDeploymentCard as RsModelDeploymentCard;
use dynamo_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
use dynamo_mocker::common::perf_model::PerfModel;
use super::aic_callback::create_aic_callback;
use dynamo_mocker::common::protocols::MockEngineArgs;
use dynamo_runtime::discovery::ModelCardInstanceId as RsModelCardInstanceId;
use dynamo_runtime::protocols::EndpointId;
......@@ -416,7 +419,7 @@ async fn select_engine(
}
}
EngineType::Mocker => {
let mocker_args = if let Some(extra_args_path) = args.extra_engine_args {
let mut mocker_args = if let Some(extra_args_path) = args.extra_engine_args {
MockEngineArgs::from_json_file(&extra_args_path).map_err(|e| {
anyhow::anyhow!(
"Failed to load mocker args from {:?}: {}",
......@@ -431,6 +434,38 @@ async fn select_engine(
MockEngineArgs::default()
};
// If aic_backend is set, create Python AIC callback and override perf_model
if let Some(ref backend_name) = mocker_args.aic_backend {
let backend = backend_name.clone();
let system = mocker_args.aic_system.as_deref().unwrap_or("h200_sxm");
let model_name = mocker_args
.aic_model_path
.as_deref()
.unwrap_or_else(|| local_model.card().source_path());
let backend_version = mocker_args.aic_backend_version.as_deref();
let tp_size = mocker_args.aic_tp_size.unwrap_or(1);
match Python::with_gil(|py| {
create_aic_callback(py, &backend, system, model_name, tp_size, backend_version)
}) {
Ok(callback) => {
tracing::info!(
"AIC perf model: backend={}, gpu={}, model={}, version={:?}",
backend,
system,
model_name,
backend_version
);
mocker_args.perf_model = Arc::new(PerfModel::from_aic_callback(callback));
}
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to create AIC callback (--aic-perf-model was requested): {}",
e
));
}
}
}
let endpoint = local_model.endpoint_id().clone();
let engine =
......@@ -477,19 +512,53 @@ pub fn run_mocker_trace_replay(
num_workers: usize,
replay_concurrency: Option<isize>,
) -> PyResult<PyObject> {
let report = py.allow_threads(move || {
let args = if let Some(extra_args_path) = extra_engine_args {
MockEngineArgs::from_json_file(&extra_args_path).map_err(|e| {
anyhow::anyhow!(
// Load args before allow_threads so we can use the GIL for AIC callback creation.
let mut args = if let Some(ref extra_args_path) = extra_engine_args {
MockEngineArgs::from_json_file(extra_args_path).map_err(|e| {
PyException::new_err(format!(
"Failed to load mocker args from {:?}: {}",
extra_args_path,
e
)
extra_args_path, e
))
})?
} else {
MockEngineArgs::default()
};
// Create AIC callback if requested (requires GIL, must be done before allow_threads).
if let Some(ref backend_name) = args.aic_backend.clone() {
let backend = backend_name.clone();
let system = args.aic_system.as_deref().unwrap_or("h200_sxm").to_string();
let model_name = args
.aic_model_path
.clone()
.ok_or_else(|| PyException::new_err("--aic-perf-model requires --model-path"))?;
let backend_version = args.aic_backend_version.clone();
let tp_size = args.aic_tp_size.unwrap_or(1);
let callback = create_aic_callback(
py,
&backend,
&system,
&model_name,
tp_size,
backend_version.as_deref(),
)
.map_err(|e| {
PyException::new_err(format!(
"Failed to create AIC callback (--aic-perf-model was requested): {}",
e
))
})?;
tracing::info!(
"AIC perf model: backend={}, gpu={}, model={}, version={:?}",
backend,
system,
model_name,
backend_version
);
args.perf_model = Arc::new(PerfModel::from_aic_callback(callback));
}
let report = py.allow_threads(move || {
let replay_concurrency = replay_concurrency
.map(usize::try_from)
.transpose()
......
......@@ -25,6 +25,18 @@ pub trait DecodeInterpolator: Send + Sync {
fn interp(&self, x: f64, y: f64) -> Result<f64, InterpolateError>;
}
/// Callback trait for direct AIC SDK calls.
/// Implementors call the Python AIC SDK via PyO3 GIL.
pub trait AicCallback: Send + Sync {
/// Predict prefill latency in ms.
/// Parameters: (batch_size, isl, prefix, osl)
fn predict_prefill(&self, batch_size: usize, isl: usize, prefix: usize, osl: usize) -> f64;
/// Predict decode (generation) latency in ms.
/// Parameters: (batch_size, isl, osl)
fn predict_decode(&self, batch_size: usize, isl: usize, osl: usize) -> f64;
}
/// Wrapper to implement PrefillInterpolator for the concrete Interp1D type
struct PrefillInterp1D {
inner: ndarray_interp::interp1d::Interp1D<
......@@ -65,11 +77,14 @@ pub enum PerfModel {
#[default]
Polynomial,
/// Interpolation-based model using profiler data
/// Interpolators are built once and stored as trait objects
/// Decode axes: (active_kv_tokens, context_length)
Interpolated {
prefill_interp: Arc<dyn PrefillInterpolator>,
decode_interp: Arc<dyn DecodeInterpolator>,
},
/// AI Configurator SDK calls via Python callback.
/// Passes full parameters (batch_size, isl, prefix, osl) for maximum accuracy.
Aiconfigurator { callback: Arc<dyn AicCallback> },
}
impl Clone for PerfModel {
......@@ -83,6 +98,9 @@ impl Clone for PerfModel {
prefill_interp: Arc::clone(prefill_interp),
decode_interp: Arc::clone(decode_interp),
},
PerfModel::Aiconfigurator { callback } => PerfModel::Aiconfigurator {
callback: Arc::clone(callback),
},
}
}
}
......@@ -92,6 +110,7 @@ impl std::fmt::Debug for PerfModel {
match self {
PerfModel::Polynomial => write!(f, "PerfModel::Polynomial"),
PerfModel::Interpolated { .. } => write!(f, "PerfModel::Interpolated {{ .. }}"),
PerfModel::Aiconfigurator { .. } => write!(f, "PerfModel::Aiconfigurator"),
}
}
}
......@@ -188,49 +207,67 @@ impl PerfModel {
})
}
/// Predict prefill time in milliseconds given the number of new tokens
pub fn predict_prefill_time(&self, new_tokens: usize) -> f64 {
/// Create an Aiconfigurator perf model from a callback.
pub fn from_aic_callback(callback: Arc<dyn AicCallback>) -> Self {
PerfModel::Aiconfigurator { callback }
}
/// Predict prefill time in milliseconds.
///
/// Callers always pass all parameters; each variant uses what it needs:
/// - Polynomial/Interpolated: uses total new tokens across the batch
/// (`batch_size * (isl - prefix)`), modeling GPU processing total tokens in parallel
/// - Aiconfigurator: passes (batch_size, isl, prefix) directly to the AIC SDK
pub fn predict_prefill_time(&self, batch_size: usize, isl: usize, prefix: usize) -> f64 {
let new_tokens_per_req = isl.saturating_sub(prefix);
let time = match self {
PerfModel::Polynomial => {
// Original polynomial formula
let tokens = new_tokens as f64;
// Total tokens across the batch — GPU processes them in parallel
let tokens = (batch_size * new_tokens_per_req) as f64;
4.209989e-07 * tokens.powi(2) + 1.518344e-02 * tokens + 1.650142e+01
}
PerfModel::Interpolated { prefill_interp, .. } => {
// Use pre-built interpolator
let query = new_tokens as f64;
prefill_interp.interp(query).unwrap_or(0.0)
let tokens = (batch_size * new_tokens_per_req) as f64;
prefill_interp.interp(tokens).unwrap_or(0.0)
}
PerfModel::Aiconfigurator { callback } => {
callback.predict_prefill(batch_size, isl, prefix, 1)
}
};
// Ensure non-negative timing
let result = time.max(0.0);
tracing::trace!("Prefill time prediction: new_tokens={new_tokens}, time={result:.2}ms");
result
time.max(0.0)
}
/// Predict decode time in milliseconds given active KV tokens and context length
/// Predict decode time in milliseconds.
///
/// For the Polynomial variant, this computes active percentage as active_kv_tokens / 16384.
/// For the Interpolated variant, this performs 2D bilinear interpolation.
pub fn predict_decode_time(&self, active_kv_tokens: usize, context_length: usize) -> f64 {
/// Callers always pass all parameters; each variant uses what it needs:
/// - Polynomial: uses active_kv_tokens
/// - Interpolated: uses (active_kv_tokens, context_length)
/// - Aiconfigurator: uses (batch_size, context_length)
pub fn predict_decode_time(
&self,
batch_size: usize,
active_kv_tokens: usize,
context_length: usize,
) -> f64 {
if batch_size == 0 {
return 0.0;
}
let time = match self {
PerfModel::Polynomial => {
// Compute active percentage using default capacity
let active_perc = active_kv_tokens as f64 / 16384.0;
// Original polynomial formula
-25.74 * active_perc.powi(2) + 54.01 * active_perc + 5.74
}
PerfModel::Interpolated { decode_interp, .. } => {
// Use pre-built interpolator
let query_x = active_kv_tokens as f64;
let query_y = context_length as f64;
decode_interp.interp(query_x, query_y).unwrap_or(0.0)
PerfModel::Interpolated { decode_interp, .. } => decode_interp
.interp(active_kv_tokens as f64, context_length as f64)
.unwrap_or(0.0),
PerfModel::Aiconfigurator { callback } => {
callback.predict_decode(batch_size, context_length, 2)
}
};
// Token-emitting decode steps should not collapse onto the same timestamp.
let result = time.max(1.0);
tracing::trace!(
"Decode time prediction: active_kv_tokens={active_kv_tokens}, context_length={context_length}, time={result:.2}ms"
"Decode time prediction: batch_size={batch_size}, active_kv_tokens={active_kv_tokens}, context_length={context_length}, time={result:.2}ms"
);
result
}
......
......@@ -61,6 +61,9 @@ pub struct DirectRequest {
pub struct PrefillCost {
pub new_blocks: usize,
pub new_tokens: usize,
/// Number of tokens already cached (prefix hit).
/// isl = cached_tokens + new_tokens
pub cached_tokens: usize,
}
impl PrefillCost {
......@@ -70,7 +73,8 @@ impl PrefillCost {
perf_model: &PerfModel,
) -> f64 {
let tokens = new_tokens.unwrap_or(self.new_tokens);
perf_model.predict_prefill_time(tokens)
let isl = self.cached_tokens + tokens;
perf_model.predict_prefill_time(1, isl, self.cached_tokens)
}
}
......@@ -232,6 +236,35 @@ pub struct MockEngineArgs {
#[builder(default = "Arc::new(PerfModel::default())")]
pub perf_model: Arc<PerfModel>,
/// If set, indicates direct AIC SDK calls should be used.
/// The value is the backend name (e.g., "sglang", "vllm").
/// The Python layer reads this and overrides perf_model with an Aiconfigurator callback.
#[serde(skip)]
#[builder(default = "None")]
pub aic_backend: Option<String>,
/// AIC GPU system name (e.g., "h200_sxm"). Required when aic_backend is set.
#[serde(skip)]
#[builder(default = "None")]
pub aic_system: Option<String>,
/// AIC backend engine version (e.g., "0.12.0" for vLLM, "0.5.6.post2" for SGLang).
/// If None, uses the default version for the backend.
#[serde(skip)]
#[builder(default = "None")]
pub aic_backend_version: Option<String>,
/// Tensor parallel size for AIC latency prediction.
/// Only affects AIC performance model lookups, not mocker scheduling.
#[serde(skip)]
#[builder(default = "None")]
pub aic_tp_size: Option<usize>,
/// HuggingFace model path for AIC latency prediction (e.g., "nvidia/Llama-3.1-8B-Instruct-FP8").
#[serde(skip)]
#[builder(default = "None")]
pub aic_model_path: Option<String>,
/// Enable worker-local KV indexer for tracking this worker's own KV cache state
#[builder(default = "false")]
pub enable_local_indexer: bool,
......@@ -331,6 +364,11 @@ impl MockEngineArgs {
"is_prefill",
"is_decode",
"planner_profile_data",
"aic_backend",
"aic_system",
"aic_backend_version",
"aic_tp_size",
"aic_model_path",
"enable_local_indexer",
"bootstrap_port",
"kv_bytes_per_token",
......@@ -523,7 +561,7 @@ impl MockEngineArgs {
};
builder = builder.worker_type(worker_type);
// Load performance model from NPZ file if provided
// Load performance model from NPZ file if provided.
let perf_model = if let Some(path_str) = extra_args.get("planner_profile_data")
&& let Some(path_str) = path_str.as_str()
{
......@@ -547,6 +585,32 @@ impl MockEngineArgs {
};
builder = builder.perf_model(perf_model);
// Check for AIC direct mode fields
if let Some(backend) = extra_args.get("aic_backend")
&& let Some(backend_str) = backend.as_str()
{
builder = builder.aic_backend(Some(backend_str.to_string()));
}
if let Some(system) = extra_args.get("aic_system")
&& let Some(s) = system.as_str()
{
builder = builder.aic_system(Some(s.to_string()));
}
if let Some(version) = extra_args.get("aic_backend_version")
&& let Some(s) = version.as_str()
{
builder = builder.aic_backend_version(Some(s.to_string()));
}
if let Some(tp) = extra_args.get("aic_tp_size")
&& let Some(n) = tp.as_u64()
{
builder = builder.aic_tp_size(Some(n as usize));
}
if let Some(mp) = extra_args.get("aic_model_path")
&& let Some(s) = mp.as_str()
{
builder = builder.aic_model_path(Some(s.to_string()));
}
// Build the MockEngineArgs with either defaults or overridden values
builder
.build()
......
......@@ -376,6 +376,7 @@ impl KvManager {
PrefillCost {
new_blocks,
new_tokens,
cached_tokens,
}
}
}
......
......@@ -208,7 +208,18 @@ impl SglangScheduler {
}
// 4. Simulate prefill
simulate_prefill(admit.total_new_tokens, admit.can_run.len(), &config).await;
let batch_size = admit.can_run.len();
let mean_isl = if batch_size > 0 {
admit.total_isl / batch_size
} else {
0
};
let mean_prefix = if batch_size > 0 {
admit.total_prefix / batch_size
} else {
0
};
simulate_prefill(batch_size, mean_isl, mean_prefix, &config).await;
// Separate fully-prefilled from chunked requests
for mut req in admit.can_run {
......@@ -348,8 +359,10 @@ fn apply_schedule_policy(
struct AdmitResult {
can_run: Vec<SglangRequest>,
/// Total new tokens to prefill (computed before prefilled_tokens is updated).
total_new_tokens: usize,
/// Sum of ISL values across admitted requests (for computing mean).
total_isl: usize,
/// Sum of prefix (cached tokens) across admitted requests (for computing mean).
total_prefix: usize,
oom: bool,
}
......@@ -378,7 +391,8 @@ fn get_new_batch_prefill(
let mut can_run = Vec::new();
let mut rejected = VecDeque::new();
let mut oom = false;
let mut total_new_tokens: usize = 0;
let mut total_isl: usize = 0;
let mut total_prefix: usize = 0;
while let Some(mut req) = waiting.pop_front() {
let extend_input = req.extend_input_len() as f64;
......@@ -435,9 +449,8 @@ fn get_new_batch_prefill(
req.prefilled_tokens = chunk_end;
let actual_prefilled = (chunk_end - (req.token_ids.len() - extend_input as usize)) as f64;
// Only count cache-miss tokens for prefill timing (prefix hits skip compute)
let new_compute_tokens = chunk_end.saturating_sub(alloc.prefix_len);
total_new_tokens += new_compute_tokens;
total_isl += chunk_end;
total_prefix += alloc.prefix_len;
rem_total_tokens -= total_needed;
rem_input_tokens -= actual_prefilled;
rem_chunk_tokens -= actual_prefilled;
......@@ -455,22 +468,26 @@ fn get_new_batch_prefill(
AdmitResult {
can_run,
total_new_tokens,
total_isl,
total_prefix,
oom,
}
}
async fn simulate_prefill(total_new_tokens: usize, num_reqs: usize, config: &SglangConfig) {
if num_reqs == 0 {
return;
}
if config.worker_type == WorkerType::Decode {
async fn simulate_prefill(
batch_size: usize,
mean_isl: usize,
mean_prefix: usize,
config: &SglangConfig,
) {
if batch_size == 0 || config.worker_type == WorkerType::Decode {
return;
}
let start = Instant::now();
let prefill_time = config.perf_model.predict_prefill_time(total_new_tokens);
let prefill_time = config
.perf_model
.predict_prefill_time(batch_size, mean_isl, mean_prefix);
let total_time = Duration::from_secs_f64(prefill_time / 1000.0);
if config.speedup_ratio > 0.0 && total_time > Duration::ZERO {
......@@ -580,9 +597,10 @@ async fn simulate_decode(
.sum();
let avg_context = total_context / running.len();
let decode_time = config
let decode_time =
config
.perf_model
.predict_decode_time(total_context, avg_context);
.predict_decode_time(running.len(), total_context, avg_context);
let total_time = Duration::from_secs_f64(decode_time / 1000.0);
......
......@@ -350,12 +350,15 @@ fn simulate_prefill_step(
current_time_ms: f64,
apply_speedup: bool,
) -> Duration {
let mut total_time = Duration::ZERO;
let mut token_budget = args
.max_num_batched_tokens
.map_or(usize::MAX, |t| t.saturating_sub(state.decode.len()));
// Accumulate batch-level prefill stats for a single predict call after the loop.
let mut batch_count: usize = 0;
let mut batch_total_isl: usize = 0;
let mut batch_total_prefix: usize = 0;
'prefill: while token_budget > 0 {
// Drain prefill first, then pull from waiting one at a time.
if state.prefill.is_empty() {
......@@ -426,13 +429,13 @@ fn simulate_prefill_step(
seq.commit_allocation(cumulative);
}
// Accumulate prefill compute time only for the new tokens in this chunk.
// Accumulate per-request (isl, prefix) for batch-level prediction.
let new_tokens_in_chunk = chunk.min(remaining);
if args.worker_type != WorkerType::Decode && new_tokens_in_chunk > 0 {
total_time += Duration::from_secs_f64(
prefill_cost.predict_prefill_compute(Some(new_tokens_in_chunk), &args.perf_model)
/ 1000.0,
);
let isl = prefill_cost.cached_tokens + new_tokens_in_chunk;
batch_total_isl += isl;
batch_total_prefix += prefill_cost.cached_tokens;
batch_count += 1;
}
// Hit rate: fraction of tokens that were already cached.
......@@ -455,6 +458,18 @@ fn simulate_prefill_step(
}
}
// One batch-level prefill prediction instead of summing per-request predictions.
let total_time = if batch_count > 0 {
let mean_isl = batch_total_isl / batch_count;
let mean_prefix = batch_total_prefix / batch_count;
let ms = args
.perf_model
.predict_prefill_time(batch_count, mean_isl, mean_prefix);
Duration::from_secs_f64(ms / 1000.0)
} else {
Duration::ZERO
};
if !apply_speedup || args.speedup_ratio <= 0.0 || total_time <= Duration::ZERO {
return total_time;
}
......@@ -489,12 +504,13 @@ fn simulate_decode_step(
return Duration::ZERO;
}
let count = decode_lengths.len();
let active_kv_tokens = kv_manager.num_active_blocks() * args.block_size;
let total_length: usize = decode_lengths.iter().sum();
let context_length = total_length / decode_lengths.len();
let decoding_time = args
.perf_model
.predict_decode_time(active_kv_tokens, context_length);
let context_length = total_length / count;
let decoding_time =
args.perf_model
.predict_decode_time(count, active_kv_tokens, context_length);
let unscaled_time = Duration::from_secs_f64(decoding_time / 1000.0);
let effective_ratio = args.speedup_ratio * args.decode_speedup_ratio;
let total_time = if apply_speedup && effective_ratio > 0.0 && unscaled_time > Duration::ZERO {
......
......@@ -65,6 +65,10 @@ sglang = [
"cupy-cuda12x>=13.0.0",
]
mocker = [
"aiconfigurator>=0.7.0",
]
[project.entry-points.pytest11]
vllm_tests = "dynamo.vllm.tests.conftest"
trtllm_tests = "dynamo.trtllm.tests.conftest"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment