"components/vscode:/vscode.git/clone" did not exist on "d7c27e68eea25f1d6b0a89d3dede4700526a2771"
Unverified Commit 6d5da821 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(dynamo-run): Allow setting context-length (#1157)

Llama 4 has a very large context length (aka n_ctx, model_max_length, max_model_len), and vllm won't start unless it can allocate enough KV cache for the entire context.

Allow passing `--context-length <N>` to `dynamo-run` to limit it so long-context models will fit.

Future todo:
- Restrict every request's `max_tokens` to below the context length. Our pre-processor should do this by setting stop_conditions.max_tokens. mistralrs engine wrapper must do it itself because it does not use the pre-processor.
- mistralrs and llamacpp currently have a hard-coded max context length if one is not provided on the command line. Change those to be the model's built-in max, read from the GGUF or tokenizer_config.json.
parent 27e92701
......@@ -28,7 +28,7 @@ It supports these engines: mistralrs, llamacpp, sglang, vllm, and tensorrt-llm.
Usage:
```
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]
dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=echo_core|echo_full|mistralrs|llamacpp|sglang|vllm|dyn [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]
```
Example: `dynamo run Qwen/Qwen3-0.6B`
......@@ -318,7 +318,7 @@ dynamo-run out=llamacpp ~/llms/Qwen3-0.6B-Q8_0.gguf # From https://huggingface.c
Note that in some cases we are unable to extract the tokenizer from the GGUF, and so a Hugging Face checkout of a matching model must also be passed. Dynamo uses the weights from the GGUF and the pre-processor (`tokenizer.json`, etc) from the `--model-config`:
```
dynamo-run out=llamacpp ~/llms/Llama-4-Scout-17B-16E-Instruct-UD-IQ1_S.gguf --model-config ~/llms/Llama-4-Scout-17B-16E-Instruct
dynamo-run out=llamacpp ~/llms/Llama-4-Scout-17B-16E-Instruct-UD-IQ1_S.gguf --context-length 32768 --model-config ~/llms/Llama-4-Scout-17B-16E-Instruct
```
If you have multiple GPUs, llama.cpp does automatic tensor parallelism. You do not need to pass any extra flags to dynamo-run to enable it.
......@@ -414,6 +414,8 @@ Inside that virtualenv:
To pass extra arguments to the vllm engine see [Extra engine arguments](#extra-engine-arguments) below.
vllm attempts to allocate enough KV cache for the full context length at startup. If that does not fit in your available memory pass `--context-length <value>`.
**Multi-GPU**
Pass `--tensor-parallel-size <NUM-GPUS>` to `dynamo-run`.
......
......@@ -105,6 +105,12 @@ pub struct Flags {
#[arg(long, default_value = "round-robin")]
pub router_mode: RouterMode,
/// Max model context length. Reduce this if you don't have enough VRAM for the full model
/// context length (e.g. Llama 4).
/// Defaults to the model's max, which is usually model_max_length in tokenizer_config.json.
#[arg(long)]
pub context_length: Option<usize>,
/// Additional engine-specific arguments from a JSON file.
/// Contains a mapping of parameter names to values.
#[arg(long)]
......
......@@ -58,7 +58,7 @@ pub async fn run(
.clone()
.or(flags.model_path_flag.clone());
let local_model: LocalModel = match out_opt {
let mut local_model: LocalModel = match out_opt {
// If output is dynamic we are ingress and don't have a local model, but making an
// empty one cleans up the code.
Output::Dynamic => Default::default(),
......@@ -84,6 +84,7 @@ pub async fn run(
}
}
};
local_model.context_length = flags.context_length;
let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
......@@ -145,6 +146,7 @@ pub async fn run(
&local_model,
&endpoint,
flags.tensor_parallel_size,
flags.context_length,
if flags.base_gpu_id == 0 {
None
} else {
......@@ -189,6 +191,7 @@ pub async fn run(
&local_model,
&endpoint,
flags.tensor_parallel_size,
flags.context_length,
None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
None, // multi-node config. vllm uses `ray`, see guide
flags.extra_engine_args.as_deref(),
......@@ -215,8 +218,7 @@ pub async fn run(
anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
}
let engine =
dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
.await?;
dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
EngineConfig::StaticCore {
engine,
model: Box::new(local_model),
......
......@@ -30,7 +30,7 @@ Example:
- OR: ./dynamo-run /data/models/Llama-3.2-1B-Instruct-Q4_K_M.gguf
"#;
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn://<path> [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]";
const USAGE: &str = "USAGE: dynamo-run in=[http|text|dyn://<path>|batch:<folder>] out=ENGINE_LIST|dyn://<path> [--http-port 8080] [--model-path <path>] [--model-name <served-model-name>] [--model-config <hf-repo>] [--tensor-parallel-size=1] [--context-length=N] [--num-nodes=1] [--node-rank=0] [--leader-addr=127.0.0.1:9876] [--base-gpu-id=0] [--extra-engine-args=args.json] [--router-mode random|round-robin|kv]";
fn main() -> anyhow::Result<()> {
// Set log level based on verbosity flag
......
......@@ -18,6 +18,8 @@ use dynamo_runtime::protocols::Endpoint as EndpointId;
pub mod sglang;
pub mod vllm;
// TODO: I guess make a config object?
#[allow(clippy::too_many_arguments)]
pub async fn start(
// The Python code to run
py_script: &'static str,
......@@ -27,6 +29,8 @@ pub async fn start(
endpoint: &EndpointId,
// How many GPUs to use
tensor_parallel_size: u32,
// Max context length to allow
context_length: Option<usize>,
// sglang which GPU to start from, on a multi-GPU system
// vllm uses CUDA_VISIBLE_DEVICES
base_gpu_id: Option<u32>,
......@@ -53,6 +57,11 @@ pub async fn start(
"--kv-block-size".to_string(),
dynamo_llm::DEFAULT_KV_BLOCK_SIZE.to_string(),
];
if let Some(context_length) = context_length {
args.push("--context-length".to_string());
args.push(context_length.to_string());
}
// sglang only
if let Some(base_gpu_id) = base_gpu_id {
args.push("--base-gpu-id".to_string());
......
......@@ -35,6 +35,7 @@ class Config:
base_gpu_id: int
tensor_parallel_size: int
kv_block_size: int
context_length: int
nnodes: int
node_rank: int
dist_init_addr: str
......@@ -92,6 +93,9 @@ async def init(runtime: DistributedRuntime, config: Config):
"base_gpu_id": config.base_gpu_id,
"page_size": config.kv_block_size,
}
if config.context_length:
arg_map["context_length"] = config.context_length
if config.dist_init_addr != "":
arg_map["trust_remote_code"] = True
arg_map["nnodes"] = config.nnodes
......@@ -164,6 +168,12 @@ def cmd_line_args():
parser.add_argument(
"--kv-block-size", type=int, default=16, help="Size of a KV cache block."
)
parser.add_argument(
"--context-length",
type=int,
default=None,
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
)
parser.add_argument(
"--nnodes", type=int, default=1, help="The number of machines SGLang will use"
)
......@@ -211,6 +221,7 @@ def cmd_line_args():
config.base_gpu_id = args.base_gpu_id
config.tensor_parallel_size = args.tensor_parallel_size
config.kv_block_size = args.kv_block_size
config.context_length = args.context_length
config.nnodes = args.nnodes
config.node_rank = args.node_rank
config.dist_init_addr = args.dist_init_addr
......
......@@ -45,6 +45,7 @@ class Config:
model_name: Optional[str]
tensor_parallel_size: int
kv_block_size: int
context_length: int
extra_engine_args: str
......@@ -157,6 +158,10 @@ async def init(runtime: DistributedRuntime, config: Config):
# KV routing relies on logging KV metrics
"disable_log_stats": False,
}
if config.context_length:
# Usually we want it to default to the max (from tokenizer_config.json)
arg_map["max_model_len"] = config.context_length
if config.extra_engine_args != "":
json_map = {}
# extra_engine_args is a filename
......@@ -232,6 +237,12 @@ def cmd_line_args():
parser.add_argument(
"--kv-block-size", type=int, default=16, help="Size of a KV cache block."
)
parser.add_argument(
"--context-length",
type=int,
default=None,
help="Max model context length. Defaults to models max, usually model_max_length from tokenizer_config.json. Reducing this reduces VRAM requirements.",
)
parser.add_argument(
"--extra-engine-args",
type=str,
......@@ -263,6 +274,7 @@ def cmd_line_args():
config.endpoint = parsed_endpoint_name
config.tensor_parallel_size = args.tensor_parallel_size
config.kv_block_size = args.kv_block_size
config.context_length = args.context_length
config.extra_engine_args = args.extra_engine_args
return config
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
num::NonZeroU32,
......@@ -34,15 +22,16 @@ use llama_cpp_2::{
token::LlamaToken,
};
use dynamo_llm::backend::ExecutionContext;
use dynamo_llm::protocols::common::llm_backend::{BackendInput, LLMEngineOutput};
use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest;
use dynamo_llm::{backend::ExecutionContext, local_model::LocalModel};
/// If user does not provide a max_tokens limit prompt+output to this many
const DEFAULT_MAX_TOKENS: u32 = 8192;
// I'm not entirely sure what this is. The model context size surely comes from the GGUF??
const CONTEXT_SIZE: u32 = 8192;
/// If the user does not provide a context length limit default to this.
/// TODO: This should come from GGUF key {model}.context_length
const CONTEXT_LENGTH: u32 = 32768;
static LLAMA_BACKEND: tokio::sync::OnceCell<LlamaBackend> = tokio::sync::OnceCell::const_new();
pub(crate) static LLAMA_MODEL: tokio::sync::OnceCell<LlamaModel> =
......@@ -59,9 +48,9 @@ unsafe impl Sync for ContextWrapper {} // LlamaContext has a NonNull which is !S
pub async fn make_engine(
cancel_token: CancellationToken,
model_path: &Path,
model: &LocalModel,
) -> pipeline_error::Result<ExecutionContext> {
let engine = LlamacppEngine::new(cancel_token, model_path).await?;
let engine = LlamacppEngine::new(cancel_token, model).await?;
let engine: ExecutionContext = Arc::new(engine);
Ok(engine)
}
......@@ -79,16 +68,20 @@ struct LlamacppEngine {
impl LlamacppEngine {
async fn new(
cancel_token: CancellationToken,
model_path: &Path,
model_config: &LocalModel,
) -> pipeline_error::Result<Self> {
let backend = LlamaBackend::init()?;
let model = load_model(&backend, model_path)?;
let model = load_model(&backend, model_config.path())?;
LLAMA_MODEL.set(model)?;
let (ctx_set, ctx_get) = tokio::sync::mpsc::channel(NUM_CONTEXTS);
// Safety: NonZeroU32::new only errors if we give it a zero
let context_size = NonZeroU32::new(CONTEXT_SIZE).unwrap();
let llama_ctx_params = LlamaContextParams::default().with_n_ctx(Some(context_size));
let n_ctx = NonZeroU32::new(
model_config
.context_length
.map(|n| n as u32)
.unwrap_or(CONTEXT_LENGTH),
);
let llama_ctx_params = LlamaContextParams::default().with_n_ctx(n_ctx);
for (i, ctx_holder) in LLAMA_CONTEXTS.iter().enumerate().take(NUM_CONTEXTS) {
let llama_ctx = LLAMA_MODEL
.get()
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::{num::NonZero, sync::Arc};
......@@ -139,7 +127,10 @@ impl MistralRsEngine {
.build(None)?
};
let max_seq_len = AutoDeviceMapParams::DEFAULT_MAX_SEQ_LEN;
// TODO: The default max seq len should come from the model not be hard coded
let max_seq_len = model
.context_length
.unwrap_or(AutoDeviceMapParams::DEFAULT_MAX_SEQ_LEN);
// Paged attention requires cuda
let paged_attention_config = if cfg!(feature = "cuda") && EXP_ENABLE_PAGED_ATTENTION {
......
......@@ -27,6 +27,10 @@ const DEFAULT_NAME: &str = "dynamo";
pub struct LocalModel {
full_path: PathBuf,
card: ModelDeploymentCard,
/// The max context the engine will allow us sending to the model.
/// If not set this defaults to the engine's configured maximum.
pub context_length: Option<usize>,
}
impl Default for LocalModel {
......@@ -34,6 +38,7 @@ impl Default for LocalModel {
LocalModel {
full_path: PathBuf::new(),
card: ModelDeploymentCard::with_name_only(DEFAULT_NAME),
context_length: None,
}
}
}
......@@ -115,7 +120,11 @@ impl LocalModel {
let mut card = ModelDeploymentCard::load(&model_config_path).await?;
card.set_name(&model_name);
Ok(LocalModel { full_path, card })
Ok(LocalModel {
full_path,
card,
..Default::default()
})
}
/// Attach this model the endpoint. This registers it on the network
......
......@@ -155,12 +155,10 @@ impl TcpClient {
}
};
tracing::debug!("joining reader and writer");
let mut stream = reader.unsplit(writer);
// await the tcp server to shutdown the socket connection
// set a timeout for the server shutdown
tracing::debug!("awaiting server shutdown");
let mut buf = vec![0u8; 1024];
let deadline = Instant::now() + Duration::from_secs(10);
loop {
......@@ -174,7 +172,6 @@ impl TcpClient {
})?;
if n == 0 {
// Server has closed (FIN)
log::debug!("server closed the connection");
break;
}
}
......@@ -254,7 +251,6 @@ async fn handle_reader(
}
}
_ = alive_tx.closed() => {
tracing::debug!("writer stream closed; shutting down");
break;
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment