"lib/vscode:/vscode.git/clone" did not exist on "491a21093f0e05bc4522ded98bc5e04fa61031c7"
Unverified Commit 0373b897 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: kv commit router (#3024)


Signed-off-by: default avatarmichaelfeil <me@michaelfeil.eu>
parent 2bb1b268
...@@ -381,7 +381,8 @@ impl KvRouter { ...@@ -381,7 +381,8 @@ impl KvRouter {
} }
} }
// NOTE: this would not be usable for now, should deprecate // NOTE: KVRouter works like a PushRouter,
// but without the reverse proxy functionality, but based on contract of 3 request types
#[async_trait] #[async_trait]
impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error> for KvRouter { impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error> for KvRouter {
async fn generate( async fn generate(
...@@ -389,11 +390,29 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er ...@@ -389,11 +390,29 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er
request: SingleIn<RouterRequest>, request: SingleIn<RouterRequest>,
) -> Result<ManyOut<Annotated<RouterResponse>>> { ) -> Result<ManyOut<Annotated<RouterResponse>>> {
let (request, ctx) = request.into_parts(); let (request, ctx) = request.into_parts();
let (worker_id, _) = self let context_id = ctx.context().id().to_string();
.find_best_match(ctx.id(), &request.tokens, None, true) // Handle different request types
.await?; let response = match request {
RouterRequest::New { tokens } => {
let (worker_id, overlap_blocks) = self
.find_best_match(&context_id, &tokens, None, true)
.await?;
RouterResponse::New {
worker_id,
overlap_blocks,
}
}
RouterRequest::MarkPrefill => {
self.mark_prefill_completed(&context_id).await;
RouterResponse::PrefillMarked { success: true }
}
RouterRequest::MarkFree => {
self.free(&context_id).await;
RouterResponse::FreeMarked { success: true }
}
};
let response = RouterResponse { worker_id };
let response = Annotated::from_data(response); let response = Annotated::from_data(response);
let stream = stream::iter(vec![response]); let stream = stream::iter(vec![response]);
Ok(ResponseStream::new(Box::pin(stream), ctx.context())) Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
......
...@@ -5,14 +5,30 @@ use crate::tokens::{SequenceHash, Token}; ...@@ -5,14 +5,30 @@ use crate::tokens::{SequenceHash, Token};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RouterRequest { #[serde(tag = "method", rename_all = "snake_case")]
pub tokens: Vec<Token>, pub enum RouterRequest {
// ini
#[serde(rename = "new")]
New {
tokens: Vec<Token>,
},
MarkPrefill,
MarkFree,
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default)] impl Default for RouterRequest {
pub struct RouterResponse { fn default() -> Self {
pub worker_id: i64, RouterRequest::New { tokens: vec![] }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum RouterResponse {
New { worker_id: i64, overlap_blocks: u32 },
PrefillMarked { success: bool },
FreeMarked { success: bool },
} }
#[derive(Debug)] #[derive(Debug)]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment