Unverified Commit b495cd83 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

feat: Adding completions endpoint support to `dynamo run in=http` (#777)


Signed-off-by: default avatarOlga Andreeva <124622579+oandreeva-nv@users.noreply.github.com>
parent 1ff119c7
......@@ -13,10 +13,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use crate::{flags::RouterMode, EngineConfig, Flags};
use dynamo_llm::{
backend::Backend,
backend::ExecutionContext,
engines::StreamingEngineAdapter,
model_card::model::ModelDeploymentCard,
preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendInput, BackendOutput},
types::{
openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
......@@ -26,12 +32,13 @@ use dynamo_llm::{
},
};
use dynamo_runtime::{
pipeline::{ManyOut, Operator, ServiceBackend, ServiceFrontend, SingleIn, Source},
engine::{AsyncEngineStream, Data},
pipeline::{Context, ManyOut, Operator, ServiceBackend, ServiceFrontend, SingleIn, Source},
DistributedRuntime, Runtime,
};
use std::sync::Arc;
/// Turns an EngineConfig into an OpenAIChatCompletionsStreamingEngine.
/// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine.
pub async fn prepare_engine(
runtime: Runtime,
flags: Flags,
......@@ -69,6 +76,7 @@ pub async fn prepare_engine(
card: _card,
} => {
tracing::debug!("Model: {service_name}");
let engine = Arc::new(StreamingEngineAdapter::new(engine));
Ok((service_name, engine, false))
}
EngineConfig::StaticCore {
......@@ -76,25 +84,11 @@ pub async fn prepare_engine(
engine: inner_engine,
card,
} => {
let frontend = ServiceFrontend::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(*card.clone())
.await?
.into_operator();
let backend = Backend::from_tokenizer(card.tokenizer_hf()?)
.await?
.into_operator();
let engine = ServiceBackend::from_engine(inner_engine);
let pipeline = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(engine)?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, inner_engine)
.await?;
tracing::debug!("Model: {service_name} with pre-processing");
Ok((service_name, pipeline, true))
......@@ -102,3 +96,82 @@ pub async fn prepare_engine(
EngineConfig::None => unreachable!(),
}
}
pub async fn build_pipeline<Req, Resp>(
card: &ModelDeploymentCard,
engine: ExecutionContext,
) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
where
Req: Data,
Resp: Data,
OpenAIPreprocessor: Operator<
Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<BackendInput>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>,
{
let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
let preprocessor = OpenAIPreprocessor::new((*card).clone())
.await?
.into_operator();
let backend = Backend::from_mdc((*card).clone()).await?.into_operator();
let engine = ServiceBackend::from_engine(engine);
Ok(frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(engine)?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?)
}
#[cfg(test)]
mod tests {
use super::*;
use dynamo_llm::types::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{CompletionRequest, CompletionResponse},
};
const HF_PATH: &str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../lib/llm/tests/data/sample-models/mock-llama-3.1-8b-instruct"
);
#[tokio::test]
async fn test_build_chat_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::from_local_path(HF_PATH, None).await?;
let engine = dynamo_llm::engines::make_engine_core();
// Build pipeline for chat completions
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, engine)
.await?;
// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);
Ok(())
}
#[tokio::test]
async fn test_build_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::from_local_path(HF_PATH, None).await?;
let engine = dynamo_llm::engines::make_engine_core();
// Build pipeline for completions
let pipeline =
build_pipeline::<CompletionRequest, CompletionResponse>(&card, engine).await?;
// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);
Ok(())
}
}
......@@ -17,6 +17,7 @@ use std::sync::Arc;
use dynamo_llm::{
backend::Backend,
engines::StreamingEngineAdapter,
http::service::discovery::ModelEntry,
key_value_store::{KeyValueStore, KeyValueStoreManager, NATSStorage},
model_card::{BUCKET_NAME, BUCKET_TTL},
......@@ -53,7 +54,10 @@ pub async fn run(
service_name,
engine,
card,
} => (Ingress::for_engine(engine)?, service_name, card),
} => {
let engine = Arc::new(StreamingEngineAdapter::new(engine));
(Ingress::for_engine(engine)?, service_name, card)
}
EngineConfig::StaticCore {
service_name,
engine: inner_engine,
......
......@@ -15,24 +15,20 @@
use std::sync::Arc;
use crate::input::common;
use crate::{EngineConfig, Flags};
use dynamo_llm::{
backend::Backend,
engines::StreamingEngineAdapter,
http::service::{discovery, service_v2},
model_type::ModelType,
preprocessor::OpenAIPreprocessor,
types::{
openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
},
Annotated,
openai::completions::{CompletionRequest, CompletionResponse},
},
};
use dynamo_runtime::{
pipeline::{ManyOut, Operator, ServiceBackend, ServiceFrontend, SingleIn, Source},
DistributedRuntime, Runtime,
};
use crate::{EngineConfig, Flags};
use dynamo_runtime::{DistributedRuntime, Runtime};
/// Build and run an HTTP service
pub async fn run(
......@@ -80,35 +76,31 @@ pub async fn run(
engine,
..
} => {
http_service
.model_manager()
.add_chat_completions_model(&service_name, engine)?;
let engine = Arc::new(StreamingEngineAdapter::new(engine));
let manager = http_service.model_manager();
manager.add_completions_model(&service_name, engine.clone())?;
manager.add_chat_completions_model(&service_name, engine)?;
}
EngineConfig::StaticCore {
service_name,
engine: inner_engine,
card,
} => {
let frontend = ServiceFrontend::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(*card.clone())
.await?
.into_operator();
let backend = Backend::from_mdc(*card.clone()).await?.into_operator();
let engine = ServiceBackend::from_engine(inner_engine);
let manager = http_service.model_manager();
let chat_pipeline = common::build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, inner_engine.clone())
.await?;
manager.add_chat_completions_model(&service_name, chat_pipeline)?;
let pipeline = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(engine)?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
http_service
.model_manager()
.add_chat_completions_model(&service_name, pipeline)?;
let cmpl_pipeline = common::build_pipeline::<CompletionRequest, CompletionResponse>(
&card,
inner_engine,
)
.await?;
manager.add_completions_model(&service_name, cmpl_pipeline)?;
}
EngineConfig::None => unreachable!(),
}
......
......@@ -18,9 +18,8 @@ use std::{future::Future, pin::Pin};
use std::{io::Read, sync::Arc};
use dynamo_llm::{
backend::ExecutionContext, kv_router::publisher::KvMetricsPublisher,
backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher,
model_card::model::ModelDeploymentCard,
types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine,
};
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime};
......@@ -60,7 +59,7 @@ pub enum EngineConfig {
/// A Full service engine does it's own tokenization and prompt formatting.
StaticFull {
service_name: String,
engine: OpenAIChatCompletionsStreamingEngine,
engine: Arc<dyn StreamingEngine>,
card: Box<ModelDeploymentCard>,
},
......
......@@ -34,10 +34,12 @@ use dynamo_runtime::pipeline::error as pipeline_error;
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated;
use dynamo_llm::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
use dynamo_llm::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{prompt_to_string, CompletionRequest, CompletionResponse},
};
use dynamo_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
use dynamo_llm::engines::{EngineDispatcher, StreamingEngine};
/// How many requests mistral will run at once in the paged attention scheduler.
/// It actually runs 1 fewer than this.
......@@ -49,11 +51,9 @@ const PAGED_ATTENTION_MAX_NUM_SEQS: usize = 10;
/// finish_reason=stop and no tokens for one of the requests.
const EXP_ENABLE_PAGED_ATTENTION: bool = false;
pub async fn make_engine(
gguf_path: &Path,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
pub async fn make_engine(gguf_path: &Path) -> pipeline_error::Result<Arc<dyn StreamingEngine>> {
let engine = MistralRsEngine::new(gguf_path).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
let engine: Arc<dyn StreamingEngine> = Arc::new(EngineDispatcher::new(engine));
Ok(engine)
}
......@@ -406,3 +406,130 @@ fn to_logit_bias(lb: HashMap<String, serde_json::Value>) -> HashMap<u32, f32> {
}
out
}
#[async_trait]
impl AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionResponse>>, Error>
for MistralRsEngine
{
async fn generate(
&self,
request: SingleIn<CompletionRequest>,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> {
let (request, context) = request.transfer(());
let ctx = context.context();
let (tx, mut rx) = channel(10_000);
let response_generator = request.response_generator();
let messages = RequestMessage::Completion {
text: prompt_to_string(&request.inner.prompt),
echo_prompt: false,
best_of: Some(1),
};
let det = SamplingParams::deterministic();
// allow deprecated because max_tokens
#[allow(deprecated)]
let sampling_params = SamplingParams {
temperature: request
.inner
.temperature
.map(|t| t as f64)
.or(det.temperature),
top_p: request.inner.top_p.map(|t| t as f64).or(det.top_p),
top_n_logprobs: request
.inner
.logprobs
.map(|t| t as usize)
.unwrap_or(det.top_n_logprobs),
frequency_penalty: request.inner.frequency_penalty.or(det.frequency_penalty),
presence_penalty: request.inner.presence_penalty.or(det.presence_penalty),
stop_toks: request
.inner
.stop
.clone()
.map(to_stop_tokens)
.or(det.stop_toks),
max_len: request
.inner
.max_tokens
.or(request.inner.max_tokens)
.map(|m| m as usize)
.or(det.max_len),
logits_bias: request
.inner
.logit_bias
.clone()
.map(to_logit_bias)
.or(det.logits_bias),
// These are not in async-openai yet
top_k: det.top_k,
min_p: det.min_p,
n_choices: 1,
dry_params: det.dry_params,
};
let request_id = self.mistralrs.next_request_id();
let mistralrs_request = Request::Normal(NormalRequest {
id: request_id,
messages,
sampling_params,
response: tx,
return_logprobs: false,
is_streaming: true,
constraint: Constraint::None,
suffix: None,
adapters: None,
tools: None,
tool_choice: None,
logits_processors: None,
return_raw_logits: false,
});
self.mistralrs.get_sender()?.send(mistralrs_request).await?;
let output = stream! {
while let Some(response) = rx.recv().await {
let response = match response.as_result() {
Ok(r) => r,
Err(err) => {
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
break;
}
};
match response {
ResponseOk::CompletionChunk(c) => {
let from_assistant = c.choices[0].text.clone();
let finish_reason = match &c.choices[0].finish_reason.as_deref() {
Some("stop") | Some("canceled") => {
Some(FinishReason::Stop)
}
Some("length") => {
Some(FinishReason::Length)
}
Some(s) => {
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
Some(FinishReason::Stop)
}
None => None,
};
#[allow(deprecated)]
let inner = response_generator.create_choice(0, Some(from_assistant), None);
let ann = Annotated{
id: None,
data: Some(inner),
event: None,
comment: None,
};
yield ann;
if finish_reason.is_some() {
break;
}
},
x => tracing::error!(request_id, "Unhandled. {x:?}"),
}
}
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
......@@ -37,7 +37,7 @@ use tokio::sync::oneshot::Sender;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use dynamo_llm::backend::ExecutionContext;
use dynamo_llm::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
use dynamo_llm::engines::{EngineDispatcher, StreamingEngine};
/// Python snippet to import a file as a module
const PY_IMPORT: &CStr = cr#"
......@@ -74,7 +74,7 @@ pub async fn make_string_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
) -> pipeline_error::Result<Arc<dyn StreamingEngine>> {
pyo3::prepare_freethreaded_python();
if let Ok(venv) = env::var("VIRTUAL_ENV") {
Python::with_gil(|py| {
......@@ -85,7 +85,7 @@ pub async fn make_string_engine(
}
let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
let engine: Arc<dyn StreamingEngine> = Arc::new(EngineDispatcher::new(engine));
Ok(engine)
}
......
......@@ -28,10 +28,10 @@ use dynamo_runtime::protocols::annotated::Annotated;
use crate::backend::ExecutionContext;
use crate::preprocessor::BackendInput;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{prompt_to_string, CompletionRequest, CompletionResponse},
};
use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
//
// The engines are each in their own crate under `lib/engines`
......@@ -120,8 +120,35 @@ fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
/// Engine that accepts un-preprocessed requests and echos the prompt back as the response
/// Useful for testing ingress such as service-http.
struct EchoEngineFull {}
pub fn make_engine_full() -> OpenAIChatCompletionsStreamingEngine {
Arc::new(EchoEngineFull {})
/// Engine that dispatches requests to either OpenAICompletions
//or OpenAIChatCompletions engine
pub struct EngineDispatcher<E> {
inner: E,
}
impl<E> EngineDispatcher<E> {
pub fn new(inner: E) -> Self {
EngineDispatcher { inner }
}
}
/// Trait that allows handling both completion and chat completions requests
#[async_trait]
pub trait StreamingEngine: Send + Sync {
async fn handle_completion(
&self,
req: SingleIn<CompletionRequest>,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error>;
async fn handle_chat(
&self,
req: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error>;
}
pub fn make_engine_full() -> Arc<dyn StreamingEngine> {
Arc::new(EngineDispatcher::new(EchoEngineFull {}))
}
#[async_trait]
......@@ -176,3 +203,94 @@ impl
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
#[async_trait]
impl AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionResponse>>, Error>
for EchoEngineFull
{
async fn generate(
&self,
incoming_request: SingleIn<CompletionRequest>,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> {
let (request, context) = incoming_request.transfer(());
let deltas = request.response_generator();
let ctx = context.context();
let chars_string = prompt_to_string(&request.inner.prompt);
let output = stream! {
let mut id = 1;
for c in chars_string.chars() {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
id += 1;
}
let response = deltas.create_choice(0, None, Some("stop".to_string()));
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}
#[async_trait]
impl<E> StreamingEngine for EngineDispatcher<E>
where
E: AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionResponse>>, Error>
+ AsyncEngine<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
Error,
> + Send
+ Sync,
{
async fn handle_completion(
&self,
req: SingleIn<CompletionRequest>,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> {
self.inner.generate(req).await
}
async fn handle_chat(
&self,
req: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
self.inner.generate(req).await
}
}
pub struct StreamingEngineAdapter(Arc<dyn StreamingEngine>);
impl StreamingEngineAdapter {
pub fn new(engine: Arc<dyn StreamingEngine>) -> Self {
StreamingEngineAdapter(engine)
}
}
#[async_trait]
impl AsyncEngine<SingleIn<CompletionRequest>, ManyOut<Annotated<CompletionResponse>>, Error>
for StreamingEngineAdapter
{
async fn generate(
&self,
req: SingleIn<CompletionRequest>,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> {
self.0.handle_completion(req).await
}
}
#[async_trait]
impl
AsyncEngine<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
Error,
> for StreamingEngineAdapter
{
async fn generate(
&self,
req: SingleIn<NvCreateChatCompletionRequest>,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
self.0.handle_chat(req).await
}
}
......@@ -66,12 +66,7 @@ impl OAIChatLikeRequest for CompletionRequest {
},
);
// Convert to a JSON string first
let json_string =
serde_json::to_string(&vec![message]).expect("Serialization to JSON string failed");
// Convert to MiniJinja Value
minijinja::value::Value::from_safe_string(json_string)
minijinja::value::Value::from_serialize(vec![message])
}
fn should_add_generation_prompt(&self) -> bool {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment