Unverified Commit 63fbf498 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: Restore running single-process without etcd (#2342)

parent dbe48a1d
...@@ -20,20 +20,20 @@ use dynamo_runtime::{DistributedRuntime, Runtime}; ...@@ -20,20 +20,20 @@ use dynamo_runtime::{DistributedRuntime, Runtime};
/// Build and run an HTTP service /// Build and run an HTTP service
pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> { pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; let mut http_service_builder = service_v2::HttpService::builder()
let etcd_client = distributed_runtime.etcd_client().clone();
let http_service = service_v2::HttpService::builder()
.port(engine_config.local_model().http_port()) .port(engine_config.local_model().http_port())
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.enable_cmpl_endpoints(true) .enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true) .enable_embeddings_endpoints(true)
.with_request_template(engine_config.local_model().request_template()) .with_request_template(engine_config.local_model().request_template());
.with_etcd_client(etcd_client.clone())
.build()?;
match engine_config { let http_service = match engine_config {
EngineConfig::Dynamic(_) => { EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client();
// This allows the /health endpoint to query etcd for active instances
http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone());
let http_service = http_service_builder.build()?;
match etcd_client { match etcd_client {
Some(ref etcd_client) => { Some(ref etcd_client) => {
let router_config = engine_config.local_model().router_config(); let router_config = engine_config.local_model().router_config();
...@@ -52,13 +52,15 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul ...@@ -52,13 +52,15 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
// Static endpoints don't need discovery // Static endpoints don't need discovery
} }
} }
http_service
} }
EngineConfig::StaticRemote(local_model) => { EngineConfig::StaticRemote(local_model) => {
let card = local_model.card(); let card = local_model.card();
let router_mode = local_model.router_config().router_mode; let router_mode = local_model.router_config().router_mode;
let dst_config = DistributedConfig::from_settings(true); let dst_config = DistributedConfig::from_settings(true); // true means static
let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?; let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?;
let http_service = http_service_builder.build()?;
let manager = http_service.model_manager(); let manager = http_service.model_manager();
let endpoint_id = local_model.endpoint_id(); let endpoint_id = local_model.endpoint_id();
...@@ -95,18 +97,23 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul ...@@ -95,18 +97,23 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
>(card, &client, router_mode, kv_chooser) >(card, &client, router_mode, kv_chooser)
.await?; .await?;
manager.add_completions_model(local_model.display_name(), completions_engine)?; manager.add_completions_model(local_model.display_name(), completions_engine)?;
http_service
} }
EngineConfig::StaticFull { engine, model, .. } => { EngineConfig::StaticFull { engine, model, .. } => {
let http_service = http_service_builder.build()?;
let engine = Arc::new(StreamingEngineAdapter::new(engine)); let engine = Arc::new(StreamingEngineAdapter::new(engine));
let manager = http_service.model_manager(); let manager = http_service.model_manager();
manager.add_completions_model(model.service_name(), engine.clone())?; manager.add_completions_model(model.service_name(), engine.clone())?;
manager.add_chat_completions_model(model.service_name(), engine)?; manager.add_chat_completions_model(model.service_name(), engine)?;
http_service
} }
EngineConfig::StaticCore { EngineConfig::StaticCore {
engine: inner_engine, engine: inner_engine,
model, model,
.. ..
} => { } => {
let http_service = http_service_builder.build()?;
let manager = http_service.model_manager(); let manager = http_service.model_manager();
let chat_pipeline = common::build_pipeline::< let chat_pipeline = common::build_pipeline::<
...@@ -122,8 +129,9 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul ...@@ -122,8 +129,9 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
>(model.card(), inner_engine) >(model.card(), inner_engine)
.await?; .await?;
manager.add_completions_model(model.service_name(), cmpl_pipeline)?; manager.add_completions_model(model.service_name(), cmpl_pipeline)?;
http_service
} }
} };
tracing::debug!( tracing::debug!(
"Supported routes: {:?}", "Supported routes: {:?}",
http_service http_service
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment