delta.rs 24.8 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
5
use std::sync::Arc;

6
use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
Greg Clark's avatar
Greg Clark committed
7
use crate::{
8
    local_model::runtime_config::ModelRuntimeConfig,
9
    protocols::{
10
        common::{self, timing::RequestTracker},
11
12
        openai::{
            convert_backend_top_logprobs,
13
            nvext::{NvExtProvider, NvExtResponseFieldSelection},
14
15
            token_to_utf8_bytes,
        },
16
    },
Greg Clark's avatar
Greg Clark committed
17
18
    types::TokenIdType,
};
19

20
/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
21
impl NvCreateChatCompletionRequest {
22
23
24
25
26
27
28
29
30
31
32
33
34
    /// Enables usage tracking for non-streaming requests to comply with OpenAI API specification.
    ///
    /// According to OpenAI API spec, non-streaming chat completion responses (stream=false)
    /// must always include usage statistics. This method ensures `stream_options.include_usage`
    /// is set to `true` for non-streaming requests.
    ///
    /// # Arguments
    /// * `original_stream_flag` - The original value of the `stream` field before any internal processing
    pub fn enable_usage_for_nonstreaming(&mut self, original_stream_flag: bool) {
        if !original_stream_flag {
            // For non-streaming requests (stream=false), enable usage by default
            if self.inner.stream_options.is_none() {
                self.inner.stream_options =
35
                    Some(dynamo_protocols::types::ChatCompletionStreamOptions {
36
                        include_usage: true,
37
                        continuous_usage_stats: false,
38
39
40
41
42
43
44
45
                    });
            } else if let Some(ref mut opts) = self.inner.stream_options {
                // If stream_options exists, ensure include_usage is true for non-streaming
                opts.include_usage = true;
            }
        }
    }

46
47
    /// Creates a [`DeltaGenerator`] instance based on the chat completion request.
    ///
48
49
50
    /// # Arguments
    /// * `request_id` - The request ID to use for the chat completion response ID.
    ///
51
52
    /// # Returns
    /// * [`DeltaGenerator`] configured with model name and response options.
53
    pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
54
        let response_fields = NvExtResponseFieldSelection::from_nvext(self.nvext());
55

56
        let options = DeltaGeneratorOptions {
57
58
59
60
61
62
            enable_usage: self
                .inner
                .stream_options
                .as_ref()
                .map(|opts| opts.include_usage)
                .unwrap_or(false),
63
64
65
66
67
68
            continuous_usage_stats: self
                .inner
                .stream_options
                .as_ref()
                .map(|opts| opts.continuous_usage_stats)
                .unwrap_or(false),
Greg Clark's avatar
Greg Clark committed
69
70
            enable_logprobs: self.inner.logprobs.unwrap_or(false)
                || self.inner.top_logprobs.unwrap_or(0) > 0,
71
            response_fields,
72
            runtime_config: ModelRuntimeConfig::default(),
73
74
        };

75
        DeltaGenerator::new(self.inner.model.clone(), options, request_id)
76
77
78
    }
}

79
/// Configuration options for the [`DeltaGenerator`], controlling response behavior.
80
81
#[derive(Debug, Clone, Default)]
pub struct DeltaGeneratorOptions {
82
    /// Determines whether token usage statistics should be included in the response.
83
    pub enable_usage: bool,
84
85
    /// Determines whether continuous usage statistics should be included in the response.
    pub continuous_usage_stats: bool,
86
    /// Determines whether log probabilities should be included in the response.
87
    pub enable_logprobs: bool,
88
89
    /// Determines which nvext response fields may be emitted for this request.
    pub response_fields: NvExtResponseFieldSelection,
90

91
    pub runtime_config: ModelRuntimeConfig,
92
93
}

94
/// Generates incremental chat completion responses in a streaming fashion.
95
pub struct DeltaGenerator {
96
    /// Unique identifier for the chat completion session.
97
    id: String,
98
    /// Object type, representing a streamed chat completion response.
99
    object: String,
100
    /// Timestamp (Unix epoch) when the response was created.
Paul Hendricks's avatar
Paul Hendricks committed
101
    created: u32,
102
    model: String,
103
    /// Optional system fingerprint for version tracking.
104
    system_fingerprint: Option<String>,
105
    /// Optional service tier information for the response.
106
    service_tier: Option<dynamo_protocols::types::ServiceTierResponse>,
107
    /// Tracks token usage for the completion request.
108
    usage: dynamo_protocols::types::CompletionUsage,
109
    /// Counter tracking the number of messages issued.
110
    msg_counter: u64,
111
    /// Configuration options for response generation.
112
    options: DeltaGeneratorOptions,
113
114
    /// Optional request tracker for per-request metrics (shared with PreprocessedRequest).
    tracker: Option<Arc<RequestTracker>>,
115
116
117
}

impl DeltaGenerator {
118
119
120
121
122
    /// Creates a new [`DeltaGenerator`] instance with the specified model and options.
    ///
    /// # Arguments
    /// * `model` - The model name used for response generation.
    /// * `options` - Configuration options for enabling usage and log probabilities.
123
    /// * `request_id` - The request ID to use for the chat completion response.
124
125
126
    ///
    /// # Returns
    /// * A new instance of [`DeltaGenerator`].
127
    pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
128
129
130
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
131
132
133
134
135
            .as_secs();

        // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`,
        // but this will not be an issue until 2106.
        let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
Paul Hendricks's avatar
Paul Hendricks committed
136

137
        let usage = dynamo_protocols::types::CompletionUsage {
Paul Hendricks's avatar
Paul Hendricks committed
138
139
140
141
142
143
            prompt_tokens: 0,
            completion_tokens: 0,
            total_tokens: 0,
            prompt_tokens_details: None,
            completion_tokens_details: None,
        };
144

145
146
        let chatcmpl_id = format!("chatcmpl-{request_id}");

147
        // Always create request tracker for per-worker metrics (TTFT, ITL per worker_id).
148
149
        // `response_fields` only controls which nvext fields are returned to the client;
        // the tracker still records timing/ITL internally for metrics.
150
        let tracker = Some(Arc::new(RequestTracker::new()));
151

152
        Self {
153
            id: chatcmpl_id,
154
155
156
157
158
            object: "chat.completion.chunk".to_string(),
            created: now,
            model,
            system_fingerprint: None,
            service_tier: None,
Paul Hendricks's avatar
Paul Hendricks committed
159
            usage,
160
161
            msg_counter: 0,
            options,
162
            tracker,
163
164
165
        }
    }

166
167
168
169
170
    /// Returns the request tracker if tracking is enabled, for sharing with PreprocessedRequest.
    pub fn tracker(&self) -> Option<Arc<RequestTracker>> {
        self.tracker.clone()
    }

171
172
173
174
    /// Updates the prompt token usage count.
    ///
    /// # Arguments
    /// * `isl` - The number of prompt tokens used.
Paul Hendricks's avatar
Paul Hendricks committed
175
    pub fn update_isl(&mut self, isl: u32) {
176
177
178
        self.usage.prompt_tokens = isl;
    }

Greg Clark's avatar
Greg Clark committed
179
180
181
    pub fn create_logprobs(
        &self,
        tokens: Vec<common::llm_backend::TokenType>,
182
        token_ids: &[TokenIdType],
Greg Clark's avatar
Greg Clark committed
183
184
        logprobs: Option<common::llm_backend::LogProbs>,
        top_logprobs: Option<common::llm_backend::TopLogprobs>,
185
    ) -> Option<dynamo_protocols::types::ChatChoiceLogprobs> {
Greg Clark's avatar
Greg Clark committed
186
187
188
189
190
191
192
        if !self.options.enable_logprobs || logprobs.is_none() {
            return None;
        }

        let toks = tokens
            .into_iter()
            .zip(token_ids)
193
            .map(|(token, token_id)| (token.unwrap_or_default(), *token_id))
Greg Clark's avatar
Greg Clark committed
194
195
196
197
198
199
200
201
202
203
204
205
            .collect::<Vec<(String, TokenIdType)>>();
        let tok_lps = toks
            .iter()
            .zip(logprobs.unwrap())
            .map(|(_, lp)| lp as f32)
            .collect::<Vec<f32>>();

        let content = top_logprobs.map(|top_logprobs| {
            toks.iter()
                .zip(tok_lps)
                .zip(top_logprobs)
                .map(|(((t, tid), lp), top_lps)| {
206
                    let converted = convert_backend_top_logprobs(&top_lps, t, *tid, lp);
207
                    dynamo_protocols::types::ChatCompletionTokenLogprob {
Greg Clark's avatar
Greg Clark committed
208
209
                        token: t.clone(),
                        logprob: lp,
210
211
                        bytes: token_to_utf8_bytes(t),
                        top_logprobs: converted,
Greg Clark's avatar
Greg Clark committed
212
213
214
215
216
                    }
                })
                .collect()
        });

217
        Some(dynamo_protocols::types::ChatChoiceLogprobs {
Greg Clark's avatar
Greg Clark committed
218
219
220
221
222
            content,
            refusal: None,
        })
    }

223
224
225
226
227
228
229
    /// Creates a choice within a chat completion response.
    ///
    /// # Arguments
    /// * `index` - The index of the choice in the completion response.
    /// * `text` - The text content for the response.
    /// * `finish_reason` - The reason why the response finished (e.g., stop, length, etc.).
    /// * `logprobs` - Optional log probabilities of the generated tokens.
230
    /// * `stop_reason` - Optional stop string or token that triggered the stop.
231
232
    ///
    /// # Returns
233
    /// * An [`dynamo_protocols::types::CreateChatCompletionStreamResponse`] instance representing the choice.
Paul Hendricks's avatar
Paul Hendricks committed
234
    #[allow(deprecated)]
235
    pub fn create_choice(
236
        &mut self,
Paul Hendricks's avatar
Paul Hendricks committed
237
        index: u32,
238
        text: Option<String>,
239
240
241
        finish_reason: Option<dynamo_protocols::types::FinishReason>,
        logprobs: Option<dynamo_protocols::types::ChatChoiceLogprobs>,
        stop_reason: Option<dynamo_protocols::types::StopReason>,
242
    ) -> NvCreateChatCompletionStreamResponse {
243
244
        let delta = dynamo_protocols::types::ChatCompletionStreamResponseDelta {
            content: text.map(dynamo_protocols::types::ChatCompletionMessageContent::Text),
245
246
            function_call: None,
            tool_calls: None,
247
            role: if self.msg_counter == 0 {
248
                Some(dynamo_protocols::types::Role::Assistant)
249
250
251
            } else {
                None
            },
Paul Hendricks's avatar
Paul Hendricks committed
252
            refusal: None,
253
            reasoning_content: None,
254
255
        };

256
        let choice = dynamo_protocols::types::ChatChoiceStream {
Paul Hendricks's avatar
Paul Hendricks committed
257
258
259
            index,
            delta,
            finish_reason,
260
            stop_reason,
Paul Hendricks's avatar
Paul Hendricks committed
261
262
263
264
265
            logprobs,
        };

        let choices = vec![choice];

266
267
268
        // According to OpenAI spec: when stream_options.include_usage is true,
        // all intermediate chunks should have usage: null
        // The final usage chunk will be sent separately with empty choices
269
        NvCreateChatCompletionStreamResponse {
270
            inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
271
272
273
274
275
276
277
278
279
280
281
282
                id: self.id.clone(),
                object: self.object.clone(),
                created: self.created,
                model: self.model.clone(),
                system_fingerprint: self.system_fingerprint.clone(),
                choices,
                usage: if self.options.enable_usage && self.options.continuous_usage_stats {
                    Some(self.get_usage())
                } else {
                    None
                },
                service_tier: self.service_tier.clone(),
283
            },
284
            nvext: None, // Will be populated by router layer if needed
285
        }
286
287
288
289
290
291
292
293
    }

    /// Creates a final usage-only chunk for OpenAI compliance.
    /// This should be sent after the last content chunk when stream_options.include_usage is true.
    ///
    /// # Returns
    /// * A [`CreateChatCompletionStreamResponse`] with empty choices and usage stats.
    pub fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
294
        let usage = self.get_usage();
295

296
        NvCreateChatCompletionStreamResponse {
297
            inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
298
299
300
301
302
303
304
305
306
                id: self.id.clone(),
                object: self.object.clone(),
                created: self.created,
                model: self.model.clone(),
                system_fingerprint: self.system_fingerprint.clone(),
                choices: vec![], // Empty choices for usage-only chunk
                usage: Some(usage),
                service_tier: self.service_tier.clone(),
            },
307
            nvext: None,
308
309
        }
    }
310
311
312
313
314

    /// Check if usage tracking is enabled
    pub fn is_usage_enabled(&self) -> bool {
        self.options.enable_usage
    }
315

316
317
318
319
320
    /// Check if continuous usage tracking is enabled
    pub fn is_continuous_usage_enabled(&self) -> bool {
        self.options.continuous_usage_stats
    }

321
    pub fn get_usage(&self) -> dynamo_protocols::types::CompletionUsage {
322
323
324
325
        let mut usage = self.usage.clone();
        usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens);
        usage
    }
326
327
}

328
/// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing
329
/// it to transform backend responses into OpenAI-style streaming responses.
330
331
332
impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamResponse>
    for DeltaGenerator
{
333
334
335
336
337
338
339
340
    /// Converts a backend response into a structured OpenAI-style streaming response.
    ///
    /// # Arguments
    /// * `delta` - The backend response containing generated text and metadata.
    ///
    /// # Returns
    /// * `Ok(NvCreateChatCompletionStreamResponse)` if conversion succeeds.
    /// * `Err(anyhow::Error)` if an error occurs.
341
342
343
    fn choice_from_postprocessor(
        &mut self,
        delta: crate::protocols::common::llm_backend::BackendOutput,
344
    ) -> anyhow::Result<NvCreateChatCompletionStreamResponse> {
345
346
347
348
349
350
351
352
353
354
355
        // Aggregate token usage even if usage tracking is disabled for metrics tracking
        // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`,
        // but this will not be an issue until context lengths exceed 4_294_967_295.
        let token_length: u32 = delta
            .token_ids
            .len()
            .try_into()
            .expect("token_ids length exceeds u32::MAX");

        self.usage.completion_tokens += token_length;

356
357
358
359
360
361
362
363
364
365
366
        // If backend provides completion_usage, use it to update usage stats
        // This is critical for prompt embeddings where prompt_tokens comes from
        // the embedding sequence length computed by the worker
        if let Some(completion_usage) = delta.completion_usage.as_ref() {
            // Update prompt_tokens from worker if provided (e.g., for embeddings)
            self.usage.prompt_tokens = completion_usage.prompt_tokens;

            // Propagate prompt token details if provided
            if let Some(prompt_details) = completion_usage.prompt_tokens_details.as_ref() {
                self.usage.prompt_tokens_details = Some(prompt_details.clone());
            }
367
368
        }

Greg Clark's avatar
Greg Clark committed
369
370
        let logprobs = self.create_logprobs(
            delta.tokens,
371
            &delta.token_ids,
Greg Clark's avatar
Greg Clark committed
372
373
374
            delta.log_probs,
            delta.top_logprobs,
        );
375

376
        // Map backend finish reasons to OpenAI's finish reasons.
377
        let finish_reason = match delta.finish_reason {
378
379
            Some(common::FinishReason::EoS) => Some(dynamo_protocols::types::FinishReason::Stop),
            Some(common::FinishReason::Stop) => Some(dynamo_protocols::types::FinishReason::Stop),
380
            Some(common::FinishReason::Length) => {
381
                Some(dynamo_protocols::types::FinishReason::Length)
382
383
            }
            Some(common::FinishReason::Cancelled) => {
384
                Some(dynamo_protocols::types::FinishReason::Stop)
385
            }
386
            Some(common::FinishReason::ContentFilter) => {
387
                Some(dynamo_protocols::types::FinishReason::ContentFilter)
388
            }
389
390
391
392
393
394
            Some(common::FinishReason::Error(err_msg)) => {
                return Err(anyhow::anyhow!(err_msg));
            }
            None => None,
        };

395
        // Create the streaming response.
396
        let index = 0;
397
398
399
400
401
402
403
        let mut stream_response = self.create_choice(
            index,
            delta.text,
            finish_reason,
            logprobs,
            delta.stop_reason,
        );
404

405
406
407
408
409
410
411
        // Record finish for timing/ITL accounting even when timing is not returned to the client.
        // Kept at call site because it's a side effect on the tracker — not a gating decision.
        if finish_reason.is_some()
            && let Some(ref tracker) = self.tracker
        {
            tracker.record_finish();
        }
412

413
414
415
416
417
418
419
420
421
        // Build the nvext response payload via the shared gating helper on
        // `NvExtResponseFieldSelection` (see `nvext.rs`). Both chat and
        // completions delta generators go through the same helper so the gating
        // rules stay in one place.
        if let Some(nvext_response) = self.options.response_fields.build_response_nvext(
            self.tracker.as_ref(),
            delta.disaggregated_params.as_ref(),
            finish_reason.is_some(),
        ) && let Ok(nvext_json) = serde_json::to_value(&nvext_response)
422
        {
423
424
425
426
427
428
429
430
431
432
433
434
435
            stream_response.nvext = Some(nvext_json);
            if let Some(ref info) = nvext_response.worker_id {
                tracing::debug!(
                    "Injected worker_id into chat completion nvext: prefill={:?}, decode={:?}",
                    info.prefill_worker_id,
                    info.decode_worker_id
                );
            }
            if let Some(ref tokens) = nvext_response.token_ids {
                tracing::debug!(
                    "Injected token_ids into chat completion nvext: {} tokens",
                    tokens.len()
                );
436
437
            }
        }
Paul Hendricks's avatar
Paul Hendricks committed
438

439
        Ok(stream_response)
440
    }
441
442
443
444

    fn get_isl(&self) -> Option<u32> {
        Some(self.usage.prompt_tokens)
    }
445
446
447
448
449
450
451
452

    fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
        DeltaGenerator::create_usage_chunk(self)
    }

    fn is_usage_enabled(&self) -> bool {
        DeltaGenerator::is_usage_enabled(self)
    }
453

454
455
456
457
    fn is_continuous_usage_enabled(&self) -> bool {
        DeltaGenerator::is_continuous_usage_enabled(self)
    }

458
    fn get_usage(&self) -> dynamo_protocols::types::CompletionUsage {
459
460
        DeltaGenerator::get_usage(self)
    }
461
462
463
464

    fn tracker(&self) -> Option<std::sync::Arc<crate::protocols::common::timing::RequestTracker>> {
        self.tracker.clone()
    }
465
}
466
467
468
469

#[cfg(test)]
mod tests {
    use super::*;
470
471
    use crate::protocols::common::{self, llm_backend::BackendOutput, timing::WORKER_TYPE_PREFILL};
    use crate::protocols::openai::DeltaGeneratorExt;
472
    use dynamo_protocols::types::{
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
        ChatCompletionRequestMessage, ChatCompletionRequestUserMessage,
        ChatCompletionRequestUserMessageContent, CreateChatCompletionRequest,
    };

    fn create_test_request() -> NvCreateChatCompletionRequest {
        let messages = vec![ChatCompletionRequestMessage::User(
            ChatCompletionRequestUserMessage {
                content: ChatCompletionRequestUserMessageContent::Text("test".to_string()),
                name: None,
            },
        )];

        NvCreateChatCompletionRequest {
            inner: CreateChatCompletionRequest {
                model: "test-model".to_string(),
                messages,
                stream: Some(false),
                stream_options: None,
                ..Default::default()
            },
            common: Default::default(),
            nvext: None,
            chat_template_args: None,
496
            media_io_kwargs: None,
497
            unsupported_fields: Default::default(),
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
        }
    }

    #[test]
    fn test_enable_usage_for_nonstreaming_enables_usage() {
        // Test that non-streaming requests get usage enabled
        let mut request = create_test_request();
        assert!(request.inner.stream_options.is_none());

        request.enable_usage_for_nonstreaming(false); // false = non-streaming

        assert!(
            request.inner.stream_options.is_some(),
            "Non-streaming request should have stream_options created"
        );
        assert!(
            request.inner.stream_options.unwrap().include_usage,
            "Non-streaming request should have include_usage=true for OpenAI compliance"
        );
517
518
519
520
        assert!(
            !request.inner.stream_options.unwrap().continuous_usage_stats,
            "Non-streaming request should have continuous_usage_stats=false for OpenAI compliance"
        );
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
    }

    #[test]
    fn test_enable_usage_for_nonstreaming_ignores_streaming() {
        // Test that streaming requests are not modified
        let mut request = create_test_request();
        assert!(request.inner.stream_options.is_none());

        request.enable_usage_for_nonstreaming(true); // true = streaming

        assert!(
            request.inner.stream_options.is_none(),
            "Streaming request should not have stream_options modified"
        );
    }
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655

    fn make_request_with_nvext(
        nvext: crate::protocols::openai::nvext::NvExt,
    ) -> NvCreateChatCompletionRequest {
        let mut request = create_test_request();
        request.nvext = Some(nvext);
        request
    }

    fn final_backend_output() -> BackendOutput {
        BackendOutput {
            token_ids: vec![1],
            tokens: vec![Some("hello".to_string())],
            text: Some("hello".to_string()),
            cum_log_probs: None,
            log_probs: None,
            top_logprobs: None,
            finish_reason: Some(common::FinishReason::Stop),
            stop_reason: None,
            index: Some(0),
            completion_usage: None,
            disaggregated_params: Some(serde_json::json!({
                "token_ids": [11, 22, 33],
                "routed_experts": {"layer_0": [1, 3]}
            })),
        }
    }

    #[test]
    fn test_plain_request_without_extra_fields_omits_nvext() {
        let request = create_test_request();
        let mut generator = request.response_generator("req-no-nvext".to_string());
        let tracker = generator.tracker().expect("tracker");
        tracker.record_worker(42, Some(0), WORKER_TYPE_PREFILL);

        let response = generator
            .choice_from_postprocessor(final_backend_output())
            .expect("choice generation");

        assert!(response.nvext.is_none());
    }

    #[test]
    fn test_timing_extra_field_emits_timing_on_final_chunk() {
        use crate::protocols::openai::nvext::NvExt;
        let nvext = NvExt::builder()
            .extra_fields(vec!["timing".to_string()])
            .build()
            .unwrap();
        let mut generator =
            make_request_with_nvext(nvext).response_generator("req-timing".to_string());

        let response = generator
            .choice_from_postprocessor(final_backend_output())
            .expect("choice generation");

        let nvext_json = response.nvext.expect("nvext present for timing request");
        assert!(
            nvext_json.get("timing").is_some(),
            "timing should be emitted when extra_fields=[\"timing\"]"
        );
        assert!(nvext_json.get("worker_id").is_none());
        assert!(nvext_json.get("token_ids").is_none());
        assert!(nvext_json.get("routed_experts").is_none());
    }

    #[test]
    fn test_query_instance_id_emits_worker_id_and_token_ids() {
        use crate::protocols::openai::nvext::NvExt;
        let nvext = NvExt::builder()
            .annotations(vec!["query_instance_id:abc".to_string()])
            .build()
            .unwrap();
        let mut generator =
            make_request_with_nvext(nvext).response_generator("req-qid".to_string());
        let tracker = generator.tracker().expect("tracker");
        tracker.record_worker(42, Some(0), WORKER_TYPE_PREFILL);

        let response = generator
            .choice_from_postprocessor(final_backend_output())
            .expect("choice generation");

        let nvext_json = response
            .nvext
            .expect("nvext present for query_instance_id flow");
        assert!(nvext_json.get("worker_id").is_some());
        assert_eq!(
            nvext_json.get("token_ids"),
            Some(&serde_json::json!([11, 22, 33]))
        );
        // timing is NOT auto-enabled for query_instance_id — it is gated by `extra_fields: ["timing"]`.
        assert!(nvext_json.get("timing").is_none());
        assert!(nvext_json.get("routed_experts").is_none());
    }

    #[test]
    fn test_routed_experts_extra_field_emits_routed_experts() {
        use crate::protocols::openai::nvext::NvExt;
        let nvext = NvExt::builder()
            .extra_fields(vec!["routed_experts".to_string()])
            .build()
            .unwrap();
        let mut generator =
            make_request_with_nvext(nvext).response_generator("req-experts".to_string());

        let response = generator
            .choice_from_postprocessor(final_backend_output())
            .expect("choice generation");

        let nvext_json = response
            .nvext
            .expect("nvext present for routed_experts request");
        assert_eq!(
            nvext_json.get("routed_experts"),
            Some(&serde_json::json!({"layer_0": [1, 3]}))
        );
        assert!(nvext_json.get("worker_id").is_none());
        assert!(nvext_json.get("timing").is_none());
        assert!(nvext_json.get("token_ids").is_none());
    }
656
}