// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper}; use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse}; use crate::{ local_model::runtime_config, protocols::common::{self}, types::TokenIdType, }; /// Provides a method for generating a [`DeltaGenerator`] from a chat completion request. impl NvCreateChatCompletionRequest { /// Creates a [`DeltaGenerator`] instance based on the chat completion request. /// /// # Returns /// * [`DeltaGenerator`] configured with model name and response options. pub fn response_generator( &self, runtime_config: runtime_config::ModelRuntimeConfig, ) -> DeltaGenerator { let options = DeltaGeneratorOptions { enable_usage: true, enable_logprobs: self.inner.logprobs.unwrap_or(false) || self.inner.top_logprobs.unwrap_or(0) > 0, runtime_config, }; DeltaGenerator::new(self.inner.model.clone(), options) } } /// Configuration options for the [`DeltaGenerator`], controlling response behavior. #[derive(Debug, Clone, Default)] pub struct DeltaGeneratorOptions { /// Determines whether token usage statistics should be included in the response. pub enable_usage: bool, /// Determines whether log probabilities should be included in the response. pub enable_logprobs: bool, pub runtime_config: runtime_config::ModelRuntimeConfig, } /// Generates incremental chat completion responses in a streaming fashion. #[derive(Debug)] pub struct DeltaGenerator { /// Unique identifier for the chat completion session. id: String, /// Object type, representing a streamed chat completion response. object: String, /// Timestamp (Unix epoch) when the response was created. created: u32, model: String, /// Optional system fingerprint for version tracking. system_fingerprint: Option, /// Optional service tier information for the response. service_tier: Option, /// Tracks token usage for the completion request. usage: dynamo_async_openai::types::CompletionUsage, /// Counter tracking the number of messages issued. msg_counter: u64, /// Configuration options for response generation. options: DeltaGeneratorOptions, /// Reasoning Parser object /// This is used to parse reasoning content in the response. reasoning_parser: ReasoningParserWrapper, } impl DeltaGenerator { /// Creates a new [`DeltaGenerator`] instance with the specified model and options. /// /// # Arguments /// * `model` - The model name used for response generation. /// * `options` - Configuration options for enabling usage and log probabilities. /// /// # Returns /// * A new instance of [`DeltaGenerator`]. pub fn new(model: String, options: DeltaGeneratorOptions) -> Self { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`, // but this will not be an issue until 2106. let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX"); let usage = dynamo_async_openai::types::CompletionUsage { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0, prompt_tokens_details: None, completion_tokens_details: None, }; // Reasoning parser type // This is hardcoded for now, but can be made configurable later. // TODO: Make parser type configurable once front-end integration is determined // Change to GptOss to test GptOSS parser // Reasoning parser wrapper let reasoning_parser = ReasoningParserType::get_reasoning_parser_from_name( options .runtime_config .reasoning_parser .as_deref() .unwrap_or("basic"), ); Self { id: format!("chatcmpl-{}", uuid::Uuid::new_v4()), object: "chat.completion.chunk".to_string(), created: now, model, system_fingerprint: None, service_tier: None, usage, msg_counter: 0, options, reasoning_parser, } } /// Updates the prompt token usage count. /// /// # Arguments /// * `isl` - The number of prompt tokens used. pub fn update_isl(&mut self, isl: u32) { self.usage.prompt_tokens = isl; } pub fn create_logprobs( &self, tokens: Vec, token_ids: &[TokenIdType], logprobs: Option, top_logprobs: Option, ) -> Option { if !self.options.enable_logprobs || logprobs.is_none() { return None; } let toks = tokens .into_iter() .zip(token_ids) .map(|(token, token_id)| (token.unwrap_or_default(), *token_id)) .collect::>(); let tok_lps = toks .iter() .zip(logprobs.unwrap()) .map(|(_, lp)| lp as f32) .collect::>(); let content = top_logprobs.map(|top_logprobs| { toks.iter() .zip(tok_lps) .zip(top_logprobs) .map(|(((t, tid), lp), top_lps)| { let mut found_selected_token = false; let mut converted_top_lps = top_lps .iter() .map(|top_lp| { let top_t = top_lp.token.clone().unwrap_or_default(); let top_tid = top_lp.token_id; found_selected_token = found_selected_token || top_tid == *tid; dynamo_async_openai::types::TopLogprobs { token: top_t, logprob: top_lp.logprob as f32, bytes: None, } }) .collect::>(); if !found_selected_token { // If the selected token is not in the top logprobs, add it converted_top_lps.push(dynamo_async_openai::types::TopLogprobs { token: t.clone(), logprob: lp, bytes: None, }); } dynamo_async_openai::types::ChatCompletionTokenLogprob { token: t.clone(), logprob: lp, bytes: None, top_logprobs: converted_top_lps, } }) .collect() }); Some(dynamo_async_openai::types::ChatChoiceLogprobs { content, refusal: None, }) } fn create_reasoning_content( &mut self, text: &Option, token_ids: &[u32], ) -> Option { let text_ref = text.as_deref().unwrap_or(""); if text_ref.is_empty() && token_ids.is_empty() { return None; } let parser_result = self .reasoning_parser .parse_reasoning_streaming_incremental(text_ref, token_ids); Some(parser_result) } /// Creates a choice within a chat completion response. /// /// # Arguments /// * `index` - The index of the choice in the completion response. /// * `text` - The text content for the response. /// * `finish_reason` - The reason why the response finished (e.g., stop, length, etc.). /// * `logprobs` - Optional log probabilities of the generated tokens. /// /// # Returns /// * An [`dynamo_async_openai::types::CreateChatCompletionStreamResponse`] instance representing the choice. #[allow(deprecated)] pub fn create_choice( &mut self, index: u32, text: Option, reasoning_content: Option, finish_reason: Option, logprobs: Option, ) -> NvCreateChatCompletionStreamResponse { let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta { content: text, function_call: None, tool_calls: None, role: if self.msg_counter == 0 { Some(dynamo_async_openai::types::Role::Assistant) } else { None }, refusal: None, reasoning_content, }; let choice = dynamo_async_openai::types::ChatChoiceStream { index, delta, finish_reason, logprobs, }; let choices = vec![choice]; let mut usage = self.usage.clone(); if self.options.enable_usage { usage.total_tokens = usage.prompt_tokens + usage.completion_tokens; } dynamo_async_openai::types::CreateChatCompletionStreamResponse { id: self.id.clone(), object: self.object.clone(), created: self.created, model: self.model.clone(), system_fingerprint: self.system_fingerprint.clone(), choices, usage: if self.options.enable_usage { Some(usage) } else { None }, service_tier: self.service_tier.clone(), } } } /// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing /// it to transform backend responses into OpenAI-style streaming responses. impl crate::protocols::openai::DeltaGeneratorExt for DeltaGenerator { /// Converts a backend response into a structured OpenAI-style streaming response. /// /// # Arguments /// * `delta` - The backend response containing generated text and metadata. /// /// # Returns /// * `Ok(NvCreateChatCompletionStreamResponse)` if conversion succeeds. /// * `Err(anyhow::Error)` if an error occurs. fn choice_from_postprocessor( &mut self, delta: crate::protocols::common::llm_backend::BackendOutput, ) -> anyhow::Result { // Aggregate token usage if enabled. if self.options.enable_usage { // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, // but this will not be an issue until context lengths exceed 4_294_967_295. let token_length: u32 = delta .token_ids .len() .try_into() .expect("token_ids length exceeds u32::MAX"); self.usage.completion_tokens += token_length; } let logprobs = self.create_logprobs( delta.tokens, &delta.token_ids, delta.log_probs, delta.top_logprobs, ); // Map backend finish reasons to OpenAI's finish reasons. let finish_reason = match delta.finish_reason { Some(common::FinishReason::EoS) => Some(dynamo_async_openai::types::FinishReason::Stop), Some(common::FinishReason::Stop) => { Some(dynamo_async_openai::types::FinishReason::Stop) } Some(common::FinishReason::Length) => { Some(dynamo_async_openai::types::FinishReason::Length) } Some(common::FinishReason::Cancelled) => { Some(dynamo_async_openai::types::FinishReason::Stop) } Some(common::FinishReason::ContentFilter) => { Some(dynamo_async_openai::types::FinishReason::ContentFilter) } Some(common::FinishReason::Error(err_msg)) => { return Err(anyhow::anyhow!(err_msg)); } None => None, }; let reasoning_parser_result = self .create_reasoning_content(&delta.text, &delta.token_ids) .unwrap_or_default(); let (normal_text, reasoning_content) = ( reasoning_parser_result.get_some_normal_text(), reasoning_parser_result.get_some_reasoning(), ); // Create the streaming response. let index = 0; let stream_response = self.create_choice( index, normal_text, reasoning_content, finish_reason, logprobs, ); Ok(stream_response) } fn get_isl(&self) -> Option { Some(self.usage.prompt_tokens) } }