Unverified Commit abc02c68 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: llm/mocker: Remove the llm -> mocker crate dependency, move config (#6998)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent c536bf76
...@@ -1944,7 +1944,6 @@ dependencies = [ ...@@ -1944,7 +1944,6 @@ dependencies = [
"dynamo-bench", "dynamo-bench",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-memory", "dynamo-memory",
"dynamo-mocker",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
...@@ -2034,25 +2033,31 @@ name = "dynamo-mocker" ...@@ -2034,25 +2033,31 @@ name = "dynamo-mocker"
version = "1.0.0" version = "1.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-llm",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
"futures",
"ndarray 0.16.1", "ndarray 0.16.1",
"ndarray-interp", "ndarray-interp",
"ndarray-npy", "ndarray-npy",
"rand 0.9.2", "rand 0.9.2",
"rmp-serde",
"rstest 0.18.2", "rstest 0.18.2",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream",
"tokio-timerfd", "tokio-timerfd",
"tokio-util", "tokio-util",
"tracing", "tracing",
"uuid", "uuid",
"validator", "validator",
"zeromq",
] ]
[[package]] [[package]]
......
...@@ -10,6 +10,7 @@ import json ...@@ -10,6 +10,7 @@ import json
import logging import logging
import os import os
import signal import signal
import socket
import tempfile import tempfile
from pathlib import Path from pathlib import Path
...@@ -17,7 +18,14 @@ import uvloop ...@@ -17,7 +18,14 @@ import uvloop
os.environ.setdefault("DYN_COMPUTE_THREADS", "0") os.environ.setdefault("DYN_COMPUTE_THREADS", "0")
from dynamo.llm import EngineType, EntrypointArgs, fetch_model, make_engine, run_input from dynamo.llm import (
EngineType,
EntrypointArgs,
ModelRuntimeConfig,
fetch_model,
make_engine,
run_input,
)
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
...@@ -136,6 +144,46 @@ def compute_stagger_delay(num_workers: int, stagger_delay: float) -> float: ...@@ -136,6 +144,46 @@ def compute_stagger_delay(num_workers: int, stagger_delay: float) -> float:
return 0.2 return 0.2
def _build_runtime_config(
engine_args: dict,
) -> tuple[int, ModelRuntimeConfig]:
"""Build a ModelRuntimeConfig from the engine args dict.
Returns (kv_cache_block_size, runtime_config). Defaults match
the Rust MockEngineArgsBuilder so hand-crafted JSON files that
omit fields behave identically.
"""
is_prefill = engine_args.get("is_prefill", False)
is_decode = engine_args.get("is_decode", False)
rc = ModelRuntimeConfig()
rc.total_kv_blocks = engine_args.get("num_gpu_blocks", 16384)
if (v := engine_args.get("max_num_seqs")) is not None:
rc.max_num_seqs = v
if (v := engine_args.get("max_num_batched_tokens")) is not None:
rc.max_num_batched_tokens = v
rc.enable_local_indexer = (
engine_args.get("enable_local_indexer", False) and not is_decode
)
rc.data_parallel_size = engine_args.get("dp_size", 1)
bootstrap_port = engine_args.get("bootstrap_port")
if is_prefill and bootstrap_port is not None:
host = os.environ.get(
"DYN_HTTP_RPC_HOST", socket.gethostbyname(socket.gethostname())
)
rc.set_disaggregated_endpoint(
bootstrap_host=host, bootstrap_port=bootstrap_port
)
logger.info(
"Mocker prefill worker: publishing bootstrap endpoint to discovery "
f"(bootstrap_port={bootstrap_port})"
)
block_size = engine_args.get("block_size", 64)
return block_size, rc
async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path): async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path):
"""Launch mocker worker(s) with isolated DistributedRuntime instances. """Launch mocker worker(s) with isolated DistributedRuntime instances.
...@@ -165,14 +213,13 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path) ...@@ -165,14 +213,13 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
f"(estimated total: {total_time:.1f}s)" f"(estimated total: {total_time:.1f}s)"
) )
# Load base engine args if we need to create per-worker files # Always load base engine args for runtime config construction
with open(extra_engine_args_path) as f:
base_engine_args = json.load(f)
needs_per_worker_args = bool( needs_per_worker_args = bool(
args.bootstrap_ports_list or args.zmq_kv_events_ports_list args.bootstrap_ports_list or args.zmq_kv_events_ports_list
) )
base_engine_args = None
if needs_per_worker_args:
with open(extra_engine_args_path) as f:
base_engine_args = json.load(f)
for worker_id in range(args.num_workers): for worker_id in range(args.num_workers):
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}") logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
...@@ -185,10 +232,9 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path) ...@@ -185,10 +232,9 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
) )
runtimes.append(runtime) runtimes.append(runtime)
# Determine which engine args file to use # Determine which engine args file and dict to use
worker_engine_args_path: Path | str worker_engine_args_path: Path | str
if needs_per_worker_args: if needs_per_worker_args:
assert base_engine_args is not None
worker_args = base_engine_args.copy() worker_args = base_engine_args.copy()
if args.bootstrap_ports_list: if args.bootstrap_ports_list:
worker_args["bootstrap_port"] = args.bootstrap_ports_list[worker_id] worker_args["bootstrap_port"] = args.bootstrap_ports_list[worker_id]
...@@ -204,8 +250,11 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path) ...@@ -204,8 +250,11 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
per_worker_temp_files.append(worker_engine_args_path) per_worker_temp_files.append(worker_engine_args_path)
logger.debug(f"Worker {worker_id}: per-worker args {worker_args}") logger.debug(f"Worker {worker_id}: per-worker args {worker_args}")
else: else:
worker_args = base_engine_args
worker_engine_args_path = extra_engine_args_path worker_engine_args_path = extra_engine_args_path
kv_cache_block_size, runtime_config = _build_runtime_config(worker_args)
# Create EntrypointArgs for this worker # Create EntrypointArgs for this worker
entrypoint_args = EntrypointArgs( entrypoint_args = EntrypointArgs(
engine_type=EngineType.Mocker, engine_type=EngineType.Mocker,
...@@ -213,6 +262,8 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path) ...@@ -213,6 +262,8 @@ async def launch_workers(args: argparse.Namespace, extra_engine_args_path: Path)
model_name=args.model_name, model_name=args.model_name,
endpoint_id=args.endpoint, endpoint_id=args.endpoint,
extra_engine_args=str(worker_engine_args_path), extra_engine_args=str(worker_engine_args_path),
runtime_config=runtime_config,
kv_cache_block_size=kv_cache_block_size,
is_prefill=args.is_prefill_worker, is_prefill=args.is_prefill_worker,
) )
......
...@@ -1568,7 +1568,6 @@ dependencies = [ ...@@ -1568,7 +1568,6 @@ dependencies = [
"dynamo-async-openai", "dynamo-async-openai",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-memory", "dynamo-memory",
"dynamo-mocker",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
...@@ -1641,31 +1640,6 @@ dependencies = [ ...@@ -1641,31 +1640,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "dynamo-mocker"
version = "1.0.0"
dependencies = [
"anyhow",
"dashmap 6.1.0",
"derive-getters",
"derive_builder",
"dynamo-kv-router",
"dynamo-runtime",
"dynamo-tokens",
"ndarray",
"ndarray-interp",
"ndarray-npy",
"rand 0.9.2",
"serde",
"serde_json",
"tokio",
"tokio-timerfd",
"tokio-util",
"tracing",
"uuid",
"validator",
]
[[package]] [[package]]
name = "dynamo-parsers" name = "dynamo-parsers"
version = "1.0.0" version = "1.0.0"
...@@ -3340,12 +3314,6 @@ dependencies = [ ...@@ -3340,12 +3314,6 @@ dependencies = [
"redox_syscall 0.7.3", "redox_syscall 0.7.3",
] ]
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.12.1" version = "0.12.1"
...@@ -3772,31 +3740,6 @@ dependencies = [ ...@@ -3772,31 +3740,6 @@ dependencies = [
"rawpointer", "rawpointer",
] ]
[[package]]
name = "ndarray-interp"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e43087829efb5ec2736598e88587df286425b59df5a9ce991994cdd2c5855d3f"
dependencies = [
"ndarray",
"num-traits",
"thiserror 2.0.18",
]
[[package]]
name = "ndarray-npy"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b313788c468c49141a9d9b6131fc15f403e6ef4e8446a0b2e18f664ddb278a9"
dependencies = [
"byteorder",
"ndarray",
"num-complex",
"num-traits",
"py_literal",
"zip 2.4.2",
]
[[package]] [[package]]
name = "neli" name = "neli"
version = "0.7.4" version = "0.7.4"
...@@ -5000,19 +4943,6 @@ version = "0.1.28" ...@@ -5000,19 +4943,6 @@ version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d" checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d"
[[package]]
name = "py_literal"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "102df7a3d46db9d3891f178dcc826dc270a6746277a9ae6436f8d29fd490a8e1"
dependencies = [
"num-bigint",
"num-complex",
"num-traits",
"pest",
"pest_derive",
]
[[package]] [[package]]
name = "pyo3" name = "pyo3"
version = "0.23.5" version = "0.23.5"
...@@ -5610,19 +5540,6 @@ dependencies = [ ...@@ -5610,19 +5540,6 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags 2.11.0",
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "1.1.4" version = "1.1.4"
...@@ -5632,7 +5549,7 @@ dependencies = [ ...@@ -5632,7 +5549,7 @@ dependencies = [
"bitflags 2.11.0", "bitflags 2.11.0",
"errno", "errno",
"libc", "libc",
"linux-raw-sys 0.12.1", "linux-raw-sys",
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
...@@ -6383,7 +6300,7 @@ dependencies = [ ...@@ -6383,7 +6300,7 @@ dependencies = [
"fastrand", "fastrand",
"getrandom 0.4.2", "getrandom 0.4.2",
"once_cell", "once_cell",
"rustix 1.1.4", "rustix",
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
...@@ -6498,15 +6415,6 @@ dependencies = [ ...@@ -6498,15 +6415,6 @@ dependencies = [
"time-core", "time-core",
] ]
[[package]]
name = "timerfd"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84e482e368cf7efa2c8b570f476e5b9fd9fd5e9b9219fc567832b05f13511091"
dependencies = [
"rustix 0.38.44",
]
[[package]] [[package]]
name = "tiny-keccak" name = "tiny-keccak"
version = "2.0.2" version = "2.0.2"
...@@ -6647,19 +6555,6 @@ dependencies = [ ...@@ -6647,19 +6555,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-timerfd"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87eecdae9a9b793843b1df7a64bc136f203443c1ca9889b3c4a39590afa51094"
dependencies = [
"futures-core",
"libc",
"slab",
"timerfd",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"
...@@ -7308,7 +7203,7 @@ dependencies = [ ...@@ -7308,7 +7203,7 @@ dependencies = [
"serde_json", "serde_json",
"url", "url",
"utoipa", "utoipa",
"zip 3.0.0", "zip",
] ]
[[package]] [[package]]
...@@ -8207,23 +8102,6 @@ dependencies = [ ...@@ -8207,23 +8102,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "zip"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fabe6324e908f85a1c52063ce7aa26b68dcb7eb6dbc83a2d148403c9bc3eba50"
dependencies = [
"arbitrary",
"crc32fast",
"crossbeam-utils",
"displaydoc",
"flate2",
"indexmap 2.13.0",
"memchr",
"thiserror 2.0.18",
"zopfli",
]
[[package]] [[package]]
name = "zip" name = "zip"
version = "3.0.0" version = "3.0.0"
......
...@@ -1576,7 +1576,6 @@ dependencies = [ ...@@ -1576,7 +1576,6 @@ dependencies = [
"dynamo-async-openai", "dynamo-async-openai",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-memory", "dynamo-memory",
"dynamo-mocker",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
...@@ -1657,24 +1656,30 @@ name = "dynamo-mocker" ...@@ -1657,24 +1656,30 @@ name = "dynamo-mocker"
version = "1.0.0" version = "1.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-llm",
"dynamo-runtime", "dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
"futures",
"ndarray", "ndarray",
"ndarray-interp", "ndarray-interp",
"ndarray-npy", "ndarray-npy",
"rand 0.9.2", "rand 0.9.2",
"rmp-serde",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream",
"tokio-timerfd", "tokio-timerfd",
"tokio-util", "tokio-util",
"tracing", "tracing",
"uuid", "uuid",
"validator", "validator",
"zeromq",
] ]
[[package]] [[package]]
...@@ -1701,6 +1706,7 @@ dependencies = [ ...@@ -1701,6 +1706,7 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"dynamo-llm", "dynamo-llm",
"dynamo-mocker",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"futures", "futures",
......
...@@ -27,6 +27,7 @@ media-ffmpeg = ["dynamo-llm/media-ffmpeg"] ...@@ -27,6 +27,7 @@ media-ffmpeg = ["dynamo-llm/media-ffmpeg"]
[dependencies] [dependencies]
dynamo-runtime = { path = "../../runtime" } dynamo-runtime = { path = "../../runtime" }
dynamo-mocker = { path = "../../mocker" }
dynamo-parsers = { path = "../../parsers" } dynamo-parsers = { path = "../../parsers" }
anyhow = { version = "1" } anyhow = { version = "1" }
......
...@@ -18,12 +18,13 @@ use dynamo_llm::entrypoint::input::Input; ...@@ -18,12 +18,13 @@ use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig; use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig;
use dynamo_llm::local_model::DEFAULT_HTTP_PORT; use dynamo_llm::local_model::DEFAULT_HTTP_PORT;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder}; use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_llm::mocker::protocols::MockEngineArgs;
use dynamo_llm::model_card::ModelDeploymentCard as RsModelDeploymentCard; use dynamo_llm::model_card::ModelDeploymentCard as RsModelDeploymentCard;
use dynamo_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine; use dynamo_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
use dynamo_mocker::common::protocols::MockEngineArgs;
use dynamo_runtime::discovery::ModelCardInstanceId as RsModelCardInstanceId; use dynamo_runtime::discovery::ModelCardInstanceId as RsModelCardInstanceId;
use dynamo_runtime::protocols::EndpointId; use dynamo_runtime::protocols::EndpointId;
use super::local_model::ModelRuntimeConfig;
use super::model_card::ModelDeploymentCard; use super::model_card::ModelDeploymentCard;
use crate::RouterMode; use crate::RouterMode;
use crate::engine::PythonAsyncEngine; use crate::engine::PythonAsyncEngine;
...@@ -183,6 +184,7 @@ pub(crate) struct EntrypointArgs { ...@@ -183,6 +184,7 @@ pub(crate) struct EntrypointArgs {
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>, extra_engine_args: Option<PathBuf>,
runtime_config: Option<ModelRuntimeConfig>,
namespace: Option<String>, namespace: Option<String>,
namespace_prefix: Option<String>, namespace_prefix: Option<String>,
is_prefill: bool, is_prefill: bool,
...@@ -194,7 +196,7 @@ pub(crate) struct EntrypointArgs { ...@@ -194,7 +196,7 @@ pub(crate) struct EntrypointArgs {
impl EntrypointArgs { impl EntrypointArgs {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[new] #[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, http_metrics_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, namespace_prefix=None, is_prefill=false, migration_limit=0, chat_engine_factory=None))] #[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, http_metrics_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, runtime_config=None, namespace=None, namespace_prefix=None, is_prefill=false, migration_limit=0, chat_engine_factory=None))]
pub fn new( pub fn new(
py: Python<'_>, py: Python<'_>,
engine_type: EngineType, engine_type: EngineType,
...@@ -211,6 +213,7 @@ impl EntrypointArgs { ...@@ -211,6 +213,7 @@ impl EntrypointArgs {
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>, extra_engine_args: Option<PathBuf>,
runtime_config: Option<ModelRuntimeConfig>,
namespace: Option<String>, namespace: Option<String>,
namespace_prefix: Option<String>, namespace_prefix: Option<String>,
is_prefill: bool, is_prefill: bool,
...@@ -257,6 +260,7 @@ impl EntrypointArgs { ...@@ -257,6 +260,7 @@ impl EntrypointArgs {
tls_cert_path, tls_cert_path,
tls_key_path, tls_key_path,
extra_engine_args, extra_engine_args,
runtime_config,
namespace, namespace,
namespace_prefix, namespace_prefix,
is_prefill, is_prefill,
...@@ -301,6 +305,7 @@ pub fn make_engine<'p>( ...@@ -301,6 +305,7 @@ pub fn make_engine<'p>(
.tls_key_path(args.tls_key_path.clone()) .tls_key_path(args.tls_key_path.clone())
.is_mocker(matches!(args.engine_type, EngineType::Mocker)) .is_mocker(matches!(args.engine_type, EngineType::Mocker))
.extra_engine_args(args.extra_engine_args.clone()) .extra_engine_args(args.extra_engine_args.clone())
.runtime_config(args.runtime_config.clone().unwrap_or_default().inner)
.namespace(args.namespace.clone()) .namespace(args.namespace.clone())
.namespace_prefix(args.namespace_prefix.clone()); .namespace_prefix(args.namespace_prefix.clone());
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
...@@ -419,12 +424,9 @@ async fn select_engine( ...@@ -419,12 +424,9 @@ async fn select_engine(
let endpoint = local_model.endpoint_id().clone(); let endpoint = local_model.endpoint_id().clone();
let engine = dynamo_llm::mocker::make_mocker_engine( let engine =
distributed_runtime.inner, dynamo_mocker::make_mocker_engine(distributed_runtime.inner, endpoint, mocker_args)
endpoint, .await?;
mocker_args,
)
.await?;
RsEngineConfig::InProcessTokens { RsEngineConfig::InProcessTokens {
engine, engine,
......
...@@ -6,7 +6,7 @@ use llm_rs::local_model::runtime_config::DisaggregatedEndpoint as RsDisaggregate ...@@ -6,7 +6,7 @@ use llm_rs::local_model::runtime_config::DisaggregatedEndpoint as RsDisaggregate
use llm_rs::local_model::runtime_config::ModelRuntimeConfig as RsModelRuntimeConfig; use llm_rs::local_model::runtime_config::ModelRuntimeConfig as RsModelRuntimeConfig;
#[pyclass] #[pyclass]
#[derive(Clone, Default)] #[derive(Clone, Debug, Default)]
pub struct ModelRuntimeConfig { pub struct ModelRuntimeConfig {
pub(crate) inner: RsModelRuntimeConfig, pub(crate) inner: RsModelRuntimeConfig,
} }
......
...@@ -452,11 +452,15 @@ class ModelRuntimeConfig: ...@@ -452,11 +452,15 @@ class ModelRuntimeConfig:
max_num_batched_tokens: int | None max_num_batched_tokens: int | None
tool_call_parser: str | None tool_call_parser: str | None
reasoning_parser: str | None reasoning_parser: str | None
data_parallel_start_rank: int
data_parallel_size: int
enable_local_indexer: bool enable_local_indexer: bool
runtime_data: dict[str, Any] runtime_data: dict[str, Any]
tensor_model_config: Any | None tensor_model_config: Any | None
data_parallel_size: int data_parallel_size: int
data_parallel_start_rank: int data_parallel_start_rank: int
bootstrap_host: str | None
bootstrap_port: int | None
def __init__(self) -> None: ... def __init__(self) -> None: ...
...@@ -476,6 +480,14 @@ class ModelRuntimeConfig: ...@@ -476,6 +480,14 @@ class ModelRuntimeConfig:
"""Set the disaggregated endpoint for the model""" """Set the disaggregated endpoint for the model"""
... ...
def set_tensor_model_config(self, tensor_model_config: Dict[str, Any]) -> None:
"""Set the tensor model configuration from a dictionary."""
...
def get_tensor_model_config(self) -> Any | None:
"""Get the tensor model configuration."""
...
class OverlapScores: class OverlapScores:
""" """
A collection of prefix matching scores of workers for a given token ids. A collection of prefix matching scores of workers for a given token ids.
...@@ -1597,7 +1609,9 @@ class EntrypointArgs: ...@@ -1597,7 +1609,9 @@ class EntrypointArgs:
tls_cert_path: Optional[str] = None, tls_cert_path: Optional[str] = None,
tls_key_path: Optional[str] = None, tls_key_path: Optional[str] = None,
extra_engine_args: Optional[str] = None, extra_engine_args: Optional[str] = None,
runtime_config: Optional[ModelRuntimeConfig] = None,
namespace: Optional[str] = None, namespace: Optional[str] = None,
namespace_prefix: Optional[str] = None,
is_prefill: bool = False, is_prefill: bool = False,
migration_limit: int = 0, migration_limit: int = 0,
chat_engine_factory: Optional[Callable] = None, chat_engine_factory: Optional[Callable] = None,
...@@ -1620,7 +1634,9 @@ class EntrypointArgs: ...@@ -1620,7 +1634,9 @@ class EntrypointArgs:
tls_cert_path: TLS certificate path (PEM format) tls_cert_path: TLS certificate path (PEM format)
tls_key_path: TLS key path (PEM format) tls_key_path: TLS key path (PEM format)
extra_engine_args: Path to extra engine arguments file extra_engine_args: Path to extra engine arguments file
runtime_config: Optional runtime configuration for discovery registration
namespace: Dynamo namespace for model discovery scoping namespace: Dynamo namespace for model discovery scoping
namespace_prefix: Optional namespace prefix
is_prefill: Whether this is a prefill worker is_prefill: Whether this is a prefill worker
migration_limit: Maximum number of request migrations (0=disabled) migration_limit: Maximum number of request migrations (0=disabled)
chat_engine_factory: Optional Python chat completions engine factory callback chat_engine_factory: Optional Python chat completions engine factory callback
......
...@@ -49,7 +49,6 @@ dynamo-runtime = { workspace = true } ...@@ -49,7 +49,6 @@ dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true } dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true, features = ["metrics"] } dynamo-kv-router = { workspace = true, features = ["metrics"] }
dynamo-memory = { workspace = true } dynamo-memory = { workspace = true }
dynamo-mocker = { workspace = true }
# workspace # workspace
aho-corasick = "1.1" aho-corasick = "1.1"
......
...@@ -25,7 +25,6 @@ pub mod kv_router; ...@@ -25,7 +25,6 @@ pub mod kv_router;
pub mod local_model; pub mod local_model;
pub mod lora; pub mod lora;
pub mod migration; pub mod migration;
pub mod mocker;
pub mod model_card; pub mod model_card;
pub mod model_type; pub mod model_type;
pub mod namespace; pub mod namespace;
......
...@@ -11,10 +11,8 @@ use dynamo_runtime::discovery::DiscoverySpec; ...@@ -11,10 +11,8 @@ use dynamo_runtime::discovery::DiscoverySpec;
use dynamo_runtime::protocols::EndpointId; use dynamo_runtime::protocols::EndpointId;
use dynamo_runtime::slug::Slug; use dynamo_runtime::slug::Slug;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::utils::get_http_rpc_host_from_env;
use crate::entrypoint::RouterConfig; use crate::entrypoint::RouterConfig;
use crate::mocker::protocols::{MockEngineArgs, WorkerType};
use crate::model_card::ModelDeploymentCard; use crate::model_card::ModelDeploymentCard;
use crate::model_type::{ModelInput, ModelType}; use crate::model_type::{ModelInput, ModelType};
use crate::preprocessor::media::{MediaDecoder, MediaFetcher}; use crate::preprocessor::media::{MediaDecoder, MediaFetcher};
...@@ -235,41 +233,6 @@ impl LocalModelBuilder { ...@@ -235,41 +233,6 @@ impl LocalModelBuilder {
.map(RequestTemplate::load) .map(RequestTemplate::load)
.transpose()?; .transpose()?;
// Override runtime configs with mocker engine args (applies to both paths)
if self.is_mocker
&& let Some(path) = &self.extra_engine_args
{
let mocker_engine_args = MockEngineArgs::from_json_file(path)
.expect("Failed to load mocker engine args for runtime config overriding.");
self.kv_cache_block_size = mocker_engine_args.block_size as u32;
self.runtime_config.total_kv_blocks = Some(mocker_engine_args.num_gpu_blocks as u64);
self.runtime_config.max_num_seqs = mocker_engine_args.max_num_seqs.map(|v| v as u64);
self.runtime_config.max_num_batched_tokens =
mocker_engine_args.max_num_batched_tokens.map(|v| v as u64);
// Decode workers don't create the WorkerKvQuery endpoint (scheduler_component is None),
// so they must not advertise enable_local_indexer=true or the router will hang
// trying to query them during initial recovery.
self.runtime_config.enable_local_indexer = mocker_engine_args.enable_local_indexer
&& mocker_engine_args.worker_type != WorkerType::Decode;
self.runtime_config.data_parallel_size = mocker_engine_args.dp_size;
// Set bootstrap endpoint for prefill workers with bootstrap_port configured
if mocker_engine_args.worker_type == WorkerType::Prefill
&& let Some(port) = mocker_engine_args.bootstrap_port
{
let host = get_http_rpc_host_from_env();
self.runtime_config.disaggregated_endpoint =
Some(runtime_config::DisaggregatedEndpoint {
bootstrap_host: Some(host),
bootstrap_port: Some(port),
});
tracing::info!(
bootstrap_port = port,
"Mocker prefill worker: publishing bootstrap endpoint to discovery"
);
}
}
// frontend and echo engine don't need a path. // frontend and echo engine don't need a path.
if self.model_path.is_none() { if self.model_path.is_none() {
let mut card = ModelDeploymentCard::with_name_only( let mut card = ModelDeploymentCard::with_name_only(
......
...@@ -12,13 +12,14 @@ repository.workspace = true ...@@ -12,13 +12,14 @@ repository.workspace = true
[dependencies] [dependencies]
# repo # repo
dynamo-kv-router = { workspace = true }
dynamo-runtime = { workspace = true } dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true } dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true }
# workspace # workspace
anyhow = { workspace = true } anyhow = { workspace = true }
validator = { workspace = true } bytes = { workspace = true }
futures = { workspace = true }
dashmap = { workspace = true } dashmap = { workspace = true }
derive_builder = { workspace = true } derive_builder = { workspace = true }
derive-getters = { workspace = true } derive-getters = { workspace = true }
...@@ -26,17 +27,25 @@ rand = { workspace = true } ...@@ -26,17 +27,25 @@ rand = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
validator = { workspace = true }
# crate-specific # crate-specific
ndarray = "0.16" ndarray = "0.16"
ndarray-npy = "0.9" ndarray-npy = "0.9"
ndarray-interp = "0.5" ndarray-interp = "0.5"
zeromq = "0.4.1"
rmp-serde = "1.3"
[target.'cfg(target_os = "linux")'.dependencies] [target.'cfg(target_os = "linux")'.dependencies]
dynamo-llm = { workspace = true }
tokio-timerfd = "0.2" tokio-timerfd = "0.2"
[target.'cfg(not(target_os = "linux"))'.dependencies]
dynamo-llm = { path = "../llm", default-features = false }
[dev-dependencies] [dev-dependencies]
rstest = "0.18.2" rstest = "0.18.2"
...@@ -10,4 +10,6 @@ ...@@ -10,4 +10,6 @@
pub mod cache; pub mod cache;
pub mod common; pub mod common;
pub mod kv_manager; pub mod kv_manager;
pub mod mocker;
pub use mocker::make_mocker_engine; // Re-export nicely for bindings
pub mod scheduler; pub mod scheduler;
...@@ -12,6 +12,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; ...@@ -12,6 +12,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Result; use anyhow::Result;
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use dynamo_llm::backend::ExecutionContext;
use futures::StreamExt; use futures::StreamExt;
use rand::Rng; use rand::Rng;
use serde::Serialize; use serde::Serialize;
...@@ -30,21 +31,18 @@ use dynamo_runtime::{ ...@@ -30,21 +31,18 @@ use dynamo_runtime::{
traits::DistributedRuntimeProvider, traits::DistributedRuntimeProvider,
}; };
use crate::kv_router::publisher::{KvEventPublisher, KvEventSourceConfig, WorkerMetricsPublisher};
use crate::protocols::TokenIdType;
use crate::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};
use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData}; use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData};
use dynamo_llm::kv_router::publisher::{
// Re-export from dynamo-mocker for convenience KvEventPublisher, KvEventSourceConfig, WorkerMetricsPublisher,
use dynamo_mocker::common::bootstrap::{BootstrapServer, connect_to_prefill};
use dynamo_mocker::common::protocols::OutputSignal;
pub use dynamo_mocker::common::protocols::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MockEngineArgsBuilder,
}; };
use dynamo_mocker::common::utils::{compute_kv_transfer_delay, sleep_precise}; use dynamo_llm::protocols::TokenIdType;
pub use dynamo_mocker::common::{bootstrap, perf_model, protocols, running_mean, sequence}; use dynamo_llm::protocols::common::llm_backend::{LLMEngineOutput, PreprocessedRequest};
pub use dynamo_mocker::scheduler::Scheduler;
pub use dynamo_mocker::{kv_manager, scheduler}; use crate::common::bootstrap::{BootstrapServer, connect_to_prefill};
use crate::common::protocols::OutputSignal;
use crate::common::protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs};
use crate::common::utils::{compute_kv_transfer_delay, sleep_precise};
use crate::scheduler::Scheduler;
pub const MOCKER_COMPONENT: &str = "mocker"; pub const MOCKER_COMPONENT: &str = "mocker";
...@@ -706,7 +704,7 @@ pub async fn make_mocker_engine( ...@@ -706,7 +704,7 @@ pub async fn make_mocker_engine(
distributed_runtime: DistributedRuntime, distributed_runtime: DistributedRuntime,
endpoint_id: dynamo_runtime::protocols::EndpointId, endpoint_id: dynamo_runtime::protocols::EndpointId,
args: MockEngineArgs, args: MockEngineArgs,
) -> Result<crate::backend::ExecutionContext, Error> { ) -> Result<ExecutionContext, Error> {
// Create the mocker engine // Create the mocker engine
tracing::info!("Creating mocker engine with config: {args:?}"); tracing::info!("Creating mocker engine with config: {args:?}");
let annotated_engine = let annotated_engine =
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment