Unverified Commit 2d2a1027 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(dynamo-llm): Move the pre-processor to ingress side (#903)

Part of https://github.com/ai-dynamo/dynamo/issues/743
parent f6d03f2f
...@@ -1684,6 +1684,7 @@ dependencies = [ ...@@ -1684,6 +1684,7 @@ dependencies = [
"rtnetlink", "rtnetlink",
"serde", "serde",
"serde_json", "serde_json",
"tempfile",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
......
...@@ -64,6 +64,7 @@ rand = { version = "0.9.0" } ...@@ -64,6 +64,7 @@ rand = { version = "0.9.0" }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" } serde_json = { version = "1" }
strum = { version = "0.27", features = ["derive"] } strum = { version = "0.27", features = ["derive"] }
tempfile = "3"
thiserror = { version = "2.0.11" } thiserror = { version = "2.0.11" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1" } tokio-stream = { version = "0.1" }
......
...@@ -76,15 +76,15 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -76,15 +76,15 @@ async fn app(runtime: Runtime) -> Result<()> {
.component(&args.component)?; .component(&args.component)?;
let etcd_root = component.etcd_path(); let etcd_root = component.etcd_path();
// Create watchers for all model types // TODO: A single watcher already watches all model types and does the right thing.
let mut watcher_tasks = Vec::new(); // The paths need change here and in llmctl to not include the model_type
for model_type in ModelType::all() { // Create watchers for `Chat` and `Completion` model types
for model_type in [ModelType::Chat, ModelType::Completion] {
let etcd_path = format!("{}/models/{}/", etcd_root, model_type.as_str()); let etcd_path = format!("{}/models/{}/", etcd_root, model_type.as_str());
let state = Arc::new(ModelWatchState { let state = Arc::new(ModelWatchState {
prefix: etcd_path.clone(), prefix: etcd_path.clone(),
model_type,
manager: manager.clone(), manager: manager.clone(),
drt: distributed.clone(), drt: distributed.clone(),
}); });
...@@ -94,8 +94,7 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -94,8 +94,7 @@ async fn app(runtime: Runtime) -> Result<()> {
etcd_client.kv_get_and_watch_prefix(etcd_path).await?; etcd_client.kv_get_and_watch_prefix(etcd_path).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve(); let (_prefix, _watcher, receiver) = models_watcher.dissolve();
let watcher_task = tokio::spawn(model_watcher(state, receiver)); tokio::spawn(model_watcher(state, receiver));
watcher_tasks.push(watcher_task);
} }
} }
......
...@@ -57,6 +57,7 @@ hf-hub = { workspace = true } ...@@ -57,6 +57,7 @@ hf-hub = { workspace = true }
humantime = { workspace = true } humantime = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
......
...@@ -80,9 +80,8 @@ pub async fn run( ...@@ -80,9 +80,8 @@ pub async fn run(
); );
} }
let (service_name, engine, _inspect_template) = let prepared_engine = common::prepare_engine(runtime, flags, engine_config).await?;
common::prepare_engine(runtime, flags, engine_config).await?; let service_name_ref = Arc::new(prepared_engine.service_name);
let service_name_ref = Arc::new(service_name);
let pre_processor = if let Some(card) = maybe_card { let pre_processor = if let Some(card) = maybe_card {
Some(OpenAIPreprocessor::new(card).await?) Some(OpenAIPreprocessor::new(card).await?)
...@@ -129,7 +128,7 @@ pub async fn run( ...@@ -129,7 +128,7 @@ pub async fn run(
}; };
entry.request_id = request_id; entry.request_id = request_id;
let engine = engine.clone(); let engine = prepared_engine.engine.clone();
let pre_processor = pre_processor.clone(); let pre_processor = pre_processor.clone();
let tokens_in = tokens_in.clone(); let tokens_in = tokens_in.clone();
let tokens_out = tokens_out.clone(); let tokens_out = tokens_out.clone();
......
...@@ -20,8 +20,9 @@ use dynamo_llm::{ ...@@ -20,8 +20,9 @@ use dynamo_llm::{
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
http::service::discovery::ModelNetworkName, http::service::discovery::ModelNetworkName,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
model_type::ModelType,
preprocessor::OpenAIPreprocessor, preprocessor::OpenAIPreprocessor,
protocols::common::llm_backend::{BackendInput, BackendOutput}, protocols::common::llm_backend::{BackendInput, BackendOutput, LLMEngineOutput},
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -33,7 +34,8 @@ use dynamo_llm::{ ...@@ -33,7 +34,8 @@ use dynamo_llm::{
use dynamo_runtime::{ use dynamo_runtime::{
engine::{AsyncEngineStream, Data}, engine::{AsyncEngineStream, Data},
pipeline::{ pipeline::{
Context, ManyOut, Operator, PushRouter, ServiceBackend, ServiceFrontend, SingleIn, Source, Context, ManyOut, Operator, PushRouter, SegmentSource, ServiceBackend, ServiceFrontend,
SingleIn, Source,
}, },
DistributedRuntime, Runtime, DistributedRuntime, Runtime,
}; };
...@@ -41,12 +43,19 @@ use std::sync::Arc; ...@@ -41,12 +43,19 @@ use std::sync::Arc;
use crate::{flags::RouterMode, EngineConfig, Flags}; use crate::{flags::RouterMode, EngineConfig, Flags};
pub struct PreparedEngine {
pub service_name: String,
pub engine: OpenAIChatCompletionsStreamingEngine,
pub inspect_template: bool,
pub _cache_dir: Option<tempfile::TempDir>,
}
/// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine. /// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine.
pub async fn prepare_engine( pub async fn prepare_engine(
runtime: Runtime, runtime: Runtime,
flags: Flags, flags: Flags,
engine_config: EngineConfig, engine_config: EngineConfig,
) -> anyhow::Result<(String, OpenAIChatCompletionsStreamingEngine, bool)> { ) -> anyhow::Result<PreparedEngine> {
match engine_config { match engine_config {
EngineConfig::Dynamic(endpoint_id) => { EngineConfig::Dynamic(endpoint_id) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
...@@ -57,12 +66,11 @@ pub async fn prepare_engine( ...@@ -57,12 +66,11 @@ pub async fn prepare_engine(
.endpoint(endpoint_id.name.clone()); .endpoint(endpoint_id.name.clone());
let client = endpoint.client().await?; let client = endpoint.client().await?;
let router = match &flags.router_mode { let mut cache_dir = None;
let engine: OpenAIChatCompletionsStreamingEngine = match &flags.router_mode {
RouterMode::Random | RouterMode::RoundRobin => { RouterMode::Random | RouterMode::RoundRobin => {
tracing::info!("Waiting for remote model.."); tracing::info!("Waiting for remote model..");
// We then use the ModelDeploymentCard's `requires_preprocessing`
// field to decide what kind of PushRouter to make.
let remote_endpoints = client.wait_for_endpoints().await?; let remote_endpoints = client.wait_for_endpoints().await?;
debug_assert!(!remote_endpoints.is_empty()); debug_assert!(!remote_endpoints.is_empty());
tracing::info!(count = remote_endpoints.len(), "Model(s) discovered"); tracing::info!(count = remote_endpoints.len(), "Model(s) discovered");
...@@ -71,16 +79,65 @@ pub async fn prepare_engine( ...@@ -71,16 +79,65 @@ pub async fn prepare_engine(
let Some(etcd_client) = distributed_runtime.etcd_client() else { let Some(etcd_client) = distributed_runtime.etcd_client() else {
anyhow::bail!("Cannot run distributed components without etcd"); anyhow::bail!("Cannot run distributed components without etcd");
}; };
let mdc = network_name.load_mdc(endpoint_id, etcd_client).await?; let network_entry = network_name.load_entry(etcd_client.clone()).await?;
if mdc.requires_preprocessing { let mut card = network_entry.load_mdc(endpoint_id, etcd_client).await?;
// Note requires_preprocessing is never true in our code right now
todo!("Ingress-side pre-processing not supported yet"); match network_entry.model_type {
} else { ModelType::Backend => {
// Download tokenizer.json etc to local disk
cache_dir = Some(
card.move_from_nats(distributed_runtime.nats_client())
.await?,
);
// The backend doesn't mind what we expose to the user (chat or
// completions), and this function is only used by text and batch input so
// the user doesn't see the HTTP request. So use Chat.
let frontend = SegmentSource::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor =
OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let router =
PushRouter::<BackendInput, Annotated<LLMEngineOutput>>::from_client(
client,
flags.router_mode.into(),
)
.await?;
frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(ServiceBackend::from_engine(Arc::new(router)))?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?
}
ModelType::Chat => Arc::new(
PushRouter::< PushRouter::<
NvCreateChatCompletionRequest, NvCreateChatCompletionRequest,
Annotated<NvCreateChatCompletionStreamResponse>, Annotated<NvCreateChatCompletionStreamResponse>,
>::from_client(client, flags.router_mode.into()) >::from_client(
.await? client, flags.router_mode.into()
)
.await?,
),
ModelType::Completion => {
anyhow::bail!("text and batch input only accept remote Chat models, not Completion");
/*
Arc::new(
PushRouter::<
CompletionRequest,
Annotated<CompletionResponse>,
>::from_client(
client, flags.router_mode.into()
)
.await?,
)
*/
}
} }
} }
RouterMode::KV => todo!(), RouterMode::KV => todo!(),
...@@ -89,16 +146,26 @@ pub async fn prepare_engine( ...@@ -89,16 +146,26 @@ pub async fn prepare_engine(
// The service_name isn't used for text chat outside of logs, // The service_name isn't used for text chat outside of logs,
// so use the path. That avoids having to listen on etcd for model registration. // so use the path. That avoids having to listen on etcd for model registration.
let service_name = endpoint.subject(); let service_name = endpoint.subject();
Ok((service_name, Arc::new(router), false)) Ok(PreparedEngine {
service_name,
engine,
inspect_template: false,
_cache_dir: cache_dir,
})
} }
EngineConfig::StaticFull { EngineConfig::StaticFull {
service_name, service_name,
engine, engine,
card: _card, card: _card,
} => { } => {
tracing::debug!("Model: {service_name}"); tracing::debug!("Model: {service_name} with engine pre-processing");
let engine = Arc::new(StreamingEngineAdapter::new(engine)); let engine = Arc::new(StreamingEngineAdapter::new(engine));
Ok((service_name, engine, false)) Ok(PreparedEngine {
service_name,
engine,
inspect_template: false,
_cache_dir: None,
})
} }
EngineConfig::StaticCore { EngineConfig::StaticCore {
service_name, service_name,
...@@ -111,8 +178,13 @@ pub async fn prepare_engine( ...@@ -111,8 +178,13 @@ pub async fn prepare_engine(
>(&card, inner_engine) >(&card, inner_engine)
.await?; .await?;
tracing::debug!("Model: {service_name} with pre-processing"); tracing::debug!("Model: {service_name} with Dynamo pre-processing");
Ok((service_name, pipeline, true)) Ok(PreparedEngine {
service_name,
engine: pipeline,
inspect_template: true,
_cache_dir: None,
})
} }
EngineConfig::None => unreachable!(), EngineConfig::None => unreachable!(),
} }
......
...@@ -13,16 +13,16 @@ ...@@ -13,16 +13,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc; use std::{pin::Pin, sync::Arc};
use dynamo_llm::{ use dynamo_llm::{
backend::Backend, backend::Backend,
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
http::service::discovery::{ModelEntry, ModelNetworkName}, http::service::discovery::{ModelEntry, ModelNetworkName},
key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager}, key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
model_card, model_card::{self, ModelDeploymentCard},
model_type::ModelType, model_type::ModelType,
preprocessor::OpenAIPreprocessor, preprocessor::{BackendInput, BackendOutput},
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
...@@ -31,9 +31,10 @@ use dynamo_llm::{ ...@@ -31,9 +31,10 @@ use dynamo_llm::{
}, },
}; };
use dynamo_runtime::pipeline::{ use dynamo_runtime::pipeline::{
network::Ingress, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source, network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
}; };
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime}; use dynamo_runtime::{component::Endpoint, engine::AsyncEngineStream};
use dynamo_runtime::{protocols::Endpoint as EndpointId, DistributedRuntime};
use crate::EngineConfig; use crate::EngineConfig;
...@@ -42,47 +43,65 @@ pub async fn run( ...@@ -42,47 +43,65 @@ pub async fn run(
path: String, path: String,
engine_config: EngineConfig, engine_config: EngineConfig,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// This will attempt to connect to NATS and etcd
let cancel_token = distributed_runtime.primary_token().clone(); let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: Endpoint = path.parse()?; let endpoint_id: EndpointId = path.parse()?;
let etcd_client = distributed_runtime.etcd_client();
let (ingress, service_name, mut card, requires_preprocessing) = match engine_config { let (rt_fut, mut card) = match engine_config {
EngineConfig::StaticFull { EngineConfig::StaticFull {
service_name, service_name,
engine, engine,
card, mut card,
} => { } => {
let engine = Arc::new(StreamingEngineAdapter::new(engine)); let engine = Arc::new(StreamingEngineAdapter::new(engine));
(Ingress::for_engine(engine)?, service_name, card, false) card.requires_preprocessing = false;
let ingress_chat = Ingress::<
Context<NvCreateChatCompletionRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<NvCreateChatCompletionStreamResponse>>>>,
>::for_engine(engine)?;
let endpoint_chat = register(
distributed_runtime.clone(),
&service_name,
endpoint_id,
*card.clone(),
ModelType::Chat,
)
.await?;
let fut_chat = endpoint_chat
.endpoint_builder()
.handler(ingress_chat)
.start();
(fut_chat, card)
} }
EngineConfig::StaticCore { EngineConfig::StaticCore {
service_name, service_name,
engine: inner_engine, engine: inner_engine,
card, mut card,
} => { } => {
let frontend = SegmentSource::< // Pre-processing is done ingress-side, so it should be already done.
SingleIn<NvCreateChatCompletionRequest>, let frontend =
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, SegmentSource::<SingleIn<BackendInput>, ManyOut<Annotated<BackendOutput>>>::new();
>::new();
let preprocessor = OpenAIPreprocessor::new(*card.clone())
.await?
.into_operator();
let backend = Backend::from_mdc(*card.clone()).await?.into_operator(); let backend = Backend::from_mdc(*card.clone()).await?.into_operator();
let engine = ServiceBackend::from_engine(inner_engine); let engine = ServiceBackend::from_engine(inner_engine);
let pipeline = frontend let pipeline = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())? .link(backend.forward_edge())?
.link(engine)? .link(engine)?
.link(backend.backward_edge())? .link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?; .link(frontend)?;
// TODO: switch last 'false' to 'true' once we have ingress-side pre-processing let ingress = Ingress::for_pipeline(pipeline)?;
(Ingress::for_pipeline(pipeline)?, service_name, card, false) card.requires_preprocessing = true;
let endpoint = register(
distributed_runtime.clone(),
&service_name,
endpoint_id,
*card.clone(),
ModelType::Backend,
)
.await?;
(endpoint.endpoint_builder().handler(ingress).start(), card)
} }
EngineConfig::Dynamic(_) => { EngineConfig::Dynamic(_) => {
anyhow::bail!("Cannot use endpoint for both in and out"); anyhow::bail!("Cannot use endpoint for both in and out");
...@@ -90,12 +109,32 @@ pub async fn run( ...@@ -90,12 +109,32 @@ pub async fn run(
EngineConfig::None => unreachable!(), EngineConfig::None => unreachable!(),
}; };
let model_registration = ModelEntry { tokio::select! {
name: service_name.to_string(), _ = rt_fut => {
endpoint: endpoint_id.clone(), tracing::debug!("Endpoint ingress ended");
model_type: ModelType::Chat, }
}; _ = cancel_token.cancelled() => {
}
}
// Cleanup on shutdown
if let Err(err) = card
.delete_from_nats(distributed_runtime.nats_client())
.await
{
tracing::error!(%err, "delete_from_nats error on shutdown");
}
Ok(())
}
async fn register(
distributed_runtime: DistributedRuntime,
service_name: &str,
endpoint_id: EndpointId,
mut card: ModelDeploymentCard,
model_type: ModelType,
) -> anyhow::Result<Endpoint> {
let component = distributed_runtime let component = distributed_runtime
.namespace(&endpoint_id.namespace)? .namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?; .component(&endpoint_id.component)?;
...@@ -105,25 +144,30 @@ pub async fn run( ...@@ -105,25 +144,30 @@ pub async fn run(
.await? .await?
.endpoint(&endpoint_id.name); .endpoint(&endpoint_id.name);
if let Some(etcd_client) = etcd_client { // A static component doesn't have an etcd_client because it doesn't need to register
if let Some(etcd_client) = distributed_runtime.etcd_client() {
// Store model config files in NATS object store // Store model config files in NATS object store
let nats_client = distributed_runtime.nats_client(); let nats_client = distributed_runtime.nats_client();
card.move_to_nats(nats_client.clone()).await?; card.move_to_nats(nats_client.clone()).await?;
// Publish the Model Deployment Card to etcd // Publish the Model Deployment Card to etcd
let kvstore: Box<dyn KeyValueStore> = let kvstore: Box<dyn KeyValueStore> =
Box::new(EtcdStorage::new(etcd_client.clone(), endpoint_id)); Box::new(EtcdStorage::new(etcd_client.clone(), endpoint_id.clone()));
let card_store = Arc::new(KeyValueStoreManager::new(kvstore)); let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
card.requires_preprocessing = requires_preprocessing; // Not used yet. Soon.
let key = card.slug().to_string(); let key = card.slug().to_string();
card_store card_store
.publish(model_card::BUCKET_NAME, None, &key, &mut *card.clone()) .publish(model_card::BUCKET_NAME, None, &key, &mut card)
.await?; .await?;
// Publish our ModelEntry to etcd. This allows ingress to find the model card. // Publish our ModelEntry to etcd. This allows ingress to find the model card.
// (Why don't we put the model card directly under this key?) // (Why don't we put the model card directly under this key?)
let network_name = ModelNetworkName::from_local(&endpoint, etcd_client.lease_id()); let network_name = ModelNetworkName::from_local(&endpoint, etcd_client.lease_id());
tracing::debug!("Registering with etcd as {network_name}"); tracing::debug!("Registering with etcd as {network_name}");
let model_registration = ModelEntry {
name: service_name.to_string(),
endpoint: endpoint_id.clone(),
model_type,
};
etcd_client etcd_client
.kv_create( .kv_create(
network_name.to_string(), network_name.to_string(),
...@@ -132,22 +176,5 @@ pub async fn run( ...@@ -132,22 +176,5 @@ pub async fn run(
) )
.await?; .await?;
} }
Ok(endpoint)
let rt_fut = endpoint.endpoint_builder().handler(ingress).start();
tokio::select! {
_ = rt_fut => {
tracing::debug!("Endpoint ingress ended");
}
_ = cancel_token.cancelled() => {
}
}
// Cleanup on shutdown
if let Err(err) = card
.delete_from_nats(distributed_runtime.nats_client())
.await
{
tracing::error!(%err, "delete_from_nats error on shutdown");
}
Ok(())
} }
...@@ -17,10 +17,10 @@ use std::sync::Arc; ...@@ -17,10 +17,10 @@ use std::sync::Arc;
use crate::input::common; use crate::input::common;
use crate::{EngineConfig, Flags}; use crate::{EngineConfig, Flags};
use dynamo_llm::http::service::ModelManager;
use dynamo_llm::{ use dynamo_llm::{
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
http::service::{discovery, service_v2}, http::service::{discovery, service_v2},
model_type::ModelType,
request_template::RequestTemplate, request_template::RequestTemplate,
types::{ types::{
openai::chat_completions::{ openai::chat_completions::{
...@@ -29,6 +29,7 @@ use dynamo_llm::{ ...@@ -29,6 +29,7 @@ use dynamo_llm::{
openai::completions::{CompletionRequest, CompletionResponse}, openai::completions::{CompletionRequest, CompletionResponse},
}, },
}; };
use dynamo_runtime::transports::etcd;
use dynamo_runtime::{DistributedRuntime, Runtime}; use dynamo_runtime::{DistributedRuntime, Runtime};
/// Build and run an HTTP service /// Build and run an HTTP service
...@@ -57,17 +58,13 @@ pub async fn run( ...@@ -57,17 +58,13 @@ pub async fn run(
let network_prefix = component.service_name(); let network_prefix = component.service_name();
// Listen for models registering themselves in etcd, add them to HTTP service // Listen for models registering themselves in etcd, add them to HTTP service
let state = Arc::new(discovery::ModelWatchState { run_watcher(
prefix: network_prefix.clone(), distributed_runtime.clone(),
model_type: ModelType::Chat, http_service.model_manager().clone(),
manager: http_service.model_manager().clone(), etcd_client.clone(),
drt: distributed_runtime.clone(), &network_prefix,
}); )
tracing::info!("Waiting for remote model at {network_prefix}"); .await?;
let models_watcher =
etcd_client.kv_get_and_watch_prefix(network_prefix).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
let _watcher_task = tokio::spawn(discovery::model_watcher(state, receiver));
} }
None => { None => {
// Static endpoints don't need discovery // Static endpoints don't need discovery
...@@ -109,3 +106,23 @@ pub async fn run( ...@@ -109,3 +106,23 @@ pub async fn run(
} }
http_service.run(runtime.primary_token()).await http_service.run(runtime.primary_token()).await
} }
/// Spawns a task that watches for new models in etcd at network_prefix,
/// and registers them with the ModelManager so that the HTTP service can use them.
async fn run_watcher(
distributed_runtime: DistributedRuntime,
model_manager: ModelManager,
etcd_client: etcd::Client,
network_prefix: &str,
) -> anyhow::Result<()> {
let state = Arc::new(discovery::ModelWatchState {
prefix: network_prefix.to_string(),
manager: model_manager,
drt: distributed_runtime.clone(),
});
tracing::info!("Watching for remote model at {network_prefix}");
let models_watcher = etcd_client.kv_get_and_watch_prefix(network_prefix).await?;
let (_prefix, _watcher, receiver) = models_watcher.dissolve();
let _watcher_task = tokio::spawn(discovery::model_watcher(state, receiver));
Ok(())
}
...@@ -36,17 +36,13 @@ pub async fn run( ...@@ -36,17 +36,13 @@ pub async fn run(
template: Option<RequestTemplate>, template: Option<RequestTemplate>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let cancel_token = runtime.primary_token(); let cancel_token = runtime.primary_token();
let (service_name, engine, inspect_template): ( let prepared_engine = common::prepare_engine(runtime, flags, engine_config).await?;
String,
OpenAIChatCompletionsStreamingEngine,
bool,
) = common::prepare_engine(runtime, flags, engine_config).await?;
main_loop( main_loop(
cancel_token, cancel_token,
&service_name, &prepared_engine.service_name,
engine, prepared_engine.engine,
single_prompt, single_prompt,
inspect_template, prepared_engine.inspect_template,
template, template,
) )
.await .await
......
...@@ -242,10 +242,6 @@ pub async fn run( ...@@ -242,10 +242,6 @@ pub async fn run(
"out=echo_core need to find the tokenizer. Pass flag --model-path <path>" "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
); );
}; };
// TODO: Switch to `true` once pre-processing moves ingress side
card.requires_preprocessing = false;
EngineConfig::StaticCore { EngineConfig::StaticCore {
service_name: card.service_name.clone(), service_name: card.service_name.clone(),
engine: dynamo_llm::engines::make_engine_core(), engine: dynamo_llm::engines::make_engine_core(),
......
...@@ -362,9 +362,11 @@ async fn list_models( ...@@ -362,9 +362,11 @@ async fn list_models(
let mut models = Vec::new(); let mut models = Vec::new();
let model_types = match model_type { let model_types = match model_type {
Some(mt) => vec![mt], Some(mt) => vec![mt],
None => ModelType::all(), None => vec![ModelType::Chat, ModelType::Completion],
}; };
// TODO: Do we need the model_type in etcd key?
for mt in model_types { for mt in model_types {
let prefix = format!("{}/models/{}/", component.etcd_path(), mt.as_str(),); let prefix = format!("{}/models/{}/", component.etcd_path(), mt.as_str(),);
......
...@@ -1064,6 +1064,7 @@ dependencies = [ ...@@ -1064,6 +1064,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"strum", "strum",
"tempfile",
"thiserror 2.0.12", "thiserror 2.0.12",
"tokenizers", "tokenizers",
"tokio", "tokio",
......
...@@ -50,6 +50,7 @@ rand = { workspace = true } ...@@ -50,6 +50,7 @@ rand = { workspace = true }
prometheus = { workspace = true } prometheus = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-stream = { workspace = true } tokio-stream = { workspace = true }
......
...@@ -21,20 +21,27 @@ use tokio::sync::mpsc::Receiver; ...@@ -21,20 +21,27 @@ use tokio::sync::mpsc::Receiver;
use dynamo_runtime::{ use dynamo_runtime::{
component::{self, ComponentEndpointInfo}, component::{self, ComponentEndpointInfo},
pipeline::network::egress::push_router::PushRouter, pipeline::{
network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource,
ServiceBackend, SingleIn, Source,
},
protocols::{self, annotated::Annotated}, protocols::{self, annotated::Annotated},
raise,
slug::Slug, slug::Slug,
transports::etcd::{self, KeyValue, WatchEvent}, transports::etcd::{self, KeyValue, WatchEvent},
DistributedRuntime, DistributedRuntime,
}; };
use super::ModelManager; use super::ModelManager;
use crate::model_type::ModelType;
use crate::protocols::openai::chat_completions::{ use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
}; };
use crate::protocols::openai::completions::{CompletionRequest, CompletionResponse}; use crate::protocols::openai::completions::{CompletionRequest, CompletionResponse};
use crate::{
backend::Backend,
model_type::ModelType,
preprocessor::{BackendInput, OpenAIPreprocessor},
protocols::common::llm_backend::LLMEngineOutput,
};
use crate::{ use crate::{
key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager}, key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
model_card::{self, ModelDeploymentCard}, model_card::{self, ModelDeploymentCard},
...@@ -58,6 +65,12 @@ pub struct ModelEntry { ...@@ -58,6 +65,12 @@ pub struct ModelEntry {
} }
impl ModelEntry { impl ModelEntry {
pub fn requires_preprocessing(&self) -> bool {
matches!(self.model_type, ModelType::Backend)
}
/// Fetch the ModelDeploymentCard from NATS.
/// This does not touch it's fields so you may need to call move_from_nats on it.
pub async fn load_mdc( pub async fn load_mdc(
&self, &self,
endpoint_id: protocols::Endpoint, endpoint_id: protocols::Endpoint,
...@@ -108,23 +121,30 @@ impl ModelNetworkName { ...@@ -108,23 +121,30 @@ impl ModelNetworkName {
) )
} }
/// Fetch the ModelEntry from etcd.
pub async fn load_entry(&self, etcd_client: etcd::Client) -> anyhow::Result<ModelEntry> {
let mut model_entries = etcd_client.kv_get(self.to_string(), None).await?;
if model_entries.is_empty() {
anyhow::bail!("No ModelEntry in etcd for key {self}");
}
let model_entry = model_entries.remove(0);
serde_json::from_slice(model_entry.value()).with_context(|| {
format!(
"Error deserializing JSON. Key={self}. JSON={}",
model_entry.value_str().unwrap_or("INVALID UTF-8")
)
})
}
/// Fetch the ModelDeploymentCard from NATS.
/// This does not touch it's fields so you may need to call move_from_nats on it.
/// TODO We have potentially two for each endpoint, one Chat and one Completion.
pub async fn load_mdc( pub async fn load_mdc(
&self, &self,
endpoint_id: protocols::Endpoint, endpoint_id: protocols::Endpoint,
etcd_client: etcd::Client, etcd_client: etcd::Client,
) -> anyhow::Result<ModelDeploymentCard> { ) -> anyhow::Result<ModelDeploymentCard> {
let network_name = self; let entry = self.load_entry(etcd_client.clone()).await?;
let model_entries = etcd_client.kv_get(network_name.to_string(), None).await?;
if model_entries.is_empty() {
anyhow::bail!("No ModelEntry in etcd for key {network_name}");
}
let entry: ModelEntry =
serde_json::from_slice(model_entries[0].value()).with_context(|| {
format!(
"Error deserializing JSON. Key={network_name}. JSON={}",
model_entries[0].value_str().unwrap_or("INVALID UTF-8")
)
})?;
entry.load_mdc(endpoint_id, etcd_client).await entry.load_mdc(endpoint_id, etcd_client).await
} }
} }
...@@ -143,7 +163,6 @@ impl std::fmt::Display for ModelNetworkName { ...@@ -143,7 +163,6 @@ impl std::fmt::Display for ModelNetworkName {
pub struct ModelWatchState { pub struct ModelWatchState {
pub prefix: String, pub prefix: String,
pub model_type: ModelType,
pub manager: ModelManager, pub manager: ModelManager,
pub drt: DistributedRuntime, pub drt: DistributedRuntime,
} }
...@@ -154,16 +173,6 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver< ...@@ -154,16 +173,6 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver<
while let Some(event) = events_rx.recv().await { while let Some(event) = events_rx.recv().await {
match event { match event {
WatchEvent::Put(kv) => { WatchEvent::Put(kv) => {
let key = match kv.key_str() {
Ok(key) => key,
Err(err) => {
tracing::error!(%err, ?kv, "Invalid UTF8 in model key");
continue;
}
};
tracing::debug!(key, "adding model");
// model_entry.name is the service name (e.g. "Llama-3.2-3B-Instruct")
let model_entry = match serde_json::from_slice::<ModelEntry>(kv.value()) { let model_entry = match serde_json::from_slice::<ModelEntry>(kv.value()) {
Ok(model_entry) => model_entry, Ok(model_entry) => model_entry,
Err(err) => { Err(err) => {
...@@ -179,18 +188,18 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver< ...@@ -179,18 +188,18 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver<
continue; continue;
} }
match handle_put(model_entry, state.clone()).await { match handle_put(&model_entry, state.clone()).await {
Ok((model_name, model_type)) => { Ok(()) => {
tracing::info!("added {} model: {}", model_type, model_name); tracing::info!(model_name = model_entry.name, "added model");
} }
Err(e) => { Err(e) => {
tracing::error!("error adding model: {}", e); tracing::error!(%e, "error adding model {}", model_entry.name);
} }
} }
} }
WatchEvent::Delete(kv) => match handle_delete(&kv, state.clone()).await { WatchEvent::Delete(kv) => match handle_delete(&kv, state.clone()).await {
Ok((model_name, model_type)) => { Ok(model_name) => {
tracing::info!("removed {} model: {}", model_type, model_name); tracing::info!("removed model {}", model_name);
} }
Err(e) => { Err(e) => {
tracing::error!("error removing model: {}", e); tracing::error!("error removing model: {}", e);
...@@ -200,41 +209,24 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver< ...@@ -200,41 +209,24 @@ pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver<
} }
} }
async fn handle_delete( async fn handle_delete(kv: &KeyValue, state: Arc<ModelWatchState>) -> anyhow::Result<&str> {
kv: &KeyValue,
state: Arc<ModelWatchState>,
) -> anyhow::Result<(&str, ModelType)> {
let key = kv.key_str()?; let key = kv.key_str()?;
tracing::debug!(key, "removing model"); tracing::debug!(key, "removing model");
let model_name = key.trim_start_matches(&state.prefix); let model_name = key.trim_start_matches(&state.prefix);
match state.model_type { // Ignore the errors because model could be either type
ModelType::Chat => state.manager.remove_chat_completions_model(model_name)?, let _ = state.manager.remove_chat_completions_model(model_name);
ModelType::Completion => state.manager.remove_completions_model(model_name)?, let _ = state.manager.remove_completions_model(model_name);
};
Ok((model_name, state.model_type)) Ok(model_name)
} }
// Handles a PUT event from etcd, this usually means adding a new model to the list of served // Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models. // models.
// //
// If this method errors, for the near term, we will delete the offending key. // If this method errors, for the near term, we will delete the offending key.
async fn handle_put( async fn handle_put(model_entry: &ModelEntry, state: Arc<ModelWatchState>) -> anyhow::Result<()> {
model_entry: ModelEntry,
state: Arc<ModelWatchState>,
) -> anyhow::Result<(String, ModelType)> {
if model_entry.model_type != state.model_type {
raise!(
"model type mismatch: {} != {}",
model_entry.model_type,
state.model_type
);
}
match state.model_type {
ModelType::Chat => {
let endpoint_id = model_entry.endpoint.clone(); let endpoint_id = model_entry.endpoint.clone();
let client = state let client = state
.drt .drt
...@@ -248,51 +240,101 @@ async fn handle_put( ...@@ -248,51 +240,101 @@ async fn handle_put(
// Should be impossible because we only get here on an etcd event // Should be impossible because we only get here on an etcd event
anyhow::bail!("Missing etcd_client"); anyhow::bail!("Missing etcd_client");
}; };
let mdc = match model_entry.load_mdc(endpoint_id, etcd_client).await { let card = match model_entry.load_mdc(endpoint_id, etcd_client).await {
Ok(mdc) => Some(mdc), Ok(card) => {
tracing::debug!(card.display_name, "adding model");
Some(card)
}
Err(err) => { Err(err) => {
// `dynamo serve` isn't using MDC yet so can't be an error // `dynamo serve` isn't using MDC yet so can't be an error
tracing::info!(%err, "load_mdc did not complete"); tracing::info!(%err, "load_mdc did not complete");
None None
} }
}; };
match model_entry.model_type {
ModelType::Backend => {
// A Backend model expects pre-processed requests meaning it's up to us whether we
// handle Chat or Completions requests, so handle both.
let Some(mut card) = card else {
anyhow::bail!("Missing model deployment card");
};
// Download tokenizer.json etc to local disk
// This cache_dir is a tempfile::TempDir will be deleted on drop. I _think_
// OpenAIPreprocessor::new loads the files, so we can delete them after this
// function. Needs checking carefully, possibly we need to store it in state.
let _cache_dir = Some(card.move_from_nats(state.drt.nats_client()).await?);
let frontend = SegmentSource::<
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<BackendInput, Annotated<LLMEngineOutput>>::from_client(
client.clone(),
RouterMode::Random, // TODO how do we configure this?
)
.await?;
if mdc.is_some() && mdc.as_ref().unwrap().requires_preprocessing { let chat_engine = frontend
// Note requires_preprocessing is never true in our code right now .link(preprocessor.forward_edge())?
todo!("Ingress-side pre-processing not supported yet"); .link(backend.forward_edge())?
} else { .link(ServiceBackend::from_engine(Arc::new(router)))?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
state
.manager
.add_chat_completions_model(&model_entry.name, chat_engine)?;
let frontend = SegmentSource::<
SingleIn<CompletionRequest>,
ManyOut<Annotated<CompletionResponse>>,
>::new();
let preprocessor = OpenAIPreprocessor::new(card.clone()).await?.into_operator();
let backend = Backend::from_mdc(card.clone()).await?.into_operator();
let router = PushRouter::<BackendInput, Annotated<LLMEngineOutput>>::from_client(
client,
RouterMode::Random, // TODO how do we configure this?
)
.await?;
let completions_engine = frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(ServiceBackend::from_engine(Arc::new(router)))?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?;
state
.manager
.add_completions_model(&model_entry.name, completions_engine)?;
}
ModelType::Chat => {
let push_router = PushRouter::< let push_router = PushRouter::<
NvCreateChatCompletionRequest, NvCreateChatCompletionRequest,
Annotated<NvCreateChatCompletionStreamResponse>, Annotated<NvCreateChatCompletionStreamResponse>,
>::from_client(client, Default::default()) >::from_client(client, Default::default())
.await?; .await?;
let engine = Arc::new(push_router);
state state
.manager .manager
.add_chat_completions_model(&model_entry.name, Arc::new(push_router))?; .add_chat_completions_model(&model_entry.name, engine)?;
}
} }
ModelType::Completion => { ModelType::Completion => {
let client = state
.drt
.namespace(model_entry.endpoint.namespace)?
.component(model_entry.endpoint.component)?
.endpoint(model_entry.endpoint.name)
.client()
.await?;
// TODO: Handle pre-processing once it moves ingress-side
let push_router = let push_router =
PushRouter::<CompletionRequest, Annotated<CompletionResponse>>::from_client( PushRouter::<CompletionRequest, Annotated<CompletionResponse>>::from_client(
client, client,
Default::default(), Default::default(),
) )
.await?; .await?;
let engine = Arc::new(push_router);
state state
.manager .manager
.add_completions_model(&model_entry.name, Arc::new(push_router))?; .add_completions_model(&model_entry.name, engine)?;
} }
} }
Ok((model_entry.name, state.model_type)) Ok(())
} }
...@@ -128,7 +128,7 @@ pub struct ModelDeploymentCard { ...@@ -128,7 +128,7 @@ pub struct ModelDeploymentCard {
/// Does this model expect preprocessing (tokenization, etc) to be already done? /// Does this model expect preprocessing (tokenization, etc) to be already done?
/// If this is true they get a BackendInput JSON. If this is false they get /// If this is true they get a BackendInput JSON. If this is false they get
/// a ChatCompletionRequest JSON. /// an NvCreateChatCompletionRequest JSON.
#[serde(default)] #[serde(default)]
pub requires_preprocessing: bool, pub requires_preprocessing: bool,
} }
...@@ -277,6 +277,58 @@ impl ModelDeploymentCard { ...@@ -277,6 +277,58 @@ impl ModelDeploymentCard {
Ok(()) Ok(())
} }
/// Move the files this MDC uses from the NATS object store to local disk.
/// Updates the URI's to point to the created files.
///
/// The returned TempDir must be kept alive, it cleans up on drop.
pub async fn move_from_nats(&mut self, nats_client: nats::Client) -> Result<tempfile::TempDir> {
let nats_addr = nats_client.addr();
let bucket_name = self.slug();
let target_dir = tempfile::TempDir::with_prefix(bucket_name.to_string())?;
tracing::debug!(
nats_addr,
%bucket_name,
target_dir = %target_dir.path().display(),
"Downloading model deployment card fields from NATS"
);
if let Some(ModelInfoType::HfConfigJson(ref src_url)) = self.model_info {
if nats::is_nats_url(src_url) {
let target = target_dir.path().join("config.json");
nats_client
.object_store_download(Url::parse(src_url)?, &target)
.await?;
self.model_info = Some(ModelInfoType::HfConfigJson(target.display().to_string()));
}
}
if let Some(PromptFormatterArtifact::HfTokenizerConfigJson(ref src_url)) =
self.prompt_formatter
{
if nats::is_nats_url(src_url) {
let target = target_dir.path().join("tokenizer_config.json");
nats_client
.object_store_download(Url::parse(src_url)?, &target)
.await?;
self.prompt_formatter = Some(PromptFormatterArtifact::HfTokenizerConfigJson(
target.display().to_string(),
));
}
}
if let Some(TokenizerKind::HfTokenizerJson(ref src_url)) = self.tokenizer {
if nats::is_nats_url(src_url) {
let target = target_dir.path().join("tokenizer.json");
nats_client
.object_store_download(Url::parse(src_url)?, &target)
.await?;
self.tokenizer = Some(TokenizerKind::HfTokenizerJson(target.display().to_string()));
}
}
Ok(target_dir)
}
/// Delete this card from the key-value store and it's URLs from the object store /// Delete this card from the key-value store and it's URLs from the object store
pub async fn delete_from_nats(&mut self, nats_client: nats::Client) -> Result<()> { pub async fn delete_from_nats(&mut self, nats_client: nats::Client) -> Result<()> {
let nats_addr = nats_client.addr(); let nats_addr = nats_client.addr();
......
...@@ -18,8 +18,12 @@ use strum::Display; ...@@ -18,8 +18,12 @@ use strum::Display;
#[derive(Copy, Debug, Clone, Display, Serialize, Deserialize, Eq, PartialEq)] #[derive(Copy, Debug, Clone, Display, Serialize, Deserialize, Eq, PartialEq)]
pub enum ModelType { pub enum ModelType {
// Chat Completions API
Chat, Chat,
/// Older completions API
Completion, Completion,
// Pre-processed requests
Backend,
} }
impl ModelType { impl ModelType {
...@@ -27,10 +31,11 @@ impl ModelType { ...@@ -27,10 +31,11 @@ impl ModelType {
match self { match self {
Self::Chat => "chat", Self::Chat => "chat",
Self::Completion => "completion", Self::Completion => "completion",
Self::Backend => "backend",
} }
} }
pub fn all() -> Vec<Self> { pub fn all() -> Vec<Self> {
vec![Self::Chat, Self::Completion] vec![Self::Chat, Self::Completion, Self::Backend]
} }
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use anyhow::{Ok, Result}; use anyhow::{Context, Ok, Result};
use minijinja::Environment; use minijinja::Environment;
use crate::model_card::model::{ModelDeploymentCard, PromptContextMixin, PromptFormatterArtifact}; use crate::model_card::model::{ModelDeploymentCard, PromptContextMixin, PromptFormatterArtifact};
...@@ -35,7 +35,8 @@ impl PromptFormatter { ...@@ -35,7 +35,8 @@ impl PromptFormatter {
.ok_or(anyhow::anyhow!("MDC does not contain a prompt formatter"))? .ok_or(anyhow::anyhow!("MDC does not contain a prompt formatter"))?
{ {
PromptFormatterArtifact::HfTokenizerConfigJson(file) => { PromptFormatterArtifact::HfTokenizerConfigJson(file) => {
let content = std::fs::read_to_string(file)?; let content = std::fs::read_to_string(&file)
.with_context(|| format!("fs:read_to_string '{file}'"))?;
let config: ChatTemplate = serde_json::from_str(&content)?; let config: ChatTemplate = serde_json::from_str(&content)?;
Self::from_parts( Self::from_parts(
config, config,
......
...@@ -155,7 +155,7 @@ impl Component { ...@@ -155,7 +155,7 @@ impl Component {
pub fn service_name(&self) -> String { pub fn service_name(&self) -> String {
let service_name = format!("{}_{}", self.namespace.name(), self.name); let service_name = format!("{}_{}", self.namespace.name(), self.name);
Slug::slugify_unique(&service_name).to_string() Slug::slugify(&service_name).to_string()
} }
pub fn path(&self) -> String { pub fn path(&self) -> String {
......
...@@ -36,6 +36,7 @@ use derive_builder::Builder; ...@@ -36,6 +36,7 @@ use derive_builder::Builder;
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::fs::File as TokioFile; use tokio::fs::File as TokioFile;
use tokio::io::AsyncRead;
use tokio::time; use tokio::time;
use url::Url; use url::Url;
use validator::{Validate, ValidationError}; use validator::{Validate, ValidationError};
...@@ -157,6 +158,40 @@ impl Client { ...@@ -157,6 +158,40 @@ impl Client {
Ok(()) Ok(())
} }
/// Download file from NATS at this URL
pub async fn object_store_download(
&self,
nats_url: Url,
filepath: &Path,
) -> anyhow::Result<()> {
let mut disk_file = TokioFile::create(filepath).await?;
let (bucket_name, key) = url_to_bucket_and_key(&nats_url)?;
let context = self.jetstream();
let bucket = match context.get_object_store(&bucket_name).await {
Ok(bucket) => bucket,
Err(err) if err.to_string().contains("stream not found") => {
// err.source() is GetStreamError, which has a kind() which
// is GetStreamErrorKind::JetStream which wraps a jetstream::Error
// which has code 404. Phew. So yeah check the string for now.
anyhow::bail!("NATS get_object_store bucket does not exist: {bucket_name}. {err}.");
}
Err(err) => {
anyhow::bail!("NATS get_object_store error: {err}");
}
};
let mut obj_reader = bucket.get(&key).await.map_err(|e| {
anyhow::anyhow!(
"Failed downloading from bucket / object store {bucket_name}/{key}: {e}"
)
})?;
let _bytes_copied = tokio::io::copy(&mut obj_reader, &mut disk_file).await?;
Ok(())
}
/// Delete a bucket and all it's contents from the NATS object store /// Delete a bucket and all it's contents from the NATS object store
pub async fn object_store_delete_bucket(&self, bucket_name: &str) -> anyhow::Result<()> { pub async fn object_store_delete_bucket(&self, bucket_name: &str) -> anyhow::Result<()> {
let context = self.jetstream(); let context = self.jetstream();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment