Unverified Commit b6603d90 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: add Anthropic Messages API endpoint (/v1/messages) (#6231)


Signed-off-by: default avatarMatej Kosec <mkosec@nvidia.com>
Signed-off-by: default avatarMarko Kosec <mkosec@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: default avatarMatej Kosec <mkosec@nvidia.com>
parent 858f33fc
...@@ -81,6 +81,7 @@ class FrontendConfig(ConfigBase): ...@@ -81,6 +81,7 @@ class FrontendConfig(ConfigBase):
request_plane: str request_plane: str
event_plane: str event_plane: str
chat_processor: str chat_processor: str
enable_anthropic_api: bool
exp_python_factory: bool exp_python_factory: bool
def validate(self) -> None: def validate(self) -> None:
...@@ -492,6 +493,16 @@ class FrontendArgGroup(ArgGroup): ...@@ -492,6 +493,16 @@ class FrontendArgGroup(ArgGroup):
help="Determines how events are published [nats|zmq]", help="Determines how events are published [nats|zmq]",
choices=["nats", "zmq"], choices=["nats", "zmq"],
) )
add_negatable_bool_argument(
g,
flag_name="--enable-anthropic-api",
env_var="DYN_ENABLE_ANTHROPIC_API",
default=False,
help=(
"[EXPERIMENTAL] Enable Anthropic Messages API endpoint (/v1/messages). "
"This feature is experimental and may change."
),
)
add_argument( add_argument(
g, g,
flag_name="--chat-processor", flag_name="--chat-processor",
......
...@@ -233,6 +233,9 @@ async def async_main(): ...@@ -233,6 +233,9 @@ async def async_main():
if config.kserve_grpc_server and config.grpc_metrics_port: if config.kserve_grpc_server and config.grpc_metrics_port:
kwargs["http_metrics_port"] = config.grpc_metrics_port kwargs["http_metrics_port"] = config.grpc_metrics_port
if config.enable_anthropic_api:
os.environ["DYN_ENABLE_ANTHROPIC_API"] = "1"
if config.chat_processor == "vllm": if config.chat_processor == "vllm":
assert ( assert (
vllm_flags is not None vllm_flags is not None
......
...@@ -51,6 +51,7 @@ fn generate_openapi() -> anyhow::Result<()> { ...@@ -51,6 +51,7 @@ fn generate_openapi() -> anyhow::Result<()> {
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true) .enable_embeddings_endpoints(true)
.enable_responses_endpoints(true) .enable_responses_endpoints(true)
.enable_anthropic_endpoints(true)
.build() .build()
.context("failed to build HttpService for OpenAPI generation")?; .context("failed to build HttpService for OpenAPI generation")?;
......
...@@ -18,6 +18,8 @@ pub enum EndpointType { ...@@ -18,6 +18,8 @@ pub enum EndpointType {
Videos, Videos,
/// Responses API /// Responses API
Responses, Responses,
/// Anthropic Messages API
AnthropicMessages,
} }
impl EndpointType { impl EndpointType {
...@@ -29,6 +31,7 @@ impl EndpointType { ...@@ -29,6 +31,7 @@ impl EndpointType {
Self::Images => "images", Self::Images => "images",
Self::Videos => "videos", Self::Videos => "videos",
Self::Responses => "responses", Self::Responses => "responses",
Self::AnthropicMessages => "anthropic_messages",
} }
} }
...@@ -40,6 +43,7 @@ impl EndpointType { ...@@ -40,6 +43,7 @@ impl EndpointType {
Self::Images, Self::Images,
Self::Videos, Self::Videos,
Self::Responses, Self::Responses,
Self::AnthropicMessages,
] ]
} }
} }
...@@ -262,6 +262,7 @@ impl KserveServiceConfigBuilder { ...@@ -262,6 +262,7 @@ impl KserveServiceConfigBuilder {
.enable_cmpl_endpoints(false) .enable_cmpl_endpoints(false)
.enable_embeddings_endpoints(false) .enable_embeddings_endpoints(false)
.enable_responses_endpoints(false) .enable_responses_endpoints(false)
.enable_anthropic_endpoints(false)
.build()?; .build()?;
// Share the HTTP service's model manager and metrics object with gRPC state // Share the HTTP service's model manager and metrics object with gRPC state
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
//! //!
//! The [`service_v2::HttpService`] can be further extended to host any [`axum::Router`] using the [`service_v2::HttpServiceConfigBuilder`]. //! The [`service_v2::HttpService`] can be further extended to host any [`axum::Router`] using the [`service_v2::HttpServiceConfigBuilder`].
mod anthropic;
mod openai; mod openai;
pub mod busy_threshold; pub mod busy_threshold;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! HTTP handler for the Anthropic Messages API (`/v1/messages`).
//!
//! This is a translation layer: incoming Anthropic requests are converted to
//! chat completions, processed by the existing engine, and responses/streams
//! are converted back to Anthropic format.
use std::sync::Arc;
use axum::{
Json, Router,
body::Body,
extract::State,
http::{HeaderMap, Request, StatusCode},
middleware::{self, Next},
response::{
IntoResponse, Response,
sse::{KeepAlive, Sse},
},
routing::post,
};
use dynamo_runtime::pipeline::{AsyncEngineContextProvider, Context};
use futures::{StreamExt, stream};
use tracing::Instrument;
use super::{
RouteDoc,
disconnect::{ConnectionHandle, create_connection_monitor, monitor_for_disconnects},
metrics::{Endpoint, process_response_and_observe_metrics},
service_v2,
};
use crate::protocols::anthropic::stream_converter::AnthropicStreamConverter;
use crate::protocols::anthropic::types::{
AnthropicCountTokensRequest, AnthropicCountTokensResponse, AnthropicCreateMessageRequest,
AnthropicErrorBody, AnthropicErrorResponse, chat_completion_to_anthropic_response,
};
use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
aggregator::ChatCompletionAggregator,
};
use crate::request_template::RequestTemplate;
// Re-use helpers from the openai module (sibling under service/)
use super::openai::{get_body_limit, get_or_create_request_id};
// ---------------------------------------------------------------------------
// Router
// ---------------------------------------------------------------------------
/// Creates the router for the `/v1/messages` and `/v1/messages/count_tokens` endpoints.
pub fn anthropic_messages_router(
state: Arc<service_v2::State>,
template: Option<RequestTemplate>,
path: Option<String>,
) -> (Vec<RouteDoc>, Router) {
let path = path.unwrap_or("/v1/messages".to_string());
let count_tokens_path = format!("{}/count_tokens", &path);
let doc = RouteDoc::new(axum::http::Method::POST, &path);
let count_doc = RouteDoc::new(axum::http::Method::POST, &count_tokens_path);
let router = Router::new()
.route(&path, post(handler_anthropic_messages))
.route(&count_tokens_path, post(handler_count_tokens))
.layer(middleware::from_fn(anthropic_error_middleware))
.layer(axum::extract::DefaultBodyLimit::max(get_body_limit()))
.with_state((state, template));
(vec![doc, count_doc], router)
}
// ---------------------------------------------------------------------------
// Error middleware
// ---------------------------------------------------------------------------
/// Converts 422 validation errors to Anthropic error format.
async fn anthropic_error_middleware(request: Request<Body>, next: Next) -> Response {
let response = next.run(request).await;
if response.status() == StatusCode::UNPROCESSABLE_ENTITY {
let (_parts, body) = response.into_parts();
let body_bytes = axum::body::to_bytes(body, get_body_limit())
.await
.unwrap_or_default();
let error_message = String::from_utf8_lossy(&body_bytes).to_string();
return anthropic_error(
StatusCode::BAD_REQUEST,
"invalid_request_error",
&error_message,
);
}
response
}
// ---------------------------------------------------------------------------
// Handlers
// ---------------------------------------------------------------------------
/// Top-level HTTP handler for POST /v1/messages.
async fn handler_anthropic_messages(
State((state, template)): State<(Arc<service_v2::State>, Option<RequestTemplate>)>,
headers: HeaderMap,
Json(request): Json<AnthropicCreateMessageRequest>,
) -> Result<Response, Response> {
// Validate required fields
if request.messages.is_empty() {
return Err(anthropic_error(
StatusCode::BAD_REQUEST,
"invalid_request_error",
"messages: field required",
));
}
if request.max_tokens == 0 {
return Err(anthropic_error(
StatusCode::BAD_REQUEST,
"invalid_request_error",
"max_tokens: must be greater than 0",
));
}
// Create request context
let request_id = get_or_create_request_id(None, &headers);
let request = Context::with_id(request, request_id);
let context = request.context();
// Create connection handles
let (mut connection_handle, stream_handle) =
create_connection_monitor(context.clone(), Some(state.metrics_clone())).await;
let response =
tokio::spawn(anthropic_messages(state, template, request, stream_handle).in_current_span())
.await
.map_err(|e| {
anthropic_error(
StatusCode::INTERNAL_SERVER_ERROR,
"api_error",
&format!("Failed to await messages task: {:?}", e),
)
})?;
connection_handle.disarm();
response
}
/// Core logic for the Anthropic Messages endpoint.
#[tracing::instrument(level = "debug", skip_all, fields(request_id = %request.id()))]
async fn anthropic_messages(
state: Arc<service_v2::State>,
template: Option<RequestTemplate>,
mut request: Context<AnthropicCreateMessageRequest>,
mut stream_handle: ConnectionHandle,
) -> Result<Response, Response> {
let streaming = request.stream;
let request_id = request.id().to_string();
// Apply template defaults before capturing model (must happen first so
// engine lookup and metrics use the resolved model name).
if let Some(template) = template {
if request.model.is_empty() {
request.model = template.model.clone();
}
if request.temperature.is_none() {
request.temperature = Some(template.temperature);
}
if request.max_tokens == 0 {
request.max_tokens = template.max_completion_tokens;
}
}
let model = request.model.clone();
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
tracing::trace!("Received Anthropic messages request: {:?}", &*request);
let (orig_request, context) = request.into_parts();
let model_for_resp = orig_request.model.clone();
// Convert Anthropic request -> Chat Completion request
let chat_request: NvCreateChatCompletionRequest =
orig_request.try_into().map_err(|e: anyhow::Error| {
tracing::error!(
request_id,
error = %e,
"Failed to convert AnthropicCreateMessageRequest to NvCreateChatCompletionRequest",
);
anthropic_error(
StatusCode::BAD_REQUEST,
"invalid_request_error",
&format!("Failed to convert request: {}", e),
)
})?;
let request = context.map(|_req| chat_request);
tracing::trace!("Getting chat completions engine for model: {}", model);
let engine = state
.manager()
.get_chat_completions_engine(&model)
.map_err(|_| {
anthropic_error(
StatusCode::NOT_FOUND,
"not_found_error",
&format!("Model '{}' not found", model),
)
})?;
let parsing_options = state.manager().get_parsing_options(&model);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
tracing::trace!("Issuing generate call for Anthropic messages");
let engine_stream = engine.generate(request).await.map_err(|e| {
anthropic_error(
StatusCode::INTERNAL_SERVER_ERROR,
"api_error",
&format!("Failed to generate completions: {}", e),
)
})?;
let ctx = engine_stream.context();
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::AnthropicMessages, streaming);
if streaming {
stream_handle.arm();
use std::sync::atomic::{AtomicBool, Ordering};
let mut converter = AnthropicStreamConverter::new(model_for_resp);
let start_events = converter.emit_start_events();
let converter = std::sync::Arc::new(std::sync::Mutex::new(converter));
let converter_end = converter.clone();
let saw_error = std::sync::Arc::new(AtomicBool::new(false));
let saw_error_end = saw_error.clone();
let mut http_queue_guard = Some(http_queue_guard);
let event_stream = engine_stream
.inspect(move |response| {
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
})
.filter_map(move |annotated_chunk| {
let converter = converter.clone();
let saw_error = saw_error.clone();
async move {
if annotated_chunk.data.is_none() {
if annotated_chunk.event.as_deref() == Some("error") {
saw_error.store(true, Ordering::Release);
}
return None;
}
let stream_resp = annotated_chunk.data?;
let mut conv = converter.lock().expect("converter lock poisoned");
let events = conv.process_chunk(&stream_resp);
Some(stream::iter(events))
}
})
.flatten();
let start_stream = stream::iter(start_events);
let done_stream = stream::once(async move {
let mut conv = converter_end.lock().expect("converter lock poisoned");
let end_events = if saw_error_end.load(Ordering::Acquire) {
conv.emit_error_events()
} else {
conv.emit_end_events()
};
stream::iter(end_events)
})
.flatten();
let full_stream = start_stream.chain(event_stream).chain(done_stream);
let full_stream = full_stream.map(|result| result.map_err(axum::Error::new));
let stream = monitor_for_disconnects(full_stream, ctx, inflight_guard, stream_handle);
let mut sse_stream = Sse::new(stream);
if let Some(keep_alive) = state.sse_keep_alive() {
sse_stream = sse_stream.keep_alive(KeepAlive::default().interval(keep_alive));
}
Ok(sse_stream.into_response())
} else {
// Non-streaming path: aggregate stream into single response
// Check first event for backend errors using the openai helper
let stream_with_check = super::openai::check_for_backend_error(engine_stream)
.await
.map_err(|(status, json_err)| {
tracing::error!(request_id, %status, ?json_err, "Backend error detected");
anthropic_error(
StatusCode::INTERNAL_SERVER_ERROR,
"api_error",
"Backend error during generation",
)
})?;
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream_with_check.inspect(move |response| {
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
let chat_response =
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone())
.await
.map_err(|e| {
tracing::error!(request_id, "Failed to fold messages stream: {:?}", e);
anthropic_error(
StatusCode::INTERNAL_SERVER_ERROR,
"api_error",
&format!("Failed to fold messages stream: {}", e),
)
})?;
let response = chat_completion_to_anthropic_response(chat_response, &model_for_resp);
inflight_guard.mark_ok();
Ok(Json(response).into_response())
}
}
// ---------------------------------------------------------------------------
// Count tokens
// ---------------------------------------------------------------------------
/// Handler for POST /v1/messages/count_tokens.
/// Returns an estimated input token count using a len/3 heuristic.
async fn handler_count_tokens(
State((_state, _template)): State<(Arc<service_v2::State>, Option<RequestTemplate>)>,
Json(request): Json<AnthropicCountTokensRequest>,
) -> Result<Response, Response> {
let tokens = request.estimate_tokens();
Ok(Json(AnthropicCountTokensResponse {
input_tokens: tokens,
})
.into_response())
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// Build an Anthropic-formatted error response.
/// Maps HTTP status codes to Anthropic error types following the Anthropic API spec.
fn anthropic_error(status: StatusCode, error_type: &str, message: &str) -> Response {
let mapped_type = match status.as_u16() {
400 => "invalid_request_error",
401 => "authentication_error",
403 => "permission_error",
404 => "not_found_error",
429 => "rate_limit_error",
503 | 529 => "overloaded_error",
// Use the caller-provided type for other codes (e.g. 500 → "api_error")
_ => error_type,
};
(
status,
Json(AnthropicErrorResponse {
object_type: "error".to_string(),
error: AnthropicErrorBody {
error_type: mapped_type.to_string(),
message: message.to_string(),
},
}),
)
.into_response()
}
...@@ -297,6 +297,9 @@ pub enum Endpoint { ...@@ -297,6 +297,9 @@ pub enum Endpoint {
/// OAI Responses /// OAI Responses
Responses, Responses,
/// Anthropic Messages
AnthropicMessages,
/// Tensor /// Tensor
Tensor, Tensor,
} }
...@@ -948,6 +951,7 @@ impl std::fmt::Display for Endpoint { ...@@ -948,6 +951,7 @@ impl std::fmt::Display for Endpoint {
Endpoint::Images => write!(f, "images"), Endpoint::Images => write!(f, "images"),
Endpoint::Videos => write!(f, "videos"), Endpoint::Videos => write!(f, "videos"),
Endpoint::Responses => write!(f, "responses"), Endpoint::Responses => write!(f, "responses"),
Endpoint::AnthropicMessages => write!(f, "anthropic_messages"),
Endpoint::Tensor => write!(f, "tensor"), Endpoint::Tensor => write!(f, "tensor"),
} }
} }
...@@ -962,6 +966,7 @@ impl Endpoint { ...@@ -962,6 +966,7 @@ impl Endpoint {
Endpoint::Images => "images", Endpoint::Images => "images",
Endpoint::Videos => "videos", Endpoint::Videos => "videos",
Endpoint::Responses => "responses", Endpoint::Responses => "responses",
Endpoint::AnthropicMessages => "anthropic_messages",
Endpoint::Tensor => "tensor", Endpoint::Tensor => "tensor",
} }
} }
......
...@@ -68,7 +68,7 @@ const VALIDATION_PREFIX: &str = "Validation: "; ...@@ -68,7 +68,7 @@ const VALIDATION_PREFIX: &str = "Validation: ";
// Default axum max body limit without configuring is 2MB: https://docs.rs/axum/latest/axum/extract/struct.DefaultBodyLimit.html // Default axum max body limit without configuring is 2MB: https://docs.rs/axum/latest/axum/extract/struct.DefaultBodyLimit.html
/// Default body limit in bytes (45MB) to support 500k+ token payloads. /// Default body limit in bytes (45MB) to support 500k+ token payloads.
/// Can be configured at compile time using the DYN_FRONTEND_BODY_LIMIT_MB environment variable /// Can be configured at compile time using the DYN_FRONTEND_BODY_LIMIT_MB environment variable
fn get_body_limit() -> usize { pub(super) fn get_body_limit() -> usize {
std::env::var(env_llm::DYN_HTTP_BODY_LIMIT_MB) std::env::var(env_llm::DYN_HTTP_BODY_LIMIT_MB)
.ok() .ok()
.and_then(|s| s.parse::<usize>().ok()) .and_then(|s| s.parse::<usize>().ok())
...@@ -248,7 +248,7 @@ pub async fn smart_json_error_middleware(request: Request<Body>, next: Next) -> ...@@ -248,7 +248,7 @@ pub async fn smart_json_error_middleware(request: Request<Body>, next: Next) ->
/// Get the request ID from a primary source, or next from the headers, or lastly create a new one if not present /// Get the request ID from a primary source, or next from the headers, or lastly create a new one if not present
// TODO: Similar function exists in lib/llm/src/grpc/service/openai.rs but with different signature and simpler logic // TODO: Similar function exists in lib/llm/src/grpc/service/openai.rs but with different signature and simpler logic
fn get_or_create_request_id(primary: Option<&str>, headers: &HeaderMap) -> String { pub(super) fn get_or_create_request_id(primary: Option<&str>, headers: &HeaderMap) -> String {
// Try to get request id from trace context // Try to get request id from trace context
if let Some(trace_context) = get_distributed_tracing_context() if let Some(trace_context) = get_distributed_tracing_context()
&& let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id && let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id
...@@ -821,7 +821,7 @@ fn extract_backend_error_if_present<T: serde::Serialize>( ...@@ -821,7 +821,7 @@ fn extract_backend_error_if_present<T: serde::Serialize>(
/// Checks if the first event in the stream is a backend error. /// Checks if the first event in the stream is a backend error.
/// Returns Err(ErrorResponse) if error detected, Ok(stream) otherwise. /// Returns Err(ErrorResponse) if error detected, Ok(stream) otherwise.
async fn check_for_backend_error( pub(super) async fn check_for_backend_error(
mut stream: impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> mut stream: impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>
+ Send + Send
+ Unpin + Unpin
......
...@@ -23,6 +23,7 @@ use crate::request_template::RequestTemplate; ...@@ -23,6 +23,7 @@ use crate::request_template::RequestTemplate;
use anyhow::Result; use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig; use axum_server::tls_rustls::RustlsConfig;
use derive_builder::Builder; use derive_builder::Builder;
use dynamo_runtime::config::env_is_truthy;
use dynamo_runtime::config::environment_names::llm as env_llm; use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::Discovery; use dynamo_runtime::discovery::Discovery;
use dynamo_runtime::logging::make_request_span; use dynamo_runtime::logging::make_request_span;
...@@ -48,6 +49,7 @@ struct StateFlags { ...@@ -48,6 +49,7 @@ struct StateFlags {
images_endpoints_enabled: AtomicBool, images_endpoints_enabled: AtomicBool,
videos_endpoints_enabled: AtomicBool, videos_endpoints_enabled: AtomicBool,
responses_endpoints_enabled: AtomicBool, responses_endpoints_enabled: AtomicBool,
anthropic_endpoints_enabled: AtomicBool,
} }
impl StateFlags { impl StateFlags {
...@@ -59,6 +61,9 @@ impl StateFlags { ...@@ -59,6 +61,9 @@ impl StateFlags {
EndpointType::Images => self.images_endpoints_enabled.load(Ordering::Relaxed), EndpointType::Images => self.images_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Videos => self.videos_endpoints_enabled.load(Ordering::Relaxed), EndpointType::Videos => self.videos_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::Responses => self.responses_endpoints_enabled.load(Ordering::Relaxed), EndpointType::Responses => self.responses_endpoints_enabled.load(Ordering::Relaxed),
EndpointType::AnthropicMessages => {
self.anthropic_endpoints_enabled.load(Ordering::Relaxed)
}
} }
} }
...@@ -82,6 +87,9 @@ impl StateFlags { ...@@ -82,6 +87,9 @@ impl StateFlags {
EndpointType::Responses => self EndpointType::Responses => self
.responses_endpoints_enabled .responses_endpoints_enabled
.store(enabled, Ordering::Relaxed), .store(enabled, Ordering::Relaxed),
EndpointType::AnthropicMessages => self
.anthropic_endpoints_enabled
.store(enabled, Ordering::Relaxed),
} }
} }
} }
...@@ -103,6 +111,7 @@ impl State { ...@@ -103,6 +111,7 @@ impl State {
images_endpoints_enabled: AtomicBool::new(false), images_endpoints_enabled: AtomicBool::new(false),
videos_endpoints_enabled: AtomicBool::new(false), videos_endpoints_enabled: AtomicBool::new(false),
responses_endpoints_enabled: AtomicBool::new(false), responses_endpoints_enabled: AtomicBool::new(false),
anthropic_endpoints_enabled: AtomicBool::new(false),
}, },
cancel_token, cancel_token,
} }
...@@ -187,6 +196,9 @@ pub struct HttpServiceConfig { ...@@ -187,6 +196,9 @@ pub struct HttpServiceConfig {
#[builder(default = "true")] #[builder(default = "true")]
enable_responses_endpoints: bool, enable_responses_endpoints: bool,
#[builder(default = "false")]
enable_anthropic_endpoints: bool,
#[builder(default = "None")] #[builder(default = "None")]
request_template: Option<RequestTemplate>, request_template: Option<RequestTemplate>,
...@@ -345,6 +357,8 @@ static HTTP_SVC_CMP_PATH_ENV: &str = "DYN_HTTP_SVC_CMP_PATH"; ...@@ -345,6 +357,8 @@ static HTTP_SVC_CMP_PATH_ENV: &str = "DYN_HTTP_SVC_CMP_PATH";
static HTTP_SVC_EMB_PATH_ENV: &str = "DYN_HTTP_SVC_EMB_PATH"; static HTTP_SVC_EMB_PATH_ENV: &str = "DYN_HTTP_SVC_EMB_PATH";
/// Environment variable to set the responses endpoint path (default: `/v1/responses`) /// Environment variable to set the responses endpoint path (default: `/v1/responses`)
static HTTP_SVC_RESPONSES_PATH_ENV: &str = "DYN_HTTP_SVC_RESPONSES_PATH"; static HTTP_SVC_RESPONSES_PATH_ENV: &str = "DYN_HTTP_SVC_RESPONSES_PATH";
/// Environment variable to set the anthropic messages endpoint path (default: `/v1/messages`)
static HTTP_SVC_ANTHROPIC_PATH_ENV: &str = "DYN_HTTP_SVC_ANTHROPIC_PATH";
impl HttpServiceConfigBuilder { impl HttpServiceConfigBuilder {
pub fn build(self) -> Result<HttpService, anyhow::Error> { pub fn build(self) -> Result<HttpService, anyhow::Error> {
...@@ -379,6 +393,10 @@ impl HttpServiceConfigBuilder { ...@@ -379,6 +393,10 @@ impl HttpServiceConfigBuilder {
state state
.flags .flags
.set(&EndpointType::Responses, config.enable_responses_endpoints); .set(&EndpointType::Responses, config.enable_responses_endpoints);
state.flags.set(
&EndpointType::AnthropicMessages,
config.enable_anthropic_endpoints,
);
// enable prometheus metrics // enable prometheus metrics
let registry = metrics::Registry::new(); let registry = metrics::Registry::new();
...@@ -501,7 +519,6 @@ impl HttpServiceConfigBuilder { ...@@ -501,7 +519,6 @@ impl HttpServiceConfigBuilder {
request_template.clone(), request_template.clone(),
var(HTTP_SVC_RESPONSES_PATH_ENV).ok(), var(HTTP_SVC_RESPONSES_PATH_ENV).ok(),
); );
let mut endpoint_routes = HashMap::new(); let mut endpoint_routes = HashMap::new();
endpoint_routes.insert(EndpointType::Chat, (chat_docs, chat_route)); endpoint_routes.insert(EndpointType::Chat, (chat_docs, chat_route));
endpoint_routes.insert(EndpointType::Completion, (cmpl_docs, cmpl_route)); endpoint_routes.insert(EndpointType::Completion, (cmpl_docs, cmpl_route));
...@@ -510,6 +527,19 @@ impl HttpServiceConfigBuilder { ...@@ -510,6 +527,19 @@ impl HttpServiceConfigBuilder {
endpoint_routes.insert(EndpointType::Videos, (videos_docs, videos_route)); endpoint_routes.insert(EndpointType::Videos, (videos_docs, videos_route));
endpoint_routes.insert(EndpointType::Responses, (responses_docs, responses_route)); endpoint_routes.insert(EndpointType::Responses, (responses_docs, responses_route));
if env_is_truthy(env_llm::DYN_ENABLE_ANTHROPIC_API) {
tracing::warn!("Anthropic Messages API (/v1/messages) is experimental.");
let (anthropic_docs, anthropic_route) = super::anthropic::anthropic_messages_router(
state.clone(),
request_template.clone(),
var(HTTP_SVC_ANTHROPIC_PATH_ENV).ok(),
);
endpoint_routes.insert(
EndpointType::AnthropicMessages,
(anthropic_docs, anthropic_route),
);
}
for endpoint_type in EndpointType::all() { for endpoint_type in EndpointType::all() {
let state_route = state.clone(); let state_route = state.clone();
if !endpoint_routes.contains_key(&endpoint_type) { if !endpoint_routes.contains_key(&endpoint_type) {
......
...@@ -129,6 +129,14 @@ impl ModelType { ...@@ -129,6 +129,14 @@ impl ModelType {
let mut endpoint_types = Vec::new(); let mut endpoint_types = Vec::new();
if self.contains(Self::Chat) { if self.contains(Self::Chat) {
endpoint_types.push(crate::endpoint_type::EndpointType::Chat); endpoint_types.push(crate::endpoint_type::EndpointType::Chat);
// Translation layers over chat completions
endpoint_types.push(crate::endpoint_type::EndpointType::Responses);
// AnthropicMessages is gated by DYN_ENABLE_ANTHROPIC_API env var
if dynamo_runtime::config::env_is_truthy(
dynamo_runtime::config::environment_names::llm::DYN_ENABLE_ANTHROPIC_API,
) {
endpoint_types.push(crate::endpoint_type::EndpointType::AnthropicMessages);
}
} }
if self.contains(Self::Completions) { if self.contains(Self::Completions) {
endpoint_types.push(crate::endpoint_type::EndpointType::Completion); endpoint_types.push(crate::endpoint_type::EndpointType::Completion);
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub mod anthropic;
pub mod codec; pub mod codec;
pub mod common; pub mod common;
pub mod openai; pub mod openai;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Anthropic Messages API protocol types and conversion logic.
//!
//! This module provides types for the Anthropic Messages API (`/v1/messages`)
//! and conversion logic to/from the internal chat completions representation.
pub mod stream_converter;
pub mod types;
pub use types::*;
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Converts a stream of chat completion SSE chunks into Anthropic Messages API SSE events.
//!
//! The event sequence follows the Anthropic streaming spec:
//! `message_start` -> `content_block_start` -> N x `content_block_delta` ->
//! `content_block_stop` -> `message_delta` -> `message_stop`
use std::collections::HashSet;
use axum::response::sse::Event;
use dynamo_async_openai::types::ChatCompletionMessageContent;
use uuid::Uuid;
use super::types::{
AnthropicDelta, AnthropicErrorBody, AnthropicMessageDeltaBody, AnthropicMessageResponse,
AnthropicResponseContentBlock, AnthropicStopReason, AnthropicStreamEvent, AnthropicUsage,
};
use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
/// State machine that converts a chat completion stream into Anthropic SSE events.
pub struct AnthropicStreamConverter {
model: String,
message_id: String,
// Text tracking
text_block_started: bool,
text_block_closed: bool,
text_block_index: u32,
// Token usage (from engine)
input_token_count: u32,
output_token_count: u32,
// Tool call tracking
tool_call_states: Vec<ToolCallState>,
tool_calls_sent: HashSet<String>,
// Block index counter
next_block_index: u32,
// Stop reason
stop_reason: Option<AnthropicStopReason>,
}
struct ToolCallState {
id: String,
name: String,
accumulated_args: String,
block_index: u32,
started: bool,
}
impl AnthropicStreamConverter {
pub fn new(model: String) -> Self {
Self {
model,
message_id: format!("msg_{}", Uuid::new_v4().simple()),
text_block_started: false,
text_block_closed: false,
text_block_index: 0,
input_token_count: 0,
output_token_count: 0,
tool_call_states: Vec::new(),
tool_calls_sent: HashSet::new(),
next_block_index: 0,
stop_reason: None,
}
}
/// Emit the initial `message_start` event.
pub fn emit_start_events(&mut self) -> Vec<Result<Event, anyhow::Error>> {
let message = AnthropicMessageResponse {
id: self.message_id.clone(),
object_type: "message".to_string(),
role: "assistant".to_string(),
content: vec![],
model: self.model.clone(),
stop_reason: None,
stop_sequence: None,
usage: AnthropicUsage {
input_tokens: 0,
output_tokens: 0,
},
};
let event = AnthropicStreamEvent::MessageStart { message };
vec![make_sse_event("message_start", &event)]
}
/// Process a single chat completion stream chunk and return zero or more SSE events.
pub fn process_chunk(
&mut self,
chunk: &NvCreateChatCompletionStreamResponse,
) -> Vec<Result<Event, anyhow::Error>> {
let mut events = Vec::new();
// Capture real token usage from engine when available (typically on the final chunk).
if let Some(usage) = &chunk.usage {
self.input_token_count = usage.prompt_tokens;
self.output_token_count = usage.completion_tokens;
}
for choice in &chunk.choices {
let delta = &choice.delta;
// Track finish reason
if let Some(ref fr) = choice.finish_reason {
self.stop_reason = Some(match fr {
dynamo_async_openai::types::FinishReason::Stop => AnthropicStopReason::EndTurn,
dynamo_async_openai::types::FinishReason::Length => {
AnthropicStopReason::MaxTokens
}
dynamo_async_openai::types::FinishReason::ToolCalls => {
AnthropicStopReason::ToolUse
}
dynamo_async_openai::types::FinishReason::ContentFilter => {
AnthropicStopReason::EndTurn
}
dynamo_async_openai::types::FinishReason::FunctionCall => {
AnthropicStopReason::ToolUse
}
});
}
// Handle text content deltas
let content_text = match &delta.content {
Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()),
_ => None,
};
if let Some(text) = content_text
&& !text.is_empty()
{
// Emit content_block_start on first text
if !self.text_block_started {
self.text_block_started = true;
self.text_block_index = self.next_block_index;
self.next_block_index += 1;
let block_start = AnthropicStreamEvent::ContentBlockStart {
index: self.text_block_index,
content_block: AnthropicResponseContentBlock::Text {
text: String::new(),
},
};
events.push(make_sse_event("content_block_start", &block_start));
}
// Emit text delta
let block_delta = AnthropicStreamEvent::ContentBlockDelta {
index: self.text_block_index,
delta: AnthropicDelta::TextDelta {
text: text.to_string(),
},
};
events.push(make_sse_event("content_block_delta", &block_delta));
}
// Handle tool call deltas
if let Some(tool_calls) = &delta.tool_calls {
// Close the text block before opening any tool blocks.
// Anthropic streaming spec requires each block to be closed
// (content_block_stop) before the next block starts.
if self.text_block_started && !self.text_block_closed {
self.text_block_closed = true;
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: self.text_block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
for tc in tool_calls {
let tc_index = tc.index as usize;
// Ensure we have state for this tool call index
while self.tool_call_states.len() <= tc_index {
let block_index = self.next_block_index;
self.next_block_index += 1;
self.tool_call_states.push(ToolCallState {
id: String::new(),
name: String::new(),
accumulated_args: String::new(),
block_index,
started: false,
});
}
// Update id and name if provided
if let Some(id) = &tc.id {
self.tool_call_states[tc_index].id = id.clone();
}
if let Some(func) = &tc.function {
if let Some(name) = &func.name {
self.tool_call_states[tc_index].name = name.clone();
}
if let Some(args) = &func.arguments {
// Emit content_block_start on first delta for this tool call
if !self.tool_call_states[tc_index].started {
let tc_id = self.tool_call_states[tc_index].id.clone();
// Dedup guard: skip if we've already emitted this tool call ID
if !tc_id.is_empty() && self.tool_calls_sent.contains(&tc_id) {
continue;
}
self.tool_call_states[tc_index].started = true;
let block_index = self.tool_call_states[tc_index].block_index;
let tc_name = self.tool_call_states[tc_index].name.clone();
if !tc_id.is_empty() {
self.tool_calls_sent.insert(tc_id.clone());
}
let block_start = AnthropicStreamEvent::ContentBlockStart {
index: block_index,
content_block: AnthropicResponseContentBlock::ToolUse {
id: tc_id,
name: tc_name,
input: serde_json::json!({}),
},
};
events.push(make_sse_event("content_block_start", &block_start));
}
self.tool_call_states[tc_index]
.accumulated_args
.push_str(args);
let block_index = self.tool_call_states[tc_index].block_index;
let block_delta = AnthropicStreamEvent::ContentBlockDelta {
index: block_index,
delta: AnthropicDelta::InputJsonDelta {
partial_json: args.clone(),
},
};
events.push(make_sse_event("content_block_delta", &block_delta));
}
}
}
}
}
events
}
/// Emit the final events when the stream ends.
pub fn emit_end_events(&mut self) -> Vec<Result<Event, anyhow::Error>> {
let mut events = Vec::new();
// Close text block if started and not already closed mid-stream
if self.text_block_started && !self.text_block_closed {
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: self.text_block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
// Close tool call blocks
for tc in &self.tool_call_states {
if tc.started {
let block_stop = AnthropicStreamEvent::ContentBlockStop {
index: tc.block_index,
};
events.push(make_sse_event("content_block_stop", &block_stop));
}
}
// Emit message_delta with stop_reason and real token usage from engine
let message_delta = AnthropicStreamEvent::MessageDelta {
delta: AnthropicMessageDeltaBody {
stop_reason: self.stop_reason.clone(),
stop_sequence: None,
},
usage: AnthropicUsage {
input_tokens: self.input_token_count,
output_tokens: self.output_token_count,
},
};
events.push(make_sse_event("message_delta", &message_delta));
// Emit message_stop
let message_stop = AnthropicStreamEvent::MessageStop {};
events.push(make_sse_event("message_stop", &message_stop));
events
}
/// Emit error events when the stream ends due to a backend error.
pub fn emit_error_events(&mut self) -> Vec<Result<Event, anyhow::Error>> {
let error_event = AnthropicStreamEvent::Error {
error: AnthropicErrorBody {
error_type: "api_error".to_string(),
message: "An internal error occurred during generation.".to_string(),
},
};
vec![make_sse_event("error", &error_event)]
}
}
fn make_sse_event(event_type: &str, event: &AnthropicStreamEvent) -> Result<Event, anyhow::Error> {
let data = serde_json::to_string(event)?;
Ok(Event::default().event(event_type).data(data))
}
/// A tagged event for testing: the event type string paired with the
/// serialized stream event. This avoids needing to parse `axum::sse::Event`
/// (which doesn't implement `Display`).
#[cfg(test)]
#[derive(Debug)]
struct TaggedEvent {
event_type: String,
data: AnthropicStreamEvent,
}
#[cfg(test)]
fn make_tagged_event(event_type: &str, event: &AnthropicStreamEvent) -> TaggedEvent {
TaggedEvent {
event_type: event_type.to_string(),
data: event.clone(),
}
}
#[cfg(test)]
impl AnthropicStreamConverter {
/// Like `process_chunk` but returns tagged events for test assertions.
fn process_chunk_tagged(
&mut self,
chunk: &NvCreateChatCompletionStreamResponse,
) -> Vec<TaggedEvent> {
let mut events = Vec::new();
if let Some(usage) = &chunk.usage {
self.input_token_count = usage.prompt_tokens;
self.output_token_count = usage.completion_tokens;
}
for choice in &chunk.choices {
let delta = &choice.delta;
if let Some(ref fr) = choice.finish_reason {
self.stop_reason = Some(match fr {
dynamo_async_openai::types::FinishReason::Stop => AnthropicStopReason::EndTurn,
dynamo_async_openai::types::FinishReason::Length => {
AnthropicStopReason::MaxTokens
}
dynamo_async_openai::types::FinishReason::ToolCalls => {
AnthropicStopReason::ToolUse
}
dynamo_async_openai::types::FinishReason::ContentFilter => {
AnthropicStopReason::EndTurn
}
dynamo_async_openai::types::FinishReason::FunctionCall => {
AnthropicStopReason::ToolUse
}
});
}
let content_text = match &delta.content {
Some(ChatCompletionMessageContent::Text(text)) => Some(text.as_str()),
_ => None,
};
if let Some(text) = content_text
&& !text.is_empty()
{
if !self.text_block_started {
self.text_block_started = true;
self.text_block_index = self.next_block_index;
self.next_block_index += 1;
let ev = AnthropicStreamEvent::ContentBlockStart {
index: self.text_block_index,
content_block: AnthropicResponseContentBlock::Text {
text: String::new(),
},
};
events.push(make_tagged_event("content_block_start", &ev));
}
self.output_token_count += 1;
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: self.text_block_index,
delta: AnthropicDelta::TextDelta {
text: text.to_string(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
}
if let Some(tool_calls) = &delta.tool_calls {
if self.text_block_started && !self.text_block_closed {
self.text_block_closed = true;
let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.text_block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
for tc in tool_calls {
let tc_index = tc.index as usize;
while self.tool_call_states.len() <= tc_index {
let block_index = self.next_block_index;
self.next_block_index += 1;
self.tool_call_states.push(ToolCallState {
id: String::new(),
name: String::new(),
accumulated_args: String::new(),
block_index,
started: false,
});
}
if let Some(id) = &tc.id {
self.tool_call_states[tc_index].id = id.clone();
}
if let Some(func) = &tc.function {
if let Some(name) = &func.name {
self.tool_call_states[tc_index].name = name.clone();
}
if let Some(args) = &func.arguments {
if !self.tool_call_states[tc_index].started {
let tc_id = self.tool_call_states[tc_index].id.clone();
if !tc_id.is_empty() && self.tool_calls_sent.contains(&tc_id) {
continue;
}
self.tool_call_states[tc_index].started = true;
let block_index = self.tool_call_states[tc_index].block_index;
let tc_name = self.tool_call_states[tc_index].name.clone();
if !tc_id.is_empty() {
self.tool_calls_sent.insert(tc_id.clone());
}
let ev = AnthropicStreamEvent::ContentBlockStart {
index: block_index,
content_block: AnthropicResponseContentBlock::ToolUse {
id: tc_id,
name: tc_name,
input: serde_json::json!({}),
},
};
events.push(make_tagged_event("content_block_start", &ev));
}
self.tool_call_states[tc_index]
.accumulated_args
.push_str(args);
let block_index = self.tool_call_states[tc_index].block_index;
let ev = AnthropicStreamEvent::ContentBlockDelta {
index: block_index,
delta: AnthropicDelta::InputJsonDelta {
partial_json: args.clone(),
},
};
events.push(make_tagged_event("content_block_delta", &ev));
}
}
}
}
}
events
}
/// Like `emit_end_events` but returns tagged events for test assertions.
fn emit_end_events_tagged(&mut self) -> Vec<TaggedEvent> {
let mut events = Vec::new();
if self.text_block_started && !self.text_block_closed {
let ev = AnthropicStreamEvent::ContentBlockStop {
index: self.text_block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
for tc in &self.tool_call_states {
if tc.started {
let ev = AnthropicStreamEvent::ContentBlockStop {
index: tc.block_index,
};
events.push(make_tagged_event("content_block_stop", &ev));
}
}
let ev = AnthropicStreamEvent::MessageDelta {
delta: AnthropicMessageDeltaBody {
stop_reason: self.stop_reason.clone(),
stop_sequence: None,
},
usage: AnthropicUsage {
input_tokens: self.input_token_count,
output_tokens: self.output_token_count,
},
};
events.push(make_tagged_event("message_delta", &ev));
let ev = AnthropicStreamEvent::MessageStop {};
events.push(make_tagged_event("message_stop", &ev));
events
}
}
#[cfg(test)]
mod tests {
use super::*;
use dynamo_async_openai::types::{
ChatChoiceStream, ChatCompletionMessageContent, ChatCompletionMessageToolCallChunk,
ChatCompletionStreamResponseDelta, ChatCompletionToolType, FunctionCallStream,
};
fn text_chunk(text: &str) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: Some(ChatCompletionMessageContent::Text(text.into())),
function_call: None,
tool_calls: None,
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}],
created: 0,
model: "test".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
nvext: None,
}
}
fn tool_call_chunk(
tc_index: u32,
id: Option<&str>,
name: Option<&str>,
args: Option<&str>,
) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(vec![ChatCompletionMessageToolCallChunk {
index: tc_index,
id: id.map(String::from),
r#type: Some(ChatCompletionToolType::Function),
function: Some(FunctionCallStream {
name: name.map(String::from),
arguments: args.map(String::from),
}),
}]),
role: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}],
created: 0,
model: "test".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
nvext: None,
}
}
fn event_types(events: &[TaggedEvent]) -> Vec<&str> {
events.iter().map(|e| e.event_type.as_str()).collect()
}
/// Regression test: text block must be closed (content_block_stop)
/// before the tool_use block starts (content_block_start).
///
/// Without this fix, the text block stop was batched at the end,
/// causing Claude Code's streaming parser to receive out-of-order
/// events and fail to execute tool calls ("Error editing file").
#[test]
fn test_text_block_stops_before_tool_block_starts() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
// Stream some text
let text_events = conv.process_chunk_tagged(&text_chunk("I'll edit the file."));
assert_eq!(
event_types(&text_events),
vec!["content_block_start", "content_block_delta"]
);
// Stream a tool call — text block must close first
let tool_events = conv.process_chunk_tagged(&tool_call_chunk(
0,
Some("call-1"),
Some("Edit"),
Some("{\"file_path\":\"/tmp/test.txt\"}"),
));
assert_eq!(
event_types(&tool_events),
vec![
"content_block_stop",
"content_block_start",
"content_block_delta"
],
"text block must be closed before tool block starts"
);
// Verify indices: stop=0 (text), start=1 (tool)
match &tool_events[0].data {
AnthropicStreamEvent::ContentBlockStop { index } => assert_eq!(*index, 0),
other => panic!("expected ContentBlockStop, got {other:?}"),
}
match &tool_events[1].data {
AnthropicStreamEvent::ContentBlockStart {
index,
content_block,
} => {
assert_eq!(*index, 1);
match content_block {
AnthropicResponseContentBlock::ToolUse { name, .. } => {
assert_eq!(name, "Edit");
}
other => panic!("expected ToolUse, got {other:?}"),
}
}
other => panic!("expected ContentBlockStart, got {other:?}"),
}
// End events should NOT duplicate the text block stop
let end_events = conv.emit_end_events_tagged();
assert_eq!(
event_types(&end_events),
vec!["content_block_stop", "message_delta", "message_stop"],
"only tool block stop in end events (text already closed)"
);
match &end_events[0].data {
AnthropicStreamEvent::ContentBlockStop { index } => assert_eq!(*index, 1),
other => panic!("expected tool stop at index 1, got {other:?}"),
}
}
/// Tool-only response (no preceding text): no spurious stop events.
#[test]
fn test_tool_only_response_no_text_block() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
let tool_events = conv.process_chunk_tagged(&tool_call_chunk(
0,
Some("call-1"),
Some("Read"),
Some("{\"path\":\"/tmp/test.txt\"}"),
));
assert_eq!(
event_types(&tool_events),
vec!["content_block_start", "content_block_delta"]
);
let end_events = conv.emit_end_events_tagged();
assert_eq!(
event_types(&end_events),
vec!["content_block_stop", "message_delta", "message_stop"]
);
}
/// Text-only response: stop emitted in end events (no early close).
#[test]
fn test_text_only_response_stop_in_end_events() {
let mut conv = AnthropicStreamConverter::new("test-model".into());
conv.process_chunk_tagged(&text_chunk("Hello world"));
let end_events = conv.emit_end_events_tagged();
assert_eq!(
event_types(&end_events),
vec!["content_block_stop", "message_delta", "message_stop"]
);
match &end_events[0].data {
AnthropicStreamEvent::ContentBlockStop { index } => assert_eq!(*index, 0),
other => panic!("expected text stop at index 0, got {other:?}"),
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Anthropic Messages API types and conversion logic.
//!
//! All request/response types for the `/v1/messages` endpoint, plus
//! bidirectional conversion to/from the internal chat completions format.
use dynamo_async_openai::types::{
ChatCompletionMessageToolCall, ChatCompletionNamedToolChoice,
ChatCompletionRequestAssistantMessage, ChatCompletionRequestAssistantMessageContent,
ChatCompletionRequestMessage, ChatCompletionRequestSystemMessage,
ChatCompletionRequestSystemMessageContent, ChatCompletionRequestToolMessage,
ChatCompletionRequestToolMessageContent, ChatCompletionRequestUserMessage,
ChatCompletionRequestUserMessageContent, ChatCompletionTool, ChatCompletionToolChoiceOption,
ChatCompletionToolType, FunctionName, FunctionObject,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
};
use crate::protocols::openai::common_ext::CommonExt;
// ---------------------------------------------------------------------------
// Custom deserializers
// ---------------------------------------------------------------------------
/// Deserialize `system` from either a plain string or an array of text blocks.
/// The Anthropic API accepts both `"system": "text"` and
/// `"system": [{"type": "text", "text": "..."}]`.
fn deserialize_system_prompt<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum SystemPrompt {
Text(String),
Blocks(Vec<SystemBlock>),
}
#[derive(Deserialize)]
struct SystemBlock {
text: String,
}
let maybe: Option<SystemPrompt> = Option::deserialize(deserializer)?;
Ok(maybe.map(|sp| match sp {
SystemPrompt::Text(s) => s,
SystemPrompt::Blocks(blocks) => blocks
.into_iter()
.map(|b| b.text)
.collect::<Vec<_>>()
.join("\n"),
}))
}
// ---------------------------------------------------------------------------
// Request types
// ---------------------------------------------------------------------------
/// Top-level request body for `POST /v1/messages`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicCreateMessageRequest {
/// The model to use (e.g. "claude-sonnet-4-20250514").
pub model: String,
/// The maximum number of tokens to generate.
pub max_tokens: u32,
/// The conversation messages.
pub messages: Vec<AnthropicMessage>,
/// Optional system prompt (string or array of `{"type":"text","text":"..."}` blocks).
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_system_prompt"
)]
pub system: Option<String>,
/// Sampling temperature (0.0 - 1.0).
#[serde(skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
/// Nucleus sampling parameter.
#[serde(skip_serializing_if = "Option::is_none")]
pub top_p: Option<f32>,
/// Top-K sampling parameter.
#[serde(skip_serializing_if = "Option::is_none")]
pub top_k: Option<u32>,
/// Custom stop sequences.
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_sequences: Option<Vec<String>>,
/// Whether to stream the response.
#[serde(default)]
pub stream: bool,
/// Optional metadata (e.g. user_id).
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
/// Tools the model may call.
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<AnthropicTool>>,
/// How the model should choose which tool to call.
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_choice: Option<AnthropicToolChoice>,
}
/// A single message in the conversation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicMessage {
pub role: AnthropicRole,
#[serde(flatten)]
pub content: AnthropicMessageContent,
}
/// The role of a message sender.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AnthropicRole {
User,
Assistant,
}
/// Message content — either a plain string or an array of content blocks.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum AnthropicMessageContent {
/// Plain text content.
Text { content: String },
/// Array of structured content blocks.
Blocks { content: Vec<AnthropicContentBlock> },
}
/// A single content block within a message.
///
/// Uses a custom deserializer so that unknown block types (e.g. `citations`,
/// `server_tool_use`, `redacted_thinking`) are captured as `Unknown` instead
/// of causing a hard deserialization failure. This is important because Claude
/// Code may send block types that we don't yet handle.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum AnthropicContentBlock {
/// Text content block.
#[serde(rename = "text")]
Text { text: String },
/// Image content block.
#[serde(rename = "image")]
Image { source: AnthropicImageSource },
/// Tool use request from assistant.
#[serde(rename = "tool_use")]
ToolUse {
id: String,
name: String,
input: serde_json::Value,
},
/// Tool result from user.
#[serde(rename = "tool_result")]
ToolResult {
tool_use_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
content: Option<ToolResultContent>,
#[serde(skip_serializing_if = "Option::is_none")]
is_error: Option<bool>,
},
/// Thinking content block from assistant (extended thinking / reasoning).
#[serde(rename = "thinking")]
Thinking { thinking: String, signature: String },
/// Catch-all for unrecognized block types. Silently accepted and skipped
/// during conversion so that new Anthropic features don't break the endpoint.
#[serde(skip)]
Unknown { block_type: String },
}
/// Content of a `tool_result` block — either a plain string or an array of
/// content blocks (the Anthropic API accepts both).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ToolResultContent {
Text(String),
Blocks(Vec<ToolResultContentBlock>),
}
impl ToolResultContent {
/// Extract the text content, concatenating array blocks if needed.
pub fn into_text(self) -> String {
match self {
ToolResultContent::Text(s) => s,
ToolResultContent::Blocks(blocks) => blocks
.into_iter()
.filter_map(|b| match b {
ToolResultContentBlock::Text { text } => Some(text),
ToolResultContentBlock::Other(_) => None,
})
.collect::<Vec<_>>()
.join(""),
}
}
}
/// A content block within a `tool_result.content` array.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ToolResultContentBlock {
Text {
text: String,
},
/// Catch-all for non-text blocks (images, etc.) in tool results.
Other(serde_json::Value),
}
/// Custom deserializer for `AnthropicContentBlock` that handles unknown types
/// gracefully. Since serde's `#[serde(other)]` is not supported on internally
/// tagged enums, we deserialize as `Value` first and dispatch manually.
impl<'de> Deserialize<'de> for AnthropicContentBlock {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let block_type = value
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("")
.to_string();
match block_type.as_str() {
"text" => {
let text = value
.get("text")
.and_then(|t| t.as_str())
.unwrap_or("")
.to_string();
Ok(AnthropicContentBlock::Text { text })
}
"image" => {
let source: AnthropicImageSource =
serde_json::from_value(value.get("source").cloned().unwrap_or_default())
.map_err(serde::de::Error::custom)?;
Ok(AnthropicContentBlock::Image { source })
}
"tool_use" => {
let id = value
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = value
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let input = value.get("input").cloned().unwrap_or(serde_json::json!({}));
Ok(AnthropicContentBlock::ToolUse { id, name, input })
}
"tool_result" => {
let tool_use_id = value
.get("tool_use_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let content: Option<ToolResultContent> = value
.get("content")
.cloned()
.and_then(|v| serde_json::from_value(v).ok());
let is_error = value.get("is_error").and_then(|v| v.as_bool());
Ok(AnthropicContentBlock::ToolResult {
tool_use_id,
content,
is_error,
})
}
"thinking" => {
let thinking = value
.get("thinking")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let signature = value
.get("signature")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Ok(AnthropicContentBlock::Thinking {
thinking,
signature,
})
}
other => {
tracing::debug!("Unknown Anthropic content block type '{}', skipping", other);
Ok(AnthropicContentBlock::Unknown {
block_type: other.to_string(),
})
}
}
}
}
/// Image source for image content blocks.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicImageSource {
#[serde(rename = "type")]
pub source_type: String,
pub media_type: String,
pub data: String,
}
/// A tool definition.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicTool {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub input_schema: serde_json::Value,
}
/// Tool choice specification.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum AnthropicToolChoice {
/// Named tool: `{type: "tool", name: "..."}`
/// Must be listed before Simple so serde tries the stricter shape first.
Named(AnthropicToolChoiceNamed),
/// Simple mode: "auto", "any", or "none".
Simple(AnthropicToolChoiceSimple),
}
/// Simple tool choice modes.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicToolChoiceSimple {
#[serde(rename = "type")]
pub choice_type: AnthropicToolChoiceMode,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AnthropicToolChoiceMode {
Auto,
Any,
None,
Tool,
}
/// Named tool choice.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicToolChoiceNamed {
#[serde(rename = "type")]
pub choice_type: AnthropicToolChoiceMode,
pub name: String,
}
// ---------------------------------------------------------------------------
// Response types
// ---------------------------------------------------------------------------
/// Response body for `POST /v1/messages` (non-streaming).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicMessageResponse {
pub id: String,
#[serde(rename = "type")]
pub object_type: String,
pub role: String,
pub content: Vec<AnthropicResponseContentBlock>,
pub model: String,
pub stop_reason: Option<AnthropicStopReason>,
pub stop_sequence: Option<String>,
pub usage: AnthropicUsage,
}
/// A content block in the response.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AnthropicResponseContentBlock {
#[serde(rename = "text")]
Text { text: String },
#[serde(rename = "tool_use")]
ToolUse {
id: String,
name: String,
input: serde_json::Value,
},
}
/// Token usage information.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AnthropicUsage {
pub input_tokens: u32,
pub output_tokens: u32,
}
/// Reason the model stopped generating.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AnthropicStopReason {
EndTurn,
MaxTokens,
StopSequence,
ToolUse,
}
// ---------------------------------------------------------------------------
// Streaming types
// ---------------------------------------------------------------------------
/// SSE event types for the Anthropic streaming API.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AnthropicStreamEvent {
#[serde(rename = "message_start")]
MessageStart { message: AnthropicMessageResponse },
#[serde(rename = "content_block_start")]
ContentBlockStart {
index: u32,
content_block: AnthropicResponseContentBlock,
},
#[serde(rename = "content_block_delta")]
ContentBlockDelta { index: u32, delta: AnthropicDelta },
#[serde(rename = "content_block_stop")]
ContentBlockStop { index: u32 },
#[serde(rename = "message_delta")]
MessageDelta {
delta: AnthropicMessageDeltaBody,
usage: AnthropicUsage,
},
#[serde(rename = "message_stop")]
MessageStop {},
#[serde(rename = "ping")]
Ping {},
#[serde(rename = "error")]
Error { error: AnthropicErrorBody },
}
/// Delta content in a streaming content_block_delta event.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AnthropicDelta {
#[serde(rename = "text_delta")]
TextDelta { text: String },
#[serde(rename = "input_json_delta")]
InputJsonDelta { partial_json: String },
}
/// The delta body in a message_delta event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicMessageDeltaBody {
pub stop_reason: Option<AnthropicStopReason>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_sequence: Option<String>,
}
// ---------------------------------------------------------------------------
// Error types
// ---------------------------------------------------------------------------
/// Anthropic API error response wrapper.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicErrorResponse {
#[serde(rename = "type")]
pub object_type: String,
pub error: AnthropicErrorBody,
}
/// Error body within an error response.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnthropicErrorBody {
#[serde(rename = "type")]
pub error_type: String,
pub message: String,
}
impl AnthropicErrorResponse {
/// Create an `invalid_request_error` response.
pub fn invalid_request(message: impl Into<String>) -> Self {
Self {
object_type: "error".to_string(),
error: AnthropicErrorBody {
error_type: "invalid_request_error".to_string(),
message: message.into(),
},
}
}
/// Create an `api_error` (internal server error) response.
pub fn api_error(message: impl Into<String>) -> Self {
Self {
object_type: "error".to_string(),
error: AnthropicErrorBody {
error_type: "api_error".to_string(),
message: message.into(),
},
}
}
/// Create a `not_found_error` response.
pub fn not_found(message: impl Into<String>) -> Self {
Self {
object_type: "error".to_string(),
error: AnthropicErrorBody {
error_type: "not_found_error".to_string(),
message: message.into(),
},
}
}
}
// ---------------------------------------------------------------------------
// Conversion: AnthropicCreateMessageRequest -> NvCreateChatCompletionRequest
// ---------------------------------------------------------------------------
impl TryFrom<AnthropicCreateMessageRequest> for NvCreateChatCompletionRequest {
type Error = anyhow::Error;
fn try_from(req: AnthropicCreateMessageRequest) -> Result<Self, Self::Error> {
let mut messages = Vec::new();
// Prepend system message if present
if let Some(system_text) = &req.system {
messages.push(ChatCompletionRequestMessage::System(
ChatCompletionRequestSystemMessage {
content: ChatCompletionRequestSystemMessageContent::Text(system_text.clone()),
name: None,
},
));
}
// Convert each Anthropic message
for msg in &req.messages {
match (&msg.role, &msg.content) {
// User with plain text
(AnthropicRole::User, AnthropicMessageContent::Text { content }) => {
messages.push(ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionRequestUserMessageContent::Text(content.clone()),
name: None,
},
));
}
// User with content blocks
(AnthropicRole::User, AnthropicMessageContent::Blocks { content: blocks }) => {
convert_user_blocks(blocks, &mut messages)?;
}
// Assistant with plain text
(AnthropicRole::Assistant, AnthropicMessageContent::Text { content }) => {
messages.push(ChatCompletionRequestMessage::Assistant(
ChatCompletionRequestAssistantMessage {
content: Some(ChatCompletionRequestAssistantMessageContent::Text(
content.clone(),
)),
reasoning_content: None,
refusal: None,
name: None,
audio: None,
tool_calls: None,
#[allow(deprecated)]
function_call: None,
},
));
}
// Assistant with content blocks (may contain tool_use)
(AnthropicRole::Assistant, AnthropicMessageContent::Blocks { content: blocks }) => {
convert_assistant_blocks(blocks, &mut messages);
}
}
}
// Convert tools
let tools = req.tools.as_ref().map(|t| convert_anthropic_tools(t));
// Convert tool_choice
let tool_choice = req.tool_choice.as_ref().map(convert_anthropic_tool_choice);
// Convert stop_sequences -> stop
let stop = req
.stop_sequences
.map(dynamo_async_openai::types::Stop::StringArray);
Ok(NvCreateChatCompletionRequest {
inner: dynamo_async_openai::types::CreateChatCompletionRequest {
messages,
model: req.model,
temperature: req.temperature,
top_p: req.top_p,
max_completion_tokens: Some(req.max_tokens),
stop,
tools,
tool_choice,
stream: Some(true), // Always stream internally
stream_options: Some(dynamo_async_openai::types::ChatCompletionStreamOptions {
include_usage: true,
continuous_usage_stats: false,
}),
..Default::default()
},
common: CommonExt {
top_k: req.top_k.map(|k| k as i32),
..Default::default()
},
nvext: None,
chat_template_args: None,
media_io_kwargs: None,
unsupported_fields: Default::default(),
})
}
}
/// Convert user-role content blocks into chat completion messages.
/// Tool results become separate Tool messages; text/image blocks become user messages.
fn convert_user_blocks(
blocks: &[AnthropicContentBlock],
messages: &mut Vec<ChatCompletionRequestMessage>,
) -> Result<(), anyhow::Error> {
// Gather text blocks for a single user message, emit tool_result blocks as Tool messages.
let mut text_parts = Vec::new();
for block in blocks {
match block {
AnthropicContentBlock::Text { text } => {
text_parts.push(text.clone());
}
AnthropicContentBlock::ToolResult {
tool_use_id,
content,
..
} => {
// Flush any accumulated text first
if !text_parts.is_empty() {
let combined = text_parts.join("");
messages.push(ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionRequestUserMessageContent::Text(combined),
name: None,
},
));
text_parts.clear();
}
let text = content.clone().map(|c| c.into_text()).unwrap_or_default();
messages.push(ChatCompletionRequestMessage::Tool(
ChatCompletionRequestToolMessage {
content: ChatCompletionRequestToolMessageContent::Text(text),
tool_call_id: tool_use_id.clone(),
},
));
}
AnthropicContentBlock::Image { .. } => {
tracing::warn!(
"Image content blocks are not supported in the Anthropic-to-chat-completions conversion; replaced with placeholder text."
);
text_parts.push("[image]".to_string());
}
AnthropicContentBlock::ToolUse { .. }
| AnthropicContentBlock::Thinking { .. }
| AnthropicContentBlock::Unknown { .. } => {
// tool_use/thinking/unknown in a user message: skip
}
}
}
// Flush remaining text
if !text_parts.is_empty() {
let combined = text_parts.join("");
messages.push(ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionRequestUserMessageContent::Text(combined),
name: None,
},
));
}
Ok(())
}
/// Convert assistant-role content blocks into chat completion messages.
/// Text blocks become an assistant message; tool_use blocks become tool_calls on an assistant message.
/// Thinking blocks are passed through as `reasoning_content`.
fn convert_assistant_blocks(
blocks: &[AnthropicContentBlock],
messages: &mut Vec<ChatCompletionRequestMessage>,
) {
let mut text_content = String::new();
let mut thinking_content = String::new();
let mut tool_calls = Vec::new();
for block in blocks {
match block {
AnthropicContentBlock::Text { text } => {
text_content.push_str(text);
}
AnthropicContentBlock::Thinking { thinking, .. } => {
if !thinking_content.is_empty() {
thinking_content.push('\n');
}
thinking_content.push_str(thinking);
}
AnthropicContentBlock::ToolUse { id, name, input } => {
tool_calls.push(ChatCompletionMessageToolCall {
id: id.clone(),
r#type: ChatCompletionToolType::Function,
function: dynamo_async_openai::types::FunctionCall {
name: name.clone(),
arguments: serde_json::to_string(input).unwrap_or_default(),
},
});
}
_ => {}
}
}
let content = if text_content.is_empty() {
None
} else {
Some(ChatCompletionRequestAssistantMessageContent::Text(
text_content,
))
};
let reasoning = if thinking_content.is_empty() {
None
} else {
Some(thinking_content)
};
let tc = if tool_calls.is_empty() {
None
} else {
Some(tool_calls)
};
messages.push(ChatCompletionRequestMessage::Assistant(
ChatCompletionRequestAssistantMessage {
content,
reasoning_content: reasoning,
refusal: None,
name: None,
audio: None,
tool_calls: tc,
#[allow(deprecated)]
function_call: None,
},
));
}
/// Convert Anthropic tools to ChatCompletionTools.
fn convert_anthropic_tools(tools: &[AnthropicTool]) -> Vec<ChatCompletionTool> {
tools
.iter()
.map(|tool| ChatCompletionTool {
r#type: ChatCompletionToolType::Function,
function: FunctionObject {
name: tool.name.clone(),
description: tool.description.clone(),
parameters: Some(tool.input_schema.clone()),
strict: None,
},
})
.collect()
}
/// Convert Anthropic tool_choice to ChatCompletionToolChoiceOption.
fn convert_anthropic_tool_choice(tc: &AnthropicToolChoice) -> ChatCompletionToolChoiceOption {
match tc {
AnthropicToolChoice::Simple(simple) => match simple.choice_type {
AnthropicToolChoiceMode::Auto => ChatCompletionToolChoiceOption::Auto,
AnthropicToolChoiceMode::Any => ChatCompletionToolChoiceOption::Required,
AnthropicToolChoiceMode::None => ChatCompletionToolChoiceOption::None,
AnthropicToolChoiceMode::Tool => {
// {"type": "tool"} without a "name" field is invalid per the Anthropic spec.
// It deserialized as Simple because Named requires the name field.
// Treat as "any" (required) since the caller wants a specific tool but
// didn't specify which — this is the closest semantic match.
tracing::warn!(
"tool_choice has type 'tool' without a 'name' field; treating as 'any' (required)"
);
ChatCompletionToolChoiceOption::Required
}
},
AnthropicToolChoice::Named(named) => {
ChatCompletionToolChoiceOption::Named(ChatCompletionNamedToolChoice {
r#type: ChatCompletionToolType::Function,
function: FunctionName {
name: named.name.clone(),
},
})
}
}
}
// ---------------------------------------------------------------------------
// Conversion: NvCreateChatCompletionResponse -> AnthropicMessageResponse
// ---------------------------------------------------------------------------
/// Convert a completed chat completion response into an Anthropic Messages response.
pub fn chat_completion_to_anthropic_response(
chat_resp: NvCreateChatCompletionResponse,
model: &str,
) -> AnthropicMessageResponse {
let msg_id = format!("msg_{}", Uuid::new_v4().simple());
let choice = chat_resp.choices.into_iter().next();
let mut content = Vec::new();
let mut stop_reason = None;
if let Some(choice) = choice {
// Map finish_reason
stop_reason = choice.finish_reason.map(|fr| match fr {
dynamo_async_openai::types::FinishReason::Stop => AnthropicStopReason::EndTurn,
dynamo_async_openai::types::FinishReason::Length => AnthropicStopReason::MaxTokens,
dynamo_async_openai::types::FinishReason::ToolCalls => AnthropicStopReason::ToolUse,
dynamo_async_openai::types::FinishReason::ContentFilter => AnthropicStopReason::EndTurn,
dynamo_async_openai::types::FinishReason::FunctionCall => AnthropicStopReason::ToolUse,
});
// Extract tool calls
if let Some(tool_calls) = choice.message.tool_calls {
for tc in tool_calls {
let input: serde_json::Value =
serde_json::from_str(&tc.function.arguments).unwrap_or(serde_json::json!({}));
content.push(AnthropicResponseContentBlock::ToolUse {
id: tc.id,
name: tc.function.name,
input,
});
}
}
// Extract text content
let text = match choice.message.content {
Some(dynamo_async_openai::types::ChatCompletionMessageContent::Text(t)) => Some(t),
Some(dynamo_async_openai::types::ChatCompletionMessageContent::Parts(_)) => {
tracing::warn!(
"Multimodal (Parts) content in chat completion response replaced with placeholder text in Anthropic conversion."
);
Some("[multimodal content]".to_string())
}
None => None,
};
if let Some(text) = text {
// Text goes first in the content array
content.insert(0, AnthropicResponseContentBlock::Text { text });
}
}
// Ensure there's at least one content block
if content.is_empty() {
content.push(AnthropicResponseContentBlock::Text {
text: String::new(),
});
}
// Map usage
let usage = chat_resp
.usage
.map(|u| AnthropicUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
})
.unwrap_or_default();
AnthropicMessageResponse {
id: msg_id,
object_type: "message".to_string(),
role: "assistant".to_string(),
content,
model: model.to_string(),
stop_reason,
stop_sequence: None,
usage,
}
}
// ---------------------------------------------------------------------------
// Count tokens
// ---------------------------------------------------------------------------
/// Request body for `POST /v1/messages/count_tokens`.
#[derive(Debug, Clone, Deserialize)]
pub struct AnthropicCountTokensRequest {
pub model: String,
pub messages: Vec<AnthropicMessage>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_system_prompt"
)]
pub system: Option<String>,
#[serde(default)]
pub tools: Option<Vec<AnthropicTool>>,
}
/// Response body for `POST /v1/messages/count_tokens`.
#[derive(Debug, Clone, Serialize)]
pub struct AnthropicCountTokensResponse {
pub input_tokens: u32,
}
impl AnthropicCountTokensRequest {
/// Estimate input token count using a `len/3` heuristic.
pub fn estimate_tokens(&self) -> u32 {
let mut total_len: usize = 0;
if let Some(system) = &self.system {
total_len += system.len();
}
for msg in &self.messages {
// Count role
total_len += match msg.role {
AnthropicRole::User => 4,
AnthropicRole::Assistant => 9,
};
// Count content
match &msg.content {
AnthropicMessageContent::Text { content } => total_len += content.len(),
AnthropicMessageContent::Blocks { content } => {
for block in content {
total_len += estimate_block_len(block);
}
}
}
}
if let Some(tools) = &self.tools {
for tool in tools {
total_len += tool.name.len();
if let Some(desc) = &tool.description {
total_len += desc.len();
}
total_len += tool.input_schema.to_string().len();
}
}
let tokens = total_len / 3;
if tokens == 0 && total_len > 0 {
1
} else {
tokens as u32
}
}
}
fn estimate_block_len(block: &AnthropicContentBlock) -> usize {
match block {
AnthropicContentBlock::Text { text } => text.len(),
AnthropicContentBlock::ToolUse { name, input, .. } => name.len() + input.to_string().len(),
AnthropicContentBlock::ToolResult { content, .. } => content
.as_ref()
.map(|c| match c {
ToolResultContent::Text(s) => s.len(),
ToolResultContent::Blocks(blocks) => blocks
.iter()
.map(|b| match b {
ToolResultContentBlock::Text { text } => text.len(),
ToolResultContentBlock::Other(v) => v.to_string().len(),
})
.sum(),
})
.unwrap_or(0),
AnthropicContentBlock::Thinking { thinking, .. } => thinking.len(),
AnthropicContentBlock::Image { .. } => 256, // rough estimate for image metadata
AnthropicContentBlock::Unknown { .. } => 0,
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_user_message_conversion() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "Hello!".into(),
},
}],
system: None,
temperature: Some(0.7),
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: None,
tool_choice: None,
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
assert_eq!(chat_req.inner.model, "test-model");
assert_eq!(chat_req.inner.max_completion_tokens, Some(100));
assert_eq!(chat_req.inner.temperature, Some(0.7));
assert_eq!(chat_req.inner.messages.len(), 1);
match &chat_req.inner.messages[0] {
ChatCompletionRequestMessage::User(u) => match &u.content {
ChatCompletionRequestUserMessageContent::Text(t) => {
assert_eq!(t, "Hello!");
}
_ => panic!("expected text content"),
},
_ => panic!("expected user message"),
}
}
#[test]
fn test_system_message_prepended() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "Hi".into(),
},
}],
system: Some("You are helpful.".into()),
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: None,
tool_choice: None,
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
assert_eq!(chat_req.inner.messages.len(), 2);
assert!(matches!(
&chat_req.inner.messages[0],
ChatCompletionRequestMessage::System(_)
));
assert!(matches!(
&chat_req.inner.messages[1],
ChatCompletionRequestMessage::User(_)
));
}
#[test]
fn test_tool_use_blocks_conversion() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![
AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "What's the weather?".into(),
},
},
AnthropicMessage {
role: AnthropicRole::Assistant,
content: AnthropicMessageContent::Blocks {
content: vec![AnthropicContentBlock::ToolUse {
id: "tool_123".into(),
name: "get_weather".into(),
input: serde_json::json!({"location": "SF"}),
}],
},
},
AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Blocks {
content: vec![AnthropicContentBlock::ToolResult {
tool_use_id: "tool_123".into(),
content: Some(ToolResultContent::Text("72F and sunny".into())),
is_error: None,
}],
},
},
],
system: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: None,
tool_choice: None,
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
assert_eq!(chat_req.inner.messages.len(), 3);
assert!(matches!(
&chat_req.inner.messages[0],
ChatCompletionRequestMessage::User(_)
));
assert!(matches!(
&chat_req.inner.messages[1],
ChatCompletionRequestMessage::Assistant(_)
));
assert!(matches!(
&chat_req.inner.messages[2],
ChatCompletionRequestMessage::Tool(_)
));
}
#[test]
fn test_stop_sequences_conversion() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "Hi".into(),
},
}],
system: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: Some(vec!["STOP".into(), "END".into()]),
stream: false,
metadata: None,
tools: None,
tool_choice: None,
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
assert!(chat_req.inner.stop.is_some());
}
#[test]
fn test_tools_conversion() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "Hi".into(),
},
}],
system: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: Some(vec![AnthropicTool {
name: "get_weather".into(),
description: Some("Get weather info".into()),
input_schema: serde_json::json!({
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}),
}]),
tool_choice: Some(AnthropicToolChoice::Simple(AnthropicToolChoiceSimple {
choice_type: AnthropicToolChoiceMode::Auto,
})),
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
assert!(chat_req.inner.tools.is_some());
let tools = chat_req.inner.tools.unwrap();
assert_eq!(tools.len(), 1);
assert_eq!(tools[0].function.name, "get_weather");
assert!(matches!(
chat_req.inner.tool_choice,
Some(ChatCompletionToolChoiceOption::Auto)
));
}
#[allow(deprecated)]
#[test]
fn test_chat_completion_to_anthropic_response() {
let chat_resp = NvCreateChatCompletionResponse {
id: "chatcmpl-xyz".into(),
choices: vec![dynamo_async_openai::types::ChatChoice {
index: 0,
message: dynamo_async_openai::types::ChatCompletionResponseMessage {
content: Some(
dynamo_async_openai::types::ChatCompletionMessageContent::Text(
"Hello!".to_string(),
),
),
refusal: None,
tool_calls: None,
role: dynamo_async_openai::types::Role::Assistant,
function_call: None,
audio: None,
reasoning_content: None,
},
finish_reason: Some(dynamo_async_openai::types::FinishReason::Stop),
stop_reason: None,
logprobs: None,
}],
created: 1726000000,
model: "test-model".into(),
service_tier: None,
system_fingerprint: None,
object: "chat.completion".to_string(),
usage: Some(dynamo_async_openai::types::CompletionUsage {
prompt_tokens: 10,
completion_tokens: 5,
total_tokens: 15,
prompt_tokens_details: None,
completion_tokens_details: None,
}),
nvext: None,
};
let response = chat_completion_to_anthropic_response(chat_resp, "test-model");
assert!(response.id.starts_with("msg_"));
assert_eq!(response.object_type, "message");
assert_eq!(response.role, "assistant");
assert_eq!(response.model, "test-model");
assert_eq!(response.stop_reason, Some(AnthropicStopReason::EndTurn));
assert_eq!(response.usage.input_tokens, 10);
assert_eq!(response.usage.output_tokens, 5);
assert_eq!(response.content.len(), 1);
match &response.content[0] {
AnthropicResponseContentBlock::Text { text } => {
assert_eq!(text, "Hello!");
}
_ => panic!("expected text block"),
}
}
#[test]
fn test_deserialize_simple_message() {
let json =
r#"{"model":"test","max_tokens":100,"messages":[{"role":"user","content":"Hello"}]}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.model, "test");
assert_eq!(req.max_tokens, 100);
assert_eq!(req.messages.len(), 1);
}
#[test]
fn test_deserialize_content_blocks() {
let json = r#"{
"model": "test",
"max_tokens": 100,
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": "What is this?"},
{"type": "tool_result", "tool_use_id": "tool_1", "content": "result text"}
]
}]
}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.messages.len(), 1);
match &req.messages[0].content {
AnthropicMessageContent::Blocks { content } => {
assert_eq!(content.len(), 2);
}
_ => panic!("expected blocks content"),
}
}
#[test]
fn test_deserialize_thinking_block() {
let json = r#"{
"model": "test",
"max_tokens": 100,
"messages": [{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "Let me reason about this...", "signature": "sig123"},
{"type": "text", "text": "Here is my answer."}
]
}]
}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
match &req.messages[0].content {
AnthropicMessageContent::Blocks { content } => {
assert_eq!(content.len(), 2);
match &content[0] {
AnthropicContentBlock::Thinking {
thinking,
signature,
} => {
assert_eq!(thinking, "Let me reason about this...");
assert_eq!(signature, "sig123");
}
other => panic!("expected Thinking, got {other:?}"),
}
}
_ => panic!("expected blocks content"),
}
}
#[test]
fn test_thinking_block_becomes_reasoning_content() {
let req = AnthropicCreateMessageRequest {
model: "test-model".into(),
max_tokens: 100,
messages: vec![AnthropicMessage {
role: AnthropicRole::Assistant,
content: AnthropicMessageContent::Blocks {
content: vec![
AnthropicContentBlock::Thinking {
thinking: "I should think...".into(),
signature: "sig".into(),
},
AnthropicContentBlock::Text {
text: "Answer".into(),
},
],
},
}],
system: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: None,
tool_choice: None,
};
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
match &chat_req.inner.messages[0] {
ChatCompletionRequestMessage::Assistant(a) => {
assert_eq!(a.reasoning_content.as_deref(), Some("I should think..."));
match &a.content {
Some(ChatCompletionRequestAssistantMessageContent::Text(t)) => {
assert_eq!(t, "Answer");
}
other => panic!("expected text content, got {other:?}"),
}
}
other => panic!("expected assistant message, got {other:?}"),
}
}
#[test]
fn test_unknown_block_type_does_not_fail() {
let json = r#"{
"model": "test",
"max_tokens": 100,
"messages": [{
"role": "assistant",
"content": [
{"type": "text", "text": "hello"},
{"type": "server_tool_use", "id": "stu_1", "name": "web_search", "input": {}},
{"type": "redacted_thinking", "data": "encrypted"},
{"type": "text", "text": "world"}
]
}]
}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
match &req.messages[0].content {
AnthropicMessageContent::Blocks { content } => {
assert_eq!(content.len(), 4);
assert!(matches!(&content[0], AnthropicContentBlock::Text { .. }));
assert!(matches!(
&content[1],
AnthropicContentBlock::Unknown { block_type } if block_type == "server_tool_use"
));
assert!(matches!(
&content[2],
AnthropicContentBlock::Unknown { block_type } if block_type == "redacted_thinking"
));
assert!(matches!(&content[3], AnthropicContentBlock::Text { .. }));
}
_ => panic!("expected blocks content"),
}
// Conversion should succeed, skipping unknown blocks
let chat_req: NvCreateChatCompletionRequest = AnthropicCreateMessageRequest {
model: "test".into(),
max_tokens: 100,
messages: req.messages,
system: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
stream: false,
metadata: None,
tools: None,
tool_choice: None,
}
.try_into()
.unwrap();
assert_eq!(chat_req.inner.messages.len(), 1);
}
#[test]
fn test_tool_result_string_content() {
let json = r#"{
"model": "test",
"max_tokens": 100,
"messages": [{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "simple text"}
]
}]
}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
match &req.messages[0].content {
AnthropicMessageContent::Blocks { content } => match &content[0] {
AnthropicContentBlock::ToolResult { content, .. } => {
let text = content.clone().unwrap().into_text();
assert_eq!(text, "simple text");
}
other => panic!("expected ToolResult, got {other:?}"),
},
_ => panic!("expected blocks"),
}
}
#[test]
fn test_tool_result_array_content() {
let json = r#"{
"model": "test",
"max_tokens": 100,
"messages": [{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": [
{"type": "text", "text": "line 1"},
{"type": "text", "text": "line 2"}
]}
]
}]
}"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
match &req.messages[0].content {
AnthropicMessageContent::Blocks { content } => match &content[0] {
AnthropicContentBlock::ToolResult { content, .. } => {
let text = content.clone().unwrap().into_text();
assert_eq!(text, "line 1line 2");
}
other => panic!("expected ToolResult, got {other:?}"),
},
_ => panic!("expected blocks"),
}
}
#[test]
fn test_count_tokens_estimate() {
let req = AnthropicCountTokensRequest {
model: "test".into(),
messages: vec![AnthropicMessage {
role: AnthropicRole::User,
content: AnthropicMessageContent::Text {
content: "Hello, world! This is a test message.".into(),
},
}],
system: Some("You are helpful.".into()),
tools: None,
};
let tokens = req.estimate_tokens();
assert!(tokens > 0, "should estimate non-zero tokens");
// "Hello, world! This is a test message." (37) + "You are helpful." (16) + role (4) = 57 / 3 = 19
assert_eq!(tokens, 19);
}
}
...@@ -216,6 +216,7 @@ fn compute_index(endpoint: &Endpoint, request_type: &RequestType, status: &Statu ...@@ -216,6 +216,7 @@ fn compute_index(endpoint: &Endpoint, request_type: &RequestType, status: &Statu
Endpoint::ChatCompletions => 1, Endpoint::ChatCompletions => 1,
Endpoint::Embeddings => todo!(), Endpoint::Embeddings => todo!(),
Endpoint::Responses => todo!(), Endpoint::Responses => todo!(),
Endpoint::AnthropicMessages => todo!(),
Endpoint::Tensor => todo!(), Endpoint::Tensor => todo!(),
Endpoint::Images => todo!(), Endpoint::Images => todo!(),
Endpoint::Videos => todo!(), Endpoint::Videos => todo!(),
......
...@@ -270,6 +270,9 @@ pub mod llm { ...@@ -270,6 +270,9 @@ pub mod llm {
/// LoRA cache directory path /// LoRA cache directory path
pub const DYN_LORA_PATH: &str = "DYN_LORA_PATH"; pub const DYN_LORA_PATH: &str = "DYN_LORA_PATH";
/// Enable the experimental Anthropic Messages API endpoint (/v1/messages)
pub const DYN_ENABLE_ANTHROPIC_API: &str = "DYN_ENABLE_ANTHROPIC_API";
/// Metrics configuration /// Metrics configuration
pub mod metrics { pub mod metrics {
/// Custom metrics prefix (overrides default "dynamo_frontend") /// Custom metrics prefix (overrides default "dynamo_frontend")
...@@ -446,6 +449,7 @@ mod tests { ...@@ -446,6 +449,7 @@ mod tests {
llm::DYN_HTTP_BODY_LIMIT_MB, llm::DYN_HTTP_BODY_LIMIT_MB,
llm::DYN_LORA_ENABLED, llm::DYN_LORA_ENABLED,
llm::DYN_LORA_PATH, llm::DYN_LORA_PATH,
llm::DYN_ENABLE_ANTHROPIC_API,
llm::metrics::DYN_METRICS_PREFIX, llm::metrics::DYN_METRICS_PREFIX,
// Model // Model
model::model_express::MODEL_EXPRESS_URL, model::model_express::MODEL_EXPRESS_URL,
......
...@@ -17,6 +17,8 @@ from tests.serve.common import ( ...@@ -17,6 +17,8 @@ from tests.serve.common import (
from tests.utils.constants import DefaultPort from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import ( from tests.utils.payload_builder import (
anthropic_messages_payload_default,
anthropic_messages_stream_payload_default,
chat_payload, chat_payload,
chat_payload_default, chat_payload_default,
completion_payload_default, completion_payload_default,
...@@ -288,6 +290,23 @@ sglang_configs = { ...@@ -288,6 +290,23 @@ sglang_configs = {
completion_payload_default(), completion_payload_default(),
], ],
), ),
"anthropic_messages": SGLangConfig(
name="anthropic_messages",
directory=sglang_dir,
script_name="agg.sh",
marks=[
pytest.mark.gpu_1,
pytest.mark.post_merge,
pytest.mark.timeout(240),
],
model="Qwen/Qwen3-0.6B",
env={"DYN_ENABLE_ANTHROPIC_API": "1"},
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
anthropic_messages_payload_default(),
anthropic_messages_stream_payload_default(),
],
),
} }
......
...@@ -6,6 +6,8 @@ from typing import Any, Dict, List, Optional, Union ...@@ -6,6 +6,8 @@ from typing import Any, Dict, List, Optional, Union
from tests.utils.client import send_request from tests.utils.client import send_request
from tests.utils.constants import DefaultPort from tests.utils.constants import DefaultPort
from tests.utils.payloads import ( from tests.utils.payloads import (
AnthropicMessagesPayload,
AnthropicMessagesStreamPayload,
CachedTokensChatPayload, CachedTokensChatPayload,
ChatPayload, ChatPayload,
ChatPayloadWithLogprobs, ChatPayloadWithLogprobs,
...@@ -531,3 +533,56 @@ def responses_stream_payload_default( ...@@ -531,3 +533,56 @@ def responses_stream_payload_default(
expected_response=expected_response expected_response=expected_response
or ["AI", "knock", "joke", "think", "artificial", "intelligence"], or ["AI", "knock", "joke", "think", "artificial", "intelligence"],
) )
def anthropic_messages_payload_default(
repeat_count: int = 1,
expected_response: Optional[List[str]] = None,
expected_log: Optional[List[str]] = None,
max_tokens: int = 200,
temperature: float = 0.0,
) -> AnthropicMessagesPayload:
"""Create a default Anthropic Messages API payload (non-streaming)."""
return AnthropicMessagesPayload(
body={
"max_tokens": max_tokens,
"messages": [
{
"role": "user",
"content": TEXT_PROMPT,
}
],
"temperature": temperature,
},
repeat_count=repeat_count,
expected_log=expected_log or [],
expected_response=expected_response
or ["AI", "knock", "joke", "think", "artificial", "intelligence"],
)
def anthropic_messages_stream_payload_default(
repeat_count: int = 1,
expected_response: Optional[List[str]] = None,
expected_log: Optional[List[str]] = None,
max_tokens: int = 200,
temperature: float = 0.0,
) -> AnthropicMessagesStreamPayload:
"""Create a default Anthropic Messages API streaming payload."""
return AnthropicMessagesStreamPayload(
body={
"max_tokens": max_tokens,
"messages": [
{
"role": "user",
"content": TEXT_PROMPT,
}
],
"stream": True,
"temperature": temperature,
},
repeat_count=repeat_count,
expected_log=expected_log or [],
expected_response=expected_response
or ["AI", "knock", "joke", "think", "artificial", "intelligence"],
)
...@@ -611,6 +611,131 @@ class ResponsesStreamPayload(BasePayload): ...@@ -611,6 +611,131 @@ class ResponsesStreamPayload(BasePayload):
return ResponsesStreamPayload.extract_content(response) return ResponsesStreamPayload.extract_content(response)
@dataclass
class AnthropicMessagesPayload(BasePayload):
"""Payload for the Anthropic Messages API endpoint (/v1/messages)."""
endpoint: str = "/v1/messages"
@staticmethod
def extract_content(response):
"""Extract text content from an Anthropic Messages API response."""
response.raise_for_status()
result = response.json()
assert (
result.get("type") == "message"
), f"Expected type='message', got {result.get('type')}"
assert result.get("id", "").startswith(
"msg_"
), f"Expected id to start with 'msg_', got {result.get('id')}"
assert (
result.get("role") == "assistant"
), f"Expected role='assistant', got {result.get('role')}"
assert result.get("stop_reason") in (
"end_turn",
"max_tokens",
"stop_sequence",
"tool_use",
), f"Unexpected stop_reason: {result.get('stop_reason')}"
content = result.get("content", [])
assert len(content) > 0, "Response content is empty"
assert (
content[0].get("type") == "text"
), f"Expected content[0].type='text', got {content[0].get('type')}"
usage = result.get("usage", {})
assert "input_tokens" in usage, "Missing input_tokens in usage"
assert "output_tokens" in usage, "Missing output_tokens in usage"
return content[0].get("text", "")
def response_handler(self, response: Any) -> str:
return AnthropicMessagesPayload.extract_content(response)
@dataclass
class AnthropicMessagesStreamPayload(BasePayload):
"""Streaming payload for the Anthropic Messages API endpoint (/v1/messages).
Validates SSE event structure and lifecycle ordering per the Anthropic streaming spec.
"""
endpoint: str = "/v1/messages"
http_stream: bool = True
@staticmethod
def extract_content(response):
"""Parse SSE stream and validate Anthropic event structure."""
import json
response.raise_for_status()
events = []
event_type = ""
for line in response.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("event: "):
event_type = line[len("event: ") :]
elif line.startswith("data: "):
data_str = line[len("data: ") :]
events.append((event_type, json.loads(data_str)))
event_types = [e[0] for e in events]
# Validate lifecycle event ordering
assert len(event_types) >= 3, f"Too few events: {event_types}"
assert (
event_types[0] == "message_start"
), f"First event should be message_start, got {event_types[0]}"
assert (
event_types[-1] == "message_stop"
), f"Last event should be message_stop, got {event_types[-1]}"
# Validate message_start structure
msg_start = events[0][1]
assert msg_start.get("type") == "message_start", "message_start missing type"
message = msg_start.get("message", {})
assert message.get("id", "").startswith(
"msg_"
), "message id should start with msg_"
assert message.get("role") == "assistant", "message role should be assistant"
# Validate required event types
assert "content_block_start" in event_types, "Missing content_block_start"
assert "content_block_delta" in event_types, "Missing content_block_delta"
assert "content_block_stop" in event_types, "Missing content_block_stop"
assert "message_delta" in event_types, "Missing message_delta"
# Validate message_delta has stop_reason
delta_events = [e for e in events if e[0] == "message_delta"]
assert (
len(delta_events) == 1
), f"Expected 1 message_delta, got {len(delta_events)}"
delta_body = delta_events[0][1].get("delta", {})
assert delta_body.get("stop_reason") in (
"end_turn",
"max_tokens",
"stop_sequence",
"tool_use",
), f"Unexpected stop_reason in message_delta: {delta_body.get('stop_reason')}"
# Collect text deltas
deltas = []
for e_type, e_data in events:
if e_type == "content_block_delta":
delta = e_data.get("delta", {})
if delta.get("type") == "text_delta":
deltas.append(delta.get("text", ""))
return "".join(deltas)
def response_handler(self, response: Any) -> str:
return AnthropicMessagesStreamPayload.extract_content(response)
@dataclass @dataclass
class EmbeddingPayload(BasePayload): class EmbeddingPayload(BasePayload):
"""Payload for embeddings endpoint.""" """Payload for embeddings endpoint."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment