Unverified Commit 781331c6 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: unregister discovery instance (#4459)

parent 3ac39b78
......@@ -141,6 +141,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(lora_name_to_id, m)?)?;
m.add_function(wrap_pyfunction!(log_message, m)?)?;
m.add_function(wrap_pyfunction!(register_llm, m)?)?;
m.add_function(wrap_pyfunction!(unregister_llm, m)?)?;
m.add_function(wrap_pyfunction!(fetch_llm, m)?)?;
m.add_function(wrap_pyfunction!(llm::entrypoint::make_engine, m)?)?;
m.add_function(wrap_pyfunction!(llm::entrypoint::run_input, m)?)?;
......@@ -331,6 +332,18 @@ fn register_llm<'p>(
})
}
/// Unregister a model from the endpoint.
#[pyfunction]
#[pyo3(signature = (endpoint))]
fn unregister_llm<'p>(py: Python<'p>, endpoint: Endpoint) -> PyResult<Bound<'p, PyAny>> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
LocalModel::detach_model_from_endpoint(&endpoint.inner)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
/// Download a model from Hugging Face, returning it's local path
/// Example: `model_path = await fetch_llm("Qwen/Qwen3-0.6B")`
#[pyfunction]
......
......@@ -40,5 +40,6 @@ from dynamo._core import lora_name_to_id as lora_name_to_id
from dynamo._core import make_engine
from dynamo._core import register_llm as register_llm
from dynamo._core import run_input
from dynamo._core import unregister_llm as unregister_llm
from .exceptions import HttpError
......@@ -5,6 +5,7 @@ use std::fs;
use std::path::{Path, PathBuf};
use dynamo_runtime::component::Endpoint;
use dynamo_runtime::discovery::DiscoveryInstance;
use dynamo_runtime::discovery::DiscoverySpec;
use dynamo_runtime::protocols::EndpointId;
use dynamo_runtime::slug::Slug;
......@@ -446,6 +447,29 @@ impl LocalModel {
Ok(())
}
/// Helper associated function to detach a model from an endpoint
pub async fn detach_model_from_endpoint(endpoint: &Endpoint) -> anyhow::Result<()> {
let drt = endpoint.drt();
let instance_id = drt.connection_id();
let endpoint_id = endpoint.id();
let instance = DiscoveryInstance::Model {
namespace: endpoint_id.namespace,
component: endpoint_id.component,
endpoint: endpoint_id.name,
instance_id,
card_json: serde_json::Value::Null,
};
let discovery = drt.discovery();
discovery.unregister(instance).await?;
tracing::info!("Successfully unregistered model from discovery");
Ok(())
}
}
/// A random endpoint to use for internal communication
......
......@@ -124,6 +124,23 @@ impl Discovery for KubeDiscoveryClient {
Ok(instance)
}
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()> {
// TODO: need to handle meta data change propagation to other pods
// Current implementation delete the entry from local metadata but
// it doesn't invalidate the cached service metadata on other pods
let mut metadata = self.metadata.write().await;
match &instance {
DiscoveryInstance::Endpoint(_inst) => {
metadata.unregister_endpoint(&instance)?;
}
DiscoveryInstance::Model { .. } => {
metadata.unregister_model_card(&instance)?;
}
}
Ok(())
}
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);
......
......@@ -203,6 +203,62 @@ impl Discovery for KVStoreDiscovery {
Ok(instance)
}
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()> {
let (bucket_name, key_path) = match &instance {
DiscoveryInstance::Endpoint(inst) => {
let key = Self::endpoint_key(
&inst.namespace,
&inst.component,
&inst.endpoint,
inst.instance_id,
);
tracing::debug!(
"Unregistering endpoint instance_id={}, namespace={}, component={}, endpoint={}, key={}",
inst.instance_id,
inst.namespace,
inst.component,
inst.endpoint,
key
);
(INSTANCES_BUCKET, key)
}
DiscoveryInstance::Model {
namespace,
component,
endpoint,
instance_id,
..
} => {
let key = Self::model_key(namespace, component, endpoint, *instance_id);
tracing::debug!(
"Unregistering model instance_id={}, namespace={}, component={}, endpoint={}, key={}",
instance_id,
namespace,
component,
endpoint,
key
);
(MODELS_BUCKET, key)
}
};
// Get the bucket - if it doesn't exist, the instance is already removed from the KV store
let Some(bucket) = self.store.get_bucket(bucket_name).await? else {
tracing::warn!(
"Bucket {} does not exist, instance already removed",
bucket_name
);
return Ok(());
};
let key = crate::storage::key_value_store::Key::from_raw(key_path.clone());
// Delete the entry from the bucket
bucket.delete(&key).await?;
Ok(())
}
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
let prefix = Self::query_prefix(&query);
let bucket_name = if prefix.starts_with(INSTANCES_BUCKET) {
......
......@@ -60,6 +60,34 @@ impl DiscoveryMetadata {
}
}
/// Unregister an endpoint instance
pub fn unregister_endpoint(&mut self, instance: &DiscoveryInstance) -> Result<()> {
if let DiscoveryInstance::Endpoint(inst) = instance {
let key = make_endpoint_key(&inst.namespace, &inst.component, &inst.endpoint);
self.endpoints.remove(&key);
Ok(())
} else {
anyhow::bail!("Cannot unregister non-endpoint instance as endpoint")
}
}
/// Unregister a model card instance
pub fn unregister_model_card(&mut self, instance: &DiscoveryInstance) -> Result<()> {
if let DiscoveryInstance::Model {
namespace,
component,
endpoint,
..
} = instance
{
let key = make_endpoint_key(namespace, component, endpoint);
self.model_cards.remove(&key);
Ok(())
} else {
anyhow::bail!("Cannot unregister non-model-card instance as model card")
}
}
/// Get all registered endpoints
pub fn get_all_endpoints(&self) -> Vec<DiscoveryInstance> {
self.endpoints.values().cloned().collect()
......
......@@ -140,6 +140,18 @@ impl Discovery for MockDiscovery {
Ok(instance)
}
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()> {
let instance_id = instance.instance_id();
self.registry
.instances
.lock()
.unwrap()
.retain(|i| i.instance_id() != instance_id);
Ok(())
}
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
let instances = self.registry.instances.lock().unwrap();
Ok(instances
......
......@@ -198,6 +198,9 @@ pub trait Discovery: Send + Sync {
/// Registers an object in the discovery plane with the instance id
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
/// Unregisters an instance from the discovery plane
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
/// Returns a list of currently registered instances for the given discovery query
/// This is a one-time snapshot without watching for changes
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment