"git@developer.sourcefind.cn:OpenDAS/openpcdet.git" did not exist on "3cf5763fdd69d3e40884103c1a86ec05215ea9b8"
Commit dd620825 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: add openai http service (#82)

parent fc4da345
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# list models
echo "\n\n### Listing models"
curl http://localhost:8000/v1/models
# create completion
echo "\n\n### Creating completions"
curl -X POST http://localhost:8000/v1/chat/completions \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"model": "mock_model",
"messages": [
{
"role":"user",
"content":"Hello! How are you?"
}
],
"max_tokens": 64,
"stream": true,
"temperature": 0.7,
"top_p": 0.9,
"frequency_penalty": 0.1,
"presence_penalty": 0.2,
"top_k": 5
}'
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import time
import uuid
import uvloop
from dynamo.llm import HttpAsyncEngine, HttpService
from dynamo.runtime import DistributedRuntime, dynamo_worker
class MockEngine:
def __init__(self, model_name):
self.model_name = model_name
def generate(self, request):
id = f"chat-{uuid.uuid4()}"
created = int(time.time())
model = self.model_name
print(f"{created} | Received request: {request}")
async def generator():
num_chunks = 5
for i in range(num_chunks):
mock_content = f"chunk{i}"
finish_reason = "stop" if (i == num_chunks - 1) else None
chunk = {
"id": id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": i,
"delta": {"role": None, "content": mock_content},
"logprobs": None,
"finish_reason": finish_reason,
}
],
}
yield chunk
return generator()
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
model: str = "mock_model"
served_model_name: str = "mock_model"
loop = asyncio.get_running_loop()
python_engine = MockEngine(model)
engine = HttpAsyncEngine(python_engine.generate, loop)
host: str = "localhost"
port: int = 8000
service: HttpService = HttpService(port=port)
service.add_chat_completions_model(served_model_name, engine)
print("Starting service...")
shutdown_signal = service.run(runtime.child_token())
try:
print(f"Serving endpoint: {host}:{port}/v1/models")
print(f"Serving endpoint: {host}:{port}/v1/chat/completions")
print(f"Serving the following models: {service.list_chat_completions_models()}")
# Block until shutdown signal received
await shutdown_signal
except KeyboardInterrupt:
# TODO: Handle KeyboardInterrupt gracefully in triton_worker
# TODO: Caught by DistributedRuntime or HttpService, so it's not caught here
pass
except Exception as e:
print(f"Unexpected error occurred: {e}")
finally:
print("Shutting down worker...")
runtime.shutdown()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use pyo3::{exceptions::PyException, prelude::*};
use crate::{engine::*, to_pyerr, CancellationToken};
pub use dynamo_llm::http::service::{error as http_error, service_v2};
pub use dynamo_runtime::{
error,
pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn},
protocols::annotated::Annotated,
Error, Result,
};
#[pyclass]
pub struct HttpService {
inner: service_v2::HttpService,
}
#[pymethods]
impl HttpService {
#[new]
#[pyo3(signature = (port=None))]
pub fn new(port: Option<u16>) -> PyResult<Self> {
let builder = service_v2::HttpService::builder().port(port.unwrap_or(8080));
let inner = builder.build().map_err(to_pyerr)?;
Ok(Self { inner })
}
pub fn add_completions_model(&self, model: String, engine: HttpAsyncEngine) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_completions_model(&model, engine)
.map_err(to_pyerr)
}
pub fn add_chat_completions_model(
&self,
model: String,
engine: HttpAsyncEngine,
) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_chat_completions_model(&model, engine)
.map_err(to_pyerr)
}
pub fn remove_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_completions_model(&model)
.map_err(to_pyerr)
}
pub fn remove_chat_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_chat_completions_model(&model)
.map_err(to_pyerr)
}
pub fn list_chat_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_chat_completions_models())
}
pub fn list_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_completions_models())
}
fn run<'p>(&self, py: Python<'p>, token: CancellationToken) -> PyResult<Bound<'p, PyAny>> {
let service = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
service.run(token.inner).await.map_err(to_pyerr)?;
Ok(())
})
}
}
/// Python Exception for HTTP errors
#[pyclass(extends=PyException)]
pub struct HttpError {
code: u16,
message: String,
}
#[pymethods]
impl HttpError {
#[new]
pub fn new(code: u16, message: String) -> Self {
HttpError { code, message }
}
#[getter]
fn code(&self) -> u16 {
self.code
}
#[getter]
fn message(&self) -> &str {
&self.message
}
}
#[pyclass]
#[derive(Clone)]
pub struct HttpAsyncEngine(pub PythonAsyncEngine);
impl From<PythonAsyncEngine> for HttpAsyncEngine {
fn from(engine: PythonAsyncEngine) -> Self {
Self(engine)
}
}
#[pymethods]
impl HttpAsyncEngine {
/// Create a new instance of the HttpAsyncEngine
/// This is a simple extension of the PythonAsyncEngine that handles HttpError
/// exceptions from Python and converts them to the Rust version of HttpError
///
/// # Arguments
/// - `generator`: a Python async generator that will be used to generate responses
/// - `event_loop`: the Python event loop that will be used to run the generator
///
/// Note: In Rust land, the request and the response are both concrete; however, in
/// Python land, the request and response are not strongly typed, meaning the generator
/// could accept a different type of request or return a different type of response
/// and we would not know until runtime.
#[new]
pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> {
Ok(PythonAsyncEngine::new(generator, event_loop)?.into())
}
}
#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for HttpAsyncEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
match self.0.generate(request).await {
Ok(res) => Ok(res),
// Inspect the error - if it was an HttpError from Python, extract the code and message
// and return the rust version of HttpError
Err(e) => {
if let Some(py_err) = e.downcast_ref::<PyErr>() {
Python::with_gil(|py| {
if let Ok(http_error_instance) = py_err
.clone_ref(py)
.into_value(py)
.extract::<PyRef<HttpError>>(py)
{
Err(http_error::HttpError {
code: http_error_instance.code,
message: http_error_instance.message.clone(),
})?
} else {
Err(error!("Python Error: {}", py_err.to_string()))
}
})
} else {
Err(e)
}
}
}
}
}
...@@ -35,6 +35,7 @@ use dynamo_runtime::{ ...@@ -35,6 +35,7 @@ use dynamo_runtime::{
use dynamo_llm::{self as llm_rs}; use dynamo_llm::{self as llm_rs};
mod engine; mod engine;
mod http;
mod llm; mod llm;
type JsonServerStreamingIngress = type JsonServerStreamingIngress =
...@@ -76,6 +77,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -76,6 +77,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::EndpointKvMetrics>()?; m.add_class::<llm::kv::EndpointKvMetrics>()?;
m.add_class::<llm::kv::AggregatedMetrics>()?; m.add_class::<llm::kv::AggregatedMetrics>()?;
m.add_class::<llm::kv::KvMetricsAggregator>()?; m.add_class::<llm::kv::KvMetricsAggregator>()?;
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?;
engine::add_to_module(m)?; engine::add_to_module(m)?;
......
...@@ -333,3 +333,24 @@ class KvMetricsAggregator: ...@@ -333,3 +333,24 @@ class KvMetricsAggregator:
Return the aggregated metrics of the endpoints. Return the aggregated metrics of the endpoints.
""" """
... ...
class HttpService:
"""
A HTTP service for dynamo applications.
It is a OpenAI compatible http ingress into the Dynamo Distributed Runtime.
"""
...
class HttpError:
"""
An error that occurred in the HTTP service
"""
...
class HttpAsyncEngine:
"""
An async engine for a distributed Dynamo http service. This is an extension of the
python based AsyncEngine that handles HttpError exceptions from Python and
converts them to the Rust version of HttpError
"""
...
\ No newline at end of file
...@@ -14,6 +14,9 @@ ...@@ -14,6 +14,9 @@
# limitations under the License. # limitations under the License.
from dynamo._core import DisaggregatedRouter as DisaggregatedRouter from dynamo._core import DisaggregatedRouter as DisaggregatedRouter
from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpError as HttpError
from dynamo._core import HttpService as HttpService
from dynamo._core import KvIndexer as KvIndexer from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment