Unverified Commit 9e5407f2 authored by zhongdaor-nv's avatar zhongdaor-nv Committed by GitHub
Browse files

feat: python binding for kserve grpc frontend (#3739)


Signed-off-by: default avatarzhongdaor <zhongdaor@nvidia.com>
Signed-off-by: default avatarzhongdaor-nv <zhongdaor@nvidia.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: default avatarRyan McCormick <rmccormick@nvidia.com>
parent 8d0a5eba
# KServe gRPC Example
This directory contains a minimal Dynamo worker that serves a KServe-compatible
gRPC endpoint (`server.py`) and a Python client (`test_client.py`) that exercises
the endpoint using the Triton `tritonclient.grpc` API.
## Prerequisites
- The Dynamo Python bindings installed
- Client dependencies:
- `numpy`
- `tritonclient[grpc]`
You can install the Python dependencies into your active environment with:
```bash
uv pip install numpy tritonclient[grpc]
```
## Running the mock server
1. From the repository root, set `PYTHONPATH` so Python can locate the local
Dynamo package:
```bash
export PYTHONPATH=$(pwd)
```
2. Start the worker:
```bash
python lib/bindings/python/examples/kserve_grpc_service/server.py
```
The server registers a mock completions model named `mock_model` and listens
on `0.0.0.0:8787`. Leave this process running while you test the endpoint.
## Sending a request with the Triton client
With the server running, invoke the example client from a separate terminal:
```bash
python lib/bindings/python/examples/kserve_grpc_service/test_client.py \
--model mock_model \
--prompt "Hello from Dynamo!"
```
You can override the `--host`, `--port`, and `--prompt` options as needed. The script sends an inference request over gRPC using the `InferenceServerClient` and prints the decoded `ModelInferResponse` payload. You should see the prompt `Hello from Dynamo!` successfully received and printed by the server.
## Alternative tooling
For debugging purposes you can still call the endpoint directly with
[`grpcurl`](https://github.com/fullstorydev/grpcurl) by running
`grpcurl.sh` in this directory.
#!/usr/bin/env bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Invoke the mock KServe gRPC endpoint using grpcurl. Requires grpcurl installed.
# The service does not expose server reflection, so we point grpcurl at the proto files directly.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROTO_DIR="${SCRIPT_DIR}/../../../../llm/src/grpc/protos"
HOST="${HOST:-127.0.0.1}"
PORT="${PORT:-8787}"
MODEL="mock_model"
if [[ $# -gt 0 ]]; then
PROMPTS=("$@")
else
PROMPTS=(
"Hello from Dynamo!"
"How are you today?"
"Tell me a joke."
)
fi
encode_base64() {
local text="$1"
python - "$text" <<'PY'
import base64
import sys
print(base64.b64encode(sys.argv[1].encode("utf-8")).decode("ascii"))
PY
}
run_infer() {
local prompt="$1"
local encoded
encoded="$(encode_base64 "$prompt")"
printf -- '---\nSending prompt: %s\n' "$prompt"
grpcurl \
-plaintext \
-import-path "${PROTO_DIR}" \
-proto kserve.proto \
-d "{
\"model_name\": \"${MODEL}\",
\"inputs\": [
{
\"name\": \"text_input\",
\"datatype\": \"BYTES\",
\"shape\": [1],
\"contents\": { \"bytesContents\": [\"${encoded}\"] }
}
]
}" \
"${HOST}:${PORT}" inference.GRPCInferenceService/ModelInfer
}
for prompt in "${PROMPTS[@]}"; do
run_infer "$prompt"
done
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import time
import uuid
import uvloop
from dynamo.llm import KserveGrpcService, PythonAsyncEngine
from dynamo.runtime import DistributedRuntime, dynamo_worker
class MockCompletionEngine:
def __init__(self, model_name: str) -> None:
self.model_name = model_name
def generate(self, request):
created = int(time.time())
request_id = f"cmpl-{uuid.uuid4()}"
print(f"{created} | Received request: {request}")
async def generator():
response = {
"id": request_id,
"object": "text_completion",
"created": created,
"model": self.model_name,
"choices": [
{
"index": 0,
"text": "Mock completion response from Dynamo.",
"logprobs": None,
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 5,
"completion_tokens": 5,
"total_tokens": 10,
},
}
yield response
return generator()
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
model_name = "mock_model"
checksum = "mdcsum"
loop = asyncio.get_running_loop()
python_engine = MockCompletionEngine(model_name)
engine = PythonAsyncEngine(python_engine.generate, loop)
host = "0.0.0.0"
port = 8787
service = KserveGrpcService(port=port, host=host)
service.add_completions_model(model_name, checksum, engine)
print("Starting KServe gRPC service...")
shutdown_signal = service.run(runtime.child_token())
try:
print(
f"Serving endpoint: {host}:{port} inference.GRPCInferenceService/ModelInfer"
)
print(f"Serving completions models: {service.list_completions_models()}")
await shutdown_signal
except KeyboardInterrupt:
pass
except Exception as err: # pragma: no cover - example logging
print(f"Unexpected error occurred: {err}")
finally:
print("Shutting down worker...")
runtime.shutdown()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Simple test client for the mock KServe gRPC server example. The script uses
# the Triton gRPC client to issue a ModelInfer request against the running
# server.
import argparse
import numpy as np
import tritonclient.grpc as triton_grpc
from google.protobuf.json_format import MessageToDict
from tritonclient.utils import InferenceServerException
def main() -> None:
parser = argparse.ArgumentParser(
description="Send a test request to the KServe gRPC mock server."
)
parser.add_argument(
"--host",
default="127.0.0.1",
help="Host serving the gRPC endpoint (default: %(default)s)",
)
parser.add_argument(
"--port",
type=int,
default=8787,
help="Port of the gRPC endpoint (default: %(default)s)",
)
parser.add_argument(
"--model",
default="mock_model",
help="Model name to target (default: %(default)s)",
)
parser.add_argument(
"--prompt",
default="Hello from Dynamo!",
help="Prompt text encoded into the BYTES input tensor.",
)
args = parser.parse_args()
target = f"{args.host}:{args.port}"
client = triton_grpc.InferenceServerClient(url=target)
text_input = triton_grpc.InferInput("text_input", [1], "BYTES")
text_input.set_data_from_numpy(
np.array([args.prompt.encode("utf-8")], dtype=object)
)
try:
response = client.infer(args.model, inputs=[text_input])
except (
InferenceServerException
) as err: # pragma: no cover - informational error path
raise SystemExit(f"Inference request failed: {err}") from err
response_dict = MessageToDict(
response.get_response(),
preserving_proto_field_name=True,
)
print("Received response:")
print(response_dict)
if __name__ == "__main__":
main()
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use pyo3::prelude::*;
use crate::{CancellationToken, engine::*, to_pyerr};
pub use dynamo_llm::grpc::service::kserve;
#[pyclass]
pub struct KserveGrpcService {
inner: kserve::KserveService,
}
#[pymethods]
impl KserveGrpcService {
#[new]
#[pyo3(signature = (port=None, host=None))]
pub fn new(port: Option<u16>, host: Option<String>) -> PyResult<Self> {
let mut builder = kserve::KserveService::builder();
if let Some(port) = port {
builder = builder.port(port);
}
if let Some(host) = host {
builder = builder.host(host);
}
let inner = builder.build().map_err(to_pyerr)?;
Ok(Self { inner })
}
pub fn add_completions_model(
&self,
model: String,
checksum: String,
engine: PythonAsyncEngine,
) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_completions_model(&model, &checksum, engine)
.map_err(to_pyerr)
}
pub fn add_chat_completions_model(
&self,
model: String,
checksum: String,
engine: PythonAsyncEngine,
) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_chat_completions_model(&model, &checksum, engine)
.map_err(to_pyerr)
}
pub fn add_tensor_model(
&self,
model: String,
checksum: String,
engine: PythonAsyncEngine,
) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_tensor_model(&model, &checksum, engine)
.map_err(to_pyerr)
}
pub fn remove_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_completions_model(&model)
.map_err(to_pyerr)
}
pub fn remove_chat_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_chat_completions_model(&model)
.map_err(to_pyerr)
}
pub fn remove_tensor_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_tensor_model(&model)
.map_err(to_pyerr)
}
pub fn list_chat_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_chat_completions_models())
}
pub fn list_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_completions_models())
}
pub fn list_tensor_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_tensor_models())
}
fn run<'p>(&self, py: Python<'p>, token: CancellationToken) -> PyResult<Bound<'p, PyAny>> {
let service = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
service.run(token.inner).await.map_err(to_pyerr)?;
Ok(())
})
}
}
...@@ -54,6 +54,7 @@ impl From<RouterMode> for RsRouterMode { ...@@ -54,6 +54,7 @@ impl From<RouterMode> for RsRouterMode {
mod context; mod context;
mod engine; mod engine;
mod http; mod http;
mod kserve_grpc;
mod llm; mod llm;
mod parsers; mod parsers;
mod planner; mod planner;
...@@ -176,6 +177,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -176,6 +177,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::KvPushRouter>()?; m.add_class::<llm::kv::KvPushRouter>()?;
m.add_class::<llm::kv::KvPushRouterStream>()?; m.add_class::<llm::kv::KvPushRouterStream>()?;
m.add_class::<RouterMode>()?; m.add_class::<RouterMode>()?;
m.add_class::<kserve_grpc::KserveGrpcService>()?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_class::<planner::VirtualConnectorCoordinator>()?; m.add_class::<planner::VirtualConnectorCoordinator>()?;
m.add_class::<planner::VirtualConnectorClient>()?; m.add_class::<planner::VirtualConnectorClient>()?;
......
...@@ -873,6 +873,17 @@ class HttpService: ...@@ -873,6 +873,17 @@ class HttpService:
... ...
class PythonAsyncEngine:
"""
Bridge a Python async generator onto Dynamo's AsyncEngine interface.
"""
def __init__(self, generator: Any, event_loop: Any) -> None:
"""Wrap a Python generator and event loop for use with Dynamo services."""
...
class HttpAsyncEngine: class HttpAsyncEngine:
""" """
An async engine for a distributed Dynamo http service. This is an extension of the An async engine for a distributed Dynamo http service. This is an extension of the
...@@ -882,6 +893,133 @@ class HttpAsyncEngine: ...@@ -882,6 +893,133 @@ class HttpAsyncEngine:
... ...
class KserveGrpcService:
"""
A gRPC service implementing the KServe protocol for dynamo applications.
Provides model management for completions, chat completions, and tensor-based models.
"""
def __init__(self, port: Optional[int] = None, host: Optional[str] = None) -> None:
"""
Create a new KServe gRPC service.
Args:
port: Optional port number to bind the service to
host: Optional host address to bind the service to
"""
...
def add_completions_model(
self,
model: str,
checksum: str,
engine: PythonAsyncEngine,
) -> None:
"""
Register a completions model with the service.
Args:
model: The model name
checksum: The model checksum
engine: The async engine to handle requests
"""
...
def add_chat_completions_model(
self,
model: str,
checksum: str,
engine: PythonAsyncEngine,
) -> None:
"""
Register a chat completions model with the service.
Args:
model: The model name
checksum: The model checksum
engine: The async engine to handle requests
"""
...
def add_tensor_model(
self,
model: str,
checksum: str,
engine: PythonAsyncEngine,
) -> None:
"""
Register a tensor-based model with the service.
Args:
model: The model name
checksum: The model checksum
engine: The async engine to handle requests
"""
...
def remove_completions_model(self, model: str) -> None:
"""
Remove a completions model from the service.
Args:
model: The model name to remove
"""
...
def remove_chat_completions_model(self, model: str) -> None:
"""
Remove a chat completions model from the service.
Args:
model: The model name to remove
"""
...
def remove_tensor_model(self, model: str) -> None:
"""
Remove a tensor model from the service.
Args:
model: The model name to remove
"""
...
def list_chat_completions_models(self) -> List[str]:
"""
List all registered chat completions models.
Returns:
List of model names
"""
...
def list_completions_models(self) -> List[str]:
"""
List all registered completions models.
Returns:
List of model names
"""
...
def list_tensor_models(self) -> List[str]:
"""
List all registered tensor models.
Returns:
List of model names
"""
...
async def run(self, token: CancellationToken) -> None:
"""
Run the KServe gRPC service.
Args:
token: Cancellation token to stop the service
"""
...
class ModelInput: class ModelInput:
"""What type of request this model needs: Text, Tokens or Tensor""" """What type of request this model needs: Text, Tokens or Tensor"""
... ...
...@@ -1438,7 +1576,9 @@ __all__ = [ ...@@ -1438,7 +1576,9 @@ __all__ = [
"Client", "Client",
"Component", "Component",
"Context", "Context",
"KserveGrpcService",
"ModelDeploymentCard", "ModelDeploymentCard",
"OAIChatPreprocessor", "OAIChatPreprocessor",
"PythonAsyncEngine",
"prometheus_names", "prometheus_names",
] ]
...@@ -19,6 +19,7 @@ from dynamo._core import EntrypointArgs as EntrypointArgs ...@@ -19,6 +19,7 @@ from dynamo._core import EntrypointArgs as EntrypointArgs
from dynamo._core import ForwardPassMetrics as ForwardPassMetrics from dynamo._core import ForwardPassMetrics as ForwardPassMetrics
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpService as HttpService from dynamo._core import HttpService as HttpService
from dynamo._core import KserveGrpcService as KserveGrpcService
from dynamo._core import KvEventPublisher as KvEventPublisher from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvIndexer as KvIndexer from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvPushRouter as KvPushRouter from dynamo._core import KvPushRouter as KvPushRouter
...@@ -29,6 +30,7 @@ from dynamo._core import ModelInput as ModelInput ...@@ -29,6 +30,7 @@ from dynamo._core import ModelInput as ModelInput
from dynamo._core import ModelRuntimeConfig as ModelRuntimeConfig from dynamo._core import ModelRuntimeConfig as ModelRuntimeConfig
from dynamo._core import ModelType as ModelType from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import PythonAsyncEngine as PythonAsyncEngine
from dynamo._core import RadixTree as RadixTree from dynamo._core import RadixTree as RadixTree
from dynamo._core import RouterConfig as RouterConfig from dynamo._core import RouterConfig as RouterConfig
from dynamo._core import RouterMode as RouterMode from dynamo._core import RouterMode as RouterMode
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment