Unverified Commit 90caf3ea authored by KrishnanPrash's avatar KrishnanPrash Committed by GitHub
Browse files

feat: Extend TensorRequest and TensorResponse to contains extra parameters (#3761)


Signed-off-by: default avatarKrishnan Prashanth <kprashanth@nvidia.com>
Signed-off-by: default avatarKrishnanPrash <140860868+KrishnanPrash@users.noreply.github.com>
Co-authored-by: default avatarRyan McCormick <rmccormick@nvidia.com>
parent b77696da
......@@ -34,6 +34,8 @@ use tonic::Status;
/// Dynamo Annotation for the request ID
pub const ANNOTATION_REQUEST_ID: &str = "request_id";
use inference::infer_parameter::ParameterChoice;
// Extend the NvCreateTensorResponse to include options to control
// the conversion to ModelInferResponse / ModelStreamInferResponse
pub struct ExtendedNvCreateTensorResponse {
......@@ -193,6 +195,61 @@ fn get_or_create_request_id(primary: Option<&str>) -> String {
uuid.to_string()
}
/// Convert KServe InferParameter to Dynamo ParameterValue
#[allow(clippy::result_large_err)]
pub fn kserve_param_to_dynamo(
key: &str,
param: &inference::InferParameter,
) -> Result<tensor::ParameterValue, Status> {
param
.parameter_choice
.as_ref()
.ok_or_else(|| Status::invalid_argument(format!("Parameter '{}' has no value", key)))
.map(|choice| match choice {
ParameterChoice::BoolParam(v) => tensor::ParameterValue::Bool(*v),
ParameterChoice::Int64Param(v) => tensor::ParameterValue::Int64(*v),
ParameterChoice::StringParam(v) => tensor::ParameterValue::String(v.clone()),
ParameterChoice::DoubleParam(v) => tensor::ParameterValue::Double(*v),
ParameterChoice::Uint64Param(v) => tensor::ParameterValue::Uint64(*v),
})
}
/// Convert Dynamo ParameterValue to KServe InferParameter
pub fn dynamo_param_to_kserve(param: &tensor::ParameterValue) -> inference::InferParameter {
let parameter_choice = match param {
tensor::ParameterValue::Bool(v) => ParameterChoice::BoolParam(*v),
tensor::ParameterValue::Int64(v) => ParameterChoice::Int64Param(*v),
tensor::ParameterValue::String(v) => ParameterChoice::StringParam(v.clone()),
tensor::ParameterValue::Double(v) => ParameterChoice::DoubleParam(*v),
tensor::ParameterValue::Uint64(v) => ParameterChoice::Uint64Param(*v),
};
inference::InferParameter {
parameter_choice: Some(parameter_choice),
}
}
/// Convert KServe parameter map to Dynamo Parameters
#[allow(clippy::result_large_err)]
fn convert_kserve_to_dynamo_params(
kserve_params: &std::collections::HashMap<String, inference::InferParameter>,
) -> Result<tensor::Parameters, Status> {
kserve_params
.iter()
.map(|(k, v)| kserve_param_to_dynamo(k, v).map(|param_value| (k.clone(), param_value)))
.collect()
}
/// Convert Dynamo Parameters to KServe parameter map
fn convert_dynamo_to_kserve_params(
dynamo_params: &tensor::Parameters,
) -> std::collections::HashMap<String, inference::InferParameter> {
dynamo_params
.iter()
.map(|(k, v)| (k.clone(), dynamo_param_to_kserve(v)))
.collect()
}
impl TryFrom<inference::ModelInferRequest> for NvCreateTensorRequest {
type Error = Status;
......@@ -207,6 +264,9 @@ impl TryFrom<inference::ModelInferRequest> for NvCreateTensorRequest {
));
}
// Extract request-level parameters
let parameters = convert_kserve_to_dynamo_params(&request.parameters)?;
let mut tensor_request = NvCreateTensorRequest {
id: if !request.id.is_empty() {
Some(request.id.clone())
......@@ -215,17 +275,22 @@ impl TryFrom<inference::ModelInferRequest> for NvCreateTensorRequest {
},
model: request.model_name.clone(),
tensors: Vec::new(),
parameters,
nvext: None,
};
// iterate through inputs
for (idx, input) in request.inputs.into_iter().enumerate() {
// Extract per-tensor parameters
let tensor_parameters = convert_kserve_to_dynamo_params(&input.parameters)?;
let mut tensor = Tensor {
metadata: TensorMetadata {
name: input.name.clone(),
data_type: tensor::DataType::from_str(&input.datatype)
.map_err(|err| Status::invalid_argument(err.to_string()))?,
shape: input.shape.clone(),
parameters: tensor_parameters,
},
// Placeholder, will be filled below
data: tensor::FlattenTensor::Bool(Vec::new()),
......@@ -468,21 +533,29 @@ impl TryFrom<ExtendedNvCreateTensorResponse> for inference::ModelInferResponse {
fn try_from(extended_response: ExtendedNvCreateTensorResponse) -> Result<Self, Self::Error> {
let response = extended_response.response;
// Convert response-level parameters
let parameters = convert_dynamo_to_kserve_params(&response.parameters);
let mut infer_response = inference::ModelInferResponse {
model_name: response.model,
model_version: "1".to_string(),
id: response.id.unwrap_or_default(),
outputs: vec![],
parameters: ::std::collections::HashMap::<String, inference::InferParameter>::new(),
parameters,
raw_output_contents: vec![],
};
for tensor in &response.tensors {
// Convert per-tensor parameters
let tensor_parameters = convert_dynamo_to_kserve_params(&tensor.metadata.parameters);
infer_response
.outputs
.push(inference::model_infer_response::InferOutputTensor {
name: tensor.metadata.name.clone(),
datatype: tensor.metadata.data_type.to_string(),
shape: tensor.metadata.shape.clone(),
parameters: tensor_parameters,
..Default::default()
});
if extended_response.set_raw_output_contents {
......
......@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::protocols::tensor;
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ModelRuntimeConfig {
pub total_kv_blocks: Option<u64>,
......
......@@ -6,6 +6,7 @@ use anyhow::Result;
use dynamo_runtime::protocols::annotated::AnnotationsProvider;
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use validator::Validate;
// [gluo TODO] whether it makes sense to have aggregator for tensor..
......@@ -112,14 +113,18 @@ impl FlattenTensor {
}
}
#[derive(Serialize, Deserialize, Validate, Debug, Clone, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Validate, Debug, Clone, PartialEq)]
pub struct TensorMetadata {
pub name: String,
pub data_type: DataType,
pub shape: Vec<i64>,
/// Optional parameters for this tensor
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
pub parameters: Parameters,
}
#[derive(Serialize, Deserialize, Validate, Debug, Clone, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Validate, Debug, Clone, PartialEq)]
pub struct TensorModelConfig {
pub name: String,
pub inputs: Vec<TensorMetadata>,
......@@ -183,6 +188,10 @@ pub struct NvCreateTensorRequest {
/// Input tensors.
pub tensors: Vec<Tensor>,
/// Optional request-level parameters
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
pub parameters: Parameters,
#[serde(skip_serializing_if = "Option::is_none")]
pub nvext: Option<NvExt>,
}
......@@ -199,6 +208,10 @@ pub struct NvCreateTensorResponse {
/// Output tensors.
pub tensors: Vec<Tensor>,
/// Optional response-level parameters
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
pub parameters: Parameters,
}
/// Implements `NvExtProvider` for `NvCreateTensorRequest`,
......@@ -291,3 +304,15 @@ impl NvCreateTensorResponse {
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum ParameterValue {
Bool(bool),
Int64(i64),
String(String),
Double(f64),
Uint64(u64),
}
pub type Parameters = HashMap<String, ParameterValue>;
......@@ -18,6 +18,7 @@ pub mod kserve_test {
use anyhow::Error;
use async_stream::stream;
use dynamo_llm::grpc::service::kserve::KserveService;
use dynamo_llm::grpc::service::kserve::inference as kserve_inference;
use dynamo_llm::protocols::{
Annotated,
openai::{
......@@ -224,6 +225,7 @@ pub mod kserve_test {
id: request.id.clone(),
model: request.model.clone(),
tensors: request.tensors.clone(),
parameters: Default::default(),
});
}
};
......@@ -1163,11 +1165,13 @@ pub mod kserve_test {
name: "input".to_string(),
data_type: tensor::DataType::Int32,
shape: vec![1],
parameters: Default::default(),
}],
outputs: vec![tensor::TensorMetadata {
name: "output".to_string(),
data_type: tensor::DataType::Bool,
shape: vec![-1],
parameters: Default::default(),
}],
}),
..Default::default()
......@@ -1293,11 +1297,13 @@ pub mod kserve_test {
name: "input".to_string(),
data_type: tensor::DataType::Bytes,
shape: vec![1],
parameters: Default::default(),
}],
outputs: vec![tensor::TensorMetadata {
name: "output".to_string(),
data_type: tensor::DataType::Bool,
shape: vec![-1],
parameters: Default::default(),
}],
}),
..Default::default()
......@@ -1541,4 +1547,62 @@ pub mod kserve_test {
}
}
}
#[test]
fn test_parameter_conversion_round_trip() {
use kserve_inference::infer_parameter::ParameterChoice;
// Test all 5 parameter types for round-trip conversion
let test_cases = vec![
("bool_param", ParameterChoice::BoolParam(true)),
("int64_param", ParameterChoice::Int64Param(42)),
(
"string_param",
ParameterChoice::StringParam("test_value".to_string()),
),
("double_param", ParameterChoice::DoubleParam(2.5)),
("uint64_param", ParameterChoice::Uint64Param(9999)),
];
for (name, choice) in test_cases {
let kserve_param = kserve_inference::InferParameter {
parameter_choice: Some(choice.clone()),
};
// Convert KServe -> Dynamo -> KServe
let dynamo_param =
dynamo_llm::grpc::service::tensor::kserve_param_to_dynamo(name, &kserve_param)
.expect("Conversion to Dynamo should succeed");
let back_to_kserve =
dynamo_llm::grpc::service::tensor::dynamo_param_to_kserve(&dynamo_param);
// Verify round-trip preserves the value
assert_eq!(
kserve_param.parameter_choice, back_to_kserve.parameter_choice,
"Parameter '{}' failed round-trip conversion",
name
);
}
}
#[test]
fn test_parameter_conversion_error_cases() {
// Test conversion of parameter with no value
let empty_param = kserve_inference::InferParameter {
parameter_choice: None,
};
let result =
dynamo_llm::grpc::service::tensor::kserve_param_to_dynamo("empty_param", &empty_param);
assert!(
result.is_err(),
"Expected error for parameter with no value"
);
assert!(
result.unwrap_err().message().contains("has no value"),
"Expected error message about missing value"
);
}
}
......@@ -45,8 +45,20 @@ async def echo_tensor_worker(runtime: DistributedRuntime):
async def generate(request, context):
"""Echo tensors and parameters back to the client."""
print(f"Echoing request: {request}")
yield {"model": request["model"], "tensors": request["tensors"]}
params = {}
if "parameters" in request:
params.update(request["parameters"])
params["processed"] = {"bool": True}
yield {
"model": request["model"],
"tensors": request["tensors"],
"parameters": params,
}
if __name__ == "__main__":
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Test gRPC parameter passing with tensor models."""
import logging
import os
import shutil
import numpy as np
import pytest
import tritonclient.grpc as grpcclient
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--kserve-grpc-server"]
log_dir = f"{request.node.name}_frontend"
shutil.rmtree(log_dir, ignore_errors=True)
super().__init__(
command=command,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class EchoTensorWorkerProcess(ManagedProcess):
def __init__(self, request):
command = [
"python3",
os.path.join(os.path.dirname(__file__), "echo_tensor_worker.py"),
]
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083"
log_dir = f"{request.node.name}_worker"
shutil.rmtree(log_dir, ignore_errors=True)
super().__init__(
command=command,
env=env,
health_check_urls=[
(
"http://localhost:8083/health",
lambda r: r.json().get("status") == "ready",
)
],
timeout=300,
display_output=True,
log_dir=log_dir,
)
@pytest.fixture()
def start_services(request, runtime_services):
"""Start frontend and worker with fresh etcd/nats."""
with DynamoFrontendProcess(request):
with EchoTensorWorkerProcess(request):
yield
def extract_params(param_map) -> dict:
"""Extract parameters from gRPC response."""
result = {}
for key, param in param_map.items():
for field in [
"bool_param",
"int64_param",
"double_param",
"string_param",
"uint64_param",
]:
if param.HasField(field):
result[key] = getattr(param, field)
break
return result
@pytest.mark.e2e
@pytest.mark.pre_merge
@pytest.mark.parametrize(
"request_params",
[
None,
{"int_param": 8},
{"str_param": "custom", "bool_param": True},
],
ids=["no_params", "numeric_param", "mixed_params"],
)
def test_request_parameters(start_services, request_params):
"""Test gRPC request-level parameters are echoed through tensor models.
The worker acts as an identity function: echoes input tensors unchanged and
returns all request parameters plus a "processed" flag to verify the complete
parameter flow through the gRPC frontend.
"""
client = grpcclient.InferenceServerClient("localhost:8000")
input_data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32)
inputs = [grpcclient.InferInput("INPUT", input_data.shape, "FP32")]
inputs[0].set_data_from_numpy(input_data)
response = client.infer("echo", inputs=inputs, parameters=request_params)
output_data = response.as_numpy("INPUT")
assert np.array_equal(input_data, output_data)
response_msg = response.get_response()
resp_params = extract_params(response_msg.parameters)
assert resp_params.get("processed") is True
if request_params:
for key, expected_value in request_params.items():
assert key in resp_params, f"Parameter '{key}' not echoed"
actual = resp_params[key]
assert (
actual == expected_value
), f"{key}: expected {expected_value}, got {actual}"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment