Unverified Commit 012236ee authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

feat(anthropic): add thinking block support and preamble stripping to /v1/messages (#7137)


Signed-off-by: default avatarMatej Kosec <mkosec@nvidia.com>
parent 9eec94b2
...@@ -71,6 +71,7 @@ class FrontendConfig(KvRouterConfigBase): ...@@ -71,6 +71,7 @@ class FrontendConfig(KvRouterConfigBase):
event_plane: str event_plane: str
chat_processor: str chat_processor: str
enable_anthropic_api: bool enable_anthropic_api: bool
strip_anthropic_preamble: bool
debug_perf: bool debug_perf: bool
preprocess_workers: int preprocess_workers: int
...@@ -344,6 +345,16 @@ class FrontendArgGroup(ArgGroup): ...@@ -344,6 +345,16 @@ class FrontendArgGroup(ArgGroup):
"This feature is experimental and may change." "This feature is experimental and may change."
), ),
) )
add_negatable_bool_argument(
g,
flag_name="--strip-anthropic-preamble",
env_var="DYN_STRIP_ANTHROPIC_PREAMBLE",
default=False,
help=(
"Strip the Claude Code billing preamble (x-anthropic-billing-header) "
"from the system prompt. Saves tokens and improves prompt caching."
),
)
add_argument( add_argument(
g, g,
flag_name="--dyn-chat-processor", flag_name="--dyn-chat-processor",
......
...@@ -259,6 +259,11 @@ async def async_main(): ...@@ -259,6 +259,11 @@ async def async_main():
if config.enable_anthropic_api: if config.enable_anthropic_api:
os.environ["DYN_ENABLE_ANTHROPIC_API"] = "1" os.environ["DYN_ENABLE_ANTHROPIC_API"] = "1"
if config.strip_anthropic_preamble:
os.environ["DYN_STRIP_ANTHROPIC_PREAMBLE"] = "1"
else:
os.environ.pop("DYN_STRIP_ANTHROPIC_PREAMBLE", None)
if config.chat_processor == "vllm": if config.chat_processor == "vllm":
assert ( assert (
vllm_flags is not None vllm_flags is not None
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
//! chat completions, processed by the existing engine, and responses/streams //! chat completions, processed by the existing engine, and responses/streams
//! are converted back to Anthropic format. //! are converted back to Anthropic format.
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use axum::{ use axum::{
...@@ -21,6 +22,7 @@ use axum::{ ...@@ -21,6 +22,7 @@ use axum::{
}, },
routing::post, routing::post,
}; };
use dynamo_runtime::config::{env_is_truthy, environment_names::llm as env_llm};
use dynamo_runtime::pipeline::{AsyncEngineContextProvider, Context}; use dynamo_runtime::pipeline::{AsyncEngineContextProvider, Context};
use futures::{StreamExt, stream}; use futures::{StreamExt, stream};
use tracing::Instrument; use tracing::Instrument;
...@@ -31,16 +33,19 @@ use super::{ ...@@ -31,16 +33,19 @@ use super::{
metrics::{Endpoint, process_response_and_observe_metrics}, metrics::{Endpoint, process_response_and_observe_metrics},
service_v2, service_v2,
}; };
use crate::preprocessor::OpenAIPreprocessor;
use crate::protocols::anthropic::stream_converter::AnthropicStreamConverter; use crate::protocols::anthropic::stream_converter::AnthropicStreamConverter;
use crate::protocols::anthropic::types::{ use crate::protocols::anthropic::types::{
AnthropicCountTokensRequest, AnthropicCountTokensResponse, AnthropicCreateMessageRequest, AnthropicCountTokensRequest, AnthropicCountTokensResponse, AnthropicCreateMessageRequest,
AnthropicErrorBody, AnthropicErrorResponse, chat_completion_to_anthropic_response, AnthropicErrorBody, AnthropicErrorResponse, SystemContent,
chat_completion_to_anthropic_response,
}; };
use crate::protocols::openai::chat_completions::{ use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
aggregator::ChatCompletionAggregator, NvCreateChatCompletionStreamResponse, aggregator::ChatCompletionAggregator,
}; };
use crate::request_template::RequestTemplate; use crate::request_template::RequestTemplate;
use crate::types::Annotated;
// Re-use helpers from the openai module (sibling under service/) // Re-use helpers from the openai module (sibling under service/)
use super::openai::{get_body_limit, get_or_create_request_id}; use super::openai::{get_body_limit, get_or_create_request_id};
...@@ -167,6 +172,11 @@ async fn anthropic_messages( ...@@ -167,6 +172,11 @@ async fn anthropic_messages(
} }
} }
// Strip Claude Code billing preamble from system prompt if enabled
if env_is_truthy(env_llm::DYN_STRIP_ANTHROPIC_PREAMBLE) {
strip_billing_preamble(&mut request.system);
}
let model = request.model.clone(); let model = request.model.clone();
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model); let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
...@@ -175,6 +185,14 @@ async fn anthropic_messages( ...@@ -175,6 +185,14 @@ async fn anthropic_messages(
let (orig_request, context) = request.into_parts(); let (orig_request, context) = request.into_parts();
let model_for_resp = orig_request.model.clone(); let model_for_resp = orig_request.model.clone();
// Check if the Anthropic request explicitly enabled thinking. When thinking
// is enabled, reasoning-capable models' chat templates typically inject
// `<think>` into the prompt, so the completion starts mid-reasoning.
let thinking_enabled = orig_request
.thinking
.as_ref()
.is_some_and(|t| t.thinking_type == "enabled");
// Convert Anthropic request -> Chat Completion request // Convert Anthropic request -> Chat Completion request
let chat_request: NvCreateChatCompletionRequest = let chat_request: NvCreateChatCompletionRequest =
orig_request.try_into().map_err(|e: anyhow::Error| { orig_request.try_into().map_err(|e: anyhow::Error| {
...@@ -219,6 +237,28 @@ async fn anthropic_messages( ...@@ -219,6 +237,28 @@ async fn anthropic_messages(
let ctx = engine_stream.context(); let ctx = engine_stream.context();
// Apply reasoning parser to the engine stream if configured.
// The preprocessor (which normally handles this for the OpenAI path) is
// bypassed by the Anthropic endpoint, so we apply the same stream
// transform here. This populates `delta.reasoning_content` which the
// AnthropicStreamConverter translates into thinking content blocks.
//
// When thinking is enabled, the model's chat template likely injected
// `<think>` into the prompt (e.g., Qwen3.5), so the parser must start
// in reasoning mode — the completion begins mid-reasoning without an
// explicit `<think>` tag.
let engine_stream: Pin<
Box<dyn futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>,
> = if let Some(ref reasoning_parser_name) = parsing_options.reasoning_parser {
Box::pin(OpenAIPreprocessor::parse_reasoning_content_from_stream(
engine_stream,
reasoning_parser_name.clone(),
thinking_enabled,
))
} else {
Box::pin(engine_stream)
};
let mut inflight_guard = let mut inflight_guard =
state state
.metrics_clone() .metrics_clone()
...@@ -342,8 +382,11 @@ async fn anthropic_messages( ...@@ -342,8 +382,11 @@ async fn anthropic_messages(
/// Returns an estimated input token count using a len/3 heuristic. /// Returns an estimated input token count using a len/3 heuristic.
async fn handler_count_tokens( async fn handler_count_tokens(
State((_state, _template)): State<(Arc<service_v2::State>, Option<RequestTemplate>)>, State((_state, _template)): State<(Arc<service_v2::State>, Option<RequestTemplate>)>,
Json(request): Json<AnthropicCountTokensRequest>, Json(mut request): Json<AnthropicCountTokensRequest>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
if env_is_truthy(env_llm::DYN_STRIP_ANTHROPIC_PREAMBLE) {
strip_billing_preamble(&mut request.system);
}
let tokens = request.estimate_tokens(); let tokens = request.estimate_tokens();
Ok(Json(AnthropicCountTokensResponse { Ok(Json(AnthropicCountTokensResponse {
input_tokens: tokens, input_tokens: tokens,
...@@ -355,6 +398,22 @@ async fn handler_count_tokens( ...@@ -355,6 +398,22 @@ async fn handler_count_tokens(
// Helpers // Helpers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Strip the Claude Code billing preamble from the system prompt.
///
/// Claude Code prepends `x-anthropic-billing-header: cc_version=...; cch=...;\n`
/// to every system prompt. This varies per session and per release, wasting tokens
/// and preventing prompt prefix caching on the target model.
fn strip_billing_preamble(system: &mut Option<SystemContent>) {
if let Some(content) = system {
let trimmed = content.text.trim_start();
if trimmed.starts_with("x-anthropic-billing-header:")
&& let Some(newline_pos) = trimmed.find('\n')
{
content.text = trimmed[newline_pos + 1..].to_string();
}
}
}
/// Build an Anthropic-formatted error response. /// Build an Anthropic-formatted error response.
/// Maps HTTP status codes to Anthropic error types following the Anthropic API spec. /// Maps HTTP status codes to Anthropic error types following the Anthropic API spec.
fn anthropic_error(status: StatusCode, error_type: &str, message: &str) -> Response { fn anthropic_error(status: StatusCode, error_type: &str, message: &str) -> Response {
......
...@@ -209,7 +209,9 @@ impl OpenAIPreprocessor { ...@@ -209,7 +209,9 @@ impl OpenAIPreprocessor {
} }
/// Translate a [`NvCreateChatCompletionRequest`] request to a common completion request. /// Translate a [`NvCreateChatCompletionRequest`] request to a common completion request.
/// Returns both the common completion request and a hashmap of annotations. /// Returns the common completion request, a hashmap of annotations, and a boolean
/// indicating whether the rendered prompt ends with a reasoning start token (e.g.,
/// `<think>`), meaning the model's completion will begin mid-reasoning.
/// ///
/// Annotations evaluated by this method include: /// Annotations evaluated by this method include:
/// - `formatted_prompt` /// - `formatted_prompt`
...@@ -225,11 +227,20 @@ impl OpenAIPreprocessor { ...@@ -225,11 +227,20 @@ impl OpenAIPreprocessor {
&self, &self,
request: &R, request: &R,
tracker: Option<&RequestTracker>, tracker: Option<&RequestTracker>,
) -> Result<(PreprocessedRequest, HashMap<String, String>)> { ) -> Result<(PreprocessedRequest, HashMap<String, String>, bool)> {
let mut builder = self.builder(request)?; let mut builder = self.builder(request)?;
let formatted_prompt = self let formatted_prompt = self
.apply_template(request) .apply_template(request)
.with_context(|| "Failed to apply prompt template")?; .with_context(|| "Failed to apply prompt template")?;
// Check if the chat template injected a reasoning start token at the end
// of the prompt (e.g., Qwen3.5 appends `<think>\n` when enable_thinking
// is not explicitly false). If so, the model's completion starts
// mid-reasoning and the parser should begin in reasoning mode.
let prompt_injected_reasoning = formatted_prompt
.as_ref()
.is_some_and(|p| p.trim_end().ends_with("<think>"));
let annotations = self let annotations = self
.gather_tokens(request, &mut builder, formatted_prompt.clone(), tracker) .gather_tokens(request, &mut builder, formatted_prompt.clone(), tracker)
.with_context(|| "Failed to gather tokens")?; .with_context(|| "Failed to gather tokens")?;
...@@ -237,7 +248,7 @@ impl OpenAIPreprocessor { ...@@ -237,7 +248,7 @@ impl OpenAIPreprocessor {
.await .await
.with_context(|| "Failed to gather multimodal data")?; .with_context(|| "Failed to gather multimodal data")?;
Ok((builder.build()?, annotations)) Ok((builder.build()?, annotations, prompt_injected_reasoning))
} }
pub fn builder< pub fn builder<
...@@ -659,6 +670,7 @@ impl OpenAIPreprocessor { ...@@ -659,6 +670,7 @@ impl OpenAIPreprocessor {
&self, &self,
stream: S, stream: S,
request: &NvCreateChatCompletionRequest, request: &NvCreateChatCompletionRequest,
prompt_injected_reasoning: bool,
) -> anyhow::Result< ) -> anyhow::Result<
impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static, impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
> >
...@@ -683,6 +695,7 @@ impl OpenAIPreprocessor { ...@@ -683,6 +695,7 @@ impl OpenAIPreprocessor {
Box::pin(Self::parse_reasoning_content_from_stream( Box::pin(Self::parse_reasoning_content_from_stream(
stream, stream,
self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg
prompt_injected_reasoning,
)) ))
} else { } else {
Box::pin(stream) Box::pin(stream)
...@@ -1104,18 +1117,30 @@ impl OpenAIPreprocessor { ...@@ -1104,18 +1117,30 @@ impl OpenAIPreprocessor {
// Motivation: Each transformation on the stream should be a separate step to allow for more flexibility // Motivation: Each transformation on the stream should be a separate step to allow for more flexibility
// Earlier reasoning parser logic was nested under delta generation logic in choice_from_postprocessor // Earlier reasoning parser logic was nested under delta generation logic in choice_from_postprocessor
// Since we have tool calling parsing as separate step, it makes sense to have reasoning parser as separate step as well // Since we have tool calling parsing as separate step, it makes sense to have reasoning parser as separate step as well
/// Apply reasoning parsing to the output stream, splitting content into
/// `reasoning_content` and normal `content` based on think tags.
///
/// When `prompt_injected_reasoning` is `true`, the parser starts in reasoning
/// mode immediately — use this when the chat template already appended the
/// reasoning start token (e.g., `<think>`) to the prompt, so the model's
/// completion begins with thinking content without an explicit start tag.
pub fn parse_reasoning_content_from_stream<S>( pub fn parse_reasoning_content_from_stream<S>(
stream: S, stream: S,
parser_name: String, parser_name: String,
prompt_injected_reasoning: bool,
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send ) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
where where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static, S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{ {
// Initialize reasoning parser from parser_name // Initialize reasoning parser from parser_name
let reasoning_parser = Box::new(ReasoningParserType::get_reasoning_parser_from_name( let mut reasoning_parser = Box::new(ReasoningParserType::get_reasoning_parser_from_name(
parser_name.as_ref(), parser_name.as_ref(),
)) as Box<dyn ReasoningParser>; )) as Box<dyn ReasoningParser>;
if prompt_injected_reasoning {
reasoning_parser.set_in_reasoning(true);
}
let state = ReasoningState { let state = ReasoningState {
stream: Box::pin(stream), stream: Box::pin(stream),
reasoning_parser: Some(reasoning_parser), reasoning_parser: Some(reasoning_parser),
...@@ -1210,10 +1235,10 @@ impl ...@@ -1210,10 +1235,10 @@ impl
let tracker = response_generator.tracker(); let tracker = response_generator.tracker();
// convert the chat completion request to a common completion request // convert the chat completion request to a common completion request
let (mut common_request, annotations) = self let (mut common_request, annotations, prompt_injected_reasoning) = self
.preprocess_request(&request, tracker.as_deref()) .preprocess_request(&request, tracker.as_deref())
.await?; .await?;
tracing::trace!(request = ?common_request, "Pre-processed request"); tracing::trace!(request = ?common_request, prompt_injected_reasoning, "Pre-processed request");
// Attach the timing tracker to the request so downstream components can record metrics // Attach the timing tracker to the request so downstream components can record metrics
common_request.tracker = tracker; common_request.tracker = tracker;
...@@ -1248,7 +1273,8 @@ impl ...@@ -1248,7 +1273,8 @@ impl
context.clone(), context.clone(),
); );
let transformed_stream = self.postprocessor_parsing_stream(stream, &request)?; let transformed_stream =
self.postprocessor_parsing_stream(stream, &request, prompt_injected_reasoning)?;
// Apply audit aggregation strategy. // Apply audit aggregation strategy.
// The audit branch already returns Pin<Box<...>> from scan/fold_aggregate_with_future, // The audit branch already returns Pin<Box<...>> from scan/fold_aggregate_with_future,
......
...@@ -23,6 +23,10 @@ use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResp ...@@ -23,6 +23,10 @@ use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResp
pub struct AnthropicStreamConverter { pub struct AnthropicStreamConverter {
model: String, model: String,
message_id: String, message_id: String,
// Thinking/reasoning tracking
thinking_block_started: bool,
thinking_block_closed: bool,
thinking_block_index: u32,
// Text tracking // Text tracking
text_block_started: bool, text_block_started: bool,
text_block_closed: bool, text_block_closed: bool,
...@@ -53,6 +57,9 @@ impl AnthropicStreamConverter { ...@@ -53,6 +57,9 @@ impl AnthropicStreamConverter {
Self { Self {
model, model,
message_id: format!("msg_{}", Uuid::new_v4().simple()), message_id: format!("msg_{}", Uuid::new_v4().simple()),
thinking_block_started: false,
thinking_block_closed: false,
thinking_block_index: 0,
text_block_started: false, text_block_started: false,
text_block_closed: false, text_block_closed: false,
text_block_index: 0, text_block_index: 0,
...@@ -127,6 +134,36 @@ impl AnthropicStreamConverter { ...@@ -127,6 +134,36 @@ impl AnthropicStreamConverter {
}); });
} }
// Handle reasoning/thinking content deltas
if let Some(ref reasoning) = delta.reasoning_content
&& !reasoning.is_empty()
{
// Emit content_block_start on first thinking token
if !self.thinking_block_started {
self.thinking_block_started = true;
self.thinking_block_index = self.next_block_index;
self.next_block_index += 1;
let block_start = AnthropicStreamEvent::ContentBlockStart {
index: self.thinking_block_index,
content_block: AnthropicResponseContentBlock::Thinking {
thinking: String::new(),
signature: String::new(),
},
};
events.push(make_sse_event("content_block_start", &block_start));
}
// Emit thinking delta
let block_delta = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::ThinkingDelta {
thinking: reasoning.clone(),
},
};
events.push(make_sse_event("content_block_delta", &block_delta));
}
// Handle text content deltas // Handle text content deltas
let content_text = match &delta.content { let content_text = match &delta.content {
Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()), Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()),
...@@ -136,6 +173,26 @@ impl AnthropicStreamConverter { ...@@ -136,6 +173,26 @@ impl AnthropicStreamConverter {
if let Some(text) = content_text if let Some(text) = content_text
&& !text.is_empty() && !text.is_empty()
{ {
// Close thinking block before text starts (Anthropic spec: thinking → text → tool_use)
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
// Emit signature delta to close the thinking block.
// The engine doesn't produce Anthropic-style cryptographic signatures,
// so we use "erased" (the standard placeholder per the Anthropic spec).
let sig_delta = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_sse_event("content_block_delta", &sig_delta));
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
// Emit content_block_start on first text // Emit content_block_start on first text
if !self.text_block_started { if !self.text_block_started {
self.text_block_started = true; self.text_block_started = true;
...@@ -164,6 +221,22 @@ impl AnthropicStreamConverter { ...@@ -164,6 +221,22 @@ impl AnthropicStreamConverter {
// Handle tool call deltas // Handle tool call deltas
if let Some(tool_calls) = &delta.tool_calls { if let Some(tool_calls) = &delta.tool_calls {
// Close thinking block before tool blocks (if text never appeared)
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
let sig_delta = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_sse_event("content_block_delta", &sig_delta));
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
// Close the text block before opening any tool blocks. // Close the text block before opening any tool blocks.
// Anthropic streaming spec requires each block to be closed // Anthropic streaming spec requires each block to be closed
// (content_block_stop) before the next block starts. // (content_block_stop) before the next block starts.
...@@ -253,6 +326,22 @@ impl AnthropicStreamConverter { ...@@ -253,6 +326,22 @@ impl AnthropicStreamConverter {
pub fn emit_end_events(&mut self) -> Vec<Result<Event, anyhow::Error>> { pub fn emit_end_events(&mut self) -> Vec<Result<Event, anyhow::Error>> {
let mut events = Vec::new(); let mut events = Vec::new();
// Close thinking block if started and not already closed mid-stream
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
let sig_delta = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_sse_event("content_block_delta", &sig_delta));
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
// Close text block if started and not already closed mid-stream // Close text block if started and not already closed mid-stream
if self.text_block_started && !self.text_block_closed { if self.text_block_started && !self.text_block_closed {
let block_stop = AnthropicStreamEvent::ContentBlockStop { let block_stop = AnthropicStreamEvent::ContentBlockStop {
...@@ -367,6 +456,34 @@ impl AnthropicStreamConverter { ...@@ -367,6 +456,34 @@ impl AnthropicStreamConverter {
}); });
} }
// Handle reasoning/thinking content deltas
if let Some(ref reasoning) = delta.reasoning_content
&& !reasoning.is_empty()
{
if !self.thinking_block_started {
self.thinking_block_started = true;
self.thinking_block_index = self.next_block_index;
self.next_block_index += 1;
let ev = AnthropicStreamEvent::ContentBlockStart {
index: self.thinking_block_index,
content_block: AnthropicResponseContentBlock::Thinking {
thinking: String::new(),
signature: String::new(),
},
};
events.push(make_tagged_event("content_block_start", &ev));
}
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::ThinkingDelta {
thinking: reasoning.clone(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
}
let content_text = match &delta.content { let content_text = match &delta.content {
Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()), Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()),
_ => None, _ => None,
...@@ -375,6 +492,22 @@ impl AnthropicStreamConverter { ...@@ -375,6 +492,22 @@ impl AnthropicStreamConverter {
if let Some(text) = content_text if let Some(text) = content_text
&& !text.is_empty() && !text.is_empty()
{ {
// Close thinking block before text starts
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
if !self.text_block_started { if !self.text_block_started {
self.text_block_started = true; self.text_block_started = true;
self.text_block_index = self.next_block_index; self.text_block_index = self.next_block_index;
...@@ -401,6 +534,22 @@ impl AnthropicStreamConverter { ...@@ -401,6 +534,22 @@ impl AnthropicStreamConverter {
} }
if let Some(tool_calls) = &delta.tool_calls { if let Some(tool_calls) = &delta.tool_calls {
// Close thinking block before tool blocks
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
if self.text_block_started && !self.text_block_closed { if self.text_block_started && !self.text_block_closed {
self.text_block_closed = true; self.text_block_closed = true;
let ev = AnthropicStreamEvent::ContentBlockStop { let ev = AnthropicStreamEvent::ContentBlockStop {
...@@ -475,6 +624,22 @@ impl AnthropicStreamConverter { ...@@ -475,6 +624,22 @@ impl AnthropicStreamConverter {
fn emit_end_events_tagged(&mut self) -> Vec<TaggedEvent> { fn emit_end_events_tagged(&mut self) -> Vec<TaggedEvent> {
let mut events = Vec::new(); let mut events = Vec::new();
// Close thinking block if not already closed
if self.thinking_block_started && !self.thinking_block_closed {
self.thinking_block_closed = true;
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: self.thinking_block_index,
delta: AnthropicDelta::SignatureDelta {
signature: "erased".to_string(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.thinking_block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
if self.text_block_started && !self.text_block_closed { if self.text_block_started && !self.text_block_closed {
let ev = AnthropicStreamEvent::ContentBlockStop { let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.text_block_index, index: self.text_block_index,
...@@ -702,4 +867,116 @@ mod tests { ...@@ -702,4 +867,116 @@ mod tests {
other => panic!("expected text stop at index 0, got {other:?}"), other => panic!("expected text stop at index 0, got {other:?}"),
} }
} }
fn reasoning_chunk(text: &str) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: None,
role: None,
refusal: None,
reasoning_content: Some(text.into()),
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}],
created: 0,
model: "test".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
nvext: None,
}
}
/// Full reasoning flow: thinking → text → tool_use.
/// Verifies block ordering (thinking=0, text=1, tool=2) and that each
/// block is properly closed before the next one starts.
#[test]
fn test_thinking_text_then_tool_call() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
// 1. Reasoning tokens → thinking block starts
let ev = conv.process_chunk_tagged(&reasoning_chunk("Let me think..."));
assert_eq!(
event_types(&ev),
vec!["content_block_start", "content_block_delta"]
);
assert!(matches!(
&ev[0].data,
AnthropicStreamEvent::ContentBlockStart {
index: 0,
content_block: AnthropicResponseContentBlock::Thinking { .. }
}
));
// 2. Text arrives → thinking block closes (signature + stop), text block opens
let ev = conv.process_chunk_tagged(&text_chunk("Hello!"));
assert_eq!(
event_types(&ev),
vec![
"content_block_delta",
"content_block_stop",
"content_block_start",
"content_block_delta"
]
);
assert!(matches!(
&ev[1].data,
AnthropicStreamEvent::ContentBlockStop { index: 0 }
));
assert!(matches!(
&ev[2].data,
AnthropicStreamEvent::ContentBlockStart { index: 1, .. }
));
// 3. Tool call → text block closes, tool block opens at index 2
let ev = conv.process_chunk_tagged(&tool_call_chunk(
0,
Some("call-1"),
Some("Read"),
Some("{\"path\":\"/tmp/test.txt\"}"),
));
assert_eq!(
event_types(&ev),
vec![
"content_block_stop",
"content_block_start",
"content_block_delta"
]
);
assert!(matches!(
&ev[0].data,
AnthropicStreamEvent::ContentBlockStop { index: 1 }
));
assert!(matches!(
&ev[1].data,
AnthropicStreamEvent::ContentBlockStart { index: 2, .. }
));
}
/// Thinking-only response (no text/tool follows): thinking block closed in end events.
#[test]
fn test_thinking_only_closed_in_end_events() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
conv.process_chunk_tagged(&reasoning_chunk("Deep thought..."));
let ev = conv.emit_end_events_tagged();
assert_eq!(
event_types(&ev),
vec![
"content_block_delta",
"content_block_stop",
"message_delta",
"message_stop"
]
);
}
} }
...@@ -585,6 +585,8 @@ pub struct AnthropicMessageResponse { ...@@ -585,6 +585,8 @@ pub struct AnthropicMessageResponse {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum AnthropicResponseContentBlock { pub enum AnthropicResponseContentBlock {
#[serde(rename = "thinking")]
Thinking { thinking: String, signature: String },
#[serde(rename = "text")] #[serde(rename = "text")]
Text { Text {
text: String, text: String,
...@@ -597,8 +599,6 @@ pub enum AnthropicResponseContentBlock { ...@@ -597,8 +599,6 @@ pub enum AnthropicResponseContentBlock {
name: String, name: String,
input: serde_json::Value, input: serde_json::Value,
}, },
#[serde(rename = "thinking")]
Thinking { thinking: String, signature: String },
#[serde(rename = "redacted_thinking")] #[serde(rename = "redacted_thinking")]
RedactedThinking { data: String }, RedactedThinking { data: String },
#[serde(rename = "server_tool_use")] #[serde(rename = "server_tool_use")]
...@@ -692,13 +692,12 @@ pub enum AnthropicStreamEvent { ...@@ -692,13 +692,12 @@ pub enum AnthropicStreamEvent {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum AnthropicDelta { pub enum AnthropicDelta {
#[serde(rename = "thinking_delta")]
ThinkingDelta { thinking: String },
#[serde(rename = "text_delta")] #[serde(rename = "text_delta")]
TextDelta { text: String }, TextDelta { text: String },
#[serde(rename = "input_json_delta")] #[serde(rename = "input_json_delta")]
InputJsonDelta { partial_json: String }, InputJsonDelta { partial_json: String },
/// Incremental thinking content during extended thinking streaming.
#[serde(rename = "thinking_delta")]
ThinkingDelta { thinking: String },
/// Incremental signature for a thinking block (sent at the end). /// Incremental signature for a thinking block (sent at the end).
#[serde(rename = "signature_delta")] #[serde(rename = "signature_delta")]
SignatureDelta { signature: String }, SignatureDelta { signature: String },
......
...@@ -120,7 +120,7 @@ async fn postprocessor_parsing_stream_replays_unit_test_fixture() { ...@@ -120,7 +120,7 @@ async fn postprocessor_parsing_stream_replays_unit_test_fixture() {
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data)); let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request) .postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build"); .expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> = let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
...@@ -176,7 +176,7 @@ async fn postprocessor_parsing_stream_replays_interval_20_fixture() { ...@@ -176,7 +176,7 @@ async fn postprocessor_parsing_stream_replays_interval_20_fixture() {
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data)); let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request) .postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build"); .expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> = let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
......
...@@ -583,7 +583,7 @@ async fn test_media_url_passthrough(#[case] media_chunks: &[(&str, usize)]) { ...@@ -583,7 +583,7 @@ async fn test_media_url_passthrough(#[case] media_chunks: &[(&str, usize)]) {
let message = build_message("Test multimodal content", media_chunks); let message = build_message("Test multimodal content", media_chunks);
let request = Request::from(&message, None, None, mdc.slug().to_string()); let request = Request::from(&message, None, None, mdc.slug().to_string());
let (preprocessed, _annotations) = preprocessor let (preprocessed, _annotations, _) = preprocessor
.preprocess_request(&request, None) .preprocess_request(&request, None)
.await .await
.unwrap(); .unwrap();
...@@ -694,7 +694,7 @@ mod context_length_validation { ...@@ -694,7 +694,7 @@ mod context_length_validation {
r#"[{"role": "user", "content": "What is deep learning?"}]"#, r#"[{"role": "user", "content": "What is deep learning?"}]"#,
"test-model", "test-model",
); );
let (preprocessed, _) = preprocessor let (preprocessed, _, _) = preprocessor
.preprocess_request(&request, None) .preprocess_request(&request, None)
.await .await
.unwrap(); .unwrap();
......
...@@ -118,6 +118,7 @@ mod tests { ...@@ -118,6 +118,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
stream::iter(chunks), stream::iter(chunks),
parser.to_string(), parser.to_string(),
false,
); );
let mut output_stream = std::pin::pin!(output_stream); let mut output_stream = std::pin::pin!(output_stream);
let mut all_reasoning = String::new(); let mut all_reasoning = String::new();
...@@ -162,6 +163,7 @@ mod tests { ...@@ -162,6 +163,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
runtime_config.reasoning_parser.unwrap(), runtime_config.reasoning_parser.unwrap(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -207,6 +209,7 @@ mod tests { ...@@ -207,6 +209,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
runtime_config.reasoning_parser.unwrap(), runtime_config.reasoning_parser.unwrap(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -251,6 +254,7 @@ mod tests { ...@@ -251,6 +254,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
runtime_config.reasoning_parser.unwrap(), runtime_config.reasoning_parser.unwrap(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -286,6 +290,7 @@ mod tests { ...@@ -286,6 +290,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
runtime_config.reasoning_parser.unwrap(), runtime_config.reasoning_parser.unwrap(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -328,6 +333,7 @@ mod tests { ...@@ -328,6 +333,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
runtime_config.reasoning_parser.unwrap(), runtime_config.reasoning_parser.unwrap(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -397,6 +403,7 @@ mod tests { ...@@ -397,6 +403,7 @@ mod tests {
let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let output_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
"gpt_oss".to_string(), "gpt_oss".to_string(),
false,
); );
// Pin the stream and collect all output chunks // Pin the stream and collect all output chunks
...@@ -537,6 +544,7 @@ mod tests { ...@@ -537,6 +544,7 @@ mod tests {
let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
"nemotron_deci".to_string(), "nemotron_deci".to_string(),
false,
); );
// Step 2: Apply tool calling jail transformation // Step 2: Apply tool calling jail transformation
...@@ -650,6 +658,7 @@ mod tests { ...@@ -650,6 +658,7 @@ mod tests {
let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
"kimi_k25".to_string(), "kimi_k25".to_string(),
false,
); );
// Step 2: tool calling jail (kimi_k2) extracts tool calls from remaining content // Step 2: tool calling jail (kimi_k2) extracts tool calls from remaining content
...@@ -741,6 +750,7 @@ mod tests { ...@@ -741,6 +750,7 @@ mod tests {
let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream( let reasoning_parsed_stream = OpenAIPreprocessor::parse_reasoning_content_from_stream(
input_stream, input_stream,
"gpt_oss".to_string(), "gpt_oss".to_string(),
false,
); );
let mut debug_stream = std::pin::pin!(reasoning_parsed_stream); let mut debug_stream = std::pin::pin!(reasoning_parsed_stream);
......
...@@ -152,6 +152,7 @@ async fn parse_response_stream( ...@@ -152,6 +152,7 @@ async fn parse_response_stream(
Box::pin(OpenAIPreprocessor::parse_reasoning_content_from_stream( Box::pin(OpenAIPreprocessor::parse_reasoning_content_from_stream(
stream, stream,
reasoning_parser, reasoning_parser,
false,
)) ))
} else { } else {
Box::pin(stream) Box::pin(stream)
......
...@@ -82,6 +82,15 @@ impl BasicReasoningParser { ...@@ -82,6 +82,15 @@ impl BasicReasoningParser {
} }
impl ReasoningParser for BasicReasoningParser { impl ReasoningParser for BasicReasoningParser {
fn set_in_reasoning(&mut self, in_reasoning: bool) {
self._in_reasoning = in_reasoning;
if in_reasoning {
// Mark the start token as already stripped so the parser doesn't
// look for it in the stream — the template already injected it.
self.stripped_think_start = true;
}
}
fn detect_and_parse_reasoning(&mut self, text: &str, _token_ids: &[u32]) -> ParserResult { fn detect_and_parse_reasoning(&mut self, text: &str, _token_ids: &[u32]) -> ParserResult {
let has_think_tag = text.contains(&self.think_start_token); let has_think_tag = text.contains(&self.think_start_token);
let in_reasoning = self._in_reasoning || has_think_tag; let in_reasoning = self._in_reasoning || has_think_tag;
......
...@@ -86,6 +86,14 @@ pub trait ReasoningParser: Send + std::fmt::Debug { ...@@ -86,6 +86,14 @@ pub trait ReasoningParser: Send + std::fmt::Debug {
text: &str, text: &str,
token_ids: &[u32], token_ids: &[u32],
) -> ParserResult; ) -> ParserResult;
/// Override the parser's initial reasoning state. When called with `true`, the parser
/// starts in reasoning mode without waiting for the start token in the completion stream.
/// Use this when the chat template already injected the start token (e.g., `<think>`)
/// into the prompt, so it won't appear in the model's output.
fn set_in_reasoning(&mut self, _in_reasoning: bool) {
// Default no-op for parsers that don't support per-request overrides.
}
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
...@@ -122,6 +130,10 @@ impl ReasoningParser for ReasoningParserWrapper { ...@@ -122,6 +130,10 @@ impl ReasoningParser for ReasoningParserWrapper {
self.parser self.parser
.parse_reasoning_streaming_incremental(text, token_ids) .parse_reasoning_streaming_incremental(text, token_ids)
} }
fn set_in_reasoning(&mut self, in_reasoning: bool) {
self.parser.set_in_reasoning(in_reasoning)
}
} }
impl ReasoningParserType { impl ReasoningParserType {
......
...@@ -280,6 +280,11 @@ pub mod llm { ...@@ -280,6 +280,11 @@ pub mod llm {
/// Enable the experimental Anthropic Messages API endpoint (/v1/messages) /// Enable the experimental Anthropic Messages API endpoint (/v1/messages)
pub const DYN_ENABLE_ANTHROPIC_API: &str = "DYN_ENABLE_ANTHROPIC_API"; pub const DYN_ENABLE_ANTHROPIC_API: &str = "DYN_ENABLE_ANTHROPIC_API";
/// Strip the Claude Code billing preamble (`x-anthropic-billing-header: ...`)
/// from the system prompt before forwarding to the target model. The preamble
/// varies per session and per release, wasting tokens and breaking prompt caching.
pub const DYN_STRIP_ANTHROPIC_PREAMBLE: &str = "DYN_STRIP_ANTHROPIC_PREAMBLE";
/// Metrics configuration /// Metrics configuration
pub mod metrics { pub mod metrics {
/// Custom metrics prefix (overrides default "dynamo_frontend") /// Custom metrics prefix (overrides default "dynamo_frontend")
...@@ -458,6 +463,7 @@ mod tests { ...@@ -458,6 +463,7 @@ mod tests {
llm::DYN_LORA_ENABLED, llm::DYN_LORA_ENABLED,
llm::DYN_LORA_PATH, llm::DYN_LORA_PATH,
llm::DYN_ENABLE_ANTHROPIC_API, llm::DYN_ENABLE_ANTHROPIC_API,
llm::DYN_STRIP_ANTHROPIC_PREAMBLE,
llm::metrics::DYN_METRICS_PREFIX, llm::metrics::DYN_METRICS_PREFIX,
// Model // Model
model::model_express::MODEL_EXPRESS_URL, model::model_express::MODEL_EXPRESS_URL,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment