Unverified Commit 2a95ef63 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

fix(responses): align wire shape with OpenResponses spec + add compliance CI (#8283)


Signed-off-by: default avatarAnant Sharma <anants@nvidia.com>
Co-authored-by: default avatarAnant Sharma <anants@nvidia.com>
parent 4410a2c5
......@@ -1595,6 +1595,18 @@ async fn responses(
service_tier: request.inner.service_tier,
include: request.inner.include.clone(),
truncation: request.inner.truncation,
// Upstream `CreateResponse` doesn't carry these yet; plumbed through so
// the response serializer can default to 0.0 without hardcoding at the
// build site. When upstream (or our shadow) adds the fields, sourcing
// from the request becomes a one-line change here.
presence_penalty: None,
frequency_penalty: None,
// Pass-through metadata — accepted on the request, echoed back on the
// response so the caller can confirm receipt. Dynamo doesn't act on
// these; see `validate_response_unsupported_fields` for rationale.
prompt_cache_key: request.inner.prompt_cache_key.clone(),
prompt_cache_retention: request.inner.prompt_cache_retention,
safety_identifier: request.inner.safety_identifier.clone(),
};
let request_id = request.id().to_string();
let (orig_request, context) = request.into_parts();
......@@ -1830,6 +1842,24 @@ pub fn validate_response_unsupported_fields(
VALIDATION_PREFIX.to_string() + "`prompt` is not supported.",
));
}
// Reject directive fields that change semantics if silently dropped.
// `max_tool_calls` is a hard cap on tool invocations — accepting it
// without enforcement would let a caller send `max_tool_calls: 5` and
// see `max_tool_calls: null` in the response, assuming their limit was
// honored. Fail loud until real enforcement lands.
//
// Pass-through metadata fields (`prompt_cache_key`,
// `prompt_cache_retention`, `safety_identifier`) are deliberately
// accepted and echoed back on the response instead. They're hints for
// OpenAI's caching/moderation backends, not directives — Codex sends
// `prompt_cache_key` on every request — and the OpenResponses spec
// includes them on the response body, so echoing the caller's value
// makes receipt observable without needing a real backend.
if inner.max_tool_calls.is_some() {
return Some(ErrorMessage::not_implemented_error(
VALIDATION_PREFIX.to_string() + "`max_tool_calls` is not supported.",
));
}
None
}
......@@ -2714,6 +2744,7 @@ mod tests {
})
}),
),
("max_tool_calls", Box::new(|r| r.max_tool_calls = Some(5))),
];
for (field, set_field) in unsupported_cases {
......@@ -2724,6 +2755,43 @@ mod tests {
}
}
/// Pass-through metadata fields (`prompt_cache_key`,
/// `prompt_cache_retention`, `safety_identifier`) are accepted at the
/// validation layer; the response serializer echoes them back so the
/// caller can confirm receipt. Codex sends `prompt_cache_key` on every
/// request — rejecting it broke `codex exec` end-to-end.
#[test]
fn test_validate_unsupported_fields_accepts_passthrough_metadata() {
#[allow(clippy::type_complexity)]
let passthrough_cases: Vec<(&str, Box<dyn FnOnce(&mut CreateResponse)>)> = vec![
(
"prompt_cache_key",
Box::new(|r| r.prompt_cache_key = Some("ck-1".into())),
),
(
"prompt_cache_retention",
Box::new(|r| {
r.prompt_cache_retention =
Some(dynamo_protocols::types::responses::PromptCacheRetention::InMemory)
}),
),
(
"safety_identifier",
Box::new(|r| r.safety_identifier = Some("user-hash".into())),
),
];
for (field, set_field) in passthrough_cases {
let mut req = make_base_request();
(set_field)(&mut req.inner);
let result = validate_response_unsupported_fields(&req);
assert!(
result.is_none(),
"Expected `{field}` to be accepted as pass-through metadata"
);
}
}
#[test]
fn test_validate_chat_completion_required_fields_empty_messages() {
let request = NvCreateChatCompletionRequest {
......
......@@ -9,9 +9,10 @@ use dynamo_protocols::types::responses::{
AssistantRole, FunctionCallOutput, FunctionToolCall, IncludeEnum, InputContent, InputItem,
InputOutputMessageContent, InputParam, InputRole, InputTokenDetails, Instructions, Item,
MessageItem, OutputItem, OutputMessage, OutputMessageContent, OutputStatus, OutputTextContent,
OutputTokenDetails, Reasoning, ReasoningItem, Response, ResponseTextParam, ResponseUsage,
Role as ResponseRole, ServiceTier, Status, SummaryPart, SummaryTextContent,
TextResponseFormatConfiguration, Tool, ToolChoiceOptions, ToolChoiceParam, Truncation,
OutputTokenDetails, PromptCacheRetention, Reasoning, ReasoningItem, Response,
ResponseTextParam, ResponseUsage, Role as ResponseRole, ServiceTier, Status, SummaryPart,
SummaryTextContent, TextResponseFormatConfiguration, Tool, ToolChoiceOptions, ToolChoiceParam,
Truncation,
};
use dynamo_protocols::types::{
ChatCompletionMessageToolCall, ChatCompletionNamedToolChoice,
......@@ -63,7 +64,7 @@ pub struct NvCreateResponse {
pub nvext: Option<NvExt>,
}
#[derive(ToSchema, Serialize, Deserialize, Validate, Debug, Clone)]
#[derive(ToSchema, Deserialize, Validate, Debug, Clone)]
pub struct NvResponse {
/// Flattened Response fields (includes upstream + extended spec fields).
#[serde(flatten)]
......@@ -73,6 +74,78 @@ pub struct NvResponse {
/// NVIDIA extension field for response metadata (worker IDs, etc.)
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
/// OpenResponses spec requires these as non-null scalars on every response,
/// but async-openai's `Response` doesn't model them. Populated from the
/// originating request. Surfaced during serialization (see `Serialize`
/// impl below); not persisted as top-level fields on the inner struct.
#[serde(default)]
pub presence_penalty: f32,
#[serde(default)]
pub frequency_penalty: f32,
#[serde(default)]
pub store: bool,
}
/// Patch an already-serialized `Response` JSON object to match the
/// OpenResponses spec. Applied both to one-shot `NvResponse` serialization
/// and to every `Response` embedded inside a streaming event payload.
///
/// Reconciles two spec gaps between upstream async-openai's `Response` and
/// the OpenResponses spec:
///
/// 1. Fields the spec requires as `T | null` that upstream marks
/// `Option<T>` with `skip_serializing_if = Option::is_none`. These are
/// silently dropped when None; the spec wants them present as null.
/// 2. Fields the spec requires (`presence_penalty`, `frequency_penalty`,
/// `store`) that are absent from upstream `Response` entirely.
///
/// Rather than fork the upstream output chain (which would cascade into
/// `OutputItem`, streaming events, and a long tail of sub-types, per
/// `lib/protocols/CLAUDE.md`), we patch the serialized JSON. Adds a
/// single `serde_json::to_value` round-trip per response, which is
/// negligible next to tokenization/inference cost.
pub(crate) fn patch_response_for_spec(
obj: &mut serde_json::Map<String, serde_json::Value>,
presence_penalty: f32,
frequency_penalty: f32,
store: bool,
) {
for key in dynamo_protocols::types::responses::SPEC_NULLABLE_REQUIRED_RESPONSE_FIELDS {
obj.entry(*key).or_insert(serde_json::Value::Null);
}
obj.insert(
"presence_penalty".into(),
serde_json::json!(presence_penalty),
);
obj.insert(
"frequency_penalty".into(),
serde_json::json!(frequency_penalty),
);
obj.insert("store".into(), serde_json::json!(store));
}
impl Serialize for NvResponse {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut value = serde_json::to_value(&self.inner).map_err(serde::ser::Error::custom)?;
let serde_json::Value::Object(obj) = &mut value else {
return value.serialize(serializer);
};
patch_response_for_spec(
obj,
self.presence_penalty,
self.frequency_penalty,
self.store,
);
if let Some(nvext) = &self.nvext {
obj.insert("nvext".into(), nvext.clone());
}
value.serialize(serializer)
}
}
/// Implements `NvExtProvider` for `NvCreateResponse`,
......@@ -244,6 +317,24 @@ fn convert_input_content_to_text(content: &[InputContent]) -> String {
.join("")
}
/// Counterpart to `convert_input_content_to_text` for upstream's
/// `InputContent`. Upstream's enum appears inside `FunctionCallOutput::Content`
/// and `EasyInputContent::ContentList`, neither of which is Dynamo-owned, so
/// payloads deserialized through those paths land as upstream variants.
fn convert_upstream_input_content_to_text(
content: &[dynamo_protocols::types::responses::UpstreamInputContent],
) -> String {
use dynamo_protocols::types::responses::UpstreamInputContent;
content
.iter()
.filter_map(|p| match p {
UpstreamInputContent::InputText(t) => Some(t.text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("")
}
/// Accumulator for consecutive assistant-side items (OutputMessage, FunctionCall,
/// Reasoning, assistant EasyMessage). Chat Completions represents an assistant
/// turn as a single message carrying `content`, `tool_calls`, and
......@@ -406,7 +497,9 @@ fn convert_input_items_to_messages(
std::mem::take(&mut pending).flush_into(&mut messages);
let output_text = match &fco.output {
FunctionCallOutput::Text(text) => text.clone(),
FunctionCallOutput::Content(parts) => convert_input_content_to_text(parts),
FunctionCallOutput::Content(parts) => {
convert_upstream_input_content_to_text(parts)
}
};
messages.push(ChatCompletionRequestMessage::Tool(
ChatCompletionRequestToolMessage {
......@@ -444,7 +537,7 @@ fn convert_input_items_to_messages(
text.clone()
}
dynamo_protocols::types::responses::EasyInputContent::ContentList(parts) => {
convert_input_content_to_text(parts)
convert_upstream_input_content_to_text(parts)
}
};
match easy.role {
......@@ -740,6 +833,22 @@ pub struct ResponseParams {
pub service_tier: Option<ServiceTier>,
pub include: Option<Vec<IncludeEnum>>,
pub truncation: Option<Truncation>,
/// OpenResponses spec requires these fields on the response body. Upstream
/// `CreateResponse` doesn't model them on the request yet, so for now they
/// pass through as `None`; the response serializer defaults to 0.0 (the
/// effective sglang default). Wired through `ResponseParams` anyway so
/// that when upstream relaxes or we shadow `CreateResponse`, threading a
/// real value becomes a one-line change at the request-extraction site.
pub presence_penalty: Option<f32>,
pub frequency_penalty: Option<f32>,
/// Pass-through metadata fields. Codex and other clients send these as
/// hints for OpenAI's caching/moderation backends; Dynamo doesn't act on
/// them, but the spec includes them on the response body so we echo back
/// what the caller sent rather than silently dropping. Echoing makes
/// receipt observable to the client without needing a real backend.
pub prompt_cache_key: Option<String>,
pub prompt_cache_retention: Option<PromptCacheRetention>,
pub safety_identifier: Option<String>,
}
/// Normalize tools so that `FunctionTool.strict` is always set.
......@@ -880,13 +989,13 @@ pub fn chat_completion_to_response(
.include
.as_ref()
.is_some_and(|inc| inc.contains(&IncludeEnum::MessageOutputTextLogprobs));
if !keep_logprobs {
for item in &mut output {
if let OutputItem::Message(msg) = item {
for content in &mut msg.content {
if let OutputMessageContent::OutputText(text) = content {
text.logprobs = None;
}
if let OutputMessageContent::OutputText(text) = content
&& (!keep_logprobs || text.logprobs.is_none())
{
text.logprobs = Some(Vec::new());
}
}
}
......@@ -936,10 +1045,10 @@ pub fn chat_completion_to_response(
max_output_tokens: params.max_output_tokens,
previous_response_id: api_context.and_then(|ctx| ctx.previous_response_id.clone()),
prompt: None,
prompt_cache_key: None,
prompt_cache_retention: None,
prompt_cache_key: params.prompt_cache_key.clone(),
prompt_cache_retention: params.prompt_cache_retention,
reasoning: params.reasoning.clone(),
safety_identifier: None,
safety_identifier: params.safety_identifier.clone(),
service_tier: Some(params.service_tier.unwrap_or(ServiceTier::Auto)),
top_logprobs: Some(0),
usage: chat_resp.usage.map(|u| ResponseUsage {
......@@ -964,6 +1073,9 @@ pub fn chat_completion_to_response(
Ok(NvResponse {
inner: response,
nvext,
presence_penalty: params.presence_penalty.unwrap_or(0.0),
frequency_penalty: params.frequency_penalty.unwrap_or(0.0),
store: params.store.unwrap_or(false),
})
}
......@@ -2475,7 +2587,10 @@ thinking
}
#[test]
fn test_include_logprobs_stripped_by_default() {
fn test_include_logprobs_empty_by_default() {
// OpenResponses schema requires `logprobs` to be an array. When the
// caller did not request them via `include`, emit an empty array
// rather than null.
let chat_resp = make_chat_resp_with_text("hello");
let params = ResponseParams::default();
let resp = chat_completion_to_response(chat_resp, &params, None).unwrap();
......@@ -2484,9 +2599,10 @@ thinking
if let OutputItem::Message(msg) = item {
for content in &msg.content {
if let OutputMessageContent::OutputText(t) = content {
assert!(
t.logprobs.is_none(),
"logprobs should be stripped by default"
assert_eq!(
t.logprobs.as_deref(),
Some(&[][..]),
"logprobs should be an empty array by default"
);
}
}
......@@ -2543,11 +2659,35 @@ thinking
assert_eq!(resp.inner.truncation, Some(Truncation::Disabled));
}
/// Validate the JSON wire shape of NvResponse.
///
/// The migration to upstream async-openai v0.34 removed fields that were
/// incorrectly present on our old local Response type (they belong on the
/// request, not the response, per the OpenAI Responses API spec).
/// Pass-through metadata fields the OpenResponses spec includes on the
/// response body. Codex sends `prompt_cache_key` on every request; we
/// echo it back so the caller can confirm receipt without enforcing any
/// caching semantics. Same pattern for `prompt_cache_retention` and
/// `safety_identifier`.
#[test]
fn test_response_echoes_passthrough_metadata() {
let chat_resp = make_chat_resp_with_text("hello");
let params = ResponseParams {
prompt_cache_key: Some("cache-key-codex-1".into()),
prompt_cache_retention: Some(PromptCacheRetention::InMemory),
safety_identifier: Some("user-abc".into()),
..Default::default()
};
let resp = chat_completion_to_response(chat_resp, &params, None).unwrap();
assert_eq!(
resp.inner.prompt_cache_key.as_deref(),
Some("cache-key-codex-1")
);
assert_eq!(
resp.inner.prompt_cache_retention,
Some(PromptCacheRetention::InMemory)
);
assert_eq!(resp.inner.safety_identifier.as_deref(), Some("user-abc"));
}
/// Validate the JSON wire shape of NvResponse matches the OpenResponses
/// spec: required scalars always present, nullable-required fields
/// emitted as `null` when None.
#[test]
fn test_response_wire_format_shape() {
let chat_resp = make_chat_resp_with_text("hello");
......@@ -2555,14 +2695,14 @@ thinking
let resp = chat_completion_to_response(chat_resp, &params, None).unwrap();
let json = serde_json::to_value(&resp).unwrap();
// Fields that were on our old local type but are NOT in the OpenAI
// Responses API spec -- they are request-level, not response-level.
assert!(json.get("frequency_penalty").is_none());
assert!(json.get("presence_penalty").is_none());
assert!(json.get("store").is_none());
assert!(json.get("max_tool_calls").is_none());
// Required scalars the spec mandates on every response. Upstream
// async-openai's Response struct doesn't model these; NvResponse's
// custom serializer injects them.
assert_eq!(json["frequency_penalty"], 0.0);
assert_eq!(json["presence_penalty"], 0.0);
assert_eq!(json["store"], false);
// Fields that should be present with expected values
// Other required fields with expected values
assert_eq!(json["object"], "response");
assert_eq!(json["status"], "completed");
assert_eq!(json["metadata"], serde_json::json!({}));
......@@ -2570,12 +2710,25 @@ thinking
assert!(json["output"][0].get("id").is_some());
assert!(json["output"][0].get("status").is_some());
// Optional fields with None should be omitted (upstream uses skip_serializing_if)
assert!(json.get("error").is_none());
assert!(json.get("incomplete_details").is_none());
assert!(json.get("billing").is_none());
assert!(json.get("conversation").is_none());
assert!(json.get("safety_identifier").is_none());
// Nullable-required fields must be present as null (not missing).
for key in [
"error",
"incomplete_details",
"billing",
"conversation",
"safety_identifier",
"max_tool_calls",
"instructions",
"previous_response_id",
"prompt_cache_key",
"reasoning",
] {
assert_eq!(
json.get(key),
Some(&serde_json::Value::Null),
"expected {key} to be present as null"
);
}
// nvext should be omitted when None
assert!(json.get("nvext").is_none());
......
......@@ -155,10 +155,10 @@ impl ResponseStreamConverter {
.as_ref()
.and_then(|ctx| ctx.previous_response_id.clone()),
prompt: None,
prompt_cache_key: None,
prompt_cache_retention: None,
prompt_cache_key: self.params.prompt_cache_key.clone(),
prompt_cache_retention: self.params.prompt_cache_retention,
reasoning: self.params.reasoning.clone(),
safety_identifier: None,
safety_identifier: self.params.safety_identifier.clone(),
service_tier: Some(self.params.service_tier.unwrap_or(ServiceTier::Auto)),
top_logprobs: Some(0),
usage: self.usage.clone(),
......@@ -173,13 +173,13 @@ impl ResponseStreamConverter {
sequence_number: self.next_seq(),
response: self.make_response(Status::InProgress, vec![]),
});
events.push(make_sse_event(&created));
events.push(self.make_sse_event(&created));
let in_progress = ResponseStreamEvent::ResponseInProgress(ResponseInProgressEvent {
sequence_number: self.next_seq(),
response: self.make_response(Status::InProgress, vec![]),
});
events.push(make_sse_event(&in_progress));
events.push(self.make_sse_event(&in_progress));
events
}
......@@ -249,7 +249,7 @@ impl ResponseStreamConverter {
}),
},
);
events.push(make_sse_event(&item_added));
events.push(self.make_sse_event(&item_added));
let part_added = ResponseStreamEvent::ResponseContentPartAdded(
ResponseContentPartAddedEvent {
......@@ -264,7 +264,7 @@ impl ResponseStreamConverter {
}),
},
);
events.push(make_sse_event(&part_added));
events.push(self.make_sse_event(&part_added));
}
// Emit text delta
......@@ -278,7 +278,7 @@ impl ResponseStreamConverter {
delta: content.to_string(),
logprobs: Some(vec![]),
});
events.push(make_sse_event(&text_delta));
events.push(self.make_sse_event(&text_delta));
}
// Handle tool call deltas
......@@ -332,7 +332,7 @@ impl ResponseStreamConverter {
}),
},
);
events.push(make_sse_event(&item_added));
events.push(self.make_sse_event(&item_added));
}
self.function_call_items[tc_index]
......@@ -355,7 +355,7 @@ impl ResponseStreamConverter {
delta: args.clone(),
},
);
events.push(make_sse_event(&args_delta));
events.push(self.make_sse_event(&args_delta));
// Emit done + output_item.done immediately if the tool call
// arrived complete in a single chunk (id + name + args all present).
......@@ -382,7 +382,7 @@ impl ResponseStreamConverter {
name: Some(fc_name.clone()),
},
);
events.push(make_sse_event(&args_done));
events.push(self.make_sse_event(&args_done));
let item_done = ResponseStreamEvent::ResponseOutputItemDone(
ResponseOutputItemDoneEvent {
......@@ -398,7 +398,7 @@ impl ResponseStreamConverter {
}),
},
);
events.push(make_sse_event(&item_done));
events.push(self.make_sse_event(&item_done));
}
}
}
......@@ -423,7 +423,7 @@ impl ResponseStreamConverter {
text: self.accumulated_text.clone(),
logprobs: Some(vec![]),
});
events.push(make_sse_event(&text_done));
events.push(self.make_sse_event(&text_done));
let part_done =
ResponseStreamEvent::ResponseContentPartDone(ResponseContentPartDoneEvent {
......@@ -437,7 +437,7 @@ impl ResponseStreamConverter {
logprobs: Some(vec![]),
}),
});
events.push(make_sse_event(&part_done));
events.push(self.make_sse_event(&part_done));
let item_done =
ResponseStreamEvent::ResponseOutputItemDone(ResponseOutputItemDoneEvent {
......@@ -455,7 +455,7 @@ impl ResponseStreamConverter {
status: OutputStatus::Completed,
}),
});
events.push(make_sse_event(&item_done));
events.push(self.make_sse_event(&item_done));
}
// Close any function call items not already done inline
......@@ -483,7 +483,7 @@ impl ResponseStreamConverter {
name: Some(fc_name.clone()),
},
);
events.push(make_sse_event(&args_done));
events.push(self.make_sse_event(&args_done));
let item_done =
ResponseStreamEvent::ResponseOutputItemDone(ResponseOutputItemDoneEvent {
......@@ -498,7 +498,7 @@ impl ResponseStreamConverter {
status: Some(OutputStatus::Completed),
}),
});
events.push(make_sse_event(&item_done));
events.push(self.make_sse_event(&item_done));
}
// Build the final output vector from accumulated state
......@@ -534,7 +534,7 @@ impl ResponseStreamConverter {
sequence_number: self.next_seq(),
response: self.make_response(Status::Completed, output),
});
events.push(make_sse_event(&completed));
events.push(self.make_sse_event(&completed));
events
}
......@@ -547,16 +547,33 @@ impl ResponseStreamConverter {
sequence_number: self.next_seq(),
response: self.make_response(Status::Failed, vec![]),
});
events.push(make_sse_event(&failed));
events.push(self.make_sse_event(&failed));
events
}
}
fn make_sse_event(event: &ResponseStreamEvent) -> Result<Event, anyhow::Error> {
impl ResponseStreamConverter {
/// Serialize a stream event, patching any embedded `response` object to
/// satisfy the OpenResponses schema. Takes `&self` so spec-required
/// sampling params can be sourced from the originating request via
/// `self.params` rather than hardcoded at each emit site.
fn make_sse_event(&self, event: &ResponseStreamEvent) -> Result<Event, anyhow::Error> {
let event_type = get_event_type(event);
let data = serde_json::to_string(event)?;
let mut value = serde_json::to_value(event)?;
if let serde_json::Value::Object(ref mut obj) = value
&& let Some(serde_json::Value::Object(inner)) = obj.get_mut("response")
{
super::patch_response_for_spec(
inner,
self.params.presence_penalty.unwrap_or(0.0),
self.params.frequency_penalty.unwrap_or(0.0),
self.params.store.unwrap_or(false),
);
}
let data = serde_json::to_string(&value)?;
Ok(Event::default().event(event_type).data(data))
}
}
fn get_event_type(event: &ResponseStreamEvent) -> &'static str {
......@@ -677,22 +694,7 @@ mod tests {
};
fn default_params() -> ResponseParams {
ResponseParams {
model: None,
temperature: None,
top_p: None,
max_output_tokens: None,
parallel_tool_calls: None,
store: None,
tools: None,
tool_choice: None,
instructions: None,
reasoning: None,
text: None,
service_tier: None,
include: None,
truncation: None,
}
ResponseParams::default()
}
fn tool_call_chunk(
......
......@@ -35,6 +35,13 @@ use serde::{Deserialize, Serialize};
// shadow their upstream counterparts where no dual-side conflict exists.
pub use async_openai::types::responses::*;
// Re-export upstream's pre-shadow `InputContent` under an explicit alias.
// Needed because `FunctionCallOutput::Content` and `EasyInputContent::ContentList`
// are non-owned upstream types that carry upstream's original `InputContent`
// inline, so downstream consumers occasionally need to name it alongside the
// Dynamo-owned shadow defined further down this module.
pub use async_openai::types::responses::InputContent as UpstreamInputContent;
// Re-export from parent module for backward compat.
pub use crate::types::ImageDetail;
pub use crate::types::ReasoningEffort;
......@@ -51,6 +58,40 @@ pub type ResponseStream = std::pin::Pin<
Box<dyn futures::Stream<Item = Result<ResponseStreamEvent, crate::error::OpenAIError>> + Send>,
>;
/// Fields on upstream `Response` that the OpenResponses spec requires as
/// `T | null` but async-openai declares as `Option<T>` with
/// `skip_serializing_if = Option::is_none` — meaning `None` disappears from
/// the wire shape, where the spec wants an explicit `null`.
///
/// Colocated here (next to the upstream `Response` re-export) rather than in
/// `lib/llm/src/protocols/openai/responses/mod.rs` so that when upstream's
/// `Response` gains a new nullable-required field, the reviewer editing this
/// module is looking directly at the authoritative list. Keep sorted
/// alphabetically; entries must match serde field names on `Response` exactly.
///
/// Any field we unconditionally populate ourselves during response
/// construction (e.g. `metadata`, `parallel_tool_calls`, `temperature`,
/// `text`, `tool_choice`, `tools`, `top_p`, `top_logprobs`, `truncation`,
/// `service_tier`, `background`) is deliberately absent — it's always
/// present on the wire, so listing it here would be noise.
pub const SPEC_NULLABLE_REQUIRED_RESPONSE_FIELDS: &[&str] = &[
"billing",
"completed_at",
"conversation",
"error",
"incomplete_details",
"instructions",
"max_output_tokens",
"max_tool_calls",
"previous_response_id",
"prompt",
"prompt_cache_key",
"prompt_cache_retention",
"reasoning",
"safety_identifier",
"usage",
];
// ---------------------------------------------------------------------------
// Input-side assistant message (relaxed vs upstream OutputMessage)
// ---------------------------------------------------------------------------
......@@ -68,6 +109,19 @@ where
Option::<Vec<T>>::deserialize(deserializer).map(Option::unwrap_or_default)
}
/// Deserialize `null` or a missing field as `T::default()`. Scalar counterpart
/// to `deserialize_null_as_empty_vec` — plain `#[serde(default)]` rejects
/// explicit `null` because serde tries to deserialize the null into `T` and
/// fails. Real clients emit `null` for unset enum-ish fields (e.g. OpenAI
/// Agents SDK sending `"detail": null` on `input_image` parts).
fn deserialize_null_as_default<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de> + Default,
D: serde::Deserializer<'de>,
{
Option::<T>::deserialize(deserializer).map(Option::unwrap_or_default)
}
/// Relaxed counterpart to upstream `OutputTextContent` for input-side content.
/// `annotations` tolerates both missing and explicit `null`; upstream requires
/// it to be a present non-null array.
......@@ -107,6 +161,45 @@ pub struct InputOutputMessage {
pub status: Option<OutputStatus>,
}
// ---------------------------------------------------------------------------
// Input-side image / content / message (shadow upstream, relaxed shapes)
// ---------------------------------------------------------------------------
/// Relaxed counterpart to upstream `InputImageContent`. `detail` defaults to
/// `ImageDetail::Auto` when the client omits it — OpenAI's hosted API and the
/// OpenResponses spec both accept this shape, but upstream's struct marks
/// `detail` as required.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct InputImageContent {
#[serde(default, deserialize_with = "deserialize_null_as_default")]
pub detail: ImageDetail,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub file_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub image_url: Option<String>,
}
/// Parts of an input message: text, image, or file. Mirrors upstream
/// `InputContent` but routes `InputImage` through the Dynamo-owned relaxed
/// `InputImageContent` above.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InputContent {
InputText(InputTextContent),
InputImage(InputImageContent),
InputFile(InputFileContent),
}
/// User / system / developer input message. Shadows upstream `InputMessage`
/// so we can route through the Dynamo-owned `InputContent` chain.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
pub struct InputMessage {
pub content: Vec<InputContent>,
pub role: InputRole,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub status: Option<OutputStatus>,
}
// ---------------------------------------------------------------------------
// Input-side Item / Message / InputItem / InputParam (shadow upstream)
// ---------------------------------------------------------------------------
......@@ -271,6 +364,33 @@ mod tests {
}
}
#[test]
fn input_image_without_detail_defaults_to_auto() {
let json = serde_json::json!({
"type": "input_image",
"image_url": "https://example.com/cat.jpg"
});
let content: InputContent = serde_json::from_value(json).unwrap();
match content {
InputContent::InputImage(img) => assert_eq!(img.detail, ImageDetail::Auto),
other => panic!("expected InputImage, got {other:?}"),
}
}
#[test]
fn input_image_with_explicit_null_detail_defaults_to_auto() {
let json = serde_json::json!({
"type": "input_image",
"image_url": "https://example.com/cat.jpg",
"detail": null
});
let content: InputContent = serde_json::from_value(json).unwrap();
match content {
InputContent::InputImage(img) => assert_eq!(img.detail, ImageDetail::Auto),
other => panic!("expected InputImage, got {other:?}"),
}
}
#[test]
fn assistant_message_without_content_field_deserializes() {
// Bare assistant shell — no `content` field at all. Seen in real
......
......@@ -232,6 +232,7 @@ markers = [
"post_merge: marks tests to run after merge",
"parallel: marks tests that can run in parallel with pytest-xdist",
"nightly: marks tests to run nightly",
"frontend_api_surface_compliance: marks tests that validate Dynamo's HTTP API surface (Responses/Anthropic wire shape, tool-call routing) against upstream compliance harnesses",
"weekly: marks tests to run weekly",
"release: marks tests to run on release pipelines",
"gpu_0: marks tests that don't require GPU",
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Frontend API-surface compliance suite against a live Dynamo frontend.
Subject under test is Dynamo's HTTP surface (`/v1/responses` and
`/v1/messages` wire shapes, tool-call routing through both); sglang is
just the backend vehicle for producing real traffic. Runs three suites
sequentially against one server:
1. Upstream OpenResponses compliance-test.ts harness (bun/TypeScript
validator against zod schemas generated from the OpenAPI spec).
2. `codex exec` smoke — forces the shell tool-call path through
`/v1/responses`.
3. `claude -p` smoke — forces the Bash tool-call path through
`/v1/messages` (Anthropic Messages API).
All external tooling (bun, node, the OpenResponses suite, and the codex /
claude CLIs) is installed lazily at test time by session-scoped fixtures
into a session-shared cache directory. Versions and the OpenResponses
SHA are pinned as module-level constants. FileLock coordination makes
concurrent xdist workers share a single install.
"""
import logging
import os
import platform
import shlex
import shutil
import subprocess
import tarfile
import time
import zipfile
from pathlib import Path
import pytest
import requests
from filelock import FileLock
from tests.serve.common import WORKSPACE_DIR
from tests.utils.engine_process import EngineConfig, EngineProcess
logger = logging.getLogger(__name__)
sglang_dir = os.environ.get("SGLANG_DIR") or os.path.join(
WORKSPACE_DIR, "examples/backends/sglang"
)
COMPLIANCE_MODEL = "Qwen/Qwen3-VL-2B-Instruct"
# Pinned external-tool versions. Bun and node are pinned for reproducibility.
# The agent CLIs (@openai/codex, @anthropic-ai/claude-code) float to @latest
# so we automatically pick up protocol fixes — they're client-side harnesses,
# not Dynamo surface.
BUN_VERSION = "1.3.12"
NODE_VERSION = "20.19.0"
OPENRESPONSES_REPO = "https://github.com/openresponses/openresponses.git"
OPENRESPONSES_SHA = "fa29df5"
# Retry budget for network-touching installs. Exponential backoff starting
# at 2s; 3 attempts caps the worst-case wait at ~6s before we surface a
# clear "upstream unavailable" error.
_RETRY_COUNT = 3
_RETRY_BACKOFF_INITIAL_S = 2.0
# Env keys forwarded into codex/claude subprocesses. These agents run with tool
# permissions (`--dangerously-bypass-approvals-and-sandbox`, `--dangerously-skip-permissions`),
# and even against a local model they may emit telemetry; inheriting the whole
# CI environment would expose `GITHUB_TOKEN`, AWS creds, registry credentials,
# etc. Keep to a minimal allowlist covering only what the runtime needs:
# PATH to resolve the binaries, locale/TLS/proxy for HTTPS, HOME so Node/bun
# finds per-user caches, and NVIDIA/CUDA vars so any GPU-touching side effects
# see the same device the test was given.
_SUBPROCESS_ENV_ALLOWLIST: frozenset[str] = frozenset(
{
"PATH",
"HOME",
"LANG",
"LC_ALL",
"TZ",
"SSL_CERT_FILE",
"SSL_CERT_DIR",
"REQUESTS_CA_BUNDLE",
"CURL_CA_BUNDLE",
"HTTP_PROXY",
"HTTPS_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"no_proxy",
"LD_LIBRARY_PATH",
"CUDA_VISIBLE_DEVICES",
"NVIDIA_VISIBLE_DEVICES",
"NVIDIA_DRIVER_CAPABILITIES",
}
)
def _agent_subprocess_env(
extra_env: dict[str, str], path_prepend: list[Path] | None = None
) -> dict[str, str]:
"""Build a minimal env for codex/claude subprocesses: allowlist from
`os.environ` merged with explicit test-scoped vars. Optional
`path_prepend` prepends directories to PATH so the fixture-installed
node/codex/claude binaries resolve without contaminating the
inherited PATH."""
base = {
k: v for k in _SUBPROCESS_ENV_ALLOWLIST if (v := os.environ.get(k)) is not None
}
if path_prepend:
existing = base.get("PATH", "")
prefix = os.pathsep.join(str(p) for p in path_prepend)
base["PATH"] = f"{prefix}{os.pathsep}{existing}" if existing else prefix
base.update(extra_env)
return base
# ---------------------------------------------------------------------------
# Tool-install fixtures
# ---------------------------------------------------------------------------
def _retry_network_op(fn, description: str):
"""Run `fn()` with a small exponential-backoff retry budget so that
transient github/npm/nodejs.org blips don't flake the test.
Captures subprocess stderr into the final error message so post-mortem
doesn't require digging through logs."""
last_err: BaseException | None = None
for attempt in range(_RETRY_COUNT):
try:
return fn()
except (OSError, requests.RequestException, subprocess.CalledProcessError) as e:
last_err = e
if attempt + 1 < _RETRY_COUNT:
wait = _RETRY_BACKOFF_INITIAL_S * (2**attempt)
logger.warning(
"%s failed (attempt %d/%d): %s — retrying in %.1fs",
description,
attempt + 1,
_RETRY_COUNT,
e,
wait,
)
time.sleep(wait)
detail = ""
if isinstance(last_err, subprocess.CalledProcessError):
detail = f"\nstdout:\n{last_err.stdout or ''}\nstderr:\n{last_err.stderr or ''}"
raise RuntimeError(
f"{description} failed after {_RETRY_COUNT} attempts: {last_err}{detail}"
) from last_err
def _download_url(url: str, dest: Path) -> None:
"""Stream GET `url` into `dest` atomically via a `.part` sibling."""
tmp = dest.with_suffix(dest.suffix + ".part")
with requests.get(url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(tmp, "wb") as f:
for chunk in r.iter_content(chunk_size=64 * 1024):
if chunk:
f.write(chunk)
tmp.rename(dest)
def _bun_arch() -> str:
m = platform.machine()
if m == "x86_64":
return "x64"
if m == "aarch64":
return "aarch64"
raise RuntimeError(f"Unsupported machine architecture for bun: {m}")
def _node_arch() -> str:
m = platform.machine()
if m == "x86_64":
return "x64"
if m == "aarch64":
return "arm64"
raise RuntimeError(f"Unsupported machine architecture for node: {m}")
@pytest.fixture(scope="session")
def _tools_cache(tmp_path_factory) -> Path:
"""Session-shared cache directory for downloaded compliance tooling.
Lives under the pytest basetemp so it's reused across xdist workers
in the same session and cleaned up automatically when the session
ends."""
base = Path(tmp_path_factory.getbasetemp()) / "_frontend_api_surface_tools"
base.mkdir(parents=True, exist_ok=True)
return base
@pytest.fixture(scope="session")
def _bun_binary(_tools_cache) -> Path:
"""Pinned-version bun executable. FileLock-coordinated so concurrent
xdist workers share a single download."""
install_dir = _tools_cache / f"bun-{BUN_VERSION}"
bun_bin = install_dir / "bun"
with FileLock(str(_tools_cache / "bun.lock")):
if bun_bin.exists():
return bun_bin
install_dir.mkdir(parents=True, exist_ok=True)
arch = _bun_arch()
url = (
f"https://github.com/oven-sh/bun/releases/download/"
f"bun-v{BUN_VERSION}/bun-linux-{arch}.zip"
)
zip_path = install_dir / "bun.zip"
_retry_network_op(
lambda: _download_url(url, zip_path),
description=f"download bun v{BUN_VERSION} ({arch})",
)
with zipfile.ZipFile(zip_path) as zf:
zf.extractall(install_dir)
extracted = install_dir / f"bun-linux-{arch}" / "bun"
shutil.copy(extracted, bun_bin)
bun_bin.chmod(0o755)
zip_path.unlink(missing_ok=True)
return bun_bin
@pytest.fixture(scope="session")
def _node_bin(_tools_cache) -> Path:
"""Pinned-version node runtime root `bin/` directory containing
`node` and `npm`. FileLock-coordinated."""
install_dir = _tools_cache / f"node-v{NODE_VERSION}"
bin_dir = install_dir / "bin"
with FileLock(str(_tools_cache / "node.lock")):
if (bin_dir / "node").exists() and (bin_dir / "npm").exists():
return bin_dir
install_dir.mkdir(parents=True, exist_ok=True)
arch = _node_arch()
tarball_name = f"node-v{NODE_VERSION}-linux-{arch}.tar.xz"
url = f"https://nodejs.org/dist/v{NODE_VERSION}/{tarball_name}"
tar_path = install_dir / tarball_name
_retry_network_op(
lambda: _download_url(url, tar_path),
description=f"download node v{NODE_VERSION} ({arch})",
)
with tarfile.open(tar_path) as tf:
# `filter="data"` is the safe extraction filter added in 3.12 and
# required in 3.14; passing it explicitly silences the pytest
# filterwarnings=error escalation of the DeprecationWarning.
tf.extractall(install_dir, filter="data")
extracted = install_dir / f"node-v{NODE_VERSION}-linux-{arch}"
for item in extracted.iterdir():
shutil.move(str(item), str(install_dir / item.name))
extracted.rmdir()
tar_path.unlink(missing_ok=True)
return bin_dir
@pytest.fixture(scope="session")
def _openresponses_suite(_tools_cache, _bun_binary) -> Path:
"""Pinned-SHA clone of the OpenResponses compliance suite with bun
deps installed. A `.installed` sentinel file marks a completed setup
so an interrupted prior install forces a clean redo."""
install_dir = _tools_cache / f"openresponses-{OPENRESPONSES_SHA}"
sentinel = install_dir / ".installed"
with FileLock(str(_tools_cache / "openresponses.lock")):
if sentinel.exists():
return install_dir
if install_dir.exists():
shutil.rmtree(install_dir)
_retry_network_op(
lambda: subprocess.run(
[
"git",
"clone",
"--filter=blob:none",
OPENRESPONSES_REPO,
str(install_dir),
],
check=True,
capture_output=True,
text=True,
),
description="clone openresponses",
)
subprocess.run(
["git", "-C", str(install_dir), "checkout", OPENRESPONSES_SHA],
check=True,
capture_output=True,
text=True,
)
_retry_network_op(
lambda: subprocess.run(
[str(_bun_binary), "install", "--frozen-lockfile"],
cwd=str(install_dir),
check=True,
capture_output=True,
text=True,
),
description="bun install openresponses deps",
)
sentinel.touch()
return install_dir
def _install_npm_cli(
tools_cache: Path,
node_bin: Path,
package: str,
binary_name: str,
slot: str,
) -> Path:
"""Install `package` into `{tools_cache}/{slot}` via npm and return
the path to the CLI entry point. Shared helper for codex + claude."""
install_dir = tools_cache / slot
cli_bin = install_dir / "node_modules" / ".bin" / binary_name
with FileLock(str(tools_cache / f"{slot}.lock")):
if cli_bin.exists():
return cli_bin
install_dir.mkdir(parents=True, exist_ok=True)
env = {
**os.environ,
"PATH": f"{node_bin}{os.pathsep}{os.environ.get('PATH', '')}",
}
_retry_network_op(
lambda: subprocess.run(
[
str(node_bin / "npm"),
"install",
"--prefix",
str(install_dir),
package,
],
env=env,
check=True,
capture_output=True,
text=True,
),
description=f"npm install {package}",
)
return cli_bin
@pytest.fixture(scope="session")
def _codex_cli(_tools_cache, _node_bin) -> Path:
return _install_npm_cli(
_tools_cache,
_node_bin,
package="@openai/codex",
binary_name="codex",
slot="codex",
)
@pytest.fixture(scope="session")
def _claude_cli(_tools_cache, _node_bin) -> Path:
return _install_npm_cli(
_tools_cache,
_node_bin,
package="@anthropic-ai/claude-code",
binary_name="claude",
slot="claude",
)
# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------
@pytest.mark.sglang
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.model(COMPLIANCE_MODEL)
@pytest.mark.profiled_vram_gib(6.0)
@pytest.mark.requested_sglang_kv_tokens(512)
# Budget: tool-install fixtures (~30-60s first session run, near-zero on
# cache hit) + sglang cold start (30-60s) + bun compliance (up to 180s) +
# codex exec (up to 180s) + claude exec (up to 180s) + two inter-suite
# health checks + teardown. 750s leaves headroom for CI variance without
# masking real hangs.
@pytest.mark.timeout(750)
@pytest.mark.frontend_api_surface_compliance
@pytest.mark.pre_merge
def test_frontend_api_surface_compliance(
request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
tmp_path,
_bun_binary,
_node_bin,
_openresponses_suite,
_codex_cli,
_claude_cli,
):
"""Assert the frontend passes the upstream OpenResponses compliance suite."""
frontend_port = int(dynamo_dynamic_ports.frontend_port)
system_port = int(dynamo_dynamic_ports.system_ports[0])
config = EngineConfig(
name="responses_compliance",
directory=sglang_dir,
marks=[],
request_payloads=[],
model=COMPLIANCE_MODEL,
script_name="agg.sh",
# Qwen3-VL-2B-specific flags: vision-model CUDA graph workaround +
# model-aware reasoning/tool-call parsers. Forwarded verbatim to
# `dynamo.sglang` by agg.sh's pass-through loop.
#
# Tool-call parser is `hermes`, not `qwen3_coder`: Qwen3-VL-Instruct
# emits `<tool_call>{"name":..., "arguments":...}</tool_call>` (JSON
# inside the tags — Hermes-style), while `qwen3_coder` expects the
# XML-structured `<tool_call><function=name><parameter=k>v</parameter>
# </function></tool_call>` that Qwen3-Coder models emit. Using the
# wrong parser leaves tool calls as raw text in the response and
# breaks end-to-end agent flows (codex exec, etc.).
script_args=[
"--model-path",
COMPLIANCE_MODEL,
"--disable-piecewise-cuda-graph",
"--dyn-reasoning-parser",
"qwen3",
"--dyn-tool-call-parser",
"hermes",
],
timeout=360,
env={},
frontend_port=frontend_port,
)
merged_env = {
"DYN_HTTP_PORT": str(frontend_port),
"DYN_SYSTEM_PORT": str(system_port),
# agg.sh doesn't forward frontend args, but the frontend reads this
# env var directly. Enables /v1/messages for the claude smoke step.
"DYN_ENABLE_ANTHROPIC_API": "1",
}
codex_home = tmp_path / "codex_home"
_write_codex_config(codex_home, frontend_port)
# Marker file that the agents can only "see" by invoking their shell/Bash
# tool; if a model answers from its prior without actually running `ls`,
# the marker won't appear in stdout and the assertion fails. Proves the
# tool-call paths through the frontend end-to-end (both /v1/responses
# for codex and /v1/messages for claude), not just text generation.
agent_cwd = tmp_path / "agent_cwd"
agent_cwd.mkdir()
marker_filename = "dynamo_compliance_marker.txt"
(agent_cwd / marker_filename).write_text("compliance-smoke")
# Isolated HOME so claude doesn't write session state into the runner's
# ~/.claude during CI / local invocation.
claude_home = tmp_path / "claude_home"
claude_home.mkdir()
with EngineProcess.from_script(config, request, extra_env=merged_env):
_run_bun_compliance(_bun_binary, _openresponses_suite, frontend_port)
_wait_for_frontend_healthy(frontend_port)
_run_codex_exec_smoke(
_codex_cli, _node_bin, codex_home, agent_cwd, marker_filename
)
_wait_for_frontend_healthy(frontend_port)
_run_claude_exec_smoke(
_claude_cli,
_node_bin,
claude_home,
agent_cwd,
marker_filename,
frontend_port,
)
def _attach_subprocess_log(
name: str,
cmd: list[str],
result: subprocess.CompletedProcess,
extra_env: dict[str, str] | None = None,
cwd: str | None = None,
) -> None:
"""Attach a reproducible transcript of `cmd` to the Allure report.
Lands in `test-results/allure-results/<uuid>-attachment.txt`, which the
CI workflow uploads as an artifact on every run (pass or fail). Contents
are a cut-and-paste-able shell invocation plus the raw stdout + stderr
so a failing CI run can be reproduced locally from the artifact alone.
Only explicitly listed env vars (`extra_env`) are recorded — not the
inherited `os.environ` — to avoid leaking runner secrets into the
artifact. CI runners keep HF tokens and cloud creds in env vars the
subprocess inherits; we don't need those in the log to reproduce.
"""
# Local import: `allure` is only available inside the test image (via
# allure-pytest). Pre-commit's collection-only pytest runs in a clean
# uvx env without it, so a module-level import would fail collection.
import allure
lines: list[str] = []
if cwd:
lines.append(f"$ cd {shlex.quote(cwd)}")
if extra_env:
for k, v in sorted(extra_env.items()):
lines.append(f"$ export {k}={shlex.quote(v)}")
lines.append("$ " + " ".join(shlex.quote(c) for c in cmd))
lines.append("")
lines.append(f"exit: {result.returncode}")
lines.append("")
lines.append("=== stdout ===")
lines.append(result.stdout or "(empty)")
lines.append("")
lines.append("=== stderr ===")
lines.append(result.stderr or "(empty)")
allure.attach(
"\n".join(lines),
name=name,
attachment_type=allure.attachment_type.TEXT,
)
def _wait_for_frontend_healthy(
frontend_port: int, timeout_s: float = 15.0, model: str = COMPLIANCE_MODEL
) -> None:
"""Confirm the frontend is still serving before the next subprocess fires.
Without this check, if bun compliance accidentally destabilized the
server (e.g. a hang that the bun timeout cut short) a codex exec
failure looks identical to "codex is broken" in CI logs. The health
probe collapses that ambiguity: if the frontend has crashed or the
worker has deregistered, fail here with a clear message rather than
letting codex run and time out.
"""
deadline = time.monotonic() + timeout_s
last_err: Exception | None = None
while time.monotonic() < deadline:
try:
resp = requests.get(
f"http://localhost:{frontend_port}/v1/models", timeout=2
)
if resp.ok and any(
m.get("id") == model for m in resp.json().get("data", [])
):
return
except requests.RequestException as e:
last_err = e
time.sleep(0.5)
pytest.fail(
f"frontend unhealthy after bun compliance — /v1/models did not list "
f"{model!r} within {timeout_s}s (last error: {last_err})"
)
def _run_bun_compliance(
bun_binary: Path, openresponses_dir: Path, frontend_port: int
) -> None:
"""Invoke compliance-test.ts against the running frontend."""
base_url = f"http://localhost:{frontend_port}/v1"
logger.info("Running OpenResponses compliance suite against %s", base_url)
cmd = [
str(bun_binary),
"run",
"bin/compliance-test.ts",
"--base-url",
base_url,
"--api-key",
"sk-compliance-dummy",
"--model",
COMPLIANCE_MODEL,
"--verbose",
]
result = subprocess.run(
cmd,
cwd=str(openresponses_dir),
capture_output=True,
text=True,
timeout=180,
)
_attach_subprocess_log(
name="bun_compliance_suite.log",
cmd=cmd,
result=result,
cwd=str(openresponses_dir),
)
if result.stdout:
logger.info("compliance stdout:\n%s", result.stdout)
if result.stderr:
logger.info("compliance stderr:\n%s", result.stderr)
if result.returncode != 0:
pytest.fail(
f"OpenResponses compliance suite failed (exit={result.returncode}).\n"
f"stdout:\n{result.stdout}\n\nstderr:\n{result.stderr}"
)
def _write_codex_config(codex_home, frontend_port: int) -> None:
"""Emit a minimal ~/.codex/config.toml pointing Codex at Dynamo.
Using a per-test CODEX_HOME keeps the runner's global Codex state
(if any) untouched.
"""
codex_home.mkdir(parents=True, exist_ok=True)
config_path = codex_home / "config.toml"
config_path.write_text(
f"""
[model_providers.local]
name = "local-dynamo"
base_url = "http://localhost:{frontend_port}/v1"
wire_api = "responses"
env_key = "LOCAL_API_KEY"
""".lstrip()
)
def _run_codex_exec_smoke(
codex_cli: Path, node_bin: Path, codex_home, cwd, marker_filename: str
) -> None:
"""Run `codex exec` against the Dynamo Responses endpoint and assert the
tool-call path actually fires.
We prompt codex to list `cwd`; `cwd` contains `marker_filename` and nothing
else the model could pattern-match from prior knowledge. If codex answers
without invoking its shell tool, the marker won't appear in stdout and the
assertion fails — which proves we're testing the full Responses API
tool-calling chain, not just text generation.
"""
logger.info("Running codex exec smoke test against CODEX_HOME=%s", codex_home)
# Isolate HOME for codex the same way we do for claude below. CODEX_HOME
# scopes codex's own state, but the agent still invokes a shell tool under
# `--dangerously-bypass-approvals-and-sandbox`, which inherits HOME for
# any shell/helper reads and writes. Point it at `codex_home` so nothing
# escapes `tmp_path`.
extra_env = {
"CODEX_HOME": str(codex_home),
"HOME": str(codex_home),
"LOCAL_API_KEY": "sk-none",
}
# codex is a node script (`#!/usr/bin/env node`); prepend the fixture-
# installed node runtime to PATH so the shebang resolves without pulling
# in the runner's system node (if any).
env = _agent_subprocess_env(extra_env, path_prepend=[node_bin])
cmd = [
str(codex_cli),
"-m",
COMPLIANCE_MODEL,
"-c",
"model_provider=local",
"exec",
"What files exist in the current working directory? Use your shell tool to run ls and report each filename verbatim from the output.",
"--dangerously-bypass-approvals-and-sandbox",
]
result = subprocess.run(
cmd,
cwd=str(cwd),
env=env,
capture_output=True,
text=True,
timeout=180,
)
_attach_subprocess_log(
name="codex_exec_smoke.log",
cmd=cmd,
result=result,
extra_env=extra_env,
cwd=str(cwd),
)
if result.stdout:
logger.info("codex stdout:\n%s", result.stdout)
if result.stderr:
logger.info("codex stderr:\n%s", result.stderr)
if result.returncode != 0:
pytest.fail(
f"codex exec failed (exit={result.returncode}).\n"
f"stdout:\n{result.stdout}\n\nstderr:\n{result.stderr}"
)
if marker_filename not in result.stdout:
pytest.fail(
"codex exec did not report the marker file — expected stdout to "
f"contain {marker_filename!r} (implies the shell tool was invoked "
f"and actually ran `ls` in {cwd}). Got:\n{result.stdout}"
)
def _run_claude_exec_smoke(
claude_cli: Path,
node_bin: Path,
claude_home,
cwd,
marker_filename: str,
frontend_port: int,
) -> None:
"""Run `claude -p` against the Dynamo Anthropic Messages endpoint and
assert the Bash tool-call path actually fires.
Same marker-file pattern as the codex step but hitting /v1/messages:
if claude answers without invoking its Bash tool, the marker won't
appear in stdout and the assertion fails — which proves the full
Anthropic Messages + tool-calling chain, not just text generation.
Isolated HOME so claude doesn't write session state into the runner's
`~/.claude`. An `ANTHROPIC_AUTH_TOKEN` is required even though Dynamo
ignores the value: on a fresh HOME with no cached OAuth, the CLI
aborts with "Not logged in" unless a bearer is supplied.
"""
base_url = f"http://localhost:{frontend_port}"
logger.info("Running claude exec smoke test against %s", base_url)
extra_env = {
"HOME": str(claude_home),
"ANTHROPIC_BASE_URL": base_url,
"ANTHROPIC_AUTH_TOKEN": "sk-none",
}
# claude shells out to `node` internally; make sure the fixture-installed
# runtime resolves on PATH without inheriting the runner's node.
env = _agent_subprocess_env(extra_env, path_prepend=[node_bin])
cmd = [
str(claude_cli),
"--model",
COMPLIANCE_MODEL,
"--dangerously-skip-permissions",
"-p",
"What files exist in the current working directory? Use your shell tool to run ls and report each filename verbatim from the output.",
]
result = subprocess.run(
cmd,
cwd=str(cwd),
env=env,
capture_output=True,
text=True,
timeout=180,
)
_attach_subprocess_log(
name="claude_exec_smoke.log",
cmd=cmd,
result=result,
extra_env=extra_env,
cwd=str(cwd),
)
if result.stdout:
logger.info("claude stdout:\n%s", result.stdout)
if result.stderr:
logger.info("claude stderr:\n%s", result.stderr)
if result.returncode != 0:
pytest.fail(
f"claude -p failed (exit={result.returncode}).\n"
f"stdout:\n{result.stdout}\n\nstderr:\n{result.stderr}"
)
if marker_filename not in result.stdout:
pytest.fail(
"claude -p did not report the marker file — expected stdout to "
f"contain {marker_filename!r} (implies the Bash tool was invoked "
f"and actually ran `ls` in {cwd}). Got:\n{result.stdout}"
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment