"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "9281c95f33a8f286168bb5a9ef4a56ed34b23ef0"
Unverified Commit 2d3fb39f authored by jain-ria's avatar jain-ria Committed by GitHub
Browse files

fix: remove http endpoint for clearing kv blocks (#1629)

parent e84b1e77
...@@ -29,14 +29,12 @@ pub async fn run( ...@@ -29,14 +29,12 @@ pub async fn run(
engine_config: EngineConfig, engine_config: EngineConfig,
template: Option<RequestTemplate>, template: Option<RequestTemplate>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let http_service = service_v2::HttpService::builder() let http_service = service_v2::HttpService::builder()
.port(flags.http_port) .port(flags.http_port)
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true) .enable_embeddings_endpoints(true)
.with_request_template(template) .with_request_template(template)
.runtime(Some(Arc::new(distributed_runtime)))
.build()?; .build()?;
match engine_config { match engine_config {
EngineConfig::Dynamic => { EngineConfig::Dynamic => {
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
mod openai; mod openai;
pub mod clear_kv_blocks;
pub mod error; pub mod error;
pub mod health; pub mod health;
pub mod metrics; pub mod metrics;
......
...@@ -20,6 +20,8 @@ use std::sync::Arc; ...@@ -20,6 +20,8 @@ use std::sync::Arc;
use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt};
pub const CLEAR_KV_ENDPOINT: &str = "clear_kv_blocks";
pub fn clear_kv_blocks_router( pub fn clear_kv_blocks_router(
state: Arc<service_v2::State>, state: Arc<service_v2::State>,
path: Option<String>, path: Option<String>,
...@@ -68,7 +70,7 @@ async fn clear_kv_blocks_handler( ...@@ -68,7 +70,7 @@ async fn clear_kv_blocks_handler(
message: Option<String>| { message: Option<String>| {
let mut result = json!({ let mut result = json!({
"name": name, "name": name,
"endpoint": format!("{}/{}/clear_kv_blocks", ns, comp), "endpoint": format!("{}/{}/{}", ns, comp, CLEAR_KV_ENDPOINT),
"status": status, "status": status,
}); });
if success { if success {
...@@ -123,7 +125,7 @@ async fn clear_kv_blocks_handler( ...@@ -123,7 +125,7 @@ async fn clear_kv_blocks_handler(
}; };
let endpoint: dynamo_runtime::component::Endpoint = let endpoint: dynamo_runtime::component::Endpoint =
component_obj.endpoint("clear_kv_blocks"); component_obj.endpoint(CLEAR_KV_ENDPOINT);
let client = match endpoint.client().await { let client = match endpoint.client().await {
Ok(c) => c, Ok(c) => c,
...@@ -190,7 +192,7 @@ async fn clear_kv_blocks_handler( ...@@ -190,7 +192,7 @@ async fn clear_kv_blocks_handler(
let instances_filtered = instances let instances_filtered = instances
.clone() .clone()
.into_iter() .into_iter()
.filter(|instance| instance.endpoint == "clear_kv_blocks") .filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if instances_filtered.is_empty() { if instances_filtered.is_empty() {
...@@ -214,7 +216,7 @@ async fn clear_kv_blocks_handler( ...@@ -214,7 +216,7 @@ async fn clear_kv_blocks_handler(
for instance in &instances_filtered { for instance in &instances_filtered {
let instance_name = format!("{}-instance-{}", entry.name, instance.id()); let instance_name = format!("{}-instance-{}", entry.name, instance.id());
match router.round_robin(().into()).await { match router.direct(().into(), instance.id()).await {
Ok(mut stream) => match stream.next().await { Ok(mut stream) => match stream.next().await {
Some(response) => { Some(response) => {
add_worker_result( add_worker_result(
......
...@@ -11,7 +11,6 @@ use crate::discovery::ModelManager; ...@@ -11,7 +11,6 @@ use crate::discovery::ModelManager;
use crate::request_template::RequestTemplate; use crate::request_template::RequestTemplate;
use anyhow::Result; use anyhow::Result;
use derive_builder::Builder; use derive_builder::Builder;
use dynamo_runtime::DistributedRuntime;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -19,7 +18,6 @@ use tokio_util::sync::CancellationToken; ...@@ -19,7 +18,6 @@ use tokio_util::sync::CancellationToken;
pub struct State { pub struct State {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
manager: Arc<ModelManager>, manager: Arc<ModelManager>,
runtime: Option<Arc<DistributedRuntime>>,
} }
impl State { impl State {
...@@ -27,15 +25,6 @@ impl State { ...@@ -27,15 +25,6 @@ impl State {
Self { Self {
manager, manager,
metrics: Arc::new(Metrics::default()), metrics: Arc::new(Metrics::default()),
runtime: None,
}
}
pub fn with_runtime(manager: Arc<ModelManager>, runtime: Arc<DistributedRuntime>) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
runtime: Some(runtime),
} }
} }
...@@ -52,11 +41,6 @@ impl State { ...@@ -52,11 +41,6 @@ impl State {
self.manager.clone() self.manager.clone()
} }
/// Get the DistributedRuntime if available
pub fn runtime(&self) -> Option<&DistributedRuntime> {
self.runtime.as_ref().map(|r| r.as_ref())
}
// TODO // TODO
pub fn sse_keep_alive(&self) -> Option<Duration> { pub fn sse_keep_alive(&self) -> Option<Duration> {
None None
...@@ -96,9 +80,6 @@ pub struct HttpServiceConfig { ...@@ -96,9 +80,6 @@ pub struct HttpServiceConfig {
#[builder(default = "None")] #[builder(default = "None")]
request_template: Option<RequestTemplate>, request_template: Option<RequestTemplate>,
#[builder(default = "None")]
runtime: Option<Arc<DistributedRuntime>>,
} }
impl HttpService { impl HttpService {
...@@ -153,11 +134,7 @@ impl HttpServiceConfigBuilder { ...@@ -153,11 +134,7 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?; let config: HttpServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
let state = if let Some(runtime) = config.runtime { let state = Arc::new(State::new(model_manager));
Arc::new(State::with_runtime(model_manager, runtime))
} else {
Arc::new(State::new(model_manager))
};
// enable prometheus metrics // enable prometheus metrics
let registry = metrics::Registry::new(); let registry = metrics::Registry::new();
...@@ -171,7 +148,6 @@ impl HttpServiceConfigBuilder { ...@@ -171,7 +148,6 @@ impl HttpServiceConfigBuilder {
metrics::router(registry, None), metrics::router(registry, None),
super::openai::list_models_router(state.clone(), None), super::openai::list_models_router(state.clone(), None),
super::health::health_check_router(state.clone(), None), super::health::health_check_router(state.clone(), None),
super::clear_kv_blocks::clear_kv_blocks_router(state.clone(), None),
]; ];
if config.enable_chat_endpoints { if config.enable_chat_endpoints {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment