Unverified Commit d675d221 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: decouple dynamo sdk to support mutiple deployment targets (#905)

parent 5d5235bc
......@@ -37,7 +37,7 @@ The code for the pipeline looks like this:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from dynamo.sdk import DYNAMO_IMAGE, depends, dynamo_endpoint, service
from dynamo.sdk import DYNAMO_IMAGE, depends, dynamo_endpoint, service, dynamo_api
class RequestType(BaseModel):
......@@ -49,7 +49,7 @@ class ResponseType(BaseModel):
@service(
dynamo={"enabled": True, "namespace": "inference"},
dynamo={"namespace": "inference"},
)
class Backend:
@dynamo_endpoint()
......@@ -60,7 +60,7 @@ class Backend:
@service(
dynamo={"enabled": True, "namespace": "inference"},
dynamo={"namespace": "inference"},
)
class Middle:
backend = depends(Backend)
......@@ -77,13 +77,13 @@ app = FastAPI(title="Hello World!")
@service(
dynamo={"enabled": True, "namespace": "inference"},
dynamo={"namespace": "inference"},
app=app,
)
class Frontend:
middle = depends(Middle)
@dynamo_endpoint(is_api=True)
@dynamo_api()
async def generate(self, request: RequestType):
async def content_generator():
async for response in self.middle.generate(request.model_dump_json()):
......
......@@ -33,7 +33,6 @@ A Service is a core building block for a project. You can think of it as a logic
```python
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"gpu": 2, "cpu": "10", "memory": "20Gi"},
......@@ -53,7 +52,7 @@ Let's walk through an example to understand how you write a dynamo service.
```python
import ServiceB
@service(dynamo={"enabled": True, "namespace": "dynamo"}, resources={"gpu": 1})
@service(dynamo={"namespace": "dynamo"}, resources={"gpu": 1})
class ServiceA:
# Define service dependencies
service_b = depends(ServiceB)
......@@ -182,7 +181,7 @@ The most basic method is to specify parameters directly in the service decorator
```python
@service(
dynamo={"enabled": True, "namespace": "prod"},
dynamo={"namespace": "prod"},
resources={"gpu": 2, "cpu": "4", "memory": "16Gi"},
workers=2,
)
......@@ -346,7 +345,7 @@ Here's a comprehensive example showing how all these pieces fit together:
```python
@service(
dynamo={"enabled": True, "namespace": "default"},
dynamo={"namespace": "default"},
resources={"gpu": 1},
workers=1,
)
......
......@@ -16,12 +16,10 @@
from typing import Any
from bentoml import on_shutdown as async_on_shutdown
from bentoml._internal.context import server_context # type: ignore
from dynamo.sdk.lib.decorators import async_on_start, dynamo_endpoint
from dynamo.sdk.lib.dependency import depends
from dynamo.sdk.lib.image import DYNAMO_IMAGE
from dynamo.sdk.lib.service import service
from dynamo.sdk.core.decorators.endpoint import dynamo_api, dynamo_endpoint
from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, service
from dynamo.sdk.lib.decorators import async_on_start
dynamo_context: dict[str, Any] = {}
......@@ -32,6 +30,6 @@ __all__ = [
"depends",
"dynamo_context",
"dynamo_endpoint",
"server_context",
"dynamo_api",
"service",
]
......@@ -21,7 +21,6 @@ User facing python APIs for managing local bentos and build new bentos.
from __future__ import annotations
import json
import logging
import os
import typing as t
......@@ -29,7 +28,6 @@ import typing as t
import fs
import fs.errors
import fs.mirror
import yaml
from bentoml._internal.bento.bento import BENTO_PROJECT_DIR_NAME, BENTO_README_FILENAME
from bentoml._internal.bento.bento import Bento as BaseBento
from bentoml._internal.bento.bento import (
......@@ -40,7 +38,6 @@ from bentoml._internal.bento.bento import (
BentoRunnerInfo,
BentoServiceInfo,
get_default_svc_readme,
get_service_import_str,
)
from bentoml._internal.bento.build_config import BentoBuildConfig, BentoPathSpec
from bentoml._internal.configuration.containers import BentoMLContainer
......@@ -54,7 +51,7 @@ from fs.copy import copy_file
from fs.tempfs import TempFS
from simple_di import Provide, inject
from dynamo.sdk.lib.service import LinkedServices
from dynamo.sdk.core.protocol.interface import LinkedServices
logger = logging.getLogger(__name__)
......@@ -94,9 +91,12 @@ class Bento(BaseBento):
# TODO: At some point we need this to take place within the load function
LinkedServices.remove_unused_edges()
inner = svc.get_bentoml_service().inner
name = f"{inner.__module__}:{inner.__name__}"
setattr(svc.get_bentoml_service(), "_import_str", name)
if not build_config.service:
object.__setattr__(build_config, "service", get_service_import_str(svc))
is_legacy = isinstance(svc, Service)
object.__setattr__(build_config, "service", name)
is_legacy = isinstance(svc.get_bentoml_service(), Service)
# Apply default build options
image: Image | None = None
disable_image = "no_image" in enabled_features or is_legacy
......@@ -114,10 +114,10 @@ class Bento(BaseBento):
if build_config.name is not None
else to_snake_case(svc.name)
)
build_config.envs.extend(svc.envs)
build_config.labels.update(svc.labels)
if svc.image is not None:
image = Image(base_image=svc.image)
# build_config.envs.extend(svc.envs)
# build_config.labels.update(svc.labels)
# if svc.image is not None:
# image = Image(base_image=svc.image)
if not disable_image:
image = populate_image_from_build_config(image, build_config, build_ctx)
build_config = build_config.with_defaults()
......@@ -216,14 +216,6 @@ class Bento(BaseBento):
else:
f.write(build_config.description)
# Create 'apis/openapi.yaml' file
bento_fs.makedir("apis")
with bento_fs.open(fs.path.combine("apis", "openapi.yaml"), "w") as f:
yaml.dump(svc.openapi_spec, f)
if not is_legacy:
with bento_fs.open(fs.path.combine("apis", "schema.json"), "w") as f:
json.dump(svc.schema(), f, indent=2)
if image is None:
bento_info = BentoInfo(
tag=tag,
......@@ -256,22 +248,18 @@ class Bento(BaseBento):
schema=svc.schema() if not is_legacy else {},
)
else:
svc = svc.get_bentoml_service()
services = [
BentoServiceInfo.from_service(s) for s in svc.all_services().values()
]
bento_info = BentoInfoV2(
tag=tag,
service=svc, # type: ignore # attrs converters do not typecheck
entry_service=svc.name,
labels=build_config.labels,
models=models,
services=(
[
BentoServiceInfo.from_service(s)
for s in svc.all_services().values()
]
if not is_legacy
else []
),
services=(services if not is_legacy else []),
envs=build_config.envs,
schema=svc.schema() if not is_legacy else {},
image=image.freeze(bento_fs, build_config.envs, platform),
)
......
......@@ -37,6 +37,7 @@ from rich.syntax import Syntax
from simple_di import Provide, inject
from dynamo.sdk.cli.bento_util import Bento
from dynamo.sdk.core.runner import TargetEnum
if t.TYPE_CHECKING:
from bentoml._internal.bento import BentoStore
......@@ -171,6 +172,12 @@ def build(
help="Containerize the Dynamo pipeline after building. Shortcut for 'dynamo build && dynamo containerize'.",
),
platform: str = typer.Option(None, "--platform", help="Platform to build for"),
target: TargetEnum = typer.Option(
TargetEnum.BENTO,
"--target",
help="Specify the target: 'dynamo' or 'bento'.",
case_sensitive=False,
),
) -> None:
"""Build a new Dynamo pipeline from the specified path.
......@@ -179,6 +186,10 @@ def build(
from bentoml._internal.configuration import get_quiet_mode, set_quiet_mode
from bentoml._internal.log import configure_logging
from dynamo.sdk.cli.utils import configure_target_environment
configure_target_environment(target)
# Validate output format
valid_outputs = ["tag", "default"]
if output not in valid_outputs:
......@@ -196,12 +207,16 @@ def build(
else:
build_ctx = dynamo_pipeline
if target != TargetEnum.BENTO:
raise NotImplementedError(
"currently only bento based build target is supported"
)
bento = build_bentofile(
service=service,
build_ctx=build_ctx,
platform=platform,
)
containerize_cmd = f"dynamo containerize {bento.tag}"
if output == "tag":
......
......@@ -29,7 +29,8 @@ import typer
from rich.console import Console
from rich.panel import Panel
from .utils import resolve_service_config
from dynamo.sdk.cli.utils import resolve_service_config
from dynamo.sdk.core.runner import TargetEnum
if t.TYPE_CHECKING:
P = t.ParamSpec("P") # type: ignore
......@@ -85,16 +86,23 @@ def serve(
False,
help="Save a snapshot of your service state to a file that allows planner to edit your deployment configuration",
),
target: TargetEnum = typer.Option(
TargetEnum.DYNAMO,
"--target",
help="Specify the target: 'dynamo' or 'bento'.",
case_sensitive=False,
),
):
"""Locally serve a Dynamo pipeline.
Starts a local server for the specified Dynamo pipeline.
"""
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.protocol.interface import LinkedServices
from dynamo.sdk.lib.loader import find_and_load_service
from dynamo.sdk.lib.service import LinkedServices
configure_target_environment(target)
# Extract extra arguments not captured by typer
service_configs = resolve_service_config(config_file, ctx.args)
......@@ -138,7 +146,7 @@ def serve(
svc = find_and_load_service(dynamo_pipeline, working_dir=working_dir)
logger.info(f"Loaded service: {svc.name}")
logger.info("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()])
logger.debug("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()])
LinkedServices.remove_unused_edges()
from dynamo.sdk.cli.serving import serve_dynamo_graph # type: ignore
......@@ -153,7 +161,6 @@ def serve(
border_style="green",
)
)
serve_dynamo_graph(
dynamo_pipeline,
working_dir=working_dir_str,
......@@ -162,4 +169,5 @@ def serve(
dependency_map=runner_map_dict,
service_name=service_name,
enable_local_planner=enable_local_planner,
target=target,
)
......@@ -22,8 +22,6 @@ import inspect
import json
import logging
import os
import signal
import sys
import time
import typing as t
from typing import Any
......@@ -35,7 +33,8 @@ from fastapi.responses import StreamingResponse
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context
from dynamo.sdk.lib.service import LinkedServices
from dynamo.sdk.core.protocol.interface import DynamoTransport, LinkedServices
from dynamo.sdk.lib.loader import find_and_load_service
from dynamo.sdk.lib.utils import get_host_port
logger = logging.getLogger(__name__)
......@@ -43,7 +42,7 @@ logger = logging.getLogger(__name__)
def add_fastapi_routes(app, service, class_instance):
"""
Add FastAPI routes for Dynamo endpoints marked with is_api=True.
Add FastAPI routes for Dynamo endpoints supporting HTTP transport.
Args:
app: FastAPI app instance
......@@ -53,7 +52,7 @@ def add_fastapi_routes(app, service, class_instance):
added_routes = []
for name, endpoint in service.get_dynamo_endpoints().items():
if endpoint.is_api:
if DynamoTransport.HTTP in endpoint.transports:
path = name if name.startswith("/") else f"/{name}"
# Bind the method to the class instance
bound_method = endpoint.func.__get__(class_instance)
......@@ -85,25 +84,6 @@ def add_fastapi_routes(app, service, class_instance):
return added_routes
class GracefulExit(SystemExit):
"""Exception to signal a graceful exit."""
pass
def setup_signal_handlers():
"""Setup signal handlers for graceful shutdown."""
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, initiating graceful shutdown")
raise GracefulExit(0)
# Register SIGINT and SIGTERM handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGQUIT, signal_handler)
@click.command()
@click.argument("bento_identifier", type=click.STRING, required=False, default=".")
@click.option("--service-name", type=click.STRING, required=False, default="")
......@@ -130,6 +110,12 @@ def setup_signal_handlers():
default=None,
help="If set, use this custom component name instead of the default service name",
)
@click.option(
"--target",
type=click.STRING,
default="dynamo",
help="Specify the target: 'dynamo' or 'bento'.",
)
def main(
bento_identifier: str,
service_name: str,
......@@ -137,21 +123,21 @@ def main(
worker_env: str | None,
worker_id: int | None,
custom_component_name: str | None,
target: str,
) -> None:
# hack to avoid bentoml from respawning the workers after their leases are revoked
os.environ["BENTOML_CONTAINERIZED"] = "true"
"""Start a worker for the given service - either Dynamo or regular service"""
from _bentoml_impl.loader import import_service
from bentoml._internal.container import BentoMLContainer
from bentoml._internal.context import server_context
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.runner import TargetEnum
# TODO: completely disable signal handlers in serve_dynamo. It interferes with arbiter shutdown
# setup_signal_handlers()
configure_target_environment(TargetEnum(target))
run_id = service_name
dynamo_context["service_name"] = service_name
dynamo_context["runner_map"] = runner_map
dynamo_context["worker_id"] = worker_id
......@@ -167,15 +153,13 @@ def main(
f"the maximum worker ID is {len(env_list)}"
)
os.environ.update(env_list[worker_key])
service = import_service(bento_identifier)
service = find_and_load_service(bento_identifier)
if service_name and service_name != service.name:
service = service.find_dependent_by_name(service_name)
# Set namespace in dynamo_context if service is a dynamo component
if service.is_dynamo_component():
namespace, _ = service.dynamo_address()
dynamo_context["namespace"] = namespace
namespace, _ = service.dynamo_address()
dynamo_context["namespace"] = namespace
configure_dynamo_logging(service_name=service_name, worker_id=worker_id)
if runner_map:
......@@ -186,158 +170,161 @@ def main(
# TODO: test this with a deep chain of services
LinkedServices.remove_unused_edges()
# Check if Dynamo is enabled for this service
if service.is_dynamo_component():
if worker_id is not None:
server_context.worker_index = worker_id
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
global dynamo_context
dynamo_context["runtime"] = runtime
if service_name and service_name != service.name:
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
server_context.service_name = service.name
# Get Dynamo configuration and create component
namespace, component_name = service.dynamo_address()
logger.info(f"Registering component {namespace}/{component_name}")
component = runtime.namespace(namespace).component(component_name)
if worker_id is not None:
server_context.worker_index = worker_id
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
global dynamo_context
dynamo_context["runtime"] = runtime
if service_name and service_name != service.name:
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
server_context.service_name = service.name
# Get Dynamo configuration and create component
namespace, component_name = service.dynamo_address()
logger.info(f"Registering component {namespace}/{component_name}")
component = runtime.namespace(namespace).component(component_name)
try:
try:
# if a custom lease is specified we need to create the service with that lease
lease = None
if service._dynamo_config.custom_lease:
lease = await component.create_service_with_custom_lease(
ttl=service._dynamo_config.custom_lease.ttl
)
lease_id = lease.id()
dynamo_context["lease"] = lease
logger.info(
f"Created {service.name} component with custom lease id {lease_id}"
)
else:
# Create service first
await component.create_service()
logger.info(f"Created {service.name} component")
# Set runtime on all dependencies
for dep in service.dependencies.values():
dep.set_runtime(runtime)
logger.debug(f"Set runtime for dependency: {dep}")
# Then register all Dynamo endpoints
dynamo_endpoints = service.get_dynamo_endpoints()
if not dynamo_endpoints:
error_msg = f"FATAL ERROR: No Dynamo endpoints found in service {service.name}!"
logger.error(error_msg)
raise ValueError(error_msg)
endpoints = []
for name, endpoint in dynamo_endpoints.items():
td_endpoint = component.endpoint(name)
logger.debug(f"Registering endpoint '{name}'")
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
dynamo_context["component"] = component
dynamo_context["endpoints"] = endpoints
class_instance = service.inner()
dynamo_handlers = []
for name, endpoint in dynamo_endpoints.items():
bound_method = endpoint.func.__get__(class_instance)
# Only pass request type for now, use Any for response
# TODO: Handle a dynamo_endpoint not having types
# TODO: Handle multiple endpoints in a single component
dynamo_wrapped_method = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
dynamo_handlers.append(dynamo_wrapped_method)
# Run startup hooks before setting up endpoints
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(
member, "__dynamo_startup_hook__", False
):
logger.debug(f"Running startup hook: {name}")
result = getattr(class_instance, name)()
if inspect.isawaitable(result):
# await on startup hook async_on_start
await result
logger.debug(f"Completed async startup hook: {name}")
else:
logger.info(f"Completed startup hook: {name}")
logger.info(
f"Starting {service.name} instance with all registered endpoints"
# Set runtime on all dependencies
for dep in service.dependencies.values():
dep.set_runtime(runtime)
logger.debug(f"Set runtime for dependency: {dep}")
# Then register all Dynamo endpoints
dynamo_endpoints = service.get_dynamo_endpoints()
endpoints = []
for name, endpoint in dynamo_endpoints.items():
td_endpoint = component.endpoint(name)
logger.debug(f"Registering endpoint '{name}'")
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
dynamo_context["component"] = component
dynamo_context["endpoints"] = endpoints
class_instance = service.inner()
dynamo_handlers = []
for name, endpoint in dynamo_endpoints.items():
bound_method = endpoint.func.__get__(class_instance)
# Only pass request type for now, use Any for response
# TODO: Handle a dynamo_endpoint not having types
# TODO: Handle multiple endpoints in a single component
dynamo_wrapped_method = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
# TODO:bis: convert to list
logger.info(f"Serving {service.name} with primary lease")
# Launch serve_endpoint for all endpoints concurrently
tasks = [
endpoint.serve_endpoint(handler)
for endpoint, handler in zip(endpoints, dynamo_handlers)
]
dynamo_handlers.append(dynamo_wrapped_method)
# Run startup hooks before setting up endpoints
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(
member, "__dynamo_startup_hook__", False
):
logger.debug(f"Running startup hook: {name}")
result = getattr(class_instance, name)()
if inspect.isawaitable(result):
# await on startup hook async_on_start
await result
logger.debug(f"Completed async startup hook: {name}")
else:
logger.info(f"Completed startup hook: {name}")
logger.info(
f"Starting {service.name} instance with all registered endpoints"
)
# Launch serve_endpoint for all endpoints concurrently
tasks = [
endpoint.serve_endpoint(handler)
for endpoint, handler in zip(endpoints, dynamo_handlers)
]
if tasks:
# Wait for all tasks to complete
await asyncio.gather(*tasks)
except GracefulExit:
logger.info(f"[{run_id}] Gracefully shutting down {service.name}")
# Add any specific cleanup needed
return None
except Exception as e:
logger.error(f"Error in Dynamo component setup: {str(e)}")
raise
# if the service has a FastAPI app, add the worker as an event handler
def web_worker():
try:
if not service.app:
return
# Create the class instance
class_instance = service.inner()
# TODO: init hooks
# Add API routes to the FastAPI app
added_routes = add_fastapi_routes(service.app, service, class_instance)
if added_routes:
# Configure uvicorn with graceful shutdown
host, port = get_host_port()
config = uvicorn.Config(
service.app, host=host, port=port, log_level="info"
)
server = uvicorn.Server(config)
# Start the server with graceful shutdown handling
logger.info(
f"Starting FastAPI server on {config.host}:{config.port} with routes: {added_routes}"
)
server.run()
else:
logger.warning("No API routes found, not starting FastAPI server")
# Keep the process running until interrupted
logger.info("Service is running, press Ctrl+C to stop")
while True:
try:
# Sleep in small increments to respond to signals quickly
time.sleep(0.1)
except (KeyboardInterrupt, GracefulExit):
logger.info("Gracefully shutting down FastAPI process")
break
except GracefulExit:
logger.info("Gracefully shutting down FastAPI service")
except Exception as e:
logger.error(f"Error in web worker: {str(e)}")
raise
try:
uvloop.install()
if service.app:
web_worker()
else:
asyncio.run(worker())
except GracefulExit:
logger.info("Exiting gracefully")
sys.exit(0)
except KeyboardInterrupt:
logger.info("Interrupted, shutting down gracefully")
sys.exit(0)
msg = f"No Dynamo endpoints found in service {service.name} but keeping service alive"
logger.info(msg)
# Even with no endpoints, we should keep the service running
# until explicitly terminated
try:
# Create an event to wait on indefinitely until interrupted
stop_event = asyncio.Event()
# Wait for the event that will never be set unless interrupted
await stop_event.wait()
except asyncio.CancelledError:
logger.info("Service execution cancelled")
except KeyboardInterrupt:
logger.info("Service interrupted by user")
except Exception as e:
logger.exception(
f"Unexpected error while keeping service alive: {e}"
)
finally:
logger.info("Service shutting down")
except Exception as e:
logger.error(f"Error in Dynamo component setup: {str(e)}")
raise
# if the service has a FastAPI app, add the worker as an event handler
def web_worker():
if not service.app:
return
# Create the class instance
class_instance = service.inner()
# TODO: init hooks
# Add API routes to the FastAPI app
added_routes = add_fastapi_routes(service.app, service, class_instance)
if added_routes:
# Configure uvicorn with graceful shutdown
host, port = get_host_port()
config = uvicorn.Config(service.app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
# Start the server with graceful shutdown handling
logger.info(
f"Starting FastAPI server on {config.host}:{config.port} with routes: {added_routes}"
)
server.run()
else:
logger.warning("No API routes found, not starting FastAPI server")
# Keep the process running until interrupted
logger.info("Service is running, press Ctrl+C to stop")
while True:
try:
# Sleep in small increments to respond to signals quickly
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Gracefully shutting down FastAPI process")
break
uvloop.install()
start_http_server = False
for endpoint in service.get_dynamo_endpoints().values():
logger.debug(f"Checking transports for endpoint: {endpoint.transports}")
if DynamoTransport.HTTP in endpoint.transports:
start_http_server = True
break
if start_http_server:
web_worker()
else:
asyncio.run(worker())
if __name__ == "__main__":
try:
main()
except (GracefulExit, KeyboardInterrupt):
logger.info("Exiting gracefully")
sys.exit(0)
except Exception as e:
logger.error(f"Error in main: {str(e)}")
sys.exit(1)
main()
......@@ -33,6 +33,7 @@ from circus.watcher import Watcher
from simple_di import inject
from dynamo.sdk.cli.circus import CircusRunner
from dynamo.sdk.core.runner import TargetEnum
from .allocator import NVIDIA_GPU, ResourceAllocator
from .circus import _get_server_socket
......@@ -55,7 +56,9 @@ logger = logging.getLogger(__name__)
_DYNAMO_WORKER_SCRIPT = "dynamo.sdk.cli.serve_dynamo"
def _get_dynamo_worker_script(bento_identifier: str, svc_name: str) -> list[str]:
def _get_dynamo_worker_script(
bento_identifier: str, svc_name: str, target: TargetEnum
) -> list[str]:
args = [
"-m",
_DYNAMO_WORKER_SCRIPT,
......@@ -64,6 +67,8 @@ def _get_dynamo_worker_script(bento_identifier: str, svc_name: str) -> list[str]
svc_name,
"--worker-id",
"$(CIRCUS.WID)",
"--target",
target,
]
return args
......@@ -75,19 +80,19 @@ def create_dynamo_watcher(
scheduler: ResourceAllocator,
working_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
target: TargetEnum = TargetEnum.DYNAMO,
) -> tuple[Watcher, CircusSocket, str]:
"""Create a watcher for a Dynamo service in the dependency graph"""
from dynamo.sdk.cli.circus import create_circus_watcher
num_workers, resource_envs = scheduler.get_resource_envs(svc)
uri, socket = _get_server_socket(svc, uds_path)
args = _get_dynamo_worker_script(bento_identifier, svc.name)
args = _get_dynamo_worker_script(bento_identifier, svc.name, target)
if resource_envs:
args.extend(["--worker-env", json.dumps(resource_envs)])
# Update env to include ServiceConfig and service-specific environment variables
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ["DYNAMO_SERVICE_CONFIG"]
......@@ -161,6 +166,7 @@ def serve_dynamo_graph(
dependency_map: dict[str, str] | None = None,
service_name: str = "",
enable_local_planner: bool = False,
target: TargetEnum = TargetEnum.DYNAMO,
) -> CircusRunner:
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.circus import create_arbiter, create_circus_watcher
......@@ -197,7 +203,6 @@ def serve_dynamo_graph(
if service_name:
logger.info(f"Service '{service_name}' running in standalone mode")
standalone = True
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, resource_envs = allocator.get_resource_envs(svc)
......@@ -210,13 +215,6 @@ def serve_dynamo_graph(
for name, dep_svc in svc.all_services().items():
if name == svc.name or name in dependency_map:
continue
if not (
hasattr(dep_svc, "is_dynamo_component")
and dep_svc.is_dynamo_component()
):
raise RuntimeError(
f"Service {dep_svc.name} is not a Dynamo component"
)
namespaces.add(dep_svc.dynamo_address()[0])
if len(namespaces) > 1:
raise RuntimeError(
......@@ -232,13 +230,6 @@ def serve_dynamo_graph(
for name, dep_svc in svc.all_services().items():
if name == svc.name or name in dependency_map:
continue
if not (
hasattr(dep_svc, "is_dynamo_component")
and dep_svc.is_dynamo_component()
):
raise RuntimeError(
f"Service {dep_svc.name} is not a Dynamo component"
)
new_watcher, new_socket, uri = create_dynamo_watcher(
bento_id,
dep_svc,
......@@ -246,6 +237,7 @@ def serve_dynamo_graph(
allocator,
str(bento_path.absolute()),
env=env,
target=target,
)
watchers.append(new_watcher)
sockets.append(new_socket)
......@@ -263,42 +255,39 @@ def serve_dynamo_graph(
"$(CIRCUS.WID)",
]
if hasattr(svc, "is_dynamo_component") and svc.is_dynamo_component():
# resource_envs is the resource allocation (ie CUDA_VISIBLE_DEVICES) for each worker created by the allocator
# these resource_envs are passed to each individual worker's environment which is set in serve_dynamo
if resource_envs:
dynamo_args.extend(["--worker-env", json.dumps(resource_envs)])
# env is the base bentoml environment variables. We make a copy and update it to add any service configurations and additional env vars
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ[
"DYNAMO_SERVICE_CONFIG"
]
# Get service-specific environment variables from DYNAMO_SERVICE_ENVS
if "DYNAMO_SERVICE_ENVS" in os.environ:
try:
service_envs = json.loads(os.environ["DYNAMO_SERVICE_ENVS"])
if svc.name in service_envs:
service_args = service_envs[svc.name].get("ServiceArgs", {})
if "envs" in service_args:
worker_env.update(service_args["envs"])
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse DYNAMO_SERVICE_ENVS: {e}")
watcher = create_circus_watcher(
name=f"{namespace}_{svc.name}",
args=dynamo_args,
numprocesses=num_workers,
working_dir=str(bento_path.absolute()),
env=worker_env,
)
watchers.append(watcher)
logger.info(
f"Created watcher for {svc.name} with {num_workers} workers in the {namespace} namespace"
)
# resource_envs is the resource allocation (ie CUDA_VISIBLE_DEVICES) for each worker created by the allocator
# these resource_envs are passed to each individual worker's environment which is set in serve_dynamo
if resource_envs:
dynamo_args.extend(["--worker-env", json.dumps(resource_envs)])
# env is the base bentoml environment variables. We make a copy and update it to add any service configurations and additional env vars
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ["DYNAMO_SERVICE_CONFIG"]
# Get service-specific environment variables from DYNAMO_SERVICE_ENVS
if "DYNAMO_SERVICE_ENVS" in os.environ:
try:
service_envs = json.loads(os.environ["DYNAMO_SERVICE_ENVS"])
if svc.name in service_envs:
service_args = service_envs[svc.name].get("ServiceArgs", {})
if "envs" in service_args:
worker_env.update(service_args["envs"])
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse DYNAMO_SERVICE_ENVS: {e}")
watcher = create_circus_watcher(
name=f"{namespace}_{svc.name}",
args=dynamo_args,
numprocesses=num_workers,
working_dir=str(bento_path.absolute()),
env=worker_env,
)
watchers.append(watcher)
logger.info(
f"Created watcher for {svc.name} with {num_workers} workers in the {namespace} namespace"
)
# inject runner map now
inject_env = {"BENTOML_RUNNER_MAP": json.dumps(dependency_map)}
......
......@@ -32,6 +32,7 @@ import yaml
from click import Command, Context
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.core.runner import TargetEnum
configure_dynamo_logging()
......@@ -351,3 +352,20 @@ def resolve_service_config(
logger.debug(f"Final resolved config: {service_configs}")
return service_configs
def configure_target_environment(target: TargetEnum):
from dynamo.sdk.core.lib import set_target
if target == TargetEnum.BENTO:
from dynamo.sdk.core.runner.bentoml import BentoDeploymentTarget
target = BentoDeploymentTarget()
elif target == TargetEnum.DYNAMO:
from dynamo.sdk.core.runner.dynamo import LocalDeploymentTarget
target = LocalDeploymentTarget()
else:
raise ValueError(f"Invalid target: {target}")
logger.info(f"Setting deployment target to {target}")
set_target(target)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import asyncio
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, TypeVar, get_type_hints
from dynamo.runtime import DistributedRuntime
from dynamo.sdk.core.protocol.interface import (
DynamoEndpointInterface,
DynamoTransport,
ServiceInterface,
)
T = TypeVar("T")
class DynamoEndpoint(DynamoEndpointInterface):
"""
Base class for dynamo endpoints
Dynamo endpoints are methods decorated with @dynamo_endpoint.
"""
def __init__(
self,
func: Callable,
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
**kwargs,
):
self.func = func
self._name = name or func.__name__
self._transports = transports or [DynamoTransport.DEFAULT]
# Extract request type from hints
hints = get_type_hints(func)
args = list(hints.items())
# Skip self/cls argument if present
if args and args[0][0] in ("self", "cls"):
args = args[1:]
# Get request type from first arg if available
self.request_type = args[0][1] if args else None
wraps(func)(self)
# Store additional metadata
for key, value in kwargs.items():
setattr(self, key, value)
@property
def name(self) -> str:
return self._name
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
return await self.func(*args, **kwargs)
@property
def transports(self) -> List[DynamoTransport]:
return self._transports
def dynamo_endpoint(
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
**kwargs,
) -> Callable[[Callable], DynamoEndpoint]:
"""Decorator for dynamo endpoints."""
def decorator(func: Callable) -> DynamoEndpoint:
return DynamoEndpoint(func, name, transports, **kwargs)
return decorator
def dynamo_api(
name: Optional[str] = None,
**kwargs,
) -> Callable[[Callable], DynamoEndpoint]:
"""Decorator for dynamo endpoints."""
def decorator(func: Callable) -> DynamoEndpoint:
return DynamoEndpoint(func, name, transports=[DynamoTransport.HTTP], **kwargs)
return decorator
class DynamoClient:
"""Client for calling Dynamo endpoints with streaming support"""
def __init__(self, service: ServiceInterface[Any]):
self._service = service
self._endpoints = service.get_dynamo_endpoints()
self._dynamo_clients: Dict[str, Any] = {}
self._runtime = None
def __getattr__(self, name: str) -> Any:
if name not in self._endpoints:
raise AttributeError(
f"No Dynamo endpoint '{name}' found on service '{self._service.name}'. "
f"Available endpoints: {list(self._endpoints.keys())}"
)
# For streaming endpoints, create/cache the stream function
if name not in self._dynamo_clients:
namespace, component_name = self._service.dynamo_address()
# Create async generator function that directly yields from the stream
async def get_stream(*args, **kwargs):
if self._runtime is not None:
runtime = self._runtime
else:
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, False)
self._runtime = runtime
# Use existing runtime if available
try:
# TODO: bis - dont recreate the client every time
client = (
await runtime.namespace(namespace)
.component(component_name)
.endpoint(name)
.client()
)
# Directly yield items from the stream
stream = await client.generate(*args, **kwargs)
async for item in stream:
yield item.data()
except Exception as e:
raise e
self._dynamo_clients[name] = get_stream
return self._dynamo_clients[name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import os
from typing import Any, Dict, Optional, Type, TypeVar, Union
from fastapi import FastAPI
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
DynamoConfig,
ServiceConfig,
ServiceInterface,
)
T = TypeVar("T", bound=object)
# Note: global service provider.
# this should be set to a concrete implementation of the DeploymentTarget interface
_target: DeploymentTarget
DYNAMO_IMAGE = os.getenv("DYNAMO_IMAGE", "dynamo:latest-vllm")
def set_target(target: DeploymentTarget) -> None:
"""Set the global service provider implementation"""
global _target
_target = target
def get_target() -> DeploymentTarget:
"""Get the current service provider implementation"""
global _target
return _target
# TODO: dynamo_component
def service(
inner: Optional[Type[T]] = None,
/,
*,
dynamo: Optional[Union[Dict[str, Any], DynamoConfig]] = None,
app: Optional[FastAPI] = None,
**kwargs: Any,
) -> Any:
"""Service decorator that's adapter-agnostic"""
config = ServiceConfig(kwargs)
# Parse dict into DynamoConfig object
dynamo_config: Optional[DynamoConfig] = None
if dynamo is not None:
if isinstance(dynamo, dict):
dynamo_config = DynamoConfig(**dynamo)
else:
dynamo_config = dynamo
assert isinstance(dynamo_config, DynamoConfig)
def decorator(inner: Type[T]) -> ServiceInterface[T]:
provider = get_target()
if inner is not None:
dynamo_config.name = inner.__name__
return provider.create_service(
service_cls=inner,
config=config,
dynamo_config=dynamo_config,
app=app,
**kwargs,
)
ret = decorator(inner) if inner is not None else decorator
return ret
def depends(
on: Optional[ServiceInterface[T]] = None, **kwargs: Any
) -> DependencyInterface[T]:
"""Create a dependency using the current service provider"""
provider = get_target()
return provider.create_dependency(on=on, **kwargs)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar
from fastapi import FastAPI
T = TypeVar("T", bound=object)
class DynamoTransport(Enum):
"""Transport types supported by Dynamo services"""
DEFAULT = auto()
HTTP = auto()
class ServiceConfig(Dict[str, Any]):
"""Base service configuration that can be extended by adapters"""
pass
class DynamoEndpointInterface(ABC):
"""Generic interface for service endpoints"""
@property
@abstractmethod
def name(self) -> str:
"""Get the name of this endpoint"""
pass
@abstractmethod
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Call the endpoint implementation"""
pass
@property
@abstractmethod
def transports(self) -> List[DynamoTransport]:
"""Get the transport type of this endpoint"""
return [DynamoTransport.DEFAULT]
class ServiceInterface(Generic[T], ABC):
"""Generic interface for service implementations"""
@property
@abstractmethod
def name(self) -> str:
"""Get the service name"""
pass
@property
@abstractmethod
def config(self) -> ServiceConfig:
"""Get the service configuration"""
pass
@property
@abstractmethod
def inner(self) -> Type[T]:
"""Get the inner service implementation class"""
pass
@abstractmethod
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
"""Get all registered endpoints"""
pass
@abstractmethod
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
"""Get a specific endpoint by name"""
pass
@abstractmethod
def list_endpoints(self) -> List[str]:
"""List names of all registered endpoints"""
pass
@abstractmethod
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
"""Link this service to another service, creating a pipeline"""
pass
@abstractmethod
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
"""Remove unused dependencies"""
pass
# @abstractmethod
def inject_config(self) -> None:
"""Inject configuration from environment into service configs"""
pass
@property
# @abstractmethod
def dependencies(self) -> Dict[str, "DependencyInterface"]:
"""Get the service dependencies"""
return {}
# @property
# @abstractmethod
def get_service_configs(self) -> Dict[str, ServiceConfig]:
"""Get all services"""
return {}
@property
# @abstractmethod
def service_configs(self) -> List[ServiceConfig]:
"""Get all service configs"""
return []
def all_services(self) -> Dict[str, "ServiceInterface"]:
"""Get all services"""
return {self.name: self}
def get_dynamo_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
"""Get all Dynamo endpoints"""
endpoints = {}
for field in dir(self.inner):
value = getattr(self.inner, field)
if isinstance(value, DynamoEndpointInterface):
endpoints[value.name] = value
return endpoints
def __call__(self) -> T:
return self.inner()
def find_dependent_by_name(self, service_name: str) -> "ServiceInterface":
"""Find a dependent service by name"""
raise NotImplementedError()
def dynamo_address(self) -> tuple[str, str]:
raise NotImplementedError()
@dataclass
class LeaseConfig:
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class DynamoConfig:
"""Configuration for Dynamo components"""
def __init__(
self,
enabled: bool = False,
name: Optional[str] = None,
namespace: Optional[str] = None,
custom_lease: Optional[LeaseConfig] = None,
**kwargs,
):
self.enabled = enabled
self.name = name
self.namespace = namespace
self.custom_lease = custom_lease
# Store any additional configuration options
for key, value in kwargs.items():
setattr(self, key, value)
class DeploymentTarget(ABC):
"""Interface for service provider implementations"""
@abstractmethod
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
"""Create a service instance"""
pass
@abstractmethod
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> "DependencyInterface[T]":
"""Create a dependency on a service"""
pass
class DependencyInterface(Generic[T], ABC):
"""Generic interface for service dependencies"""
@property
@abstractmethod
def on(self) -> Optional[ServiceInterface[T]]:
"""Get the service this dependency is on"""
pass
@abstractmethod
def get(self, *args: Any, **kwargs: Any) -> Any:
"""Get the dependency client"""
pass
@abstractmethod
async def get_endpoint(self, name: str) -> Any:
"""Get a specific endpoint from the service"""
pass
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T:
raise NotImplementedError()
class RuntimeLinkedServices:
"""
A class to track the linked services in the runtime.
"""
def __init__(self) -> None:
self.edges: Dict[ServiceInterface, Set[ServiceInterface]] = defaultdict(set)
def add(self, edge: Tuple[ServiceInterface, ServiceInterface]):
src, dest = edge
self.edges[src].add(dest)
# track the dest node as well so we can cleanup later
self.edges[dest]
def remove_unused_edges(self):
# this method is idempotent
if not self.edges:
return
# remove edges that are not in the current service
for u, vertices in self.edges.items():
u.remove_unused_edges(used_edges=vertices)
LinkedServices = RuntimeLinkedServices()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from enum import Enum
class TargetEnum(str, Enum):
"""The target deployment environment for the service"""
DYNAMO = "dynamo"
BENTO = "bento"
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
from _bentoml_sdk import Service as BentoService
from _bentoml_sdk.service.dependency import Dependency as BentoDependency
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
DynamoConfig,
DynamoEndpointInterface,
DynamoTransport,
LinkedServices,
ServiceConfig,
ServiceInterface,
)
T = TypeVar("T", bound=object)
class BentoEndpoint(DynamoEndpoint):
"""BentoML-specific endpoint implementation"""
def __init__(
self,
bentoml_endpoint: Any,
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
):
self.bentoml_endpoint = bentoml_endpoint
self._name = name or bentoml_endpoint.name
self._transports = transports or bentoml_endpoint.transports
@property
def name(self) -> str:
return self._name
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
return await self.bentoml_endpoint(*args, **kwargs)
@property
def transports(self) -> List[DynamoTransport]:
return self._transports
class BentoMLService(ServiceInterface[T]):
"""BentoML adapter implementing the ServiceInterface"""
def __init__(
self,
bentoml_service: BentoService,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
):
self._bentoml_service = bentoml_service
name = bentoml_service.inner.__name__
self._dynamo_config = dynamo_config or DynamoConfig(
name=name, namespace="default"
)
self._endpoints: Dict[str, BentoEndpoint] = {}
if not app:
self.app = FastAPI(title=name)
else:
self.app = app
self._dependencies: Dict[str, "DependencyInterface"] = {}
# Map BentoML endpoints to our generic interface
for field_name in dir(bentoml_service.inner):
field = getattr(bentoml_service.inner, field_name)
if isinstance(field, DynamoEndpoint):
self._endpoints[field.name] = BentoEndpoint(
field, field.name, field.transports
)
if isinstance(field, DependencyInterface):
self._dependencies[field_name] = field
@property
def dependencies(self) -> dict[str, "DependencyInterface"]:
return self._dependencies
@property
def name(self) -> str:
return self._bentoml_service.name
@property
def config(self) -> ServiceConfig:
return ServiceConfig(self._bentoml_service.config)
@property
def inner(self) -> Type[T]:
return self._bentoml_service.inner
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
return self._endpoints
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
if name not in self._endpoints:
raise ValueError(f"No endpoint found with name: {name}")
return self._endpoints[name]
def list_endpoints(self) -> List[str]:
return list(self._endpoints.keys())
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
# Check if the next service is a BentoML service adapter
LinkedServices.add((self, next_service))
return next_service
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
current_deps = dict(self._dependencies)
for dep_key, dep_value in current_deps.items():
if dep_value.on not in used_edges:
del self._dependencies[dep_key]
# Add methods to expose underlying BentoML service when needed
def get_bentoml_service(self) -> BentoService:
return self._bentoml_service
def __call__(self) -> T:
instance = self.inner()
return instance
# TODO: add attribution to bentoml
def find_dependent_by_name(self, name: str) -> "ServiceInterface":
"""Find dynamo service by name"""
return self.all_services()[name]
def dynamo_address(self) -> tuple[str, str]:
return (self._dynamo_config.namespace, self._dynamo_config.name)
def all_services(self) -> dict[str, "ServiceInterface"]:
"""Get a map of the service and all recursive dependencies"""
services: dict[str, "ServiceInterface"] = {self.name: self}
for dep in self.dependencies.values():
services.update(dep.on.all_services())
return services
class BentoMLDependency(DependencyInterface[T]):
"""BentoML adapter implementing the DependencyInterface"""
def __init__(
self,
bentoml_dependency: BentoDependency,
on_service: Optional[BentoMLService[T]] = None,
):
self._bentoml_dependency = bentoml_dependency
self._on_service = on_service
self._dynamo_client = None
self._runtime = None
@property
def on(self) -> Optional[ServiceInterface[T]]:
return self._on_service
def get(self, *args: Any, **kwargs: Any) -> Any:
return self._bentoml_dependency.get(*args, **kwargs)
def set_runtime(self, runtime: Any) -> None:
"""Set the Dynamo runtime for this dependency"""
self._runtime = runtime
if self._dynamo_client:
self._dynamo_client._runtime = runtime
async def get_endpoint(self, name: str) -> Any:
# Implementation depends on what BentoML provides
# This is a simplified version
client = self.get()
if hasattr(client, name):
return getattr(client, name)
raise ValueError(f"No endpoint found with name: {name}")
# Add method to expose underlying BentoML dependency when needed
def get_bentoml_dependency(self) -> BentoDependency:
return self._bentoml_dependency
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T | Any:
if instance is None:
return self
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
self._dynamo_client._runtime = self._runtime
return self._dynamo_client
class BentoDeploymentTarget(DeploymentTarget):
"""Kubernetes implementation of the DeploymentTarget"""
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
# Create BentoML service
image = kwargs.get("image")
envs = kwargs.get("envs", [])
bentoml_service = BentoService(
config=config,
inner=service_cls,
image=image,
envs=envs,
)
# Wrap in our adapter
return BentoMLService(bentoml_service, dynamo_config, app)
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> DependencyInterface[T]:
url = kwargs.get("url")
deployment = kwargs.get("deployment")
cluster = kwargs.get("cluster")
# Get the underlying BentoML service if available
bentoml_service = None
if on is not None and isinstance(on, BentoMLService):
# this is underlying bentoml service
bentoml_service = on.get_bentoml_service()
# Create underlying BentoML dependency
bentoml_dependency = BentoDependency(
bentoml_service, url=url, deployment=deployment, cluster=cluster
)
# Wrap in our adapter
return BentoMLDependency(bentoml_dependency, on)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import contextlib
import logging
import os
import shlex
import sys
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
import psutil
from circus.arbiter import Arbiter
from circus.sockets import CircusSocket
from circus.watcher import Watcher
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
DynamoConfig,
DynamoEndpointInterface,
DynamoTransport,
LinkedServices,
ServiceConfig,
ServiceInterface,
)
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=object)
MAX_AF_UNIX_PATH_LENGTH = 103
class LocalEndpoint(DynamoEndpoint):
"""Circus-specific endpoint implementation"""
def __init__(
self, name: str, service: "LocalService", transports: List[DynamoTransport]
):
self._name = name
self._service = service
self._transports = transports
@property
def name(self) -> str:
return self._name
class LocalService(ServiceInterface[T]):
"""Circus implementation of the ServiceInterface"""
def __init__(
self,
inner_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
watcher: Optional[Watcher] = None,
socket: Optional[CircusSocket] = None,
app: Optional[FastAPI] = None,
):
self._inner_cls = inner_cls
self._config = config
name = inner_cls.__name__
self._dynamo_config = dynamo_config or DynamoConfig(
name=name, namespace="default"
)
self._watcher = watcher
self._socket = socket
self.app = app or FastAPI(title=name)
self._dependencies: Dict[str, "DependencyInterface"] = {}
self._endpoints: Dict[str, LocalEndpoint] = {}
for field_name in dir(inner_cls):
field = getattr(inner_cls, field_name)
if isinstance(field, DynamoEndpoint):
self._endpoints[field.name] = LocalEndpoint(
field.name, self, field.transports
)
if isinstance(field, DependencyInterface):
self._dependencies[field_name] = field
def find_dependent_by_name(self, name: str) -> "ServiceInterface":
return self.all_services()[name]
def all_services(self) -> dict[str, "ServiceInterface"]:
"""Get a map of the service and all recursive dependencies"""
services: dict[str, "ServiceInterface"] = {self.name: self}
for dependency in self.dependencies.values():
services.update(dependency.on.all_services())
return services
@property
def name(self) -> str:
return self._inner_cls.__name__
@property
def config(self) -> ServiceConfig:
return self._config
@property
def inner(self) -> Type[T]:
return self._inner_cls
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
return self._endpoints
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
if name not in self._endpoints:
raise ValueError(f"No endpoint found with name: {name}")
return self._endpoints[name]
def list_endpoints(self) -> List[str]:
return list(self._endpoints.keys())
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
LinkedServices.add((self, next_service))
return next_service
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
current_deps = dict(self._dependencies)
for dep_key, dep_value in current_deps.items():
if dep_value.on not in used_edges:
del self._dependencies[dep_key]
def dynamo_address(self) -> tuple[str, str]:
return (self._dynamo_config.namespace, self._dynamo_config.name)
@property
def dependencies(self) -> dict[str, "DependencyInterface"]:
return self._dependencies
@property
def endpoints(self) -> dict[str, "LocalEndpoint"]:
return self._endpoints
class LocalDependency(DependencyInterface[T]):
"""Circus implementation of the DependencyInterface"""
def __init__(
self,
on_service: Optional[LocalService[T]] = None,
):
self._on_service = on_service
self._dynamo_client = None
self._runtime = None
@property
def on(self) -> Optional[ServiceInterface[T]]:
return self._on_service
def get(self, *args: Any, **kwargs: Any) -> Any:
# Return a client that can communicate with the service
# through the circus socket
if not self._on_service:
raise ValueError("No service specified for this dependency")
return self._on_service
async def get_endpoint(self, name: str) -> Any:
# Get a specific endpoint from the service
if not self._on_service:
raise ValueError("No service specified for this dependency")
return await self._on_service.get_endpoint(name)
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T | Any:
if instance is None:
return self
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
self._dynamo_client._runtime = self._runtime
return self._dynamo_client
def set_runtime(self, runtime: Any) -> None:
"""Set the Dynamo runtime for this dependency"""
self._runtime = runtime
if self._dynamo_client:
self._dynamo_client._runtime = runtime
class LocalDeploymentTarget(DeploymentTarget):
"""Circus implementation of the DeploymentTarget"""
def __init__(self):
self._arbiter = None
self._watchers = []
self._sockets = {}
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
**kwargs,
) -> ServiceInterface[T]:
# Get parameters needed for creating a circus watcher
cmd = kwargs.get("cmd", sys.executable)
args = kwargs.get("args", [])
env_vars = kwargs.get("env_vars", {})
# Create a socket for this service
socket_path = os.path.join(
os.environ.get("DYN_CIRCUS_SOCKET_DIR", "/tmp/circus"),
f"{service_cls.__name__}.sock",
)
# Ensure the socket path isn't too long
if len(socket_path) >= MAX_AF_UNIX_PATH_LENGTH:
raise ValueError(
f"Socket path '{socket_path}' exceeds maximum length of {MAX_AF_UNIX_PATH_LENGTH}"
)
# Create the socket
socket = CircusSocket(name=service_cls.__name__, path=socket_path)
self._sockets[service_cls.__name__] = socket
# Create a watcher for the service
watcher = Watcher(
name=service_cls.__name__,
cmd=shlex.quote(cmd) if psutil.POSIX else cmd,
args=args,
copy_env=True,
env=env_vars,
stop_children=True,
use_sockets=True,
graceful_timeout=86400,
respawn=True,
)
self._watchers.append(watcher)
# Create and return the service interface
return LocalService(
inner_cls=service_cls,
config=config,
dynamo_config=dynamo_config,
watcher=watcher,
socket=socket,
)
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> DependencyInterface[T]:
# Ensure the dependency is on a LocalService
if on is not None and not isinstance(on, LocalService):
raise TypeError("LocalDependency can only depend on LocalService")
# Create and return the dependency interface
return LocalDependency(on)
def start_arbiter(self, threaded: bool = False, **kwargs: Any) -> Arbiter:
"""Start the circus arbiter with all configured watchers and sockets"""
if self._arbiter is not None:
logger.warning("Arbiter already started")
return self._arbiter
# Configure arbiter
endpoint_port = int(os.environ.get("DYN_CIRCUS_ENDPOINT_PORT", "41234"))
pubsub_port = int(os.environ.get("DYN_CIRCUS_PUBSUB_PORT", "52345"))
# Create arbiter with all sockets and watchers
arbiter = Arbiter(
watchers=self._watchers,
sockets=[socket for socket in self._sockets.values()],
endpoint=f"tcp://127.0.0.1:{endpoint_port}",
pubsub_endpoint=f"tcp://127.0.0.1:{pubsub_port}",
check_delay=kwargs.pop("check_delay", 10),
**kwargs,
)
# Start arbiter
arbiter.start()
self._arbiter = arbiter
return arbiter
def stop_arbiter(self) -> None:
"""Stop the circus arbiter and all managed processes"""
if self._arbiter is None:
logger.warning("No arbiter to stop")
return
self._arbiter.stop()
self._arbiter = None
@contextlib.contextmanager
def run_services(self, **kwargs: Any):
"""Context manager to run all services and clean up when done"""
try:
arbiter = self.start_arbiter(**kwargs)
yield arbiter
finally:
self.stop_arbiter()
......@@ -21,15 +21,21 @@ from typing import Any, get_type_hints
from pydantic import BaseModel
from dynamo.sdk.core.protocol.interface import DynamoTransport
class DynamoEndpoint:
"""Decorator class for Dynamo endpoints"""
def __init__(self, func: t.Callable, name: str | None = None, is_api: bool = False):
def __init__(
self,
func: t.Callable,
name: str | None = None,
transports: t.List[DynamoTransport] | None = None,
):
self.func = func
self.name = name or func.__name__
self.is_dynamo_endpoint = True
self.is_api = is_api
self._transports = transports or [DynamoTransport.DEFAULT]
# Extract request type from hints
hints = get_type_hints(func)
args = list(hints.items())
......@@ -78,7 +84,8 @@ def dynamo_endpoint(
"""
def decorator(func: t.Callable) -> DynamoEndpoint:
return DynamoEndpoint(func, name, is_api)
transports = [DynamoTransport.HTTP] if is_api else [DynamoTransport.DEFAULT]
return DynamoEndpoint(func, name, transports)
return decorator
......
......@@ -131,7 +131,7 @@ class DynamoDependency(Dependency[T]):
def get(self, *args: Any, **kwargs: Any) -> T | Any:
# If this is a Dynamo-enabled service, return the Dynamo client
if isinstance(self.on, DynamoService) and self.on.is_dynamo_component():
if isinstance(self.on, DynamoService):
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
......
......@@ -83,7 +83,6 @@ def find_and_load_service(
def _do_import(import_str: str, working_dir: str) -> DynamoService:
"""Internal function to handle the actual import logic"""
import_path, _, attrs_str = import_str.partition(":")
logger.info(f"Parsed import string - path: {import_path}, attributes: {attrs_str}")
......@@ -185,11 +184,6 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService:
except (AttributeError, KeyError):
raise ValueError(f'Attribute "{attr}" not found in "{module_name}"')
if not isinstance(instance, DynamoService):
raise ValueError(
f'Object "{attrs_str}" in module "{module_name}" is not a DynamoService'
)
# Set import string for debugging/logging
if not hasattr(instance, "_import_str"):
import_str_val = f"{module_name}:{attrs_str}" if attrs_str else module_name
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment