Unverified Commit 87e6e052 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: Interactive inputs actually stops, does not ignore stop token (#3057)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent bc29b594
...@@ -318,36 +318,19 @@ dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/Tin ...@@ -318,36 +318,19 @@ dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/Tin
dynamo-run in=http out=auto --router-mode kv dynamo-run in=http out=auto --router-mode kv
``` ```
### echo_full ### echo
The `echo_full` engine accepts un-processed requests and echoes the prompt back as the response. The `echo` engine echoes the prompt back as the response.
``` ```
dynamo-run in=http out=echo_full --model-name my_model dynamo-run in=http out=echo --model-name my_model
``` ```
### echo_core The echo engine uses a configurable delay between tokens to simulate generation speed. You can adjust this using the `DYN_TOKEN_ECHO_DELAY_MS` environment variable:
The `echo_core` engine accepts pre-processed requests and echoes the tokens back as the response. This is useful for testing pre-processing functionality as the response includes the full prompt template.
```
dynamo-run in=http out=echo_core --model-path <hf-repo-checkout>
```
Note that to use it with `in=http` you need to tell the post processor to ignore stop tokens from the template by adding `nvext.ignore_eos` like this:
```
curl -N -d '{"nvext": {"ignore_eos": true}, "stream": true, "model": "Qwen2.5-3B-Instruct", "max_completion_tokens": 4096, "messages":[{"role":"user", "content": "Tell me a story" }]}' ...
```
The default `in=text` sets that for you.
### Echo Configuration
Both echo engines use a configurable delay between tokens to simulate generation speed. You can adjust this using the `DYN_TOKEN_ECHO_DELAY_MS` environment variable:
``` ```
# Set token echo delay to 1ms (1000 tokens per second) # Set token echo delay to 1ms (1000 tokens per second)
DYN_TOKEN_ECHO_DELAY_MS=1 dynamo-run in=http out=echo_full DYN_TOKEN_ECHO_DELAY_MS=1 dynamo-run in=http out=echo
``` ```
The default delay is 10ms, which produces approximately 100 tokens per second. The default delay is 10ms, which produces approximately 100 tokens per second.
......
...@@ -204,14 +204,7 @@ impl Flags { ...@@ -204,14 +204,7 @@ impl Flags {
); );
} }
} }
Output::EchoFull => {} Output::Echo => {}
Output::EchoCore => {
if !local_model.card().has_tokenizer() {
anyhow::bail!(
"out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
);
};
}
#[cfg(feature = "mistralrs")] #[cfg(feature = "mistralrs")]
Output::MistralRs => {} Output::MistralRs => {}
#[cfg(feature = "llamacpp")] #[cfg(feature = "llamacpp")]
......
...@@ -109,14 +109,9 @@ async fn engine_for( ...@@ -109,14 +109,9 @@ async fn engine_for(
// A single static backend, no etcd // A single static backend, no etcd
Ok(EngineConfig::StaticRemote(Box::new(local_model))) Ok(EngineConfig::StaticRemote(Box::new(local_model)))
} }
Output::EchoFull => Ok(EngineConfig::StaticFull { Output::Echo => Ok(EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
is_static: flags.static_worker,
}),
Output::EchoCore => Ok(EngineConfig::StaticCore {
engine: dynamo_llm::engines::make_engine_core(),
model: Box::new(local_model), model: Box::new(local_model),
engine: dynamo_llm::engines::make_echo_engine(),
is_static: flags.static_worker, is_static: flags.static_worker,
}), }),
#[cfg(feature = "mistralrs")] #[cfg(feature = "mistralrs")]
...@@ -213,7 +208,7 @@ fn gguf_default() -> Output { ...@@ -213,7 +208,7 @@ fn gguf_default() -> Output {
#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))] #[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
{ {
Output::EchoFull Output::Echo
} }
} }
...@@ -225,6 +220,6 @@ fn safetensors_default() -> Output { ...@@ -225,6 +220,6 @@ fn safetensors_default() -> Output {
#[cfg(not(feature = "mistralrs"))] #[cfg(not(feature = "mistralrs"))]
{ {
Output::EchoFull Output::Echo
} }
} }
...@@ -5,11 +5,8 @@ use dynamo_runtime::protocols::ENDPOINT_SCHEME; ...@@ -5,11 +5,8 @@ use dynamo_runtime::protocols::ENDPOINT_SCHEME;
use std::fmt; use std::fmt;
pub enum Output { pub enum Output {
/// Accept un-preprocessed requests, echo the prompt back as the response /// Echos the prompt back as the response
EchoFull, Echo,
/// Accept preprocessed requests, echo the tokens back as the response
EchoCore,
/// Listen for models on nats/etcd, add/remove dynamically /// Listen for models on nats/etcd, add/remove dynamically
Auto, Auto,
...@@ -44,8 +41,7 @@ impl TryFrom<&str> for Output { ...@@ -44,8 +41,7 @@ impl TryFrom<&str> for Output {
"llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp), "llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp),
"mocker" => Ok(Output::Mocker), "mocker" => Ok(Output::Mocker),
"echo_full" => Ok(Output::EchoFull), "echo" | "echo_full" => Ok(Output::Echo),
"echo_core" => Ok(Output::EchoCore),
"dyn" | "auto" => Ok(Output::Auto), "dyn" | "auto" => Ok(Output::Auto),
...@@ -69,8 +65,7 @@ impl fmt::Display for Output { ...@@ -69,8 +65,7 @@ impl fmt::Display for Output {
Output::LlamaCpp => "llamacpp", Output::LlamaCpp => "llamacpp",
Output::Mocker => "mocker", Output::Mocker => "mocker",
Output::EchoFull => "echo_full", Output::Echo => "echo",
Output::EchoCore => "echo_core",
Output::Auto => "auto", Output::Auto => "auto",
Output::Static(endpoint) => &format!("{ENDPOINT_SCHEME}{endpoint}"), Output::Static(endpoint) => &format!("{ENDPOINT_SCHEME}{endpoint}"),
...@@ -82,11 +77,7 @@ impl fmt::Display for Output { ...@@ -82,11 +77,7 @@ impl fmt::Display for Output {
impl Output { impl Output {
#[allow(unused_mut)] #[allow(unused_mut)]
pub fn available_engines() -> Vec<String> { pub fn available_engines() -> Vec<String> {
let mut out = vec![ let mut out = vec!["echo".to_string(), Output::Mocker.to_string()];
"echo_core".to_string(),
"echo_full".to_string(),
Output::Mocker.to_string(),
];
#[cfg(feature = "mistralrs")] #[cfg(feature = "mistralrs")]
{ {
out.push(Output::MistralRs.to_string()); out.push(Output::MistralRs.to_string());
......
...@@ -1415,6 +1415,7 @@ dependencies = [ ...@@ -1415,6 +1415,7 @@ dependencies = [
"hf-hub", "hf-hub",
"humantime", "humantime",
"itertools 0.14.0", "itertools 0.14.0",
"json-five",
"memmap2", "memmap2",
"minijinja", "minijinja",
"minijinja-contrib", "minijinja-contrib",
...@@ -2929,6 +2930,16 @@ dependencies = [ ...@@ -2929,6 +2930,16 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "json-five"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56bf719068ddd382e66ee32cf044805aa8203bf9999b5af007bd0367fb681c4d"
dependencies = [
"serde",
"unicode-general-category",
]
[[package]] [[package]]
name = "jwalk" name = "jwalk"
version = "0.8.1" version = "0.8.1"
...@@ -6186,6 +6197,12 @@ version = "2.8.1" ...@@ -6186,6 +6197,12 @@ version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-general-category"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b993bddc193ae5bd0d623b49ec06ac3e9312875fdae725a975c51db1cc1677f"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.18" version = "1.0.18"
......
...@@ -219,7 +219,7 @@ async fn select_engine( ...@@ -219,7 +219,7 @@ async fn select_engine(
// There is no validation for the echo engine // There is no validation for the echo engine
RsEngineConfig::StaticFull { RsEngineConfig::StaticFull {
model: Box::new(local_model), model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(), engine: dynamo_llm::engines::make_echo_engine(),
is_static: false, is_static: false,
} }
} }
......
...@@ -13,9 +13,6 @@ use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseSt ...@@ -13,9 +13,6 @@ use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseSt
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn}; use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated; use dynamo_runtime::protocols::annotated::Annotated;
use crate::backend::ExecutionContext;
use crate::preprocessor::PreprocessedRequest;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::{ use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse}, chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse, prompt_to_string}, completions::{NvCreateCompletionRequest, NvCreateCompletionResponse, prompt_to_string},
...@@ -65,53 +62,9 @@ pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| { ...@@ -65,53 +62,9 @@ pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| {
Duration::from_millis(delay_ms) Duration::from_millis(delay_ms)
}); });
/// Engine that accepts pre-processed requests and echos the tokens back as the response
/// The response will include the full prompt template.
/// Useful for testing pre-processing.
struct EchoEngineCore {}
pub fn make_engine_core() -> ExecutionContext {
Arc::new(EchoEngineCore {})
}
#[async_trait]
impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for EchoEngineCore
{
async fn generate(
&self,
incoming_request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = incoming_request.into_parts();
let ctx = context.context();
let output = stream! {
for tok in request.token_ids {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
yield delta_core(tok);
}
yield Annotated::from_data(LLMEngineOutput::stop());
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
let delta = LLMEngineOutput {
token_ids: vec![tok],
tokens: None,
text: None,
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: None,
index: None,
};
Annotated::from_data(delta)
}
/// Engine that accepts un-preprocessed requests and echos the prompt back as the response /// Engine that accepts un-preprocessed requests and echos the prompt back as the response
/// Useful for testing ingress such as service-http. /// Useful for testing ingress such as service-http.
struct EchoEngineFull {} struct EchoEngine {}
/// Validate Engine that verifies request data /// Validate Engine that verifies request data
pub struct ValidateEngine<E> { pub struct ValidateEngine<E> {
...@@ -164,8 +117,8 @@ pub trait EmbeddingEngine: Send + Sync { ...@@ -164,8 +117,8 @@ pub trait EmbeddingEngine: Send + Sync {
) -> Result<ManyOut<Annotated<NvCreateEmbeddingResponse>>, Error>; ) -> Result<ManyOut<Annotated<NvCreateEmbeddingResponse>>, Error>;
} }
pub fn make_engine_full() -> Arc<dyn StreamingEngine> { pub fn make_echo_engine() -> Arc<dyn StreamingEngine> {
let engine = EchoEngineFull {}; let engine = EchoEngine {};
let data = EngineDispatcher::new(engine); let data = EngineDispatcher::new(engine);
Arc::new(data) Arc::new(data)
} }
...@@ -176,7 +129,7 @@ impl ...@@ -176,7 +129,7 @@ impl
SingleIn<NvCreateChatCompletionRequest>, SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
Error, Error,
> for EchoEngineFull > for EchoEngine
{ {
async fn generate( async fn generate(
&self, &self,
...@@ -185,7 +138,9 @@ impl ...@@ -185,7 +138,9 @@ impl
let (request, context) = incoming_request.transfer(()); let (request, context) = incoming_request.transfer(());
let ctx = context.context(); let ctx = context.context();
let mut deltas = request.response_generator(ctx.id().to_string()); let mut deltas = request.response_generator(ctx.id().to_string());
let req = request.inner.messages.into_iter().next_back().unwrap(); let Some(req) = request.inner.messages.into_iter().next_back() else {
anyhow::bail!("Empty chat messages in request");
};
let prompt = match req { let prompt = match req {
dynamo_async_openai::types::ChatCompletionRequestMessage::User(user_msg) => { dynamo_async_openai::types::ChatCompletionRequestMessage::User(user_msg) => {
...@@ -223,7 +178,7 @@ impl ...@@ -223,7 +178,7 @@ impl
SingleIn<NvCreateCompletionRequest>, SingleIn<NvCreateCompletionRequest>,
ManyOut<Annotated<NvCreateCompletionResponse>>, ManyOut<Annotated<NvCreateCompletionResponse>>,
Error, Error,
> for EchoEngineFull > for EchoEngine
{ {
async fn generate( async fn generate(
&self, &self,
...@@ -256,7 +211,7 @@ impl ...@@ -256,7 +211,7 @@ impl
SingleIn<NvCreateEmbeddingRequest>, SingleIn<NvCreateEmbeddingRequest>,
ManyOut<Annotated<NvCreateEmbeddingResponse>>, ManyOut<Annotated<NvCreateEmbeddingResponse>>,
Error, Error,
> for EchoEngineFull > for EchoEngine
{ {
async fn generate( async fn generate(
&self, &self,
......
...@@ -310,56 +310,3 @@ where ...@@ -310,56 +310,3 @@ where
.link(frontend)?; .link(frontend)?;
Ok(engine) Ok(engine)
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::types::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
};
const HF_PATH: &str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/data/sample-models/mock-llama-3.1-8b-instruct"
);
#[tokio::test]
async fn test_build_chat_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::load(HF_PATH, None)?;
let engine = crate::engines::make_engine_core();
// Build pipeline for chat completions
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, engine, card.tokenizer_hf()?)
.await?;
// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);
Ok(())
}
#[tokio::test]
async fn test_build_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::load(HF_PATH, None)?;
let engine = crate::engines::make_engine_core();
// Build pipeline for completions
let pipeline = build_pipeline::<NvCreateCompletionRequest, NvCreateCompletionResponse>(
&card,
engine,
card.tokenizer_hf()?,
)
.await?;
// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);
Ok(())
}
}
...@@ -156,8 +156,8 @@ mod integration_tests { ...@@ -156,8 +156,8 @@ mod integration_tests {
.await .await
.map_err(|e| anyhow::anyhow!("Failed to create distributed runtime: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to create distributed runtime: {}", e))?;
let engine_config = EngineConfig::StaticCore { let engine_config = EngineConfig::StaticFull {
engine: crate::engines::make_engine_core(), engine: crate::engines::make_echo_engine(),
model: Box::new( model: Box::new(
crate::local_model::LocalModelBuilder::default() crate::local_model::LocalModelBuilder::default()
.model_name(Some("test-model".to_string())) .model_name(Some("test-model".to_string()))
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::protocols::openai::nvext::NvExt;
use crate::request_template::RequestTemplate; use crate::request_template::RequestTemplate;
use crate::types::openai::chat_completions::{ use crate::types::openai::chat_completions::{
NvCreateChatCompletionRequest, OpenAIChatCompletionsStreamingEngine, NvCreateChatCompletionRequest, OpenAIChatCompletionsStreamingEngine,
...@@ -107,21 +106,11 @@ async fn main_loop( ...@@ -107,21 +106,11 @@ async fn main_loop(
.temperature(template.as_ref().map_or(0.7, |t| t.temperature)) .temperature(template.as_ref().map_or(0.7, |t| t.temperature))
.n(1) // only generate one response .n(1) // only generate one response
.build()?; .build()?;
let nvext = NvExt {
ignore_eos: Some(true),
..Default::default()
};
// TODO We cannot set min_tokens with async-openai
// if inspect_template {
// // This makes the pre-processor ignore stop tokens
// req_builder.min_tokens(8192);
// }
let req = NvCreateChatCompletionRequest { let req = NvCreateChatCompletionRequest {
inner, inner,
common: Default::default(), common: Default::default(),
nvext: Some(nvext), nvext: None,
}; };
// Call the model // Call the model
...@@ -150,8 +139,8 @@ async fn main_loop( ...@@ -150,8 +139,8 @@ async fn main_loop(
let _ = stdout.flush(); let _ = stdout.flush();
assistant_message += c; assistant_message += c;
} }
if chat_comp.finish_reason.is_some() { if let Some(reason) = chat_comp.finish_reason {
tracing::trace!("finish reason: {:?}", chat_comp.finish_reason.unwrap()); tracing::trace!("finish reason: {reason:?}");
break; break;
} }
} }
......
...@@ -210,7 +210,7 @@ impl LocalModelBuilder { ...@@ -210,7 +210,7 @@ impl LocalModelBuilder {
.map(RequestTemplate::load) .map(RequestTemplate::load)
.transpose()?; .transpose()?;
// echo_full engine doesn't need a path. It's an edge case, move it out of the way. // echo engine doesn't need a path. It's an edge case, move it out of the way.
if self.model_path.is_none() { if self.model_path.is_none() {
let mut card = ModelDeploymentCard::with_name_only( let mut card = ModelDeploymentCard::with_name_only(
self.model_name.as_deref().unwrap_or(DEFAULT_NAME), self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
......
...@@ -432,21 +432,27 @@ mod integration_tests { ...@@ -432,21 +432,27 @@ mod integration_tests {
// Prepare test cases // Prepare test cases
let mut test_cases = vec![]; let mut test_cases = vec![];
if custom_health_path.is_none() { match custom_health_path {
// When using default paths, test the default paths None => {
test_cases.push(("/health", expected_status, expected_body)); // When using default paths, test the default paths
} else { test_cases.push(("/health", expected_status, expected_body));
// When using custom paths, default paths should not exist }
test_cases.push(("/health", 404, "Route not found")); Some(chp) => {
test_cases.push((custom_health_path.unwrap(), expected_status, expected_body)); // When using custom paths, default paths should not exist
test_cases.push(("/health", 404, "Route not found"));
test_cases.push((chp, expected_status, expected_body));
}
} }
if custom_live_path.is_none() { match custom_live_path {
// When using default paths, test the default paths None => {
test_cases.push(("/live", expected_status, expected_body)); // When using default paths, test the default paths
} else { test_cases.push(("/live", expected_status, expected_body));
// When using custom paths, default paths should not exist }
test_cases.push(("/live", 404, "Route not found")); Some(clp) => {
test_cases.push((custom_live_path.unwrap(), expected_status, expected_body)); // When using custom paths, default paths should not exist
test_cases.push(("/live", 404, "Route not found"));
test_cases.push((clp, expected_status, expected_body));
}
} }
test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found")); test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
assert_eq!(test_cases.len(), expected_num_tests); assert_eq!(test_cases.len(), expected_num_tests);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment