Commit 50564320 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

fix: disabling sse keep-alive (#408)

parent 411f07e0
...@@ -52,6 +52,7 @@ use crate::types::openai::{ ...@@ -52,6 +52,7 @@ use crate::types::openai::{
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration,
}; };
#[derive(Clone)] #[derive(Clone)]
...@@ -191,6 +192,7 @@ pub struct DeploymentState { ...@@ -191,6 +192,7 @@ pub struct DeploymentState {
completion_engines: Arc<Mutex<ModelEngines<OpenAICompletionsStreamingEngine>>>, completion_engines: Arc<Mutex<ModelEngines<OpenAICompletionsStreamingEngine>>>,
chat_completion_engines: Arc<Mutex<ModelEngines<OpenAIChatCompletionsStreamingEngine>>>, chat_completion_engines: Arc<Mutex<ModelEngines<OpenAIChatCompletionsStreamingEngine>>>,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
sse_keep_alive: Option<Duration>,
} }
impl DeploymentState { impl DeploymentState {
...@@ -199,6 +201,7 @@ impl DeploymentState { ...@@ -199,6 +201,7 @@ impl DeploymentState {
completion_engines: Arc::new(Mutex::new(ModelEngines::default())), completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
chat_completion_engines: Arc::new(Mutex::new(ModelEngines::default())), chat_completion_engines: Arc::new(Mutex::new(ModelEngines::default())),
metrics: Arc::new(Metrics::default()), metrics: Arc::new(Metrics::default()),
sse_keep_alive: None,
} }
} }
......
...@@ -182,9 +182,13 @@ async fn completions( ...@@ -182,9 +182,13 @@ async fn completions(
let stream = stream.map(|response| Event::try_from(EventConverter::from(response))); let stream = stream.map(|response| Event::try_from(EventConverter::from(response)));
let stream = monitor_for_disconnects(stream.boxed(), ctx, inflight).await; let stream = monitor_for_disconnects(stream.boxed(), ctx, inflight).await;
Ok(Sse::new(stream) let mut sse_stream = Sse::new(stream);
.keep_alive(KeepAlive::default())
.into_response()) if let Some(keep_alive) = state.sse_keep_alive {
sse_stream = sse_stream.keep_alive(KeepAlive::default().interval(keep_alive));
}
Ok(sse_stream.into_response())
} else { } else {
let response = CompletionResponse::from_annotated_stream(stream.into()) let response = CompletionResponse::from_annotated_stream(stream.into())
.await .await
...@@ -270,9 +274,13 @@ async fn chat_completions( ...@@ -270,9 +274,13 @@ async fn chat_completions(
let stream = stream.map(|response| Event::try_from(EventConverter::from(response))); let stream = stream.map(|response| Event::try_from(EventConverter::from(response)));
let stream = monitor_for_disconnects(stream.boxed(), ctx, inflight).await; let stream = monitor_for_disconnects(stream.boxed(), ctx, inflight).await;
Ok(Sse::new(stream) let mut sse_stream = Sse::new(stream);
.keep_alive(KeepAlive::default())
.into_response()) if let Some(keep_alive) = state.sse_keep_alive {
sse_stream = sse_stream.keep_alive(KeepAlive::default().interval(keep_alive));
}
Ok(sse_stream.into_response())
} else { } else {
let response = NvCreateChatCompletionResponse::from_annotated_stream(stream.into()) let response = NvCreateChatCompletionResponse::from_annotated_stream(stream.into())
.await .await
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment