Unverified Commit 2887cd1c authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

refactor(1/3): move `nvext` to `dynamo-llm` and move `anthropic` to `dynamo-async-openai` (#7564)

parent d6136f4a
This diff is collapsed.
......@@ -1182,10 +1182,6 @@ pub struct CreateChatCompletionResponse {
/// The object type, which is always `chat.completion`.
pub object: String,
pub usage: Option<CompletionUsage>,
/// NVIDIA extension field for response metadata (worker IDs, etc.)
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
/// Parsed server side events stream until an \[DONE\] is received from server.
......@@ -1281,10 +1277,6 @@ pub struct CreateChatCompletionStreamResponse {
/// An optional field that will only be present when you set `stream_options: {"include_usage": true}` in your request.
/// When present, it contains a null value except for the last chunk which contains the token usage statistics for the entire request.
pub usage: Option<CompletionUsage>,
/// NVIDIA extension field for response metadata
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
#[cfg(test)]
......
......@@ -224,10 +224,6 @@ pub struct CreateCompletionResponse {
/// The object type, which is always "text_completion"
pub object: String,
pub usage: Option<CompletionUsage>,
/// NVIDIA extension field for response metadata (worker IDs, etc.)
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
/// Parsed server side events stream until an \[DONE\] is received from server.
......
......@@ -10,6 +10,7 @@
//! Types used in OpenAI API requests and responses.
//! These types are created from component schemas in the [OpenAPI spec](https://github.com/openai/openai-openapi)
pub mod anthropic;
mod assistant;
mod assistant_impls;
mod assistant_stream;
......
......@@ -90,6 +90,7 @@ where
tracing::warn!("audit: aggregation future canceled/failed");
// Return minimal response if aggregation failed
NvCreateChatCompletionResponse {
inner: dynamo_async_openai::types::CreateChatCompletionResponse {
id: String::new(),
created: 0,
usage: None,
......@@ -98,6 +99,7 @@ where
system_fingerprint: None,
choices: vec![],
service_tier: None,
},
nvext: None,
}
})
......@@ -125,6 +127,7 @@ where
Err(e) => {
tracing::warn!("fold aggregation failed: {e}");
let fallback = NvCreateChatCompletionResponse {
inner: dynamo_async_openai::types::CreateChatCompletionResponse {
id: String::new(),
created: 0,
usage: None,
......@@ -133,6 +136,7 @@ where
system_fingerprint: None,
choices: vec![],
service_tier: None,
},
nvext: None,
};
let _ = tx.send(fallback.clone());
......@@ -145,6 +149,7 @@ where
rx.await.unwrap_or_else(|_| {
tracing::warn!("fold aggregation future canceled");
NvCreateChatCompletionResponse {
inner: dynamo_async_openai::types::CreateChatCompletionResponse {
id: String::new(),
created: 0,
usage: None,
......@@ -153,6 +158,7 @@ where
system_fingerprint: None,
choices: vec![],
service_tier: None,
},
nvext: None,
}
})
......@@ -171,8 +177,8 @@ pub fn final_response_to_one_chunk_stream(
) -> std::pin::Pin<
Box<dyn futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>,
> {
let mut choices: Vec<ChatChoiceStream> = Vec::with_capacity(resp.choices.len());
for (idx, ch) in resp.choices.iter().enumerate() {
let mut choices: Vec<ChatChoiceStream> = Vec::with_capacity(resp.inner.choices.len());
for (idx, ch) in resp.inner.choices.iter().enumerate() {
// Convert FunctionCall to FunctionCallStream if present
#[allow(deprecated)]
let function_call = ch.message.function_call.as_ref().map(|fc| {
......@@ -222,14 +228,16 @@ pub fn final_response_to_one_chunk_stream(
}
let chunk = NvCreateChatCompletionStreamResponse {
id: resp.id.clone(),
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: resp.inner.id.clone(),
object: "chat.completion.chunk".to_string(),
created: resp.created,
model: resp.model.clone(),
system_fingerprint: resp.system_fingerprint.clone(),
service_tier: resp.service_tier.clone(),
created: resp.inner.created,
model: resp.inner.model.clone(),
system_fingerprint: resp.inner.system_fingerprint.clone(),
service_tier: resp.inner.service_tier.clone(),
choices,
usage: resp.usage.clone(),
usage: resp.inner.usage.clone(),
},
nvext: resp.nvext.clone(),
};
......@@ -275,6 +283,7 @@ mod tests {
};
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 1234567890,
......@@ -283,6 +292,7 @@ mod tests {
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
......@@ -314,6 +324,7 @@ mod tests {
};
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 1234567890,
......@@ -322,6 +333,7 @@ mod tests {
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
......@@ -339,7 +351,7 @@ mod tests {
chunk
.data
.as_ref()
.and_then(|d| d.choices.first())
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
.and_then(|content| match content {
ChatCompletionMessageContent::Text(text) => Some(text.clone()),
......@@ -396,7 +408,7 @@ mod tests {
assert_eq!(results.len(), 0, "Empty stream should produce no chunks");
// Verify fallback response (aggregation will fail on empty stream)
assert_eq!(final_resp.object, "chat.completion");
assert_eq!(final_resp.inner.object, "chat.completion");
// Should get fallback response, not panic
}
......@@ -415,7 +427,7 @@ mod tests {
assert_eq!(extract_content(&results[0]), "Single chunk");
// Verify aggregation
assert_eq!(final_resp.object, "chat.completion");
assert_eq!(final_resp.inner.object, "chat.completion");
}
#[tokio::test]
......@@ -423,6 +435,7 @@ mod tests {
// Test that metadata (id, event, comment) is preserved through passthrough
let chunk_with_metadata = Annotated {
data: Some(NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![{
#[allow(deprecated)]
......@@ -449,6 +462,7 @@ mod tests {
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}),
id: Some("correlation-123".to_string()),
......@@ -481,7 +495,7 @@ mod tests {
let (resp1, resp2) = tokio::join!(future1, future2);
// Both should complete successfully
assert_eq!(resp1.object, "chat.completion");
assert_eq!(resp2.object, "chat.completion");
assert_eq!(resp1.inner.object, "chat.completion");
assert_eq!(resp2.inner.object, "chat.completion");
}
}
......@@ -238,8 +238,9 @@ async fn evaluate(
match (item.data.as_ref(), item.event.as_deref()) {
(Some(data), _) => {
// Normal case
let choice = data.choices.first();
let chat_comp = choice.as_ref().unwrap();
let Some(chat_comp) = data.inner.choices.first() else {
continue;
};
if let Some(c) = &chat_comp.delta.content {
match c {
ChatCompletionMessageContent::Text(text) => {
......
......@@ -138,8 +138,9 @@ async fn main_loop(
match (item.data.as_ref(), item.event.as_deref()) {
(Some(data), _) => {
// Normal case
let entry = data.choices.first();
let chat_comp = entry.as_ref().unwrap();
let Some(chat_comp) = data.inner.choices.first() else {
continue;
};
if let Some(c) = &chat_comp.delta.content {
match c {
ChatCompletionMessageContent::Text(text) => {
......
......@@ -991,7 +991,7 @@ fn streaming_tool_dispatch_events(
};
let mut events = vec![];
for choice in &data.choices {
for choice in &data.inner.choices {
let Some(tool_calls) = &choice.delta.tool_calls else {
continue;
};
......@@ -1034,7 +1034,7 @@ fn accumulate_reasoning_dispatch(
};
let mut events = vec![];
for choice in &data.choices {
for choice in &data.inner.choices {
let buffer = buffers.entry(choice.index).or_default();
let has_reasoning = choice
.delta
......@@ -2892,7 +2892,8 @@ mod tests {
// Create a normal data event
let normal_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: Some(CreateChatCompletionStreamResponse {
data: Some(NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![],
created: 0,
......@@ -2901,6 +2902,7 @@ mod tests {
object: "chat.completion.chunk".to_string(),
service_tier: None,
usage: None,
},
nvext: None,
}),
id: Some("msg-1".to_string()),
......@@ -3162,7 +3164,8 @@ mod tests {
fn make_stream_response(
choices: Vec<ChatChoiceStream>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let response = CreateChatCompletionStreamResponse {
let response = NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 0,
......@@ -3171,6 +3174,7 @@ mod tests {
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
......
......@@ -128,7 +128,7 @@ impl LogprobExtractor for NvCreateChatCompletionStreamResponse {
fn extract_logprobs_by_choice(&self) -> HashMap<u32, Vec<TokenLogProbs>> {
let mut result = HashMap::new();
for choice in &self.choices {
for choice in &self.inner.choices {
let choice_index = choice.index;
let choice_logprobs = choice
......@@ -949,6 +949,7 @@ mod tests {
) -> NvCreateChatCompletionStreamResponse {
#[expect(deprecated)]
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
choices: vec![ChatChoiceStream {
index: 0,
......@@ -977,6 +978,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
},
nvext: None,
}
}
......@@ -1012,6 +1014,7 @@ mod tests {
.collect();
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
choices,
created: 1234567890,
......@@ -1020,6 +1023,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
},
nvext: None,
}
}
......@@ -1341,6 +1345,7 @@ mod tests {
// Test with choice that has no logprobs
#[expect(deprecated)]
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
choices: vec![ChatChoiceStream {
index: 0,
......@@ -1366,6 +1371,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
},
nvext: None,
};
......@@ -1573,6 +1579,7 @@ mod tests {
// In practice, this would have real logprobs data
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
choices: vec![],
created: 1234567890,
......@@ -1581,6 +1588,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
},
nvext: None,
}
}
......
......@@ -1217,7 +1217,7 @@ impl OpenAIPreprocessor {
let processed_response = if let Some(ref mut parser) = state.reasoning_parser {
response.map_data(|mut data| {
// Process all choices, not just the first one
for choice in data.choices.iter_mut() {
for choice in data.inner.choices.iter_mut() {
// Reasoning parsing only applies to text content
if let Some(
dynamo_async_openai::types::ChatCompletionMessageContent::Text(
......
......@@ -111,7 +111,7 @@ pub fn maybe_wrap_stream(
let mut prefill_tx = Some(tx);
Box::pin(stream.map(move |item| {
if let Some(ref resp) = item.data {
for choice in &resp.choices {
for choice in &resp.inner.choices {
if let Some(ChatCompletionMessageContent::Text(ref text)) = choice.delta.content {
accumulated_text.push_str(text);
}
......
......@@ -106,7 +106,7 @@ impl AnthropicStreamConverter {
let mut events = Vec::new();
// Capture real token usage from engine when available (typically on the final chunk).
if let Some(usage) = &chunk.usage {
if let Some(usage) = &chunk.inner.usage {
self.input_token_count = usage.prompt_tokens;
self.output_token_count = usage.completion_tokens;
self.cached_token_count = usage
......@@ -115,7 +115,7 @@ impl AnthropicStreamConverter {
.and_then(|d| d.cached_tokens);
}
for choice in &chunk.choices {
for choice in &chunk.inner.choices {
let delta = &choice.delta;
// Track finish reason
......@@ -444,7 +444,7 @@ impl AnthropicStreamConverter {
) -> Vec<TaggedEvent> {
let mut events = Vec::new();
if let Some(usage) = &chunk.usage {
if let Some(usage) = &chunk.inner.usage {
self.input_token_count = usage.prompt_tokens;
self.output_token_count = usage.completion_tokens;
self.cached_token_count = usage
......@@ -453,7 +453,7 @@ impl AnthropicStreamConverter {
.and_then(|d| d.cached_tokens);
}
for choice in &chunk.choices {
for choice in &chunk.inner.choices {
let delta = &choice.delta;
if let Some(ref fr) = choice.finish_reason {
......@@ -722,6 +722,7 @@ mod tests {
fn text_chunk(text: &str) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
......@@ -743,6 +744,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
},
nvext: None,
}
}
......@@ -755,6 +757,7 @@ mod tests {
) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
......@@ -784,6 +787,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
},
nvext: None,
}
}
......@@ -908,6 +912,7 @@ mod tests {
fn reasoning_chunk(text: &str) -> NvCreateChatCompletionStreamResponse {
#[allow(deprecated)]
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "chat-1".into(),
choices: vec![ChatChoiceStream {
index: 0,
......@@ -929,6 +934,7 @@ mod tests {
system_fingerprint: None,
object: "chat.completion.chunk".into(),
usage: None,
},
nvext: None,
}
}
......
This diff is collapsed.
......@@ -64,21 +64,24 @@ pub struct NvCreateChatCompletionRequest {
}
/// A response structure for unary chat completion responses, embedding OpenAI's
/// `CreateChatCompletionResponse`.
///
/// # Fields
/// - `inner`: The base OpenAI unary chat completion response, embedded
/// using `serde(flatten)`.
pub type NvCreateChatCompletionResponse = dynamo_async_openai::types::CreateChatCompletionResponse;
/// `CreateChatCompletionResponse` with optional NVIDIA extension metadata.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct NvCreateChatCompletionResponse {
#[serde(flatten)]
pub inner: dynamo_async_openai::types::CreateChatCompletionResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
/// A response structure for streamed chat completions, embedding OpenAI's
/// `CreateChatCompletionStreamResponse`.
///
/// # Fields
/// - `inner`: The base OpenAI streaming chat completion response, embedded
/// using `serde(flatten)`.
pub type NvCreateChatCompletionStreamResponse =
dynamo_async_openai::types::CreateChatCompletionStreamResponse;
/// `CreateChatCompletionStreamResponse` with optional NVIDIA extension metadata.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct NvCreateChatCompletionStreamResponse {
#[serde(flatten)]
pub inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
/// Implements `NvExtProvider` for `NvCreateChatCompletionRequest`,
/// providing access to NVIDIA-specific extensions.
......
......@@ -136,16 +136,16 @@ impl DeltaAggregator {
if aggregator.error.is_none()
&& let Some(delta) = delta.data
{
aggregator.id = delta.id;
aggregator.model = delta.model;
aggregator.created = delta.created;
aggregator.service_tier = delta.service_tier;
aggregator.id = delta.inner.id;
aggregator.model = delta.inner.model;
aggregator.created = delta.inner.created;
aggregator.service_tier = delta.inner.service_tier;
// Aggregate usage statistics if available.
if let Some(usage) = delta.usage {
if let Some(usage) = delta.inner.usage {
aggregator.usage = Some(usage);
}
if let Some(system_fingerprint) = delta.system_fingerprint {
if let Some(system_fingerprint) = delta.inner.system_fingerprint {
aggregator.system_fingerprint = Some(system_fingerprint);
}
......@@ -155,7 +155,7 @@ impl DeltaAggregator {
}
// Aggregate choices incrementally.
for choice in delta.choices {
for choice in delta.inner.choices {
let state_choice =
aggregator
.choices
......@@ -267,6 +267,7 @@ impl DeltaAggregator {
// Construct the final response object.
let response = NvCreateChatCompletionResponse {
inner: dynamo_async_openai::types::CreateChatCompletionResponse {
id: aggregator.id,
created: aggregator.created,
usage: aggregator.usage,
......@@ -275,6 +276,7 @@ impl DeltaAggregator {
system_fingerprint: aggregator.system_fingerprint,
choices,
service_tier: aggregator.service_tier,
},
nvext: aggregator.nvext,
};
......@@ -360,7 +362,7 @@ pub trait ChatCompletionAggregator {
) -> Result<NvCreateChatCompletionResponse, String>;
}
impl ChatCompletionAggregator for dynamo_async_openai::types::CreateChatCompletionResponse {
impl ChatCompletionAggregator for NvCreateChatCompletionResponse {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
......@@ -445,6 +447,7 @@ mod tests {
};
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "meta/llama-3.1-8b-instruct".to_string(),
created: 1234567890,
......@@ -453,6 +456,7 @@ mod tests {
system_fingerprint: None,
choices: vec![choice],
object: "chat.completion".to_string(),
},
nvext: None,
};
......@@ -479,13 +483,13 @@ mod tests {
let response = result.unwrap();
// Verify that the response is empty and has default values
assert_eq!(response.id, "");
assert_eq!(response.model, "");
assert_eq!(response.created, 0);
assert!(response.usage.is_none());
assert!(response.system_fingerprint.is_none());
assert_eq!(response.choices.len(), 0);
assert!(response.service_tier.is_none());
assert_eq!(response.inner.id, "");
assert_eq!(response.inner.model, "");
assert_eq!(response.inner.created, 0);
assert!(response.inner.usage.is_none());
assert!(response.inner.system_fingerprint.is_none());
assert_eq!(response.inner.choices.len(), 0);
assert!(response.inner.service_tier.is_none());
}
#[tokio::test]
......@@ -511,13 +515,13 @@ mod tests {
let response = result.unwrap();
// Verify the response fields
assert_eq!(response.id, "test_id");
assert_eq!(response.model, "meta/llama-3.1-8b-instruct");
assert_eq!(response.created, 1234567890);
assert!(response.usage.is_none());
assert!(response.system_fingerprint.is_none());
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.id, "test_id");
assert_eq!(response.inner.model, "meta/llama-3.1-8b-instruct");
assert_eq!(response.inner.created, 1234567890);
assert!(response.inner.usage.is_none());
assert!(response.inner.system_fingerprint.is_none());
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
......@@ -525,7 +529,7 @@ mod tests {
);
assert!(choice.finish_reason.is_none());
assert_eq!(choice.message.role, dynamo_async_openai::types::Role::User);
assert!(response.service_tier.is_none());
assert!(response.inner.service_tier.is_none());
}
#[tokio::test]
......@@ -562,8 +566,8 @@ mod tests {
let response = result.unwrap();
// Verify the response fields
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
......@@ -630,8 +634,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
......@@ -653,6 +657,7 @@ mod tests {
// Create a delta with multiple choices
// ALLOW: function_call is deprecated
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "test_model".to_string(),
created: 1234567890,
......@@ -664,7 +669,9 @@ mod tests {
index: 0,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_async_openai::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text("Choice 0".to_string())),
content: Some(ChatCompletionMessageContent::Text(
"Choice 0".to_string(),
)),
function_call: None,
tool_calls: None,
refusal: None,
......@@ -678,7 +685,9 @@ mod tests {
index: 1,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_async_openai::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text("Choice 1".to_string())),
content: Some(ChatCompletionMessageContent::Text(
"Choice 1".to_string(),
)),
function_call: None,
tool_calls: None,
refusal: None,
......@@ -690,6 +699,7 @@ mod tests {
},
],
object: "chat.completion".to_string(),
},
nvext: None,
};
......@@ -711,9 +721,9 @@ mod tests {
let mut response = result.unwrap();
// Verify the response fields
assert_eq!(response.choices.len(), 2);
response.choices.sort_by(|a, b| a.index.cmp(&b.index)); // Ensure the choices are ordered
let choice0 = &response.choices[0];
assert_eq!(response.inner.choices.len(), 2);
response.inner.choices.sort_by(|a, b| a.index.cmp(&b.index)); // Ensure the choices are ordered
let choice0 = &response.inner.choices[0];
assert_eq!(choice0.index, 0);
assert_eq!(
choice0.message.content.as_ref().unwrap(),
......@@ -728,7 +738,7 @@ mod tests {
dynamo_async_openai::types::Role::Assistant
);
let choice1 = &response.choices[1];
let choice1 = &response.inner.choices[1];
assert_eq!(choice1.index, 1);
assert_eq!(
choice1.message.content.as_ref().unwrap(),
......@@ -773,8 +783,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// Verify tool calls are present
assert!(choice.message.tool_calls.is_some());
......@@ -816,8 +826,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// Verify tool calls are present
assert!(choice.message.tool_calls.is_some());
......@@ -859,8 +869,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// Verify tool calls are present
assert!(choice.message.tool_calls.is_some());
......@@ -900,8 +910,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// Verify no tool calls are present
assert!(choice.message.tool_calls.is_none());
......@@ -928,7 +938,7 @@ mod tests {
// Manually set empty tool calls array
if let Some(ref mut data) = annotated_delta.data {
data.choices[0].delta.tool_calls = Some(vec![]); // Empty tool calls array
data.inner.choices[0].delta.tool_calls = Some(vec![]); // Empty tool calls array
}
let data = annotated_delta.data.unwrap();
......@@ -945,8 +955,8 @@ mod tests {
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// Verify tool calls array is empty
assert!(choice.message.tool_calls.is_none());
......@@ -992,8 +1002,8 @@ mod tests {
let response = result.unwrap();
// There should be one choice
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// The tool_calls field should be present and parsed
assert!(choice.message.tool_calls.is_some());
......@@ -1050,8 +1060,8 @@ mod tests {
let response = result.unwrap();
// There should be one choice
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
// The finish_reason should be ToolCalls, not Stop, because tool calls are present
assert_eq!(
......
......@@ -278,7 +278,8 @@ impl DeltaGenerator {
// According to OpenAI spec: when stream_options.include_usage is true,
// all intermediate chunks should have usage: null
// The final usage chunk will be sent separately with empty choices
dynamo_async_openai::types::CreateChatCompletionStreamResponse {
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: self.id.clone(),
object: self.object.clone(),
created: self.created,
......@@ -291,6 +292,7 @@ impl DeltaGenerator {
None
},
service_tier: self.service_tier.clone(),
},
nvext: None, // Will be populated by router layer if needed
}
}
......@@ -303,7 +305,8 @@ impl DeltaGenerator {
pub fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
let usage = self.get_usage();
dynamo_async_openai::types::CreateChatCompletionStreamResponse {
NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: self.id.clone(),
object: self.object.clone(),
created: self.created,
......@@ -312,6 +315,7 @@ impl DeltaGenerator {
choices: vec![], // Empty choices for usage-only chunk
usage: Some(usage),
service_tier: self.service_tier.clone(),
},
nvext: None,
}
}
......
......@@ -525,13 +525,13 @@ impl JailedStream {
// Process each item in the stream
while let Some(response) = stream.next().await {
if let Some(chat_response) = response.data.as_ref() {
last_stream_id.clone_from(&chat_response.id);
last_stream_model.clone_from(&chat_response.model);
last_stream_created = chat_response.created;
last_stream_id.clone_from(&chat_response.inner.id);
last_stream_model.clone_from(&chat_response.inner.model);
last_stream_created = chat_response.inner.created;
let mut all_emissions = Vec::new();
if chat_response.choices.is_empty() {
if chat_response.inner.choices.is_empty() {
// No choices processed (e.g., usage-only chunk)
// Pass through as-is to preserve usage and other metadata
yield response;
......@@ -539,7 +539,7 @@ impl JailedStream {
}
// Process each choice independently using the new architecture
for choice in &chat_response.choices {
for choice in &chat_response.inner.choices {
if let Some(ref content) = choice.delta.content {
// Jailing only applies to text content
let text_content = match content {
......@@ -676,6 +676,7 @@ impl JailedStream {
tracing::debug!("Stream ended while jailed, releasing accumulated content");
// Create a finalization response carrying forward real stream metadata
let dummy_response = NvCreateChatCompletionStreamResponse {
inner: dynamo_async_openai::types::CreateChatCompletionStreamResponse {
id: last_stream_id,
object: "chat.completion.chunk".to_string(),
created: last_stream_created,
......@@ -684,6 +685,7 @@ impl JailedStream {
usage: None,
service_tier: None,
system_fingerprint: None,
},
nvext: None,
};
......@@ -713,7 +715,7 @@ impl JailedStream {
EmissionMode::Packed => {
// Pack all choices into a single response
let mut response = base_response.clone();
response.choices = emissions.into_iter().map(|e| e.into_choice()).collect();
response.inner.choices = emissions.into_iter().map(|e| e.into_choice()).collect();
vec![Annotated {
data: Some(response),
......@@ -729,7 +731,7 @@ impl JailedStream {
.into_iter()
.map(|emission| {
let mut response = base_response.clone();
response.choices = vec![emission.into_choice()];
response.inner.choices = vec![emission.into_choice()];
Annotated {
data: Some(response),
......@@ -1013,7 +1015,7 @@ impl JailedStream {
while let Some(mut response) = input_stream.next().await {
// Track if any choice emitted tool calls
if let Some(ref data) = response.data {
for choice in &data.choices {
for choice in &data.inner.choices {
if choice.delta.tool_calls.is_some() {
has_tool_calls_per_choice.insert(choice.index, true);
}
......@@ -1022,7 +1024,7 @@ impl JailedStream {
// Fix finish_reason based on jail mode and whether tool calls were emitted
if let Some(ref mut data) = response.data {
for choice in &mut data.choices {
for choice in &mut data.inner.choices {
if let Some(finish) = choice.finish_reason {
// Only modify Stop finish reason, preserve Length/ContentFilter
if finish == FinishReason::Stop {
......
......@@ -48,6 +48,8 @@ pub struct NvCreateCompletionRequest {
pub struct NvCreateCompletionResponse {
#[serde(flatten)]
pub inner: dynamo_async_openai::types::CreateCompletionResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<serde_json::Value>,
}
impl ContentProvider for dynamo_async_openai::types::Choice {
......@@ -296,9 +298,8 @@ impl ResponseFactory {
choices: vec![choice],
system_fingerprint: self.system_fingerprint.clone(),
usage,
nvext: None, // Will be populated by router layer if needed
};
NvCreateCompletionResponse { inner }
NvCreateCompletionResponse { inner, nvext: None }
}
}
......
......@@ -86,8 +86,8 @@ impl DeltaAggregator {
aggregator.system_fingerprint = Some(system_fingerprint);
}
// Aggregate nvext field (take the last non-None value)
if delta.inner.nvext.is_some() {
aggregator.nvext = delta.inner.nvext;
if delta.nvext.is_some() {
aggregator.nvext = delta.nvext;
}
// handle the choices
......@@ -168,10 +168,12 @@ impl DeltaAggregator {
object: "text_completion".to_string(),
system_fingerprint: aggregator.system_fingerprint,
choices,
nvext: aggregator.nvext,
};
let response = NvCreateCompletionResponse { inner };
let response = NvCreateCompletionResponse {
inner,
nvext: aggregator.nvext,
};
Ok(response)
}
......@@ -256,10 +258,9 @@ mod tests {
logprobs,
}],
object: "text_completion".to_string(),
nvext: None,
};
let response = NvCreateCompletionResponse { inner };
let response = NvCreateCompletionResponse { inner, nvext: None };
Annotated {
data: Some(response),
......@@ -387,10 +388,9 @@ mod tests {
},
],
object: "text_completion".to_string(),
nvext: None,
};
let response = NvCreateCompletionResponse { inner };
let response = NvCreateCompletionResponse { inner, nvext: None };
let annotated_delta = Annotated {
data: Some(response),
......
......@@ -218,10 +218,9 @@ impl DeltaGenerator {
} else {
None
},
nvext: None, // Will be populated by router layer if needed
};
NvCreateCompletionResponse { inner }
NvCreateCompletionResponse { inner, nvext: None }
}
/// Creates a final usage-only chunk for OpenAI compliance.
......@@ -240,10 +239,9 @@ impl DeltaGenerator {
system_fingerprint: self.system_fingerprint.clone(),
choices: vec![], // Empty choices for usage-only chunk
usage: Some(usage),
nvext: None, // Will be populated by router layer if needed
};
NvCreateCompletionResponse { inner }
NvCreateCompletionResponse { inner, nvext: None }
}
/// Check if usage tracking is enabled
......@@ -343,7 +341,7 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
};
if let Ok(nvext_json) = serde_json::to_value(&nvext_response) {
response.inner.nvext = Some(nvext_json);
response.nvext = Some(nvext_json);
if let Some(ref info) = worker_id_info {
tracing::debug!(
"Injected worker_id into completions nvext: prefill={:?}, decode={:?}",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment