Unverified Commit 641234cd authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Prevent duplicate components with different models. (#1103)

Each namespace is for a single pipeline, so a component must be model-unique. The means we can have several components with the same name running the same model (data parallel), their traffic will be routed according to `--router-mode`, but we cannot have several components with the same name running different models.

Add an `ensure_unique` check to prevent that happening.
parent 770c230c
......@@ -79,8 +79,8 @@ pub async fn prepare_engine(
let Some(etcd_client) = distributed_runtime.etcd_client() else {
anyhow::bail!("Cannot run distributed components without etcd");
};
let network_entry = network_name.load_entry(etcd_client.clone()).await?;
let mut card = network_entry.load_mdc(endpoint_id, etcd_client).await?;
let network_entry = network_name.load_entry(&etcd_client).await?;
let mut card = network_entry.load_mdc(endpoint_id, &etcd_client).await?;
let engine: OpenAIChatCompletionsStreamingEngine = match network_entry.model_type {
ModelType::Backend => {
......
......@@ -66,7 +66,7 @@ impl ModelEntry {
pub async fn load_mdc(
&self,
endpoint_id: protocols::Endpoint,
etcd_client: etcd::Client,
etcd_client: &etcd::Client,
) -> anyhow::Result<ModelDeploymentCard> {
let kvstore: Box<dyn KeyValueStore> =
Box::new(EtcdStorage::new(etcd_client.clone(), endpoint_id));
......@@ -114,7 +114,7 @@ impl ModelNetworkName {
}
/// Fetch the ModelEntry from etcd.
pub async fn load_entry(&self, etcd_client: etcd::Client) -> anyhow::Result<ModelEntry> {
pub async fn load_entry(&self, etcd_client: &etcd::Client) -> anyhow::Result<ModelEntry> {
let mut model_entries = etcd_client.kv_get(self.to_string(), None).await?;
if model_entries.is_empty() {
anyhow::bail!("No ModelEntry in etcd for key {self}");
......@@ -134,9 +134,9 @@ impl ModelNetworkName {
pub async fn load_mdc(
&self,
endpoint_id: protocols::Endpoint,
etcd_client: etcd::Client,
etcd_client: &etcd::Client,
) -> anyhow::Result<ModelDeploymentCard> {
let entry = self.load_entry(etcd_client.clone()).await?;
let entry = self.load_entry(etcd_client).await?;
entry.load_mdc(endpoint_id, etcd_client).await
}
}
......@@ -262,7 +262,7 @@ impl ModelWatcher {
// Should be impossible because we only get here on an etcd event
anyhow::bail!("Missing etcd_client");
};
let card = match model_entry.load_mdc(endpoint_id, etcd_client).await {
let card = match model_entry.load_mdc(endpoint_id, &etcd_client).await {
Ok(card) => {
tracing::debug!(card.display_name, "adding model");
Some(card)
......
......@@ -5,7 +5,7 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use dynamo_runtime::component::Endpoint;
use dynamo_runtime::component::{Component, Endpoint};
use dynamo_runtime::traits::DistributedRuntimeProvider;
use crate::http::service::discovery::{ModelEntry, ModelNetworkName};
......@@ -126,6 +126,9 @@ impl LocalModel {
let Some(etcd_client) = endpoint.drt().etcd_client() else {
anyhow::bail!("Cannot attach to static endpoint");
};
self.ensure_unique(endpoint.component(), self.display_name())
.await?;
// Store model config files in NATS object store
let nats_client = endpoint.drt().nats_client();
self.card.move_to_nats(nats_client.clone()).await?;
......@@ -157,4 +160,25 @@ impl LocalModel {
)
.await
}
/// Ensure that each component serves only one model.
/// We can have multiple instances of the same model running using the same component name
/// (they get load balanced, and are differentiated in etcd by their lease_id).
/// We cannot have multiple models with the same component name.
///
/// Returns an error if there is already a component by this name serving a different model.
async fn ensure_unique(&self, component: &Component, model_name: &str) -> anyhow::Result<()> {
let Some(etcd_client) = component.drt().etcd_client() else {
// A static component is necessarily unique, it cannot register
return Ok(());
};
for endpoint_info in component.list_endpoints().await? {
let network_name: ModelNetworkName = (&endpoint_info).into();
let entry = network_name.load_entry(&etcd_client).await?;
if entry.name != model_name {
anyhow::bail!("Duplicate component. Attempt to register model {model_name} at {component}, which is already used by {network_name} running model {}.", entry.name);
}
}
Ok(())
}
}
......@@ -179,10 +179,28 @@ impl Component {
}
}
/// Get keys from etcd on the slug, splitting the endpoints and only returning the
/// set of unique endpoints.
pub async fn list_endpoints(&self) -> Vec<Endpoint> {
unimplemented!("endpoints")
pub async fn list_endpoints(&self) -> anyhow::Result<Vec<ComponentEndpointInfo>> {
let Some(etcd_client) = self.drt.etcd_client() else {
return Ok(vec![]);
};
let mut out = vec![];
// The extra slash is important to only list exact component matches, not substrings.
for kv in etcd_client
.kv_get_prefix(format!("{}/", self.etcd_path()))
.await?
{
let val = match serde_json::from_slice::<ComponentEndpointInfo>(kv.value()) {
Ok(val) => val,
Err(err) => {
anyhow::bail!(
"Error converting etcd response to ComponentEndpointInfo: {err}. {}",
kv.value_str()?
);
}
};
out.push(val);
}
Ok(out)
}
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment