// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; use super::{NvCreateCompletionRequest, NvCreateCompletionResponse}; use crate::{ protocols::{ common::{self, timing::RequestTracker}, openai::{ convert_backend_top_logprobs, nvext::{NvExtProvider, NvExtResponse, TimingInfo}, }, }, types::TokenIdType, }; impl NvCreateCompletionRequest { /// Enables usage tracking for non-streaming requests to comply with OpenAI API specification. /// /// According to OpenAI API spec, non-streaming completion responses (stream=false) /// must always include usage statistics. This method ensures `stream_options.include_usage` /// is set to `true` for non-streaming requests. /// /// Reference: https://platform.openai.com/docs/api-reference/completions/create /// /// # Arguments /// * `original_stream_flag` - The original value of the `stream` field before any internal processing pub fn enable_usage_for_nonstreaming(&mut self, original_stream_flag: bool) { if !original_stream_flag { // For non-streaming requests (stream=false), enable usage by default if self.inner.stream_options.is_none() { self.inner.stream_options = Some(dynamo_async_openai::types::ChatCompletionStreamOptions { include_usage: true, continuous_usage_stats: false, }); } else if let Some(ref mut opts) = self.inner.stream_options { // If stream_options exists, ensure include_usage is true for non-streaming opts.include_usage = true; } } } // put this method on the request // inspect the request to extract options pub fn response_generator(&self, request_id: String) -> DeltaGenerator { // Enable tracking if: // 1. Client requested timing in extra_fields, OR // 2. query_instance_id annotation is present (needs worker_id tracking for response) let enable_tracking = self .nvext() .map(|nv| { nv.extra_fields .as_ref() .is_some_and(|fields| fields.iter().any(|f| f == "timing")) || nv.annotations.as_ref().is_some_and(|annots| { annots.iter().any(|a| a.starts_with("query_instance_id")) }) }) .unwrap_or(false); let options = DeltaGeneratorOptions { enable_usage: self .inner .stream_options .as_ref() .map(|opts| opts.include_usage) .unwrap_or(false), continuous_usage_stats: self .inner .stream_options .as_ref() .map(|opts| opts.continuous_usage_stats) .unwrap_or(false), enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0, enable_tracking, }; DeltaGenerator::new(self.inner.model.clone(), options, request_id) } } #[derive(Debug, Clone, Default)] pub struct DeltaGeneratorOptions { pub enable_usage: bool, pub continuous_usage_stats: bool, pub enable_logprobs: bool, pub enable_tracking: bool, } pub struct DeltaGenerator { id: String, object: String, created: u32, model: String, system_fingerprint: Option, usage: dynamo_async_openai::types::CompletionUsage, options: DeltaGeneratorOptions, tracker: Option>, } impl DeltaGenerator { pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`, // but this will not be an issue until 2106. let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX"); // Previously, our home-rolled CompletionUsage impl'd Default // PR !387 - https://github.com/64bit/async-openai/pull/387 let usage = dynamo_async_openai::types::CompletionUsage { completion_tokens: 0, prompt_tokens: 0, total_tokens: 0, completion_tokens_details: None, prompt_tokens_details: None, }; let completion_id = format!("cmpl-{request_id}"); // Always create request tracker for per-worker metrics (TTFT, ITL per worker_id). // The enable_tracking option only controls whether timing info is included in the response. let tracker = Some(Arc::new(RequestTracker::new())); Self { id: completion_id, object: "text_completion".to_string(), created: now, model, system_fingerprint: None, usage, options, tracker, } } /// Returns the request tracker if tracking is enabled, for sharing with PreprocessedRequest. pub fn tracker(&self) -> Option> { self.tracker.clone() } pub fn update_isl(&mut self, isl: u32) { self.usage.prompt_tokens = isl; } pub fn create_logprobs( &self, tokens: Vec, token_ids: Vec, logprobs: Option, top_logprobs: Option, ) -> Option { if !self.options.enable_logprobs || logprobs.is_none() { return None; } let toks = tokens .into_iter() .zip(token_ids) .map(|(token, token_id)| (token.unwrap_or_default(), token_id)) .collect::>(); let tok_lps = toks .iter() .zip(logprobs.unwrap()) .map(|(_, lp)| lp as f32) .collect::>(); let top_lps = top_logprobs.map_or(vec![], |top_logprobs| { toks.iter() .zip(tok_lps.iter()) .zip(top_logprobs.iter()) .map(|(((t, tid), lp), top_lps)| { let converted = convert_backend_top_logprobs(top_lps, t, *tid, *lp); serde_json::to_value(converted).unwrap() }) .collect() }); Some(dynamo_async_openai::types::Logprobs { tokens: toks.iter().map(|(t, _)| t.clone()).collect(), token_logprobs: tok_lps.into_iter().map(Some).collect(), text_offset: vec![], top_logprobs: top_lps, }) } pub fn create_choice( &self, index: u32, text: Option, finish_reason: Option, logprobs: Option, ) -> NvCreateCompletionResponse { // todo - update for tool calling // According to OpenAI spec: when stream_options.include_usage is true, // all intermediate chunks should have usage: null // The final usage chunk will be sent separately with empty choices let inner = dynamo_async_openai::types::CreateCompletionResponse { id: self.id.clone(), object: self.object.clone(), created: self.created, model: self.model.clone(), system_fingerprint: self.system_fingerprint.clone(), choices: vec![dynamo_async_openai::types::Choice { text: text.unwrap_or_default(), index, finish_reason, logprobs, }], usage: if self.options.enable_usage && self.options.continuous_usage_stats { Some(self.get_usage()) } else { None }, }; NvCreateCompletionResponse { inner, nvext: None } } /// Creates a final usage-only chunk for OpenAI compliance. /// This should be sent after the last content chunk when stream_options.include_usage is true. /// /// # Returns /// * A [`NvCreateCompletionResponse`] with empty choices and usage stats. pub fn create_usage_chunk(&self) -> NvCreateCompletionResponse { let usage = self.get_usage(); let inner = dynamo_async_openai::types::CreateCompletionResponse { id: self.id.clone(), object: self.object.clone(), created: self.created, model: self.model.clone(), system_fingerprint: self.system_fingerprint.clone(), choices: vec![], // Empty choices for usage-only chunk usage: Some(usage), }; NvCreateCompletionResponse { inner, nvext: None } } /// Check if usage tracking is enabled pub fn is_usage_enabled(&self) -> bool { self.options.enable_usage } /// Check if continuous usage tracking is enabled pub fn is_continuous_usage_enabled(&self) -> bool { self.options.continuous_usage_stats } pub fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { let mut usage = self.usage.clone(); usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens); usage } } impl crate::protocols::openai::DeltaGeneratorExt for DeltaGenerator { fn choice_from_postprocessor( &mut self, delta: common::llm_backend::BackendOutput, ) -> anyhow::Result { // Aggregate token usage even if usage tracking is disabled for metrics tracking // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, // but this will not be an issue until context lengths exceed 4_294_967_295. let token_length: u32 = delta .token_ids .len() .try_into() .expect("token_ids length exceeds u32::MAX"); self.usage.completion_tokens += token_length; // If backend provides completion_usage, use it to update usage stats // This is critical for prompt embeddings where prompt_tokens comes from // the embedding sequence length computed by the worker if let Some(completion_usage) = delta.completion_usage.as_ref() { // Update prompt_tokens from worker if provided (e.g., for embeddings) self.usage.prompt_tokens = completion_usage.prompt_tokens; // Propagate prompt token details if provided if let Some(prompt_details) = completion_usage.prompt_tokens_details.as_ref() { self.usage.prompt_tokens_details = Some(prompt_details.clone()); } } let logprobs = self.create_logprobs( delta.tokens, delta.token_ids, delta.log_probs, delta.top_logprobs, ); let finish_reason = delta.finish_reason.map(Into::into); // create choice let index = delta.index.unwrap_or(0); let mut response = self.create_choice(index, delta.text.clone(), finish_reason, logprobs); // Get worker_id info from tracker (set by KvPushRouter based on phase) let worker_id_info = self.tracker.as_ref().and_then(|t| t.get_worker_info()); let token_ids = delta .disaggregated_params .as_ref() .and_then(|params| params.get("token_ids")) .and_then(|v| serde_json::from_value::>(v.clone()).ok()); let routed_experts = delta .disaggregated_params .as_ref() .and_then(|params| params.get("routed_experts")) .cloned(); // Get timing info if this is the final response (has finish_reason) let timing_info: Option = if finish_reason.is_some() { self.tracker.as_ref().map(|tracker| { tracker.record_finish(); tracker.get_timing_info() }) } else { None }; // Inject nvext if we have worker_id, token_ids, timing, or routed experts. if worker_id_info.is_some() || token_ids.is_some() || timing_info.is_some() || routed_experts.is_some() { let nvext_response = NvExtResponse { worker_id: worker_id_info.clone(), timing: timing_info, token_ids: token_ids.clone(), routed_experts, }; if let Ok(nvext_json) = serde_json::to_value(&nvext_response) { response.nvext = Some(nvext_json); if let Some(ref info) = worker_id_info { tracing::debug!( "Injected worker_id into completions nvext: prefill={:?}, decode={:?}", info.prefill_worker_id, info.decode_worker_id ); } if let Some(ref tokens) = token_ids { tracing::debug!( "Injected token_ids into completions nvext: {} tokens", tokens.len() ); } } } Ok(response) } fn get_isl(&self) -> Option { Some(self.usage.prompt_tokens) } fn create_usage_chunk(&self) -> NvCreateCompletionResponse { DeltaGenerator::create_usage_chunk(self) } fn is_usage_enabled(&self) -> bool { DeltaGenerator::is_usage_enabled(self) } fn is_continuous_usage_enabled(&self) -> bool { DeltaGenerator::is_continuous_usage_enabled(self) } fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { DeltaGenerator::get_usage(self) } fn tracker(&self) -> Option> { self.tracker.clone() } }