Unverified Commit d68e4b8a authored by GuanLuo's avatar GuanLuo Committed by GitHub
Browse files

fix: enhance gRPC frontend to return output in raw content field for Triton...


fix: enhance gRPC frontend to return output in raw content field for Triton client compatibility (#3600)
Signed-off-by: default avatarGuan Luo <gluo@nvidia.com>
Signed-off-by: default avatarGuan Luo <41310872+GuanLuo@users.noreply.github.com>
parent 03bd92de
...@@ -28,6 +28,8 @@ pytest-forked ...@@ -28,6 +28,8 @@ pytest-forked
pytest-md-report pytest-md-report
pytest-mypy pytest-mypy
pytest-timeout pytest-timeout
# Triton client to Dynamo gRPC server
tritonclient[grpc]
# add types library stub for PyYAML # add types library stub for PyYAML
types-PyYAML types-PyYAML
types-requests types-requests
...@@ -22,7 +22,7 @@ use tokio_stream::{Stream, StreamExt}; ...@@ -22,7 +22,7 @@ use tokio_stream::{Stream, StreamExt};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::grpc::service::openai::completion_response_stream; use crate::grpc::service::openai::completion_response_stream;
use crate::grpc::service::tensor::tensor_response_stream; use crate::grpc::service::tensor::{ExtendedNvCreateTensorResponse, tensor_response_stream};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use tonic::{Request, Response, Status, transport::Server}; use tonic::{Request, Response, Status, transport::Server};
...@@ -195,18 +195,21 @@ impl GrpcInferenceService for KserveService { ...@@ -195,18 +195,21 @@ impl GrpcInferenceService for KserveService {
// [gluo TODO] refactor to reuse code, inference logic is largely the same // [gluo TODO] refactor to reuse code, inference logic is largely the same
if self.state().is_tensor_model(&model) { if self.state().is_tensor_model(&model) {
// Fallback handling by assuming the model is OpenAI Completions model let set_raw_output_contents = !request.raw_input_contents.is_empty();
let tensor_request: NvCreateTensorRequest = NvCreateTensorRequest::try_from(request) let tensor_request: NvCreateTensorRequest = NvCreateTensorRequest::try_from(request)
.map_err(|e| Status::invalid_argument(format!("Failed to parse request: {}", e)))?; .map_err(|e| Status::invalid_argument(format!("Failed to parse request: {}", e)))?;
let stream = tensor_response_stream(self.state_clone(), tensor_request, false).await?; let stream = tensor_response_stream(self.state_clone(), tensor_request, false).await?;
let tensor_response = NvCreateTensorResponse::from_annotated_stream(stream) let tensor_response = ExtendedNvCreateTensorResponse {
.await response: NvCreateTensorResponse::from_annotated_stream(stream)
.map_err(|e| { .await
tracing::error!("Failed to fold completions stream: {:?}", e); .map_err(|e| {
Status::internal(format!("Failed to fold completions stream: {}", e)) tracing::error!("Failed to fold completions stream: {:?}", e);
})?; Status::internal(format!("Failed to fold completions stream: {}", e))
})?,
set_raw_output_contents,
};
let mut reply: ModelInferResponse = tensor_response.try_into().map_err(|e| { let mut reply: ModelInferResponse = tensor_response.try_into().map_err(|e| {
Status::invalid_argument(format!("Failed to parse response: {}", e)) Status::invalid_argument(format!("Failed to parse response: {}", e))
...@@ -216,6 +219,8 @@ impl GrpcInferenceService for KserveService { ...@@ -216,6 +219,8 @@ impl GrpcInferenceService for KserveService {
return Ok(Response::new(reply)); return Ok(Response::new(reply));
} }
// [gluo FIXME] check model existence first, otherwise the true error
// is masked by "Failed to parse request" below.
// Fallback handling by assuming the model is OpenAI Completions model // Fallback handling by assuming the model is OpenAI Completions model
let mut completion_request: NvCreateCompletionRequest = request let mut completion_request: NvCreateCompletionRequest = request
.try_into() .try_into()
...@@ -298,6 +303,7 @@ impl GrpcInferenceService for KserveService { ...@@ -298,6 +303,7 @@ impl GrpcInferenceService for KserveService {
if state.is_tensor_model(&model) { if state.is_tensor_model(&model) {
// Must keep track of 'request_id' which will be returned in corresponding response // Must keep track of 'request_id' which will be returned in corresponding response
let request_id = request.id.clone(); let request_id = request.id.clone();
let set_raw_output_contents = !request.raw_input_contents.is_empty();
let tensor_request: NvCreateTensorRequest = request.try_into().map_err(|e| { let tensor_request: NvCreateTensorRequest = request.try_into().map_err(|e| {
Status::invalid_argument(format!("Failed to parse request: {}", e)) Status::invalid_argument(format!("Failed to parse request: {}", e))
})?; })?;
...@@ -308,6 +314,9 @@ impl GrpcInferenceService for KserveService { ...@@ -308,6 +314,9 @@ impl GrpcInferenceService for KserveService {
while let Some(response) = stream.next().await { while let Some(response) = stream.next().await {
match response.data { match response.data {
Some(data) => { Some(data) => {
let data = ExtendedNvCreateTensorResponse {response: data,
set_raw_output_contents,
};
let mut reply = ModelStreamInferResponse::try_from(data).map_err(|e| { let mut reply = ModelStreamInferResponse::try_from(data).map_err(|e| {
Status::invalid_argument(format!("Failed to parse response: {}", e)) Status::invalid_argument(format!("Failed to parse response: {}", e))
})?; })?;
......
...@@ -34,6 +34,13 @@ use tonic::Status; ...@@ -34,6 +34,13 @@ use tonic::Status;
/// Dynamo Annotation for the request ID /// Dynamo Annotation for the request ID
pub const ANNOTATION_REQUEST_ID: &str = "request_id"; pub const ANNOTATION_REQUEST_ID: &str = "request_id";
// Extend the NvCreateTensorResponse to include options to control
// the conversion to ModelInferResponse / ModelStreamInferResponse
pub struct ExtendedNvCreateTensorResponse {
pub response: NvCreateTensorResponse,
pub set_raw_output_contents: bool,
}
/// Tensor Request Handler /// Tensor Request Handler
/// ///
/// This method will handle the incoming request for model type tensor. The endpoint is a "source" /// This method will handle the incoming request for model type tensor. The endpoint is a "source"
...@@ -456,10 +463,11 @@ impl tensor::Tensor { ...@@ -456,10 +463,11 @@ impl tensor::Tensor {
} }
} }
impl TryFrom<NvCreateTensorResponse> for inference::ModelInferResponse { impl TryFrom<ExtendedNvCreateTensorResponse> for inference::ModelInferResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(response: NvCreateTensorResponse) -> Result<Self, Self::Error> { fn try_from(extended_response: ExtendedNvCreateTensorResponse) -> Result<Self, Self::Error> {
let response = extended_response.response;
let mut infer_response = inference::ModelInferResponse { let mut infer_response = inference::ModelInferResponse {
model_name: response.model, model_name: response.model,
model_version: "1".to_string(), model_version: "1".to_string(),
...@@ -475,97 +483,146 @@ impl TryFrom<NvCreateTensorResponse> for inference::ModelInferResponse { ...@@ -475,97 +483,146 @@ impl TryFrom<NvCreateTensorResponse> for inference::ModelInferResponse {
name: tensor.metadata.name.clone(), name: tensor.metadata.name.clone(),
datatype: tensor.metadata.data_type.to_string(), datatype: tensor.metadata.data_type.to_string(),
shape: tensor.metadata.shape.clone(), shape: tensor.metadata.shape.clone(),
contents: match &tensor.data { ..Default::default()
tensor::FlattenTensor::Bool(data) => Some(inference::InferTensorContents { });
bool_contents: data.clone(), if extended_response.set_raw_output_contents {
..Default::default() infer_response.add_raw_output_contents(tensor)?;
}), } else {
tensor::FlattenTensor::Uint8(data) => { infer_response.fill_last_tensor_contents(tensor);
Some(inference::InferTensorContents { }
uint_contents: data.iter().map(|&x| x as u32).collect(), }
..Default::default()
})
}
tensor::FlattenTensor::Uint16(data) => { Ok(infer_response)
Some(inference::InferTensorContents { }
uint_contents: data.iter().map(|&x| x as u32).collect(), }
..Default::default()
})
}
tensor::FlattenTensor::Uint32(data) => { impl inference::ModelInferResponse {
Some(inference::InferTensorContents { /// Serializes the tensor data into a standardized little-endian byte format
uint_contents: data.clone(), /// and appends it to the raw_output_contents field.
..Default::default() ///
}) /// This ensures consistent cross-platform representation of numerical values
} /// regardless of the host machine's native endianness. Each tensor element is
/// flattened and converted to its corresponding little-endian byte sequence,
/// matching the protocol format expected by Triton Inference Server and
/// similar inference runtimes.
pub fn add_raw_output_contents(
&mut self,
tensor: &tensor::Tensor,
) -> Result<(), anyhow::Error> {
let raw_content = match &tensor.data {
tensor::FlattenTensor::Bool(data) => {
data.iter().map(|&b| if b { 1u8 } else { 0u8 }).collect()
}
tensor::FlattenTensor::Uint8(data) => data.clone(),
tensor::FlattenTensor::Uint16(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Uint32(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Uint64(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Int8(data) => data.iter().map(|&x| x as u8).collect(),
tensor::FlattenTensor::Int16(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Int32(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Int64(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Float32(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Float64(data) => {
data.iter().flat_map(|&x| x.to_le_bytes()).collect()
}
tensor::FlattenTensor::Bytes(data) => {
let mut bytes = Vec::new();
for item in data {
let len = item.len() as u32;
bytes.extend_from_slice(&len.to_le_bytes());
bytes.extend_from_slice(item);
}
bytes
}
};
self.raw_output_contents.push(raw_content);
Ok(())
}
tensor::FlattenTensor::Uint64(data) => { pub fn fill_last_tensor_contents(&mut self, tensor: &tensor::Tensor) {
Some(inference::InferTensorContents { if self.outputs.is_empty() {
uint64_contents: data.clone(), return;
..Default::default() }
}) self.outputs.last_mut().unwrap().contents = match &tensor.data {
} tensor::FlattenTensor::Bool(data) => Some(inference::InferTensorContents {
bool_contents: data.clone(),
..Default::default()
}),
tensor::FlattenTensor::Uint8(data) => Some(inference::InferTensorContents {
uint_contents: data.iter().map(|&x| x as u32).collect(),
..Default::default()
}),
tensor::FlattenTensor::Int8(data) => Some(inference::InferTensorContents { tensor::FlattenTensor::Uint16(data) => Some(inference::InferTensorContents {
int_contents: data.iter().map(|&x| x as i32).collect(), uint_contents: data.iter().map(|&x| x as u32).collect(),
..Default::default() ..Default::default()
}), }),
tensor::FlattenTensor::Int16(data) => {
Some(inference::InferTensorContents {
int_contents: data.iter().map(|&x| x as i32).collect(),
..Default::default()
})
}
tensor::FlattenTensor::Int32(data) => { tensor::FlattenTensor::Uint32(data) => Some(inference::InferTensorContents {
Some(inference::InferTensorContents { uint_contents: data.clone(),
int_contents: data.clone(), ..Default::default()
..Default::default() }),
})
}
tensor::FlattenTensor::Int64(data) => { tensor::FlattenTensor::Uint64(data) => Some(inference::InferTensorContents {
Some(inference::InferTensorContents { uint64_contents: data.clone(),
int64_contents: data.clone(), ..Default::default()
..Default::default() }),
})
}
tensor::FlattenTensor::Float32(data) => { tensor::FlattenTensor::Int8(data) => Some(inference::InferTensorContents {
Some(inference::InferTensorContents { int_contents: data.iter().map(|&x| x as i32).collect(),
fp32_contents: data.clone(), ..Default::default()
..Default::default() }),
}) tensor::FlattenTensor::Int16(data) => Some(inference::InferTensorContents {
} int_contents: data.iter().map(|&x| x as i32).collect(),
..Default::default()
}),
tensor::FlattenTensor::Float64(data) => { tensor::FlattenTensor::Int32(data) => Some(inference::InferTensorContents {
Some(inference::InferTensorContents { int_contents: data.clone(),
fp64_contents: data.clone(), ..Default::default()
..Default::default() }),
})
}
tensor::FlattenTensor::Bytes(data) => { tensor::FlattenTensor::Int64(data) => Some(inference::InferTensorContents {
Some(inference::InferTensorContents { int64_contents: data.clone(),
bytes_contents: data.clone(), ..Default::default()
..Default::default() }),
})
}
},
..Default::default()
});
}
Ok(infer_response) tensor::FlattenTensor::Float32(data) => Some(inference::InferTensorContents {
fp32_contents: data.clone(),
..Default::default()
}),
tensor::FlattenTensor::Float64(data) => Some(inference::InferTensorContents {
fp64_contents: data.clone(),
..Default::default()
}),
tensor::FlattenTensor::Bytes(data) => Some(inference::InferTensorContents {
bytes_contents: data.clone(),
..Default::default()
}),
};
} }
} }
impl TryFrom<NvCreateTensorResponse> for inference::ModelStreamInferResponse { impl TryFrom<ExtendedNvCreateTensorResponse> for inference::ModelStreamInferResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(response: NvCreateTensorResponse) -> Result<Self, Self::Error> { fn try_from(response: ExtendedNvCreateTensorResponse) -> Result<Self, Self::Error> {
match inference::ModelInferResponse::try_from(response) { match inference::ModelInferResponse::try_from(response) {
Ok(response) => Ok(inference::ModelStreamInferResponse { Ok(response) => Ok(inference::ModelStreamInferResponse {
infer_response: Some(response), infer_response: Some(response),
......
...@@ -264,6 +264,22 @@ pub mod kserve_test { ...@@ -264,6 +264,22 @@ pub mod kserve_test {
} }
} }
#[fixture]
fn int_input(
#[default(vec![42,43,44])] input: Vec<u32>,
) -> inference::model_infer_request::InferInputTensor {
inference::model_infer_request::InferInputTensor {
name: "int_input".into(),
datatype: "UINT32".into(),
shape: vec![1],
contents: Some(inference::InferTensorContents {
uint_contents: input,
..Default::default()
}),
..Default::default()
}
}
#[fixture] #[fixture]
fn service_with_engines( fn service_with_engines(
#[default(8990)] port: u16, #[default(8990)] port: u16,
...@@ -342,6 +358,7 @@ pub mod kserve_test { ...@@ -342,6 +358,7 @@ pub mod kserve_test {
StreamInferCancellation = 8993, StreamInferCancellation = 8993,
ModelInfo = 8994, ModelInfo = 8994,
TensorModel = 8995, TensorModel = 8995,
TensorModelTypes = 8996,
} }
#[rstest] #[rstest]
...@@ -1119,6 +1136,72 @@ pub mod kserve_test { ...@@ -1119,6 +1136,72 @@ pub mod kserve_test {
} }
} }
#[rstest]
#[tokio::test]
async fn test_tensor_infer_dtypes(
#[with(TestPort::TensorModelTypes as u16)] service_with_engines: (
KserveService,
Arc<SplitEngine>,
Arc<AlwaysFailEngine>,
Arc<LongRunningEngine>,
),
int_input: inference::model_infer_request::InferInputTensor,
) {
// start server
let _running = RunningService::spawn(service_with_engines.0.clone());
let mut client = get_ready_client(TestPort::TensorModelTypes as u16, 5).await;
// Register a tensor model
let mut card = ModelDeploymentCard::with_name_only("tensor");
card.model_type = ModelType::TensorBased;
card.model_input = ModelInput::Tensor;
card.runtime_config = ModelRuntimeConfig {
tensor_model_config: Some(tensor::TensorModelConfig {
name: "tensor".to_string(),
inputs: vec![tensor::TensorMetadata {
name: "input".to_string(),
data_type: tensor::DataType::Int32,
shape: vec![1],
}],
outputs: vec![tensor::TensorMetadata {
name: "output".to_string(),
data_type: tensor::DataType::Bool,
shape: vec![-1],
}],
}),
..Default::default()
};
let tensor = Arc::new(TensorEngine {});
service_with_engines
.0
.model_manager()
.add_tensor_model("tensor", card.mdcsum(), tensor.clone())
.unwrap();
let _ = service_with_engines
.0
.model_manager()
.save_model_card("key", card);
let model_name = "tensor";
let inputs = vec![int_input.clone()];
let request = tonic::Request::new(ModelInferRequest {
model_name: model_name.into(),
model_version: "1".into(),
id: "1234".into(),
inputs: inputs.clone(),
..Default::default()
});
let response = client.model_infer(request).await.unwrap();
validate_tensor_response(
response,
model_name,
inputs,
std::collections::HashMap::new(),
);
}
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_tensor_infer( async fn test_tensor_infer(
...@@ -1273,7 +1356,12 @@ pub mod kserve_test { ...@@ -1273,7 +1356,12 @@ pub mod kserve_test {
}); });
let response = client.model_infer(request).await.unwrap(); let response = client.model_infer(request).await.unwrap();
validate_tensor_response(response, model_name, inputs); validate_tensor_response(
response,
model_name,
inputs,
std::collections::HashMap::new(),
);
// streaming response in model_infer(), expect failure // streaming response in model_infer(), expect failure
let repeat = inference::model_infer_request::InferInputTensor { let repeat = inference::model_infer_request::InferInputTensor {
...@@ -1359,11 +1447,20 @@ pub mod kserve_test { ...@@ -1359,11 +1447,20 @@ pub mod kserve_test {
"Expected successful inference" "Expected successful inference"
); );
let text_input_str = "dummy input";
let input_len = text_input_str.len() as u32;
let mut serialized_text_input = input_len.to_le_bytes().to_vec();
serialized_text_input.extend_from_slice(text_input_str.as_bytes());
let serialized_repeat = 2i32.to_le_bytes().to_vec();
if let Some(response) = &response.infer_response { if let Some(response) = &response.infer_response {
validate_tensor_response( validate_tensor_response(
Response::new(response.clone()), Response::new(response.clone()),
model_name, model_name,
inputs.clone(), inputs.clone(),
std::collections::HashMap::from([
("text_input".into(), serialized_text_input.clone()),
("repeat".into(), serialized_repeat.clone()),
]),
); );
} }
response_idx += 1; response_idx += 1;
...@@ -1376,6 +1473,7 @@ pub mod kserve_test { ...@@ -1376,6 +1473,7 @@ pub mod kserve_test {
response: Response<ModelInferResponse>, response: Response<ModelInferResponse>,
model_name: &str, model_name: &str,
inputs: Vec<inference::model_infer_request::InferInputTensor>, inputs: Vec<inference::model_infer_request::InferInputTensor>,
expected_raw_outputs: std::collections::HashMap<String, Vec<u8>>,
) { ) {
assert_eq!( assert_eq!(
response.get_ref().model_name, response.get_ref().model_name,
...@@ -1397,7 +1495,12 @@ pub mod kserve_test { ...@@ -1397,7 +1495,12 @@ pub mod kserve_test {
inputs.len(), inputs.len(),
"Expected the same number of outputs as inputs", "Expected the same number of outputs as inputs",
); );
for output in &response.get_ref().outputs { assert_eq!(
response.get_ref().raw_output_contents.len(),
expected_raw_outputs.len(),
"Expected the same number of raw_output_contents as expected_raw_outputs",
);
for (idx, output) in response.get_ref().outputs.iter().enumerate() {
let mut found = false; let mut found = false;
for input in &inputs { for input in &inputs {
if input.name != output.name { if input.name != output.name {
...@@ -1418,6 +1521,18 @@ pub mod kserve_test { ...@@ -1418,6 +1521,18 @@ pub mod kserve_test {
"Expected output shape to be '{:?}', got '{:?}'", "Expected output shape to be '{:?}', got '{:?}'",
input.shape, output.shape input.shape, output.shape
); );
if expected_raw_outputs.contains_key(&output.name) {
assert_eq!(
&response.get_ref().raw_output_contents[idx],
expected_raw_outputs.get(&output.name).unwrap(),
"Expected output contents to match raw_input_contents",
);
} else {
assert_eq!(
output.contents, input.contents,
"Expected output contents to match input contents",
);
}
found = true; found = true;
break; break;
} }
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Usage: `TEST_END_TO_END=1 python test_tensor.py` to run this worker as tensor based echo worker.
import uvloop
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
@dynamo_worker(static=False)
async def echo_tensor_worker(runtime: DistributedRuntime):
component = runtime.namespace("tensor").component("echo")
await component.create_service()
endpoint = component.endpoint("generate")
model_config = {
"name": "echo",
"inputs": [
{"name": "dummy_input", "data_type": "Bytes", "shape": [-1]},
],
"outputs": [{"name": "dummy_output", "data_type": "Bytes", "shape": [-1]}],
}
runtime_config = ModelRuntimeConfig()
runtime_config.set_tensor_model_config(model_config)
assert model_config == runtime_config.get_tensor_model_config()
# [gluo FIXME] register_llm will attempt to load a LLM model,
# which is not well-defined for Tensor yet. Currently provide
# a valid model name to pass the registration.
await register_llm(
ModelInput.Tensor,
ModelType.TensorBased,
endpoint,
"Qwen/Qwen3-0.6B",
"echo",
runtime_config=runtime_config,
)
await endpoint.serve_endpoint(generate)
async def generate(request, context):
print(f"Echoing request: {request}")
yield {"model": request["model"], "tensors": request["tensors"]}
if __name__ == "__main__":
uvloop.run(echo_tensor_worker())
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import shutil
import pytest
import triton_echo_client
from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
TEST_MODEL = QWEN
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--kserve-grpc-server"]
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class MockWorkerProcess(ManagedProcess):
def __init__(self, request, worker_id: str = "mocker-worker"):
self.worker_id = worker_id
command = [
"python3",
os.path.join(os.path.dirname(__file__), "echo_tensor_worker.py"),
]
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083"
log_dir = f"{request.node.name}_{worker_id}"
try:
shutil.rmtree(log_dir)
except FileNotFoundError:
pass
super().__init__(
command=command,
env=env,
health_check_urls=[
# gRPC doesn't expose endpoint for listing models, so skip this check
# ("http://localhost:8000/v1/models", check_models_api),
("http://localhost:8083/health", self.is_ready),
],
timeout=300,
display_output=True,
terminate_existing=False,
stragglers=[],
straggler_commands=["echo_tensor_worker.py"],
log_dir=log_dir,
)
def is_ready(self, response) -> bool:
try:
status = (response.json() or {}).get("status")
except ValueError:
logger.warning("%s health response is not valid JSON", self.worker_id)
return False
is_ready = status == "ready"
if is_ready:
logger.info("%s status is ready", self.worker_id)
else:
logger.warning("%s status is not ready: %s", self.worker_id, status)
return is_ready
@pytest.fixture(scope="module")
def runtime_services(request):
"""Module-scoped runtime services for this test file."""
with NatsServer(request) as nats_process:
with EtcdServer(request) as etcd_process:
yield nats_process, etcd_process
@pytest.fixture(scope="module")
def start_services(request, runtime_services):
"""Start frontend and worker processes once for this module's tests."""
with DynamoFrontendProcess(request):
logger.info("Frontend started for tests")
with MockWorkerProcess(request):
logger.info("Worker started for tests")
yield
@pytest.mark.usefixtures("start_services")
@pytest.mark.pre_merge
@pytest.mark.model(TEST_MODEL)
def test_echo() -> None:
triton_echo_client.run_infer()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import sys
import numpy as np
import tritonclient.grpc as grpcclient
def run_infer():
server_url = "localhost:8000"
try:
triton_client = grpcclient.InferenceServerClient(url=server_url)
except Exception as e:
print("channel creation failed: " + str(e))
sys.exit()
model_name = "echo"
# Infer
inputs = []
inputs.append(grpcclient.InferInput("INPUT0", [16], "INT32"))
inputs.append(grpcclient.InferInput("INPUT1", [16], "BYTES"))
# Create the data for the two input tensors. Initialize the first
# to unique integers and the second to all ones.
input0_data = np.arange(start=0, stop=16, dtype=np.int32).reshape([16])
input1_data = np.array(
[str(x).encode("utf-8") for x in input0_data.reshape(input0_data.size)],
dtype=np.object_,
).reshape([16])
# Initialize the data
inputs[0].set_data_from_numpy(input0_data)
inputs[1].set_data_from_numpy(input1_data)
# Test with outputs
results = triton_client.infer(model_name=model_name, inputs=inputs)
# Get the output arrays from the results
output0_data = results.as_numpy("INPUT0")
output1_data = results.as_numpy("INPUT1")
assert np.array_equal(input0_data, output0_data)
assert np.array_equal(input1_data, output1_data)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment