delta.rs 12.3 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};

6
use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
Greg Clark's avatar
Greg Clark committed
7
8
9
10
use crate::{
    protocols::common::{self},
    types::TokenIdType,
};
11

12
/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
13
impl NvCreateChatCompletionRequest {
14
15
16
17
    /// Creates a [`DeltaGenerator`] instance based on the chat completion request.
    ///
    /// # Returns
    /// * [`DeltaGenerator`] configured with model name and response options.
18
19
20
    pub fn response_generator(&self) -> DeltaGenerator {
        let options = DeltaGeneratorOptions {
            enable_usage: true,
Greg Clark's avatar
Greg Clark committed
21
22
            enable_logprobs: self.inner.logprobs.unwrap_or(false)
                || self.inner.top_logprobs.unwrap_or(0) > 0,
23
24
        };

Paul Hendricks's avatar
Paul Hendricks committed
25
        DeltaGenerator::new(self.inner.model.clone(), options)
26
27
28
    }
}

29
/// Configuration options for the [`DeltaGenerator`], controlling response behavior.
30
31
#[derive(Debug, Clone, Default)]
pub struct DeltaGeneratorOptions {
32
    /// Determines whether token usage statistics should be included in the response.
33
    pub enable_usage: bool,
34
    /// Determines whether log probabilities should be included in the response.
35
36
37
    pub enable_logprobs: bool,
}

38
/// Generates incremental chat completion responses in a streaming fashion.
39
#[derive(Debug)]
40
pub struct DeltaGenerator {
41
    /// Unique identifier for the chat completion session.
42
    id: String,
43
    /// Object type, representing a streamed chat completion response.
44
    object: String,
45
    /// Timestamp (Unix epoch) when the response was created.
Paul Hendricks's avatar
Paul Hendricks committed
46
    created: u32,
47
    model: String,
48
    /// Optional system fingerprint for version tracking.
49
    system_fingerprint: Option<String>,
50
    /// Optional service tier information for the response.
51
    service_tier: Option<dynamo_async_openai::types::ServiceTierResponse>,
52
    /// Tracks token usage for the completion request.
53
    usage: dynamo_async_openai::types::CompletionUsage,
54
    /// Counter tracking the number of messages issued.
55
    msg_counter: u64,
56
    /// Configuration options for response generation.
57
    options: DeltaGeneratorOptions,
58
59
60
61

    /// Reasoning Parser object
    /// This is used to parse reasoning content in the response.
    reasoning_parser: ReasoningParserWrapper,
62
63
64
}

impl DeltaGenerator {
65
66
67
68
69
70
71
72
    /// Creates a new [`DeltaGenerator`] instance with the specified model and options.
    ///
    /// # Arguments
    /// * `model` - The model name used for response generation.
    /// * `options` - Configuration options for enabling usage and log probabilities.
    ///
    /// # Returns
    /// * A new instance of [`DeltaGenerator`].
73
74
75
76
    pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
77
78
79
80
81
            .as_secs();

        // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`,
        // but this will not be an issue until 2106.
        let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
Paul Hendricks's avatar
Paul Hendricks committed
82

83
        let usage = dynamo_async_openai::types::CompletionUsage {
Paul Hendricks's avatar
Paul Hendricks committed
84
85
86
87
88
89
            prompt_tokens: 0,
            completion_tokens: 0,
            total_tokens: 0,
            prompt_tokens_details: None,
            completion_tokens_details: None,
        };
90

91
92
93
94
95
96
97
98
        // Reasoning parser type
        // This is hardcoded for now, but can be made configurable later.
        // TODO: Make parser type configurable once front-end integration is determined
        let reasoning_parser_type = ReasoningParserType::Basic;

        // Reasoning parser wrapper
        let reasoning_parser = reasoning_parser_type.get_reasoning_parser();

99
100
101
102
103
104
105
        Self {
            id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
            object: "chat.completion.chunk".to_string(),
            created: now,
            model,
            system_fingerprint: None,
            service_tier: None,
Paul Hendricks's avatar
Paul Hendricks committed
106
            usage,
107
108
            msg_counter: 0,
            options,
109
            reasoning_parser,
110
111
112
        }
    }

113
114
115
116
    /// Updates the prompt token usage count.
    ///
    /// # Arguments
    /// * `isl` - The number of prompt tokens used.
Paul Hendricks's avatar
Paul Hendricks committed
117
    pub fn update_isl(&mut self, isl: u32) {
118
119
120
        self.usage.prompt_tokens = isl;
    }

Greg Clark's avatar
Greg Clark committed
121
122
123
124
125
126
    pub fn create_logprobs(
        &self,
        tokens: Vec<common::llm_backend::TokenType>,
        token_ids: Vec<TokenIdType>,
        logprobs: Option<common::llm_backend::LogProbs>,
        top_logprobs: Option<common::llm_backend::TopLogprobs>,
127
    ) -> Option<dynamo_async_openai::types::ChatChoiceLogprobs> {
Greg Clark's avatar
Greg Clark committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
        if !self.options.enable_logprobs || logprobs.is_none() {
            return None;
        }

        let toks = tokens
            .into_iter()
            .zip(token_ids)
            .map(|(token, token_id)| (token.unwrap_or_default(), token_id))
            .collect::<Vec<(String, TokenIdType)>>();
        let tok_lps = toks
            .iter()
            .zip(logprobs.unwrap())
            .map(|(_, lp)| lp as f32)
            .collect::<Vec<f32>>();

        let content = top_logprobs.map(|top_logprobs| {
            toks.iter()
                .zip(tok_lps)
                .zip(top_logprobs)
                .map(|(((t, tid), lp), top_lps)| {
                    let mut found_selected_token = false;
                    let mut converted_top_lps = top_lps
                        .iter()
                        .map(|top_lp| {
                            let top_t = top_lp.token.clone().unwrap_or_default();
                            let top_tid = top_lp.token_id;
                            found_selected_token = found_selected_token || top_tid == *tid;
155
                            dynamo_async_openai::types::TopLogprobs {
Greg Clark's avatar
Greg Clark committed
156
157
158
159
160
                                token: top_t,
                                logprob: top_lp.logprob as f32,
                                bytes: None,
                            }
                        })
161
                        .collect::<Vec<dynamo_async_openai::types::TopLogprobs>>();
Greg Clark's avatar
Greg Clark committed
162
163
                    if !found_selected_token {
                        // If the selected token is not in the top logprobs, add it
164
                        converted_top_lps.push(dynamo_async_openai::types::TopLogprobs {
Greg Clark's avatar
Greg Clark committed
165
166
167
168
169
                            token: t.clone(),
                            logprob: lp,
                            bytes: None,
                        });
                    }
170
                    dynamo_async_openai::types::ChatCompletionTokenLogprob {
Greg Clark's avatar
Greg Clark committed
171
172
173
174
175
176
177
178
179
                        token: t.clone(),
                        logprob: lp,
                        bytes: None,
                        top_logprobs: converted_top_lps,
                    }
                })
                .collect()
        });

180
        Some(dynamo_async_openai::types::ChatChoiceLogprobs {
Greg Clark's avatar
Greg Clark committed
181
182
183
184
185
            content,
            refusal: None,
        })
    }

186
187
188
189
190
191
192
193
194
    fn create_reasoning_content(&mut self, text: Option<String>) -> Option<ParserResult> {
        let text = text?;
        let parser_result = self
            .reasoning_parser
            .parse_reasoning_streaming_incremental(&text);

        Some(parser_result)
    }

195
196
197
198
199
200
201
202
203
    /// Creates a choice within a chat completion response.
    ///
    /// # Arguments
    /// * `index` - The index of the choice in the completion response.
    /// * `text` - The text content for the response.
    /// * `finish_reason` - The reason why the response finished (e.g., stop, length, etc.).
    /// * `logprobs` - Optional log probabilities of the generated tokens.
    ///
    /// # Returns
204
    /// * An [`dynamo_async_openai::types::CreateChatCompletionStreamResponse`] instance representing the choice.
Paul Hendricks's avatar
Paul Hendricks committed
205
    #[allow(deprecated)]
206
    pub fn create_choice(
207
        &mut self,
Paul Hendricks's avatar
Paul Hendricks committed
208
        index: u32,
209
        text: Option<String>,
210
211
        finish_reason: Option<dynamo_async_openai::types::FinishReason>,
        logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
212
213
214
215
216
217
218
    ) -> NvCreateChatCompletionStreamResponse {
        let reasoning_parser_result = self.create_reasoning_content(text).unwrap_or_default();

        let (normal_text, reasoning_content) = (
            reasoning_parser_result.get_some_normal_text(),
            reasoning_parser_result.get_some_reasoning(),
        );
219
        let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
220
            content: normal_text,
221
222
            function_call: None,
            tool_calls: None,
223
            role: if self.msg_counter == 0 {
224
                Some(dynamo_async_openai::types::Role::Assistant)
225
226
227
            } else {
                None
            },
Paul Hendricks's avatar
Paul Hendricks committed
228
            refusal: None,
229
            reasoning_content,
230
231
        };

232
        let choice = dynamo_async_openai::types::ChatChoiceStream {
Paul Hendricks's avatar
Paul Hendricks committed
233
234
235
236
237
238
239
240
            index,
            delta,
            finish_reason,
            logprobs,
        };

        let choices = vec![choice];

241
242
243
244
245
        let mut usage = self.usage.clone();
        if self.options.enable_usage {
            usage.total_tokens = usage.prompt_tokens + usage.completion_tokens;
        }

246
        dynamo_async_openai::types::CreateChatCompletionStreamResponse {
247
248
249
250
251
            id: self.id.clone(),
            object: self.object.clone(),
            created: self.created,
            model: self.model.clone(),
            system_fingerprint: self.system_fingerprint.clone(),
Paul Hendricks's avatar
Paul Hendricks committed
252
            choices,
253
            usage: if self.options.enable_usage {
254
                Some(usage)
255
256
257
258
259
260
261
262
            } else {
                None
            },
            service_tier: self.service_tier.clone(),
        }
    }
}

263
/// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing
264
/// it to transform backend responses into OpenAI-style streaming responses.
265
266
267
impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamResponse>
    for DeltaGenerator
{
268
269
270
271
272
273
274
275
    /// Converts a backend response into a structured OpenAI-style streaming response.
    ///
    /// # Arguments
    /// * `delta` - The backend response containing generated text and metadata.
    ///
    /// # Returns
    /// * `Ok(NvCreateChatCompletionStreamResponse)` if conversion succeeds.
    /// * `Err(anyhow::Error)` if an error occurs.
276
277
278
    fn choice_from_postprocessor(
        &mut self,
        delta: crate::protocols::common::llm_backend::BackendOutput,
279
    ) -> anyhow::Result<NvCreateChatCompletionStreamResponse> {
280
        // Aggregate token usage if enabled.
281
        if self.options.enable_usage {
282
283
284
285
286
287
288
289
290
            // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`,
            // but this will not be an issue until context lengths exceed 4_294_967_295.
            let token_length: u32 = delta
                .token_ids
                .len()
                .try_into()
                .expect("token_ids length exceeds u32::MAX");

            self.usage.completion_tokens += token_length;
291
292
        }

Greg Clark's avatar
Greg Clark committed
293
294
295
296
297
298
        let logprobs = self.create_logprobs(
            delta.tokens,
            delta.token_ids,
            delta.log_probs,
            delta.top_logprobs,
        );
299

300
        // Map backend finish reasons to OpenAI's finish reasons.
301
        let finish_reason = match delta.finish_reason {
302
303
304
305
306
307
308
309
310
311
            Some(common::FinishReason::EoS) => Some(dynamo_async_openai::types::FinishReason::Stop),
            Some(common::FinishReason::Stop) => {
                Some(dynamo_async_openai::types::FinishReason::Stop)
            }
            Some(common::FinishReason::Length) => {
                Some(dynamo_async_openai::types::FinishReason::Length)
            }
            Some(common::FinishReason::Cancelled) => {
                Some(dynamo_async_openai::types::FinishReason::Stop)
            }
312
            Some(common::FinishReason::ContentFilter) => {
313
                Some(dynamo_async_openai::types::FinishReason::ContentFilter)
314
            }
315
316
317
318
319
320
            Some(common::FinishReason::Error(err_msg)) => {
                return Err(anyhow::anyhow!(err_msg));
            }
            None => None,
        };

321
        // Create the streaming response.
322
        let index = 0;
Paul Hendricks's avatar
Paul Hendricks committed
323
324
        let stream_response = self.create_choice(index, delta.text, finish_reason, logprobs);

325
        Ok(stream_response)
326
    }
327
328
329
330

    fn get_isl(&self) -> Option<u32> {
        Some(self.usage.prompt_tokens)
    }
331
}