"lib/bindings/python/Cargo.lock" did not exist on "9d6643b7a59220fc4f3ef599c002241dd0bf9965"
Unverified Commit c392c341 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: add custom lease to worker components (#748)

parent 4e75b04b
...@@ -118,6 +118,18 @@ def main( ...@@ -118,6 +118,18 @@ def main(
component = runtime.namespace(namespace).component(component_name) component = runtime.namespace(namespace).component(component_name)
try: try:
# if a custom lease is specified we need to create the service with that lease
lease = None
if service._dynamo_config.custom_lease:
lease = await component.create_service_with_custom_lease(
ttl=service._dynamo_config.custom_lease.ttl
)
lease_id = lease.id()
dynamo_context["lease"] = lease
logger.info(
f"Created {service.name} component with custom lease id {lease_id}"
)
else:
# Create service first # Create service first
await component.create_service() await component.create_service()
logger.info(f"Created {service.name} component") logger.info(f"Created {service.name} component")
...@@ -170,7 +182,11 @@ def main( ...@@ -170,7 +182,11 @@ def main(
f"Starting {service.name} instance with all registered endpoints" f"Starting {service.name} instance with all registered endpoints"
) )
# TODO:bis: convert to list # TODO:bis: convert to list
result = await endpoints[0].serve_endpoint(twm[0]) if lease is None:
logger.info(f"Serving {service.name} with primary lease")
else:
logger.info(f"Serving {service.name} with lease: {lease.id()}")
result = await endpoints[0].serve_endpoint(twm[0], lease)
except Exception as e: except Exception as e:
logger.error(f"Error in Dynamo component setup: {str(e)}") logger.error(f"Error in Dynamo component setup: {str(e)}")
......
...@@ -66,6 +66,14 @@ class DynamoConfig: ...@@ -66,6 +66,14 @@ class DynamoConfig:
enabled: bool = False enabled: bool = False
name: str | None = None name: str | None = None
namespace: str | None = None namespace: str | None = None
custom_lease: LeaseConfig | None = None
@dataclass
class LeaseConfig:
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class DynamoService(Service[T]): class DynamoService(Service[T]):
......
...@@ -33,6 +33,7 @@ from vllm.sampling_params import RequestOutputKind ...@@ -33,6 +33,7 @@ from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher from dynamo.llm import KvMetricsPublisher
from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
from dynamo.sdk.lib.service import LeaseConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -41,6 +42,7 @@ logger = logging.getLogger(__name__) ...@@ -41,6 +42,7 @@ logger = logging.getLogger(__name__)
dynamo={ dynamo={
"enabled": True, "enabled": True,
"namespace": "dynamo", "namespace": "dynamo",
"custom_lease": LeaseConfig(ttl=1), # 1 second
}, },
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"}, resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1, workers=1,
...@@ -142,6 +144,8 @@ class VllmWorker: ...@@ -142,6 +144,8 @@ class VllmWorker:
await self.disaggregated_router.async_init() await self.disaggregated_router.async_init()
else: else:
self.disaggregated_router = None self.disaggregated_router = None
self.lease = dynamo_context.get("lease")
logger.info("VllmWorker has been initialized") logger.info("VllmWorker has been initialized")
def shutdown_vllm_engine(self, signum, frame): def shutdown_vllm_engine(self, signum, frame):
...@@ -158,8 +162,13 @@ class VllmWorker: ...@@ -158,8 +162,13 @@ class VllmWorker:
async def create_metrics_publisher_endpoint(self): async def create_metrics_publisher_endpoint(self):
component = dynamo_context["component"] component = dynamo_context["component"]
# TODO: use the same child lease for metrics publisher endpoint and generate endpoint if self.lease is None:
await self.metrics_publisher.create_endpoint(component) logger.info("Creating metrics publisher endpoint with primary lease")
else:
logger.info(
f"Creating metrics publisher endpoint with lease: {self.lease.id()}"
)
await self.metrics_publisher.create_endpoint(component, self.lease)
def get_remote_prefill_request_callback(self): def get_remote_prefill_request_callback(self):
# TODO: integrate prefill_queue to dynamo endpoint # TODO: integrate prefill_queue to dynamo endpoint
......
...@@ -328,7 +328,9 @@ pub async fn run( ...@@ -328,7 +328,9 @@ pub async fn run(
.component(KV_PUBLISHER_COMPONENT)?; .component(KV_PUBLISHER_COMPONENT)?;
let kvp = Arc::new(KvMetricsPublisher::new()?); let kvp = Arc::new(KvMetricsPublisher::new()?);
let kvp_inner = kvp.clone(); let kvp_inner = kvp.clone();
tokio::spawn(async move { kvp_inner.create_endpoint(kvp_component).await }); tokio::spawn(
async move { kvp_inner.create_endpoint(kvp_component, None).await },
);
Some(kvp) Some(kvp)
} else { } else {
None None
......
...@@ -52,7 +52,7 @@ async def init(runtime: DistributedRuntime, ns: str): ...@@ -52,7 +52,7 @@ async def init(runtime: DistributedRuntime, ns: str):
# the server will gracefully shutdown (i.e., keep opened TCP streams finishes) # the server will gracefully shutdown (i.e., keep opened TCP streams finishes)
# after the lease is revoked # after the lease is revoked
await endpoint.serve_endpoint_with_lease(RequestHandler().generate, lease) await endpoint.serve_endpoint(RequestHandler().generate, lease)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -412,46 +412,24 @@ impl Component { ...@@ -412,46 +412,24 @@ impl Component {
#[pymethods] #[pymethods]
impl Endpoint { impl Endpoint {
#[pyo3(signature = (generator, lease=None))]
fn serve_endpoint<'p>( fn serve_endpoint<'p>(
&self, &self,
py: Python<'p>, py: Python<'p>,
generator: PyObject, generator: PyObject,
lease: Option<&PyLease>,
) -> PyResult<Bound<'p, PyAny>> { ) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new( let engine = Arc::new(engine::PythonAsyncEngine::new(
generator, generator,
self.event_loop.clone(), self.event_loop.clone(),
)?); )?);
let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?; let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?;
let builder = self.inner.endpoint_builder().handler(ingress); let mut builder = self.inner.endpoint_builder().handler(ingress);
pyo3_async_runtimes::tokio::future_into_py(py, async move { if lease.is_some() {
builder.start().await.map_err(to_pyerr)?; builder = builder.lease(lease.map(|l| l.inner.clone()));
Ok(())
})
} }
fn serve_endpoint_with_lease<'p>(
&self,
py: Python<'p>,
generator: PyObject,
lease: &PyLease,
) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new(
generator,
self.event_loop.clone(),
)?);
let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?;
// Create the builder with the ingress
let builder = self
.inner
.endpoint_builder()
.handler(ingress)
.lease(Some(lease.inner.clone()));
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
// Start the endpoint
builder.start().await.map_err(to_pyerr)?; builder.start().await.map_err(to_pyerr)?;
Ok(()) Ok(())
}) })
} }
......
...@@ -73,16 +73,19 @@ impl KvMetricsPublisher { ...@@ -73,16 +73,19 @@ impl KvMetricsPublisher {
}) })
} }
#[pyo3(signature = (component, lease=None))]
fn create_endpoint<'p>( fn create_endpoint<'p>(
&self, &self,
py: Python<'p>, py: Python<'p>,
component: Component, component: Component,
lease: Option<&PyLease>,
) -> PyResult<Bound<'p, PyAny>> { ) -> PyResult<Bound<'p, PyAny>> {
let rs_publisher = self.inner.clone(); let rs_publisher = self.inner.clone();
let rs_component = component.inner.clone(); let rs_component = component.inner.clone();
let lease = lease.map(|l| l.inner.clone());
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
rs_publisher rs_publisher
.create_endpoint(rs_component) .create_endpoint(rs_component, lease)
.await .await
.map_err(to_pyerr)?; .map_err(to_pyerr)?;
Ok(()) Ok(())
......
...@@ -23,6 +23,7 @@ use dynamo_runtime::{ ...@@ -23,6 +23,7 @@ use dynamo_runtime::{
SingleIn, SingleIn,
}, },
protocols::annotated::Annotated, protocols::annotated::Annotated,
transports::etcd::Lease,
Error, Result, Error, Result,
}; };
use futures::stream; use futures::stream;
...@@ -92,12 +93,12 @@ impl KvMetricsPublisher { ...@@ -92,12 +93,12 @@ impl KvMetricsPublisher {
self.tx.send(metrics) self.tx.send(metrics)
} }
pub async fn create_endpoint(&self, component: Component) -> Result<()> { pub async fn create_endpoint(&self, component: Component, lease: Option<Lease>) -> Result<()> {
let mut metrics_rx = self.rx.clone(); let mut metrics_rx = self.rx.clone();
let handler = Arc::new(KvLoadEndpoingHander::new(metrics_rx.clone())); let handler = Arc::new(KvLoadEndpoingHander::new(metrics_rx.clone()));
let handler = Ingress::for_engine(handler)?; let handler = Ingress::for_engine(handler)?;
component let builder = component
.endpoint(KV_METRICS_ENDPOINT) .endpoint(KV_METRICS_ENDPOINT)
.endpoint_builder() .endpoint_builder()
.stats_handler(move |_| { .stats_handler(move |_| {
...@@ -105,8 +106,9 @@ impl KvMetricsPublisher { ...@@ -105,8 +106,9 @@ impl KvMetricsPublisher {
serde_json::to_value(&*metrics).unwrap() serde_json::to_value(&*metrics).unwrap()
}) })
.handler(handler) .handler(handler)
.start() .lease(lease);
.await
builder.start().await
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment