Unverified Commit 3f6a7472 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Remove PreprocessedRequest alias BackendInput (#1307)

It was confusing to have two names for one type.

This tidy up started in #1064 , is now complete.
parent 859944f4
...@@ -9,7 +9,7 @@ use dynamo_llm::{ ...@@ -9,7 +9,7 @@ use dynamo_llm::{
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
preprocessor::OpenAIPreprocessor, preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendInput, BackendOutput}, protocols::common::llm_backend::{BackendOutput, PreprocessedRequest},
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -113,7 +113,7 @@ where ...@@ -113,7 +113,7 @@ where
OpenAIPreprocessor: Operator< OpenAIPreprocessor: Operator<
Context<Req>, Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>, Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<BackendInput>, Context<PreprocessedRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>, Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>, >,
{ {
......
...@@ -19,7 +19,7 @@ use dynamo_llm::{ ...@@ -19,7 +19,7 @@ use dynamo_llm::{
backend::Backend, backend::Backend,
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
model_type::ModelType, model_type::ModelType,
preprocessor::{BackendInput, BackendOutput}, preprocessor::{BackendOutput, PreprocessedRequest},
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -71,8 +71,10 @@ pub async fn run( ...@@ -71,8 +71,10 @@ pub async fn run(
mut model, mut model,
} => { } => {
// Pre-processing is done ingress-side, so it should be already done. // Pre-processing is done ingress-side, so it should be already done.
let frontend = let frontend = SegmentSource::<
SegmentSource::<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>>::new(); SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>,
>::new();
let backend = Backend::from_mdc(model.card().clone()) let backend = Backend::from_mdc(model.card().clone())
.await? .await?
.into_operator(); .into_operator();
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
use super::*; use super::*;
use crate::llm::model_card::ModelDeploymentCard; use crate::llm::model_card::ModelDeploymentCard;
use llm_rs::protocols::common::llm_backend::{BackendInput, BackendOutput}; use llm_rs::protocols::common::llm_backend::{BackendOutput, PreprocessedRequest};
use llm_rs::types::Annotated; use llm_rs::types::Annotated;
use dynamo_runtime::pipeline::{Operator, ServiceBackend, ServiceFrontend, Source}; use dynamo_runtime::pipeline::{Operator, ServiceBackend, ServiceFrontend, Source};
...@@ -44,8 +44,10 @@ impl Backend { ...@@ -44,8 +44,10 @@ impl Backend {
} }
fn start<'p>(&self, py: Python<'p>, generator: PyObject) -> PyResult<Bound<'p, PyAny>> { fn start<'p>(&self, py: Python<'p>, generator: PyObject) -> PyResult<Bound<'p, PyAny>> {
let frontend = let frontend = ServiceFrontend::<
ServiceFrontend::<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>>::new(); SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>,
>::new();
let backend = self.inner.into_operator(); let backend = self.inner.into_operator();
let engine = Arc::new(PythonAsyncEngine::new( let engine = Arc::new(PythonAsyncEngine::new(
......
...@@ -18,7 +18,7 @@ use crate::llm::model_card::ModelDeploymentCard; ...@@ -18,7 +18,7 @@ use crate::llm::model_card::ModelDeploymentCard;
use llm_rs::{ use llm_rs::{
preprocessor::OpenAIPreprocessor, preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendInput, BackendOutput}, protocols::common::llm_backend::{BackendOutput, PreprocessedRequest},
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -60,7 +60,7 @@ impl OAIChatPreprocessor { ...@@ -60,7 +60,7 @@ impl OAIChatPreprocessor {
>::new(); >::new();
let network = let network =
SegmentSink::<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>>::new(); SegmentSink::<SingleIn<PreprocessedRequest>, ManyOut<Annotated<BackendOutput>>>::new();
let preprocessor = self.inner.into_operator(); let preprocessor = self.inner.into_operator();
let pipeline = frontend let pipeline = frontend
...@@ -77,7 +77,7 @@ impl OAIChatPreprocessor { ...@@ -77,7 +77,7 @@ impl OAIChatPreprocessor {
let endpoint = Arc::new(self.next.inner.clone()); let endpoint = Arc::new(self.next.inner.clone());
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
let client = endpoint.client().await.map_err(to_pyerr)?; let client = endpoint.client().await.map_err(to_pyerr)?;
let router = PushRouter::<BackendInput, Annotated<BackendOutput>>::from_client( let router = PushRouter::<PreprocessedRequest, Annotated<BackendOutput>>::from_client(
client, client,
Default::default(), Default::default(),
) )
......
...@@ -23,7 +23,7 @@ use llama_cpp_2::{ ...@@ -23,7 +23,7 @@ use llama_cpp_2::{
LogOptions, LogOptions,
}; };
use dynamo_llm::protocols::common::llm_backend::{BackendInput, LLMEngineOutput}; use dynamo_llm::protocols::common::llm_backend::LLMEngineOutput;
use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest; use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest;
use dynamo_llm::{backend::ExecutionContext, local_model::LocalModel}; use dynamo_llm::{backend::ExecutionContext, local_model::LocalModel};
...@@ -119,12 +119,12 @@ fn load_model(backend: &LlamaBackend, model_path: &Path) -> Result<LlamaModel> { ...@@ -119,12 +119,12 @@ fn load_model(backend: &LlamaBackend, model_path: &Path) -> Result<LlamaModel> {
} }
#[async_trait] #[async_trait]
impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error> impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for LlamacppEngine for LlamacppEngine
{ {
async fn generate( async fn generate(
&self, &self,
request: SingleIn<BackendInput>, request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> { ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = request.into_parts(); let (request, context) = request.into_parts();
let ctx = context.context(); let ctx = context.context();
......
...@@ -44,7 +44,7 @@ use dynamo_runtime::{ ...@@ -44,7 +44,7 @@ use dynamo_runtime::{
use crate::protocols::{ use crate::protocols::{
common::{ common::{
llm_backend::{BackendInput, BackendOutput, FinishReason, LLMEngineOutput}, llm_backend::{BackendOutput, FinishReason, LLMEngineOutput, PreprocessedRequest},
StopConditions, StopConditions,
}, },
TokenIdType, TokenIdType,
...@@ -56,7 +56,7 @@ use tokenizers::Tokenizer as HfTokenizer; ...@@ -56,7 +56,7 @@ use tokenizers::Tokenizer as HfTokenizer;
pub type ExecutionOutputStream = Annotated<LLMEngineOutput>; pub type ExecutionOutputStream = Annotated<LLMEngineOutput>;
/// Context for executing LLM inference, engine consumes backend input and produces execution output stream /// Context for executing LLM inference, engine consumes backend input and produces execution output stream
pub type ExecutionContext = ServerStreamingEngine<BackendInput, ExecutionOutputStream>; pub type ExecutionContext = ServerStreamingEngine<PreprocessedRequest, ExecutionOutputStream>;
/// Backend handles resource management and orchestrates LLM execution /// Backend handles resource management and orchestrates LLM execution
#[allow(dead_code)] #[allow(dead_code)]
...@@ -121,16 +121,16 @@ impl Backend { ...@@ -121,16 +121,16 @@ impl Backend {
#[async_trait] #[async_trait]
impl impl
Operator< Operator<
SingleIn<BackendInput>, SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>, ManyOut<Annotated<BackendOutput>>,
SingleIn<BackendInput>, SingleIn<PreprocessedRequest>,
ManyOut<Annotated<LLMEngineOutput>>, ManyOut<Annotated<LLMEngineOutput>>,
> for Backend > for Backend
{ {
async fn generate( async fn generate(
&self, &self,
request: SingleIn<BackendInput>, request: SingleIn<PreprocessedRequest>,
next: ServerStreamingEngine<BackendInput, Annotated<LLMEngineOutput>>, next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
) -> Result<ManyOut<Annotated<BackendOutput>>> { ) -> Result<ManyOut<Annotated<BackendOutput>>> {
let stop_conditions = request.stop_conditions.clone(); let stop_conditions = request.stop_conditions.clone();
let next_stream = next.generate(request).await?; let next_stream = next.generate(request).await?;
......
...@@ -20,7 +20,7 @@ use crate::{ ...@@ -20,7 +20,7 @@ use crate::{
backend::Backend, backend::Backend,
kv_router::KvPushRouter, kv_router::KvPushRouter,
model_type::ModelType, model_type::ModelType,
preprocessor::{BackendInput, OpenAIPreprocessor}, preprocessor::{OpenAIPreprocessor, PreprocessedRequest},
protocols::common::llm_backend::LLMEngineOutput, protocols::common::llm_backend::LLMEngineOutput,
protocols::openai::chat_completions::{ protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -196,11 +196,12 @@ impl ModelWatcher { ...@@ -196,11 +196,12 @@ impl ModelWatcher {
>::new(); >::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator(); let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator(); let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<BackendInput, Annotated<LLMEngineOutput>>::from_client( let router =
client.clone(), PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
self.router_mode, client.clone(),
) self.router_mode,
.await?; )
.await?;
let service_backend = match self.router_mode { let service_backend = match self.router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => { RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router)) ServiceBackend::from_engine(Arc::new(router))
...@@ -231,11 +232,12 @@ impl ModelWatcher { ...@@ -231,11 +232,12 @@ impl ModelWatcher {
>::new(); >::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator(); let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator(); let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<BackendInput, Annotated<LLMEngineOutput>>::from_client( let router =
client, PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client(
self.router_mode, client,
) self.router_mode,
.await?; )
.await?;
let service_backend = match self.router_mode { let service_backend = match self.router_mode {
RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => { RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
ServiceBackend::from_engine(Arc::new(router)) ServiceBackend::from_engine(Arc::new(router))
......
...@@ -26,7 +26,7 @@ use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn}; ...@@ -26,7 +26,7 @@ use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated; use dynamo_runtime::protocols::annotated::Annotated;
use crate::backend::ExecutionContext; use crate::backend::ExecutionContext;
use crate::preprocessor::BackendInput; use crate::preprocessor::PreprocessedRequest;
use crate::protocols::common::llm_backend::LLMEngineOutput; use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::{ use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse}, chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
...@@ -86,12 +86,12 @@ pub fn make_engine_core() -> ExecutionContext { ...@@ -86,12 +86,12 @@ pub fn make_engine_core() -> ExecutionContext {
} }
#[async_trait] #[async_trait]
impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error> impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for EchoEngineCore for EchoEngineCore
{ {
async fn generate( async fn generate(
&self, &self,
incoming_request: SingleIn<BackendInput>, incoming_request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> { ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = incoming_request.into_parts(); let (request, context) = incoming_request.into_parts();
let ctx = context.context(); let ctx = context.context();
......
...@@ -31,7 +31,7 @@ use crate::{ ...@@ -31,7 +31,7 @@ use crate::{
scheduler::{KvScheduler, KvSchedulerError, SchedulingRequest}, scheduler::{KvScheduler, KvSchedulerError, SchedulingRequest},
scoring::ProcessedEndpoints, scoring::ProcessedEndpoints,
}, },
preprocessor::BackendInput, preprocessor::PreprocessedRequest,
protocols::common::llm_backend::LLMEngineOutput, protocols::common::llm_backend::LLMEngineOutput,
tokens::TokenBlockSequence, tokens::TokenBlockSequence,
}; };
...@@ -173,13 +173,13 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er ...@@ -173,13 +173,13 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er
} }
pub struct KvPushRouter { pub struct KvPushRouter {
inner: PushRouter<BackendInput, Annotated<LLMEngineOutput>>, inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
chooser: Arc<KvRouter>, chooser: Arc<KvRouter>,
} }
impl KvPushRouter { impl KvPushRouter {
pub fn new( pub fn new(
inner: PushRouter<BackendInput, Annotated<LLMEngineOutput>>, inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
chooser: Arc<KvRouter>, chooser: Arc<KvRouter>,
) -> Self { ) -> Self {
KvPushRouter { inner, chooser } KvPushRouter { inner, chooser }
...@@ -187,12 +187,12 @@ impl KvPushRouter { ...@@ -187,12 +187,12 @@ impl KvPushRouter {
} }
#[async_trait] #[async_trait]
impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error> impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for KvPushRouter for KvPushRouter
{ {
async fn generate( async fn generate(
&self, &self,
request: SingleIn<BackendInput>, request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> { ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
match self.inner.client.instance_source.as_ref() { match self.inner.client.instance_source.as_ref() {
InstanceSource::Static => self.inner.r#static(request).await, InstanceSource::Static => self.inner.r#static(request).await,
......
...@@ -55,7 +55,7 @@ use crate::tokenizers::{traits::Tokenizer, HuggingFaceTokenizer}; ...@@ -55,7 +55,7 @@ use crate::tokenizers::{traits::Tokenizer, HuggingFaceTokenizer};
use crate::preprocessor::prompt::PromptFormatter; use crate::preprocessor::prompt::PromptFormatter;
pub use crate::protocols::common::llm_backend::{BackendInput, BackendOutput}; pub use crate::protocols::common::llm_backend::{BackendOutput, PreprocessedRequest};
pub const ANNOTATION_FORMATTED_PROMPT: &str = "formatted_prompt"; pub const ANNOTATION_FORMATTED_PROMPT: &str = "formatted_prompt";
pub const ANNOTATION_TOKEN_IDS: &str = "token_ids"; pub const ANNOTATION_TOKEN_IDS: &str = "token_ids";
...@@ -121,9 +121,9 @@ impl OpenAIPreprocessor { ...@@ -121,9 +121,9 @@ impl OpenAIPreprocessor {
>( >(
&self, &self,
request: &R, request: &R,
) -> Result<(BackendInput, HashMap<String, String>)> { ) -> Result<(PreprocessedRequest, HashMap<String, String>)> {
let mut annotations = HashMap::new(); let mut annotations = HashMap::new();
let mut builder = BackendInput::builder(); let mut builder = PreprocessedRequest::builder();
let use_raw_prompt = request let use_raw_prompt = request
.nvext() .nvext()
...@@ -266,7 +266,7 @@ impl ...@@ -266,7 +266,7 @@ impl
Operator< Operator<
SingleIn<NvCreateChatCompletionRequest>, SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
SingleIn<BackendInput>, SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>, ManyOut<Annotated<BackendOutput>>,
> for OpenAIPreprocessor > for OpenAIPreprocessor
{ {
...@@ -274,7 +274,11 @@ impl ...@@ -274,7 +274,11 @@ impl
&self, &self,
request: SingleIn<NvCreateChatCompletionRequest>, request: SingleIn<NvCreateChatCompletionRequest>,
next: Arc< next: Arc<
dyn AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>, Error>, dyn AsyncEngine<
SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>,
Error,
>,
>, >,
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> { ) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
// unpack the request // unpack the request
...@@ -320,7 +324,7 @@ impl ...@@ -320,7 +324,7 @@ impl
Operator< Operator<
SingleIn<CompletionRequest>, SingleIn<CompletionRequest>,
ManyOut<Annotated<CompletionResponse>>, ManyOut<Annotated<CompletionResponse>>,
SingleIn<BackendInput>, SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>, ManyOut<Annotated<BackendOutput>>,
> for OpenAIPreprocessor > for OpenAIPreprocessor
{ {
...@@ -328,7 +332,11 @@ impl ...@@ -328,7 +332,11 @@ impl
&self, &self,
request: SingleIn<CompletionRequest>, request: SingleIn<CompletionRequest>,
next: Arc< next: Arc<
dyn AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>, Error>, dyn AsyncEngine<
SingleIn<PreprocessedRequest>,
ManyOut<Annotated<BackendOutput>>,
Error,
>,
>, >,
) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> { ) -> Result<ManyOut<Annotated<CompletionResponse>>, Error> {
// unpack the request // unpack the request
......
...@@ -20,7 +20,6 @@ use crate::protocols::TokenIdType; ...@@ -20,7 +20,6 @@ use crate::protocols::TokenIdType;
pub type TokenType = Option<String>; pub type TokenType = Option<String>;
pub type LogProbs = Vec<f64>; pub type LogProbs = Vec<f64>;
pub use super::preprocessor::PreprocessedRequest as BackendInput; // TODO stop renaming this
pub use super::preprocessor::PreprocessedRequest; pub use super::preprocessor::PreprocessedRequest;
pub use super::FinishReason; pub use super::FinishReason;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment