Unverified Commit 1c9412d2 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(lpu): add opaque engine_data field to nvext response with per-request opt-in via extra_fields

parent 3f83c597
......@@ -300,6 +300,7 @@ impl
index: data.index,
completion_usage: data.completion_usage,
disaggregated_params: data.disaggregated_params,
engine_data: data.engine_data,
})
})
});
......
......@@ -323,6 +323,7 @@ mod tests {
index: None,
disaggregated_params: None,
completion_usage: None,
engine_data: None,
})
}
......
......@@ -106,6 +106,11 @@ pub struct BackendOutput {
/// Disaggregated execution parameters (for prefill/decode separation)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub disaggregated_params: Option<serde_json::Value>,
/// Opaque engine data passed through from the backend worker to the response.
/// Dynamo does not inspect this field; it is serialized as-is into `nvext.engine_data`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub engine_data: Option<serde_json::Value>,
}
/// The LLM engine and backnd with manage it's own state, specifically translating how a
......@@ -167,6 +172,11 @@ pub struct LLMEngineOutput {
// Token usage information
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completion_usage: Option<CompletionUsage>,
/// Opaque engine data passed through from the backend worker to the response.
/// Dynamo does not inspect this field; it is serialized as-is into `nvext.engine_data`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub engine_data: Option<serde_json::Value>,
}
impl LLMEngineOutput {
......@@ -186,6 +196,7 @@ impl LLMEngineOutput {
disaggregated_params: None,
extra_args: None,
completion_usage: None,
engine_data: None,
}
}
......@@ -205,6 +216,7 @@ impl LLMEngineOutput {
disaggregated_params: None,
extra_args: None,
completion_usage: None,
engine_data: None,
}
}
......@@ -224,6 +236,7 @@ impl LLMEngineOutput {
disaggregated_params: None,
extra_args: None,
completion_usage: None,
engine_data: None,
}
}
......@@ -243,6 +256,7 @@ impl LLMEngineOutput {
disaggregated_params: None,
extra_args: None,
completion_usage: None,
engine_data: None,
}
}
}
......
......@@ -418,6 +418,7 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
self.tracker.as_ref(),
delta.disaggregated_params.as_ref(),
finish_reason.is_some(),
delta.engine_data,
) && let Ok(nvext_json) = serde_json::to_value(&nvext_response)
{
stream_response.nvext = Some(nvext_json);
......@@ -558,6 +559,58 @@ mod tests {
"token_ids": [11, 22, 33],
"routed_experts": {"layer_0": [1, 3]}
})),
engine_data: None,
}
}
fn create_test_request_with_extra_fields(fields: Vec<String>) -> NvCreateChatCompletionRequest {
let messages = vec![ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionRequestUserMessageContent::Text("test".to_string()),
name: None,
},
)];
NvCreateChatCompletionRequest {
inner: CreateChatCompletionRequest {
model: "test-model".to_string(),
messages,
stream: Some(true),
stream_options: None,
..Default::default()
},
common: Default::default(),
nvext: Some(
crate::protocols::openai::nvext::NvExt::builder()
.extra_fields(fields)
.build()
.unwrap(),
),
chat_template_args: None,
media_io_kwargs: None,
unsupported_fields: Default::default(),
}
}
fn make_backend_output_with_engine_data() -> crate::protocols::common::llm_backend::BackendOutput
{
crate::protocols::common::llm_backend::BackendOutput {
token_ids: vec![42],
tokens: vec![Some("hello".to_string())],
text: Some("hello".to_string()),
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: Some(crate::protocols::common::FinishReason::Stop),
stop_reason: None,
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: Some(serde_json::json!({
"kv_transfer_time_ms": 12.3,
"disaggregated_kv_transfer_time_ms": 8.1,
"prefill_compute_time_ms": 45.6
})),
}
}
......@@ -653,4 +706,93 @@ mod tests {
assert!(nvext_json.get("timing").is_none());
assert!(nvext_json.get("token_ids").is_none());
}
#[test]
fn test_engine_data_included_when_requested_via_extra_fields() {
let request = create_test_request_with_extra_fields(vec!["engine_data".to_string()]);
let mut generator = request.response_generator("req-engine-1".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
let nvext = response.nvext.expect("nvext should be present");
let engine_data = nvext
.get("engine_data")
.expect("engine_data should be present");
assert_eq!(engine_data["kv_transfer_time_ms"], 12.3);
assert_eq!(engine_data["prefill_compute_time_ms"], 45.6);
}
#[test]
fn test_engine_data_excluded_when_not_requested() {
let request = create_test_request();
let mut generator = request.response_generator("req-engine-2".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
// nvext may or may not be present (tracker may inject worker_id),
// but engine_data specifically must be absent
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not be present when not requested"
);
}
}
#[test]
fn test_engine_data_excluded_when_other_extra_fields_requested() {
let request = create_test_request_with_extra_fields(vec!["timing".to_string()]);
let mut generator = request.response_generator("req-engine-3".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not be present when only timing is requested"
);
}
}
#[test]
fn test_engine_data_none_from_backend_no_nvext_noise() {
let request = create_test_request_with_extra_fields(vec!["engine_data".to_string()]);
let mut generator = request.response_generator("req-engine-4".to_string());
let backend_output = crate::protocols::common::llm_backend::BackendOutput {
token_ids: vec![42],
tokens: vec![Some("hello".to_string())],
text: Some("hello".to_string()),
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: Some(crate::protocols::common::FinishReason::Stop),
stop_reason: None,
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None, // engine didn't provide any data
};
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
// engine_data is None from backend, so nvext.engine_data should be absent
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not appear when backend provides None"
);
}
}
}
......@@ -312,6 +312,7 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
self.tracker.as_ref(),
delta.disaggregated_params.as_ref(),
finish_reason.is_some(),
delta.engine_data,
) && let Ok(nvext_json) = serde_json::to_value(&nvext_response)
{
response.nvext = Some(nvext_json);
......@@ -405,6 +406,49 @@ mod tests {
"token_ids": [11, 22, 33],
"routed_experts": {"layer_0": [1, 3]}
})),
engine_data: None,
}
}
fn create_test_request_with_extra_fields(fields: Vec<String>) -> NvCreateCompletionRequest {
let inner = CreateCompletionRequestArgs::default()
.model("test-model")
.prompt(Prompt::String("test".to_string()))
.build()
.expect("completion request");
NvCreateCompletionRequest {
inner,
common: Default::default(),
nvext: Some(
crate::protocols::openai::nvext::NvExt::builder()
.extra_fields(fields)
.build()
.unwrap(),
),
metadata: None,
unsupported_fields: Default::default(),
}
}
fn make_backend_output_with_engine_data() -> BackendOutput {
BackendOutput {
token_ids: vec![42],
tokens: vec![Some("hello".to_string())],
text: Some("hello".to_string()),
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: Some(common::FinishReason::Stop),
stop_reason: None,
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: Some(serde_json::json!({
"kv_transfer_time_ms": 12.3,
"disaggregated_kv_transfer_time_ms": 8.1,
"prefill_compute_time_ms": 45.6
})),
}
}
......@@ -500,4 +544,93 @@ mod tests {
assert!(nvext_json.get("timing").is_none());
assert!(nvext_json.get("token_ids").is_none());
}
#[test]
fn test_engine_data_included_when_requested_via_extra_fields() {
let request = create_test_request_with_extra_fields(vec!["engine_data".to_string()]);
let mut generator = request.response_generator("req-engine-1".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
let nvext = response.nvext.expect("nvext should be present");
let engine_data = nvext
.get("engine_data")
.expect("engine_data should be present");
assert_eq!(engine_data["kv_transfer_time_ms"], 12.3);
assert_eq!(engine_data["prefill_compute_time_ms"], 45.6);
}
#[test]
fn test_engine_data_excluded_when_not_requested() {
let request = create_test_request();
let mut generator = request.response_generator("req-engine-2".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
// nvext may or may not be present (tracker may inject worker_id),
// but engine_data specifically must be absent
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not be present when not requested"
);
}
}
#[test]
fn test_engine_data_excluded_when_other_extra_fields_requested() {
let request = create_test_request_with_extra_fields(vec!["timing".to_string()]);
let mut generator = request.response_generator("req-engine-3".to_string());
let backend_output = make_backend_output_with_engine_data();
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not be present when only timing is requested"
);
}
}
#[test]
fn test_engine_data_none_from_backend_no_nvext_noise() {
let request = create_test_request_with_extra_fields(vec!["engine_data".to_string()]);
let mut generator = request.response_generator("req-engine-4".to_string());
let backend_output = BackendOutput {
token_ids: vec![42],
tokens: vec![Some("hello".to_string())],
text: Some("hello".to_string()),
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: Some(common::FinishReason::Stop),
stop_reason: None,
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None, // engine didn't provide any data
};
let response = generator
.choice_from_postprocessor(backend_output)
.expect("should produce a response");
// engine_data is None from backend, so nvext.engine_data should be absent
if let Some(nvext) = &response.nvext {
assert!(
nvext.get("engine_data").is_none() || nvext.get("engine_data").unwrap().is_null(),
"engine_data should not appear when backend provides None"
);
}
}
}
......@@ -114,6 +114,11 @@ pub struct NvExtResponse {
/// Routed expert capture payload (SGLang-specific)
#[serde(skip_serializing_if = "Option::is_none")]
pub routed_experts: Option<serde_json::Value>,
/// Opaque engine data passed through from the backend worker.
/// Dynamo does not inspect this; it is forwarded as-is to the client.
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_data: Option<serde_json::Value>,
}
/// Response nvext fields requested for a given request.
......@@ -130,6 +135,7 @@ pub struct NvExtResponseFieldSelection {
pub timing: bool,
pub token_ids: bool,
pub routed_experts: bool,
pub engine_data: bool,
}
impl NvExtResponseFieldSelection {
......@@ -145,6 +151,7 @@ impl NvExtResponseFieldSelection {
"worker_id" => selection.worker_id = true,
"timing" => selection.timing = true,
"routed_experts" => selection.routed_experts = true,
"engine_data" => selection.engine_data = true,
_ => {}
}
}
......@@ -176,11 +183,13 @@ impl NvExtResponseFieldSelection {
/// - `routed_experts` requires the selection flag **and** a `"routed_experts"` key on
/// `disaggregated_params` (cloned as-is, no validation).
/// - `timing` requires the selection flag, `finish_reason_present == true`, **and** a tracker.
/// - `engine_data` requires the selection flag **and** a non-`None` `engine_data_from_backend`.
pub fn build_response_nvext(
&self,
tracker: Option<&std::sync::Arc<crate::protocols::common::timing::RequestTracker>>,
disaggregated_params: Option<&serde_json::Value>,
finish_reason_present: bool,
engine_data_from_backend: Option<serde_json::Value>,
) -> Option<NvExtResponse> {
let worker_id = if self.worker_id {
tracker.and_then(|t| t.get_worker_info())
......@@ -210,10 +219,17 @@ impl NvExtResponseFieldSelection {
None
};
let engine_data = if self.engine_data {
engine_data_from_backend
} else {
None
};
if worker_id.is_none()
&& token_ids.is_none()
&& routed_experts.is_none()
&& timing.is_none()
&& engine_data.is_none()
{
return None;
}
......@@ -223,6 +239,7 @@ impl NvExtResponseFieldSelection {
timing,
token_ids,
routed_experts,
engine_data,
})
}
}
......@@ -273,7 +290,7 @@ pub struct NvExt {
/// Extra fields to be included in the response's nvext
/// This is a list of field names that should be populated in the response
/// Supported fields include "worker_id", "timing", "routed_experts",
/// Supported fields include "worker_id", "timing", "routed_experts", "engine_data",
/// which map to fields in NvExtResponse.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
......@@ -677,11 +694,11 @@ mod tests {
fn test_build_response_nvext_all_false_returns_none() {
let sel = sel_all_false();
assert!(
sel.build_response_nvext(None, None, false).is_none(),
sel.build_response_nvext(None, None, false, None).is_none(),
"no fields selected → None"
);
assert!(
sel.build_response_nvext(None, None, true).is_none(),
sel.build_response_nvext(None, None, true, None).is_none(),
"finish_reason alone does not force emission"
);
}
......@@ -696,7 +713,7 @@ mod tests {
// finish_reason=false: worker_id still emitted (only timing is finish-gated).
let out = sel
.build_response_nvext(Some(&tracker), None, false)
.build_response_nvext(Some(&tracker), None, false, None)
.expect("worker_id should emit regardless of finish_reason");
assert!(out.worker_id.is_some());
......@@ -715,7 +732,7 @@ mod tests {
// timing alone + finish_reason=false → nothing to emit, returns None.
assert!(
sel.build_response_nvext(Some(&tracker), None, false)
sel.build_response_nvext(Some(&tracker), None, false, None)
.is_none(),
"timing is gated on finish_reason_present"
);
......@@ -730,7 +747,7 @@ mod tests {
let tracker = tracker_with_prefill_worker();
let out = sel
.build_response_nvext(Some(&tracker), None, true)
.build_response_nvext(Some(&tracker), None, true, None)
.expect("timing should emit on finish");
assert!(out.timing.is_some());
......@@ -746,7 +763,7 @@ mod tests {
..Default::default()
};
// finish=true but no tracker → timing not populated → None.
assert!(sel.build_response_nvext(None, None, true).is_none());
assert!(sel.build_response_nvext(None, None, true, None).is_none());
}
#[test]
......@@ -758,7 +775,7 @@ mod tests {
let params = disagg_params_full();
let out = sel
.build_response_nvext(None, Some(&params), false)
.build_response_nvext(None, Some(&params), false, None)
.expect("token_ids should emit when present");
assert_eq!(out.token_ids, Some(vec![11u32, 22, 33]));
......@@ -777,7 +794,7 @@ mod tests {
let params = serde_json::json!({ "token_ids": "not-an-array" });
assert!(
sel.build_response_nvext(None, Some(&params), false)
sel.build_response_nvext(None, Some(&params), false, None)
.is_none(),
"malformed token_ids silently suppressed; nothing else selected → None"
);
......@@ -792,7 +809,7 @@ mod tests {
let params = disagg_params_full();
let out = sel
.build_response_nvext(None, Some(&params), false)
.build_response_nvext(None, Some(&params), false, None)
.expect("routed_experts should emit when present");
assert_eq!(
......@@ -808,12 +825,13 @@ mod tests {
timing: true,
token_ids: true,
routed_experts: true,
engine_data: false,
};
let tracker = tracker_with_prefill_worker();
let params = disagg_params_full();
let out = sel
.build_response_nvext(Some(&tracker), Some(&params), true)
.build_response_nvext(Some(&tracker), Some(&params), true, None)
.expect("all fields selected and available → Some");
assert!(out.worker_id.is_some());
......@@ -843,6 +861,7 @@ mod tests {
timing: true,
token_ids: false, // only enabled via query_instance_id
routed_experts: true,
engine_data: false,
}
);
}
......
......@@ -110,6 +110,7 @@ fn build_backend_outputs_with_cached_tokens(cached_tokens: Option<u32>) -> Vec<B
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
},
BackendOutput {
token_ids: vec![1917],
......@@ -123,6 +124,7 @@ fn build_backend_outputs_with_cached_tokens(cached_tokens: Option<u32>) -> Vec<B
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
},
BackendOutput {
token_ids: vec![0],
......@@ -145,6 +147,7 @@ fn build_backend_outputs_with_cached_tokens(cached_tokens: Option<u32>) -> Vec<B
completion_tokens_details: None,
}),
disaggregated_params: None,
engine_data: None,
},
]
}
......
......@@ -134,6 +134,7 @@ fn build_backend_output(text: &str) -> BackendOutput {
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
}
}
......@@ -302,6 +303,7 @@ async fn test_streaming_named_tool_buffers_until_finish() {
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
};
let response = generator
......@@ -369,6 +371,7 @@ async fn test_streaming_required_tool_parallel() {
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
};
let response = generator
......@@ -438,6 +441,7 @@ fn test_no_tool_choice_outputs_normal_text() {
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
};
let response = generator
......
......@@ -50,6 +50,7 @@ fn build_backend_output_with_finish(text: &str, finish: common::FinishReason) ->
index: Some(0),
completion_usage: None,
disaggregated_params: None,
engine_data: None,
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment