Unverified Commit 088f7eeb authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: add liveness and readiness probes to Dynamo SDK (#1187)


Co-authored-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent 69dcba7b
......@@ -10,6 +10,9 @@ const (
DynamoServicePortName = "http"
DynamoContainerPortName = "http"
DynamoHealthPort = 5000
DynamoHealthPortName = "health"
DynamoImageBuilderComponentName = "dynamo-image-builder"
DynamoApiServerComponentName = "api-server"
......
......@@ -1416,6 +1416,11 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
args = append(args, "cd", "src", "&&", "uv", "run", "dynamo", "serve")
// ensure liveness and readiness probes are enabled for the dynamo components
args = append(args, "--system-app-port", fmt.Sprintf("%d", commonconsts.DynamoHealthPort))
args = append(args, "--enable-system-app")
args = append(args, "--use-default-health-checks")
// todo : remove this line when https://github.com/ai-dynamo/dynamo/issues/345 is fixed
enableDependsOption := false
if len(opt.dynamoComponentDeployment.Spec.ExternalServices) > 0 && enableDependsOption {
......@@ -1549,10 +1554,38 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
Name: commonconsts.DynamoContainerPortName,
ContainerPort: int32(containerPort), // nolint: gosec
},
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoHealthPortName,
ContainerPort: int32(commonconsts.DynamoHealthPort),
},
},
SecurityContext: mainContainerSecurityContext,
}
// Set default probes if none are provided
if livenessProbe == nil {
container.LivenessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.FromString(commonconsts.DynamoHealthPortName),
},
},
}
}
if readinessProbe == nil {
container.ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.FromString(commonconsts.DynamoHealthPortName),
},
},
}
}
if opt.dynamoComponentDeployment.Spec.EnvFromSecret != nil {
container.EnvFrom = []corev1.EnvFromSource{
{
......
......@@ -18,7 +18,7 @@ from typing import Any
from bentoml import on_shutdown as async_on_shutdown
from dynamo.sdk.core.decorators.endpoint import api, endpoint
from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, service
from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, liveness, readiness, service
from dynamo.sdk.lib.decorators import async_on_start
dynamo_context: dict[str, Any] = {}
......@@ -32,4 +32,6 @@ __all__ = [
"endpoint",
"api",
"service",
"liveness",
"readiness",
]
......@@ -78,6 +78,26 @@ def serve(
help="The host to bind for the REST API server",
envvar="DYNAMO_HOST",
),
system_app_port: Optional[int] = typer.Option(
None,
help="The port to listen on for the system app. This is only supported when --service-name is set (only one service is started).",
envvar="DYNAMO_SYSTEM_APP_PORT",
),
system_app_host: Optional[str] = typer.Option(
None,
help="The host to bind for the system app.",
envvar="DYNAMO_SYSTEM_APP_HOST",
),
enable_system_app: bool = typer.Option(
False,
help="Enable the system app.",
envvar="DYNAMO_SYSTEM_APP_ENABLED",
),
use_default_health_checks: bool = typer.Option(
False,
"--use-default-health-checks",
help="Use default liveness and readiness health checks if none are provided.",
),
working_dir: Optional[Path] = typer.Option(
None,
help="When loading from source code, specify the directory to find the Service instance",
......@@ -175,4 +195,8 @@ def serve(
service_name=service_name,
enable_local_planner=enable_local_planner,
target=target,
system_app_port=system_app_port,
system_app_host=system_app_host,
enable_system_app=enable_system_app,
use_default_health_checks=use_default_health_checks,
)
......@@ -22,7 +22,6 @@ import inspect
import json
import logging
import os
import time
import typing as t
from typing import Any
......@@ -34,8 +33,12 @@ from fastapi.responses import StreamingResponse
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context
from dynamo.sdk.core.protocol.interface import DynamoTransport, LinkedServices
from dynamo.sdk.core.runner.health import (
register_liveness_probe,
register_readiness_probe,
)
from dynamo.sdk.lib.loader import find_and_load_service
from dynamo.sdk.lib.utils import get_host_port
from dynamo.sdk.lib.utils import get_host_port, get_system_app_host_port
logger = logging.getLogger(__name__)
......@@ -173,8 +176,14 @@ def main(
if worker_id is not None:
server_context.worker_index = worker_id
# Instance of the inner class of the service should be the same across the dynamo_worker, web_worker, and system_app_worker
class_instance: Any = None
# will be set once dyn_worker has created class_instance
instanceReady = asyncio.Event()
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
async def dyn_worker(runtime: DistributedRuntime):
nonlocal class_instance
global dynamo_context
dynamo_context["runtime"] = runtime
if service_name and service_name != service.name:
......@@ -203,23 +212,27 @@ def main(
endpoints = []
for name, endpoint in dynamo_endpoints.items():
td_endpoint = component.endpoint(name)
logger.debug(f"Registering endpoint '{name}'")
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
if DynamoTransport.DEFAULT in endpoint.transports:
td_endpoint = component.endpoint(name)
logger.debug(
f"Registering endpoint '{name}' with DEFAULT transport"
)
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
dynamo_context["component"] = component
dynamo_context["endpoints"] = endpoints
class_instance = service.inner()
dynamo_handlers = []
for name, endpoint in dynamo_endpoints.items():
bound_method = endpoint.func.__get__(class_instance)
# Only pass request type for now, use Any for response
# TODO: Handle an endpoint not having types
# TODO: Handle multiple endpoints in a single component
dynamo_wrapped_method = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
dynamo_handlers.append(dynamo_wrapped_method)
if DynamoTransport.DEFAULT in endpoint.transports:
bound_method = endpoint.func.__get__(class_instance)
# Only pass request type for now, use Any for response
# TODO: Handle an endpoint not having types
# TODO: Handle multiple endpoints in a single component
dynamo_wrapped_method = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
dynamo_handlers.append(dynamo_wrapped_method)
# Run startup hooks before setting up endpoints
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(
......@@ -236,6 +249,8 @@ def main(
logger.info(
f"Starting {service.name} instance with all registered endpoints"
)
# signal that class_instance (and its setup) is done
instanceReady.set()
# Launch serve_endpoint for all endpoints concurrently
tasks = [
endpoint.serve_endpoint(handler)
......@@ -269,37 +284,68 @@ def main(
raise
# if the service has a FastAPI app, add the worker as an event handler
def web_worker():
async def web_worker():
# We want to wait until dyn_worker has initialized class_instance
await instanceReady.wait()
if not service.app:
return
# Create the class instance
class_instance = service.inner()
# TODO: init hooks
# Add API routes to the FastAPI app
added_routes = add_fastapi_routes(service.app, service, class_instance)
if added_routes:
# Configure uvicorn with graceful shutdown
host, port = get_host_port()
config = uvicorn.Config(service.app, host=host, port=port, log_level="info")
# Pass None to uvicorn setting to unify log style
config = uvicorn.Config(service.app, host=host, port=port, log_config=None)
server = uvicorn.Server(config)
# Start the server with graceful shutdown handling
logger.info(
f"Starting FastAPI server on {config.host}:{config.port} with routes: {added_routes}"
)
server.run()
await server.serve()
else:
logger.warning("No API routes found, not starting FastAPI server")
# Keep the process running until interrupted
logger.info("Service is running, press Ctrl+C to stop")
while True:
try:
# Sleep in small increments to respond to signals quickly
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Gracefully shutting down FastAPI process")
break
async def system_app_worker():
# We want to wait until dyn_worker has initialized class_instance
await instanceReady.wait()
if not service.system_app:
raise ValueError("System app not defined for service")
# Register system endpoints
use_default_health_checks = (
os.environ.get(
"DYNAMO_SYSTEM_APP_USE_DEFAULT_HEALTH_CHECKS", "false"
).lower()
== "true"
)
if use_default_health_checks:
logger.info("Using default health checks for liveness and readiness probes")
register_liveness_probe(
service.system_app, class_instance, use_default=use_default_health_checks
)
register_readiness_probe(
service.system_app, class_instance, use_default=use_default_health_checks
)
# readiness, etc...
host, port = get_system_app_host_port()
server = uvicorn.Server(
uvicorn.Config(service.system_app, host=host, port=port, log_config=None)
)
logger.info(f"Starting system app on {host}:{port}")
await server.serve()
def should_start_system_app():
return os.environ.get("DYNAMO_SYSTEM_APP_ENABLED", "false").lower() == "true"
# Helper to launch fastapi server and dynamo worker concurrently
async def run_concurrent_workers(tasks):
await asyncio.gather(*tasks)
worker_tasks = []
uvloop.install()
start_http_server = False
......@@ -309,9 +355,15 @@ def main(
start_http_server = True
break
if start_http_server:
web_worker()
else:
asyncio.run(worker())
worker_tasks.append(web_worker())
if should_start_system_app():
logger.info("Starting system app")
worker_tasks.append(system_app_worker())
# Always start the dynamo worker, no reason not to
worker_tasks.append(dyn_worker())
asyncio.run(run_concurrent_workers(worker_tasks))
if __name__ == "__main__":
......
......@@ -167,6 +167,10 @@ def serve_dynamo_graph(
service_name: str = "",
enable_local_planner: bool = False,
target: TargetEnum = TargetEnum.DYNAMO,
system_app_port: Optional[int] = None,
system_app_host: Optional[str] = None,
enable_system_app: bool = False,
use_default_health_checks: bool = False,
) -> CircusRunner:
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.circus import create_arbiter, create_circus_watcher
......@@ -198,11 +202,30 @@ def serve_dynamo_graph(
if dependency_map is None:
dependency_map = {}
# TODO: Only for testing, this will prevent any other dep services from getting started, relying entirely on configured deps in the runner-map
standalone = False
if service_name:
logger.info(f"Service '{service_name}' running in standalone mode")
standalone = True
# TODO: We are signaling by setting env vars to downstream subprocesses. Let's pass flags on our invokation of serve_dynamo instead. That way the API is defined at the top level.
# Signal downstream workers to start system app by setting DYNAMO_SYSTEM_APP_* env vars for each worker. They are respectively consumed in serve_dynamo.py
if enable_system_app:
env["DYNAMO_SYSTEM_APP_ENABLED"] = "true"
if system_app_port:
# Throw if not standalone mode. Should only be set in standalone mode.
# TODO: This might still cause issues if we are running in standalone, but have multiple workers, need to figure this one out
if not standalone:
raise ValueError(
"Specifying system app port is only supported in standalone mode (i.e --service-name is set)"
)
env["DYNAMO_SYSTEM_APP_PORT"] = str(system_app_port)
if system_app_host:
env["DYNAMO_SYSTEM_APP_HOST"] = system_app_host
# Only set use_default_health_checks if explicitly enabled
if use_default_health_checks:
env["DYNAMO_SYSTEM_APP_USE_DEFAULT_HEALTH_CHECKS"] = "true"
logger.info("Default health checks enabled for system app")
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, resource_envs = allocator.get_resource_envs(svc)
......@@ -244,7 +267,8 @@ def serve_dynamo_graph(
dependency_map[name] = uri
# reserve one more to avoid conflicts
port_stack.enter_context(reserve_free_port())
else:
namespace, _ = svc.dynamo_address()
dynamo_args = [
"-m",
_DYNAMO_WORKER_SCRIPT,
......
......@@ -15,7 +15,7 @@
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import os
from typing import Any, Dict, Optional, Type, TypeVar, Union
from typing import Any, Callable, Dict, Optional, Type, TypeVar, Union
from fastapi import FastAPI
......@@ -27,7 +27,7 @@ from dynamo.sdk.core.protocol.interface import (
ServiceInterface,
)
T = TypeVar("T", bound=object)
G = TypeVar("G", bound=Callable[..., Any])
# Note: global service provider.
# this should be set to a concrete implementation of the DeploymentTarget interface
......@@ -51,11 +51,12 @@ def get_target() -> DeploymentTarget:
# TODO: dynamo_component
def service(
inner: Optional[Type[T]] = None,
inner: Optional[Type[G]] = None,
/,
*,
dynamo: Optional[Union[Dict[str, Any], DynamoConfig]] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs: Any,
) -> Any:
"""Service decorator that's adapter-agnostic"""
......@@ -70,7 +71,7 @@ def service(
assert isinstance(dynamo_config, DynamoConfig)
def decorator(inner: Type[T]) -> ServiceInterface[T]:
def decorator(inner: Type[G]) -> ServiceInterface[G]:
provider = get_target()
if inner is not None:
dynamo_config.name = inner.__name__
......@@ -79,6 +80,7 @@ def service(
config=config,
dynamo_config=dynamo_config,
app=app,
system_app=system_app,
**kwargs,
)
......@@ -87,8 +89,42 @@ def service(
def depends(
on: Optional[ServiceInterface[T]] = None, **kwargs: Any
) -> DependencyInterface[T]:
on: Optional[ServiceInterface[G]] = None, **kwargs: Any
) -> DependencyInterface[G]:
"""Create a dependency using the current service provider"""
provider = get_target()
return provider.create_dependency(on=on, **kwargs)
def liveness(func: G) -> G:
"""Decorator for liveness probe."""
if not callable(func):
raise TypeError("@liveness can only decorate callable methods")
func.__is_liveness_probe__ = True # type: ignore
return func
def get_liveness_handler(obj):
for attr in dir(obj):
fn = getattr(obj, attr)
if callable(fn) and getattr(fn, "__is_liveness_probe__", False):
return fn
return None
def readiness(func: G) -> G:
"""Decorator for readiness probe."""
if not callable(func):
raise TypeError("@readiness can only decorate callable methods")
func.__is_readiness_probe__ = True # type: ignore
return func
def get_readiness_handler(obj):
for attr in dir(obj):
fn = getattr(obj, attr)
if callable(fn) and getattr(fn, "__is_readiness_probe__", False):
return fn
return None
......@@ -73,6 +73,7 @@ class BentoServiceAdapter(ServiceMixin, ServiceInterface[T]):
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
):
name = service_cls.__name__
......@@ -112,10 +113,8 @@ class BentoServiceAdapter(ServiceMixin, ServiceInterface[T]):
)
self._endpoints: Dict[str, BentoEndpoint] = {}
if not app:
self.app = FastAPI(title=name)
else:
self.app = app
self.app = app or FastAPI(title=name)
self.system_app = system_app or FastAPI(title=f"{name}-system")
self._dependencies: Dict[str, "DependencyInterface"] = {}
self._bentoml_service.config["dynamo"] = asdict(self._dynamo_config)
self._api_endpoints: list[str] = []
......@@ -263,6 +262,7 @@ class BentoDeploymentTarget(DeploymentTarget):
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
"""Create a BentoServiceAdapter with the given parameters"""
......@@ -271,6 +271,7 @@ class BentoDeploymentTarget(DeploymentTarget):
config=config,
dynamo_config=dynamo_config,
app=app,
system_app=system_app,
**kwargs,
)
......
......@@ -75,6 +75,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
watcher: Optional[Watcher] = None,
socket: Optional[CircusSocket] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
):
self._inner_cls = inner_cls
self._config = config
......@@ -87,6 +88,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
self._watcher = watcher
self._socket = socket
self.app = app or FastAPI(title=name)
self.system_app = system_app or FastAPI(title=f"{name}-system")
self._dependencies: Dict[str, "DependencyInterface"] = {}
self._endpoints: Dict[str, LocalEndpoint] = {}
for field_name in dir(inner_cls):
......@@ -216,6 +218,8 @@ class LocalDeploymentTarget(DeploymentTarget):
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
# Get parameters needed for creating a circus watcher
......
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from typing import Any, Awaitable, Union
from fastapi import FastAPI, Response
# TODO: These defaults should be set by the provider. For now, I'm just adding them so that something is exposed when we do --use-default-health-checks
def default_liveness_check() -> bool:
"""Default liveness check that always returns True."""
return True
def default_readiness_check() -> bool:
"""Default readiness check that always returns True."""
return True
def register_liveness_probe(
app: FastAPI, instance: Any, route: str = "/healthz", use_default: bool = False
) -> None:
"""Registers /healthz endpoint.
If a method decorated with @liveness is found, uses that.
Otherwise, if use_default is True, uses a default check that always returns 200.
Does nothing if neither condition is met.
Args:
app (FastAPI): The FastAPI application to register the liveness route on.
instance (Any): The service or component instance to inspect for a @liveness-decorated method.
route (str, optional): The URL path to register the liveness endpoint under.
Defaults to "/healthz".
use_default (bool, optional): Whether to use default health check if no decorated method is found.
Defaults to False.
"""
# Find the decorated method.
decorated_method = None
for attr in dir(instance):
method = getattr(instance, attr)
if callable(method) and getattr(method, "__is_liveness_probe__", False):
decorated_method = method
break
if not decorated_method and not use_default:
# Do nothing if no @liveness() decorator found and default not requested
return
@app.get(route)
async def liveness_check():
try:
# Use decorated method if available, otherwise use default
check_method = (
decorated_method if decorated_method else default_liveness_check
)
# self needs to be bound so we need to use the instance of the inner
result: Union[bool, Awaitable[bool]] = check_method()
if isinstance(result, Awaitable):
result = await result
return Response(status_code=200 if result else 503)
except Exception as e:
return Response(content=str(e), status_code=500)
def register_readiness_probe(
app: FastAPI, instance: Any, route: str = "/readyz", use_default: bool = False
) -> None:
"""Registers /readyz endpoint.
If a method decorated with @readiness is found, uses that.
Otherwise, if use_default is True, uses a default check that always returns 200.
Does nothing if neither condition is met.
Args:
app (FastAPI): The FastAPI application to register the readiness route on.
instance (Any): The service or component instance to inspect for a @readiness-decorated method.
route (str, optional): The URL path to register the readiness endpoint under.
Defaults to "/readyz".
use_default (bool, optional): Whether to use default health check if no decorated method is found.
Defaults to False.
"""
# Find the decorated method.
decorated_method = None
for attr in dir(instance):
method = getattr(instance, attr)
if callable(method) and getattr(method, "__is_readiness_probe__", False):
decorated_method = method
break
if not decorated_method and not use_default:
# Do nothing if no @readiness() decorator found and default not requested
return
@app.get(route)
async def readiness_check():
try:
# Use decorated method if available, otherwise use default
check_method = (
decorated_method if decorated_method else default_readiness_check
)
# self needs to be bound so we need to use the instance of the inner
result: Union[bool, Awaitable[bool]] = check_method()
if isinstance(result, Awaitable):
result = await result
return Response(status_code=200 if result else 503)
except Exception as e:
return Response(content=str(e), status_code=500)
......@@ -21,3 +21,10 @@ def get_host_port():
port = int(os.environ.get("DYNAMO_PORT", 8000))
host = os.environ.get("DYNAMO_HOST", "0.0.0.0")
return host, port
def get_system_app_host_port():
"""Gets host and port for system app from environment variables. Defaults to choosing a random port."""
port = int(os.environ.get("DYNAMO_SYSTEM_APP_PORT", 0))
host = os.environ.get("DYNAMO_SYSTEM_APP_HOST", "0.0.0.0")
return host, port
......@@ -19,7 +19,15 @@ from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk import DYNAMO_IMAGE, api, depends, endpoint, service
from dynamo.sdk import (
DYNAMO_IMAGE,
api,
depends,
endpoint,
liveness,
readiness,
service,
)
from dynamo.sdk.lib.config import ServiceConfig
logger = logging.getLogger(__name__)
......@@ -136,3 +144,11 @@ class Frontend:
yield f"Frontend: {response}"
return StreamingResponse(content_generator())
@liveness
def is_alive(self):
return True
@readiness
def is_ready(self):
return True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment