Unverified Commit d953f9d0 authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

feat: add endpoint instance registration/unregistration for sleep/wake (#5203)


Signed-off-by: default avatarSchwinn Saereesitthipitak <schwinns@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.5 <noreply@anthropic.com>
parent 602ce0ed
......@@ -854,6 +854,35 @@ impl Endpoint {
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_endpoint(self.inner.clone())
}
/// Unregister this endpoint instance from discovery.
///
/// This removes the endpoint from the instances bucket, preventing the router
/// from sending requests to this worker. Use this when a worker is sleeping
/// and should not receive any requests.
fn unregister_endpoint_instance<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner
.unregister_endpoint_instance()
.await
.map_err(to_pyerr)?;
Ok(())
})
}
/// Re-register this endpoint instance to discovery.
///
/// This adds the endpoint back to the instances bucket, allowing the router
/// to send requests to this worker again. Use this when a worker wakes up
/// and should start receiving requests.
fn register_endpoint_instance<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.register_endpoint_instance().await.map_err(to_pyerr)?;
Ok(())
})
}
}
#[pymethods]
......
......@@ -205,6 +205,26 @@ class Endpoint:
"""
...
async def unregister_endpoint_instance(self) -> None:
"""
Unregister this endpoint instance from discovery.
This removes the endpoint from the instances bucket, preventing the router
from sending requests to this worker. Use this when a worker is sleeping
and should not receive any requests.
"""
...
async def register_endpoint_instance(self) -> None:
"""
Re-register this endpoint instance to discovery.
This adds the endpoint back to the instances bucket, allowing the router
to send requests to this worker again. Use this when a worker wakes up
and should start receiving requests.
"""
...
class Client:
"""
......
......@@ -311,3 +311,88 @@ pub async fn build_transport_type(
build_transport_type_inner(mode, endpoint_id, connection_id)
}
impl Endpoint {
/// Unregister this endpoint instance from discovery.
///
/// This removes the endpoint from the instances bucket, preventing the router
/// from sending requests to this worker. Use this when a worker is sleeping
/// and should not receive any requests.
pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
let drt = self.drt();
let instance_id = drt.connection_id();
let endpoint_id = self.id();
// Get the transport type for the endpoint
let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
namespace: endpoint_id.namespace,
component: endpoint_id.component,
endpoint: endpoint_id.name,
instance_id,
transport,
});
let discovery = drt.discovery();
if let Err(e) = discovery.unregister(instance).await {
let endpoint_id = self.id();
tracing::error!(
%endpoint_id,
error = %e,
"Unable to unregister endpoint instance from discovery"
);
anyhow::bail!(
"Unable to unregister endpoint instance from discovery. Check discovery service status"
);
}
tracing::info!(
instance_id = instance_id,
"Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
);
Ok(())
}
/// Re-register this endpoint instance to discovery.
///
/// This adds the endpoint back to the instances bucket, allowing the router
/// to send requests to this worker again. Use this when a worker wakes up
/// and should start receiving requests.
pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
let drt = self.drt();
let instance_id = drt.connection_id();
let endpoint_id = self.id();
// Get the transport type for the endpoint
let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
let spec = crate::discovery::DiscoverySpec::Endpoint {
namespace: endpoint_id.namespace,
component: endpoint_id.component,
endpoint: endpoint_id.name,
transport,
};
let discovery = drt.discovery();
if let Err(e) = discovery.register(spec).await {
let endpoint_id = self.id();
tracing::error!(
%endpoint_id,
error = %e,
"Unable to re-register endpoint instance to discovery"
);
anyhow::bail!(
"Unable to re-register endpoint instance to discovery. Check discovery service status"
);
}
tracing::info!(
instance_id = instance_id,
"Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
);
Ok(())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment