Unverified Commit 3292ed1b authored by Yongming Ding's avatar Yongming Ding Committed by GitHub
Browse files

feat(mocker): add MoE parallelism params to AIC perf model (#7856)


Signed-off-by: default avatarYongming Ding <yongmingd@nvidia.com>
parent b2cafb7f
......@@ -35,6 +35,9 @@ class AicSession:
model_path: str,
tp_size: int,
backend_version: str | None = None,
moe_tp_size: int | None = None,
moe_ep_size: int | None = None,
attention_dp_size: int | None = None,
):
version = backend_version or DEFAULT_BACKEND_VERSIONS.get(
backend_name, DEFAULT_BACKEND_VERSIONS["vllm"]
......@@ -49,7 +52,17 @@ class AicSession:
f"system={system!r}, backend={backend_name!r}, version={version!r}. "
f"Supported versions for this system/backend: {supported_versions}"
)
model_config = config.ModelConfig(tp_size=tp_size)
# Build ModelConfig. For MoE models, aic_moe_tp_size, aic_moe_ep_size, and
# aic_attention_dp_size must be set to satisfy AIC's constraint:
# tp_size * attention_dp_size == moe_tp_size * moe_ep_size
# AIC SDK validates this internally and raises a clear AssertionError if violated.
effective_dp = attention_dp_size or 1
model_config = config.ModelConfig(
tp_size=tp_size,
moe_tp_size=moe_tp_size,
moe_ep_size=moe_ep_size,
attention_dp_size=effective_dp,
)
model = get_model(
model_path=model_path,
model_config=model_config,
......@@ -144,6 +157,18 @@ def create_session(
model_path: str,
tp_size: int,
backend_version: str | None = None,
moe_tp_size: int | None = None,
moe_ep_size: int | None = None,
attention_dp_size: int | None = None,
) -> AicSession:
"""Factory function called from Rust via PyO3."""
return AicSession(backend_name, system, model_path, tp_size, backend_version)
return AicSession(
backend_name,
system,
model_path,
tp_size,
backend_version,
moe_tp_size,
moe_ep_size,
attention_dp_size,
)
......@@ -299,6 +299,27 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
help="Tensor parallel size for AIC latency prediction (default: 1). "
"Only affects AIC performance model lookups, not mocker scheduling.",
)
parser.add_argument(
"--aic-moe-tp-size",
type=int,
default=None,
help="MoE tensor-parallel size for AIC latency prediction. "
"Required for MoE models. Constraint: aic_tp_size * aic_attention_dp_size == aic_moe_tp_size * aic_moe_ep_size.",
)
parser.add_argument(
"--aic-moe-ep-size",
type=int,
default=None,
help="MoE expert-parallel size for AIC latency prediction. "
"Required for MoE models. Constraint: aic_tp_size * aic_attention_dp_size == aic_moe_tp_size * aic_moe_ep_size.",
)
parser.add_argument(
"--aic-attention-dp-size",
type=int,
default=None,
help="Attention data-parallel size for AIC latency prediction (default: 1). "
"Corresponds to the 'dp' dimension in AIC CLI output.",
)
parser.add_argument(
"--num-workers",
type=int,
......
......@@ -54,12 +54,18 @@ def build_mocker_engine_args(args: argparse.Namespace) -> MockEngineArgs:
aic_backend_version = None
aic_tp_size = None
aic_model_path = None
aic_moe_tp_size = None
aic_moe_ep_size = None
aic_attention_dp_size = None
if getattr(args, "aic_perf_model", False):
aic_backend = getattr(args, "engine_type", None) or "vllm"
aic_system = getattr(args, "aic_system", None)
aic_backend_version = getattr(args, "aic_backend_version", None)
aic_tp_size = getattr(args, "aic_tp_size", None)
aic_model_path = getattr(args, "model_path", None)
aic_moe_tp_size = getattr(args, "aic_moe_tp_size", None)
aic_moe_ep_size = getattr(args, "aic_moe_ep_size", None)
aic_attention_dp_size = getattr(args, "aic_attention_dp_size", None)
return MockEngineArgs(
engine_type=getattr(args, "engine_type", None) or "vllm",
num_gpu_blocks=getattr(args, "num_gpu_blocks", _DEFAULT_NUM_GPU_BLOCKS),
......@@ -81,6 +87,9 @@ def build_mocker_engine_args(args: argparse.Namespace) -> MockEngineArgs:
aic_backend_version=aic_backend_version,
aic_tp_size=aic_tp_size,
aic_model_path=aic_model_path,
aic_moe_tp_size=aic_moe_tp_size,
aic_moe_ep_size=aic_moe_ep_size,
aic_attention_dp_size=aic_attention_dp_size,
enable_local_indexer=not getattr(args, "durable_kv_events", False),
kv_transfer_bandwidth=getattr(args, "kv_transfer_bandwidth", None),
reasoning=_parse_reasoning_config(getattr(args, "reasoning", None)),
......
......@@ -55,6 +55,9 @@ def make_args(**overrides):
"aic_system": None,
"aic_backend_version": None,
"aic_tp_size": None,
"aic_moe_tp_size": None,
"aic_moe_ep_size": None,
"aic_attention_dp_size": None,
"model_path": None,
"is_prefill_worker": False,
"is_decode_worker": False,
......@@ -207,6 +210,9 @@ def test_build_mocker_engine_args_preserves_cli_mapped_fields(tmp_path):
"aic_backend_version": "0.5.6.post2",
"aic_tp_size": 8,
"aic_model_path": "/models/mock",
"aic_moe_tp_size": None,
"aic_moe_ep_size": None,
"aic_attention_dp_size": None,
"enable_local_indexer": True,
"bootstrap_port": None,
"kv_bytes_per_token": None,
......
......@@ -49,6 +49,7 @@ impl AicCallback for PyAicCallback {
/// Initialize an AIC callback by importing and calling the Python setup function.
///
/// Called once at mocker startup when `--aic-perf-model` is requested.
#[allow(clippy::too_many_arguments)]
pub(super) fn create_aic_callback(
py: Python<'_>,
backend_name: &str,
......@@ -56,11 +57,23 @@ pub(super) fn create_aic_callback(
model_path: &str,
tp_size: usize,
backend_version: Option<&str>,
moe_tp_size: Option<usize>,
moe_ep_size: Option<usize>,
attention_dp_size: Option<usize>,
) -> PyResult<Arc<dyn AicCallback>> {
let module = py.import("dynamo.mocker.aic_session")?;
let session = module.call_method1(
"create_session",
(backend_name, system, model_path, tp_size, backend_version),
(
backend_name,
system,
model_path,
tp_size,
backend_version,
moe_tp_size,
moe_ep_size,
attention_dp_size,
),
)?;
Ok(Arc::new(PyAicCallback {
session: session.into(),
......
......@@ -500,8 +500,21 @@ async fn select_engine(
.unwrap_or_else(|| local_model.card().source_path());
let backend_version = mocker_args.aic_backend_version.as_deref();
let tp_size = mocker_args.aic_tp_size.unwrap_or(1);
let moe_tp_size = mocker_args.aic_moe_tp_size;
let moe_ep_size = mocker_args.aic_moe_ep_size;
let attention_dp_size = mocker_args.aic_attention_dp_size;
match Python::with_gil(|py| {
create_aic_callback(py, &backend, system, model_name, tp_size, backend_version)
create_aic_callback(
py,
&backend,
system,
model_name,
tp_size,
backend_version,
moe_tp_size,
moe_ep_size,
attention_dp_size,
)
}) {
Ok(callback) => {
tracing::info!(
......
......@@ -133,7 +133,7 @@ impl MockEngineArgs {
#[pymethods]
impl MockEngineArgs {
#[new]
#[pyo3(signature = (engine_type="vllm", num_gpu_blocks=16384, block_size=0, max_num_seqs=Some(256), max_num_batched_tokens=Some(8192), enable_prefix_caching=true, enable_chunked_prefill=true, speedup_ratio=1.0, decode_speedup_ratio=1.0, dp_size=1, startup_time=None, worker_type="aggregated", planner_profile_data=None, aic_backend=None, aic_system=None, aic_backend_version=None, aic_tp_size=None, aic_model_path=None, enable_local_indexer=false, bootstrap_port=None, kv_bytes_per_token=None, kv_transfer_bandwidth=None, reasoning=None, zmq_kv_events_port=None, zmq_replay_port=None, preemption_mode="lifo", router_queue_policy=None, sglang=None))]
#[pyo3(signature = (engine_type="vllm", num_gpu_blocks=16384, block_size=0, max_num_seqs=Some(256), max_num_batched_tokens=Some(8192), enable_prefix_caching=true, enable_chunked_prefill=true, speedup_ratio=1.0, decode_speedup_ratio=1.0, dp_size=1, startup_time=None, worker_type="aggregated", planner_profile_data=None, aic_backend=None, aic_system=None, aic_backend_version=None, aic_tp_size=None, aic_model_path=None, aic_moe_tp_size=None, aic_moe_ep_size=None, aic_attention_dp_size=None, enable_local_indexer=false, bootstrap_port=None, kv_bytes_per_token=None, kv_transfer_bandwidth=None, reasoning=None, zmq_kv_events_port=None, zmq_replay_port=None, preemption_mode="lifo", router_queue_policy=None, sglang=None))]
#[allow(clippy::too_many_arguments)]
fn new(
engine_type: &str,
......@@ -154,6 +154,9 @@ impl MockEngineArgs {
aic_backend_version: Option<String>,
aic_tp_size: Option<usize>,
aic_model_path: Option<String>,
aic_moe_tp_size: Option<usize>,
aic_moe_ep_size: Option<usize>,
aic_attention_dp_size: Option<usize>,
enable_local_indexer: bool,
bootstrap_port: Option<u16>,
kv_bytes_per_token: Option<usize>,
......@@ -195,6 +198,9 @@ impl MockEngineArgs {
.aic_backend_version(aic_backend_version)
.aic_tp_size(aic_tp_size)
.aic_model_path(aic_model_path)
.aic_moe_tp_size(aic_moe_tp_size)
.aic_moe_ep_size(aic_moe_ep_size)
.aic_attention_dp_size(aic_attention_dp_size)
.enable_local_indexer(enable_local_indexer)
.bootstrap_port(bootstrap_port)
.kv_bytes_per_token(kv_bytes_per_token)
......@@ -272,6 +278,9 @@ impl MockEngineArgs {
"aic_backend_version": self.inner.aic_backend_version,
"aic_tp_size": self.inner.aic_tp_size,
"aic_model_path": self.inner.aic_model_path,
"aic_moe_tp_size": self.inner.aic_moe_tp_size,
"aic_moe_ep_size": self.inner.aic_moe_ep_size,
"aic_attention_dp_size": self.inner.aic_attention_dp_size,
"enable_local_indexer": self.inner.enable_local_indexer,
"bootstrap_port": self.inner.bootstrap_port,
"kv_bytes_per_token": self.inner.kv_bytes_per_token,
......@@ -386,6 +395,36 @@ impl MockEngineArgs {
self.inner.aic_model_path = value;
}
#[getter]
fn aic_moe_tp_size(&self) -> Option<usize> {
self.inner.aic_moe_tp_size
}
#[setter]
fn set_aic_moe_tp_size(&mut self, value: Option<usize>) {
self.inner.aic_moe_tp_size = value;
}
#[getter]
fn aic_moe_ep_size(&self) -> Option<usize> {
self.inner.aic_moe_ep_size
}
#[setter]
fn set_aic_moe_ep_size(&mut self, value: Option<usize>) {
self.inner.aic_moe_ep_size = value;
}
#[getter]
fn aic_attention_dp_size(&self) -> Option<usize> {
self.inner.aic_attention_dp_size
}
#[setter]
fn set_aic_attention_dp_size(&mut self, value: Option<usize>) {
self.inner.aic_attention_dp_size = value;
}
#[getter]
fn worker_type(&self) -> &'static str {
match self.inner.worker_type {
......@@ -415,7 +454,7 @@ impl MockEngineArgs {
}
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (bootstrap_port=None, zmq_kv_events_port=None, zmq_replay_port=None, kv_bytes_per_token=None, num_gpu_blocks=None, aic_backend=None, aic_system=None, aic_backend_version=None, aic_tp_size=None, aic_model_path=None, enable_prefix_caching=None, worker_type=None))]
#[pyo3(signature = (bootstrap_port=None, zmq_kv_events_port=None, zmq_replay_port=None, kv_bytes_per_token=None, num_gpu_blocks=None, aic_backend=None, aic_system=None, aic_backend_version=None, aic_tp_size=None, aic_model_path=None, aic_moe_tp_size=None, aic_moe_ep_size=None, aic_attention_dp_size=None, enable_prefix_caching=None, worker_type=None))]
fn with_overrides(
&self,
bootstrap_port: Option<u16>,
......@@ -428,6 +467,9 @@ impl MockEngineArgs {
aic_backend_version: Option<String>,
aic_tp_size: Option<usize>,
aic_model_path: Option<String>,
aic_moe_tp_size: Option<usize>,
aic_moe_ep_size: Option<usize>,
aic_attention_dp_size: Option<usize>,
enable_prefix_caching: Option<bool>,
worker_type: Option<String>,
) -> PyResult<Self> {
......@@ -462,6 +504,15 @@ impl MockEngineArgs {
if let Some(model_path) = aic_model_path {
inner.aic_model_path = Some(model_path);
}
if let Some(moe_tp_size) = aic_moe_tp_size {
inner.aic_moe_tp_size = Some(moe_tp_size);
}
if let Some(moe_ep_size) = aic_moe_ep_size {
inner.aic_moe_ep_size = Some(moe_ep_size);
}
if let Some(attention_dp_size) = aic_attention_dp_size {
inner.aic_attention_dp_size = Some(attention_dp_size);
}
if let Some(enable_prefix_caching) = enable_prefix_caching {
inner.enable_prefix_caching = enable_prefix_caching;
}
......@@ -880,6 +931,9 @@ fn materialize_replay_mocker_args(
.ok_or_else(|| PyException::new_err("--aic-perf-model requires --model-path"))?;
let backend_version = args.aic_backend_version.clone();
let tp_size = args.aic_tp_size.unwrap_or(1);
let moe_tp_size = args.aic_moe_tp_size;
let moe_ep_size = args.aic_moe_ep_size;
let attention_dp_size = args.aic_attention_dp_size;
let callback = create_aic_callback(
py,
&backend,
......@@ -887,6 +941,9 @@ fn materialize_replay_mocker_args(
&model_name,
tp_size,
backend_version.as_deref(),
moe_tp_size,
moe_ep_size,
attention_dp_size,
)
.map_err(|e| {
PyException::new_err(format!(
......
......@@ -1278,6 +1278,9 @@ class MockEngineArgs:
aic_backend_version: Optional[str] = None,
aic_tp_size: Optional[int] = None,
aic_model_path: Optional[str] = None,
aic_moe_tp_size: Optional[int] = None,
aic_moe_ep_size: Optional[int] = None,
aic_attention_dp_size: Optional[int] = None,
enable_local_indexer: bool = False,
bootstrap_port: Optional[int] = None,
kv_bytes_per_token: Optional[int] = None,
......@@ -1359,6 +1362,24 @@ class MockEngineArgs:
@aic_model_path.setter
def aic_model_path(self, value: Optional[str]) -> None: ...
@property
def aic_moe_tp_size(self) -> Optional[int]: ...
@aic_moe_tp_size.setter
def aic_moe_tp_size(self, value: Optional[int]) -> None: ...
@property
def aic_moe_ep_size(self) -> Optional[int]: ...
@aic_moe_ep_size.setter
def aic_moe_ep_size(self, value: Optional[int]) -> None: ...
@property
def aic_attention_dp_size(self) -> Optional[int]: ...
@aic_attention_dp_size.setter
def aic_attention_dp_size(self, value: Optional[int]) -> None: ...
@property
def worker_type(self) -> str: ...
......@@ -1381,6 +1402,9 @@ class MockEngineArgs:
aic_backend_version: Optional[str] = None,
aic_tp_size: Optional[int] = None,
aic_model_path: Optional[str] = None,
aic_moe_tp_size: Optional[int] = None,
aic_moe_ep_size: Optional[int] = None,
aic_attention_dp_size: Optional[int] = None,
enable_prefix_caching: Optional[bool] = None,
worker_type: Optional[str] = None,
) -> "MockEngineArgs": ...
......
......@@ -326,6 +326,25 @@ pub struct MockEngineArgs {
#[builder(default = "None")]
pub aic_model_path: Option<String>,
/// MoE tensor-parallel size for AIC latency prediction (e.g., 4 for pure MoE-TP).
/// Required for MoE models; must satisfy: aic_tp_size * aic_attention_dp_size == aic_moe_tp_size * aic_moe_ep_size.
#[serde(skip)]
#[builder(default = "None")]
pub aic_moe_tp_size: Option<usize>,
/// MoE expert-parallel size for AIC latency prediction (e.g., 4 for pure EP).
/// Required for MoE models; must satisfy: aic_tp_size * aic_attention_dp_size == aic_moe_tp_size * aic_moe_ep_size.
#[serde(skip)]
#[builder(default = "None")]
pub aic_moe_ep_size: Option<usize>,
/// Attention data-parallel size for AIC latency prediction (default: 1).
/// Corresponds to the `dp` dimension in AIC CLI output.
/// Must satisfy: aic_tp_size * aic_attention_dp_size == aic_moe_tp_size * aic_moe_ep_size.
#[serde(skip)]
#[builder(default = "None")]
pub aic_attention_dp_size: Option<usize>,
/// Enable worker-local KV indexer for tracking this worker's own KV cache state
#[builder(default = "false")]
pub enable_local_indexer: bool,
......@@ -493,6 +512,9 @@ impl MockEngineArgs {
"aic_backend_version",
"aic_tp_size",
"aic_model_path",
"aic_moe_tp_size",
"aic_moe_ep_size",
"aic_attention_dp_size",
"enable_local_indexer",
"bootstrap_port",
"kv_bytes_per_token",
......@@ -771,6 +793,21 @@ impl MockEngineArgs {
{
builder = builder.aic_model_path(Some(s.to_string()));
}
if let Some(v) = extra_args.get("aic_moe_tp_size")
&& let Some(n) = v.as_u64()
{
builder = builder.aic_moe_tp_size(Some(n as usize));
}
if let Some(v) = extra_args.get("aic_moe_ep_size")
&& let Some(n) = v.as_u64()
{
builder = builder.aic_moe_ep_size(Some(n as usize));
}
if let Some(v) = extra_args.get("aic_attention_dp_size")
&& let Some(n) = v.as_u64()
{
builder = builder.aic_attention_dp_size(Some(n as usize));
}
// Build the MockEngineArgs with either defaults or overridden values
builder
.build()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment