Unverified Commit 47477909 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: deprecate sdk as dependency (#2149)

parent 095ea3e7
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import shutil
import subprocess
import sys
import typer
from rich.console import Console
console = Console()
def run(ctx: typer.Context):
"""Execute dynamo-run with any additional arguments."""
# Check if dynamo-run is available in PATH
if shutil.which("dynamo-run") is None:
console.print(
"[bold red]Error:[/bold red] 'dynamo-run' is needed but not found.\n"
"Please install it using: [bold cyan]cargo install dynamo-run[/bold cyan]",
style="red",
)
raise typer.Exit(code=1)
# Extract all arguments after 'run'
args = sys.argv[sys.argv.index("run") + 1 :] if "run" in sys.argv else []
command = ["dynamo-run"] + args
try:
subprocess.run(command)
except Exception as e:
console.print(f"[bold red]Error executing dynamo-run:[/bold red] {str(e)}")
raise typer.Exit(code=1)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import sys
def run_executable(executable_name, args=None, capture_output=True, text=True):
"""
Runs an executable located in the cli/bin directory.
Parameters:
executable_name (str): Name of the executable file.
args (list): List of arguments to pass to the executable.
capture_output (bool): Whether to capture stdout and stderr.
text (bool): If True, returns output as string; otherwise bytes.
Returns:
subprocess.CompletedProcess: The result of the executed command.
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
bin_dir = os.path.join(script_dir, "bin")
# Construct full path to executable
executable_path = os.path.join(bin_dir, executable_name)
# Check if executable exists
if not os.path.isfile(executable_path):
raise FileNotFoundError(
f"Executable '{executable_name}' not found in {bin_dir}"
)
# Prepare command
command = [executable_path]
if args:
command = [executable_path] + args
else:
command = [executable_path]
# Run the command using subprocess.run()
result = subprocess.run(command, capture_output=capture_output, text=text)
return result
def dynamo_run(args=None):
"""
Run the dynamo-run executable with the provided arguments.
If no args provided, passes through sys.argv[1:] to the executable.
"""
if args is None:
args = sys.argv[1:]
# Run with capture_output=False to allow direct stdout/stderr streaming
result = run_executable("dynamo-run", args=args, capture_output=False)
return result.returncode
def metrics(args=None):
"""
Run the metrics executable with the provided arguments.
If no args provided, passes through sys.argv[1:] to the executable.
"""
if args is None:
args = sys.argv[1:]
result = run_executable("metrics", args=args, capture_output=False)
return result.returncode
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import json
import logging
import os
import sys
import typing as t
from pathlib import Path
from typing import List, Optional
import typer
from rich.console import Console
from rich.panel import Panel
from dynamo.sdk.cli.utils import (
is_local_planner_enabled,
raise_local_planner_warning,
resolve_service_config,
)
from dynamo.sdk.core.runner import TargetEnum
if t.TYPE_CHECKING:
P = t.ParamSpec("P") # type: ignore
F = t.Callable[P, t.Any] # type: ignore
logger = logging.getLogger(__name__)
console = Console()
def serve(
ctx: typer.Context,
graph: str = typer.Argument(..., help="The path to the Dynamo graph to serve"),
service_name: str = typer.Option(
"",
help="Only serve the specified service. Don't serve any dependencies of this service.",
envvar="DYNAMO_SERVICE_NAME",
),
depends: List[str] = typer.Option(
[],
help="List of runner dependencies in name=value format",
envvar="DYNAMO_SERVE_DEPENDS",
),
config_file: Optional[Path] = typer.Option(
None,
"--config-file",
"-f",
help="Path to YAML config file for service configuration",
exists=True,
),
port: Optional[int] = typer.Option(
None,
"--port",
"-p",
help="The port to listen on for the REST API server",
envvar="DYNAMO_PORT",
),
host: Optional[str] = typer.Option(
None,
help="The host to bind for the REST API server",
envvar="DYNAMO_HOST",
),
system_app_port: Optional[int] = typer.Option(
None,
help="The port to listen on for the system app. This is only supported when --service-name is set (only one service is started).",
envvar="DYNAMO_SYSTEM_APP_PORT",
),
system_app_host: Optional[str] = typer.Option(
None,
help="The host to bind for the system app.",
envvar="DYNAMO_SYSTEM_APP_HOST",
),
enable_system_app: bool = typer.Option(
False,
help="Enable the system app.",
envvar="DYNAMO_SYSTEM_APP_ENABLED",
),
use_default_health_checks: bool = typer.Option(
False,
"--use-default-health-checks",
help="Use default liveness and readiness health checks if none are provided.",
),
working_dir: Optional[Path] = typer.Option(
None,
help="When loading from source code, specify the directory to find the Service instance",
),
dry_run: bool = typer.Option(
False,
help="Print the final service configuration and exit without starting the server",
),
target: TargetEnum = typer.Option(
TargetEnum.DYNAMO,
"--target",
help="Specify the target: 'dynamo'",
case_sensitive=False,
),
):
"""Locally serve a Dynamo graph.
Starts a local server for the specified Dynamo graph.
"""
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.protocol.interface import LinkedServices
from dynamo.sdk.lib.loader import find_and_load_service
configure_target_environment(target)
# Extract extra arguments not captured by typer
service_configs = resolve_service_config(config_file, ctx.args)
# Process depends
runner_map_dict = {}
if depends:
try:
runner_map_dict = dict([s.split("=", maxsplit=2) for s in depends or []])
except ValueError:
console.print(
"[bold red]Error:[/bold red] Invalid format for --depends option. Use format 'name=value'"
)
raise typer.Exit(code=1)
if dry_run:
console.print("[bold green]Service Configuration:[/bold green]")
console.print_json(json.dumps(service_configs))
console.print(
"\n[bold green]Environment Variable that would be set:[/bold green]"
)
console.print(f"DYNAMO_SERVICE_CONFIG={json.dumps(service_configs)}")
raise typer.Exit()
configure_dynamo_logging()
if service_configs:
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
if working_dir is None:
if os.path.isdir(os.path.expanduser(graph)):
working_dir = Path(os.path.expanduser(graph))
else:
working_dir = Path(".")
# Convert Path objects to strings where string is required
working_dir_str = str(working_dir)
if sys.path[0] != working_dir_str:
sys.path.insert(0, working_dir_str)
svc = find_and_load_service(graph, working_dir=working_dir)
logger.debug(f"Loaded service: {svc.name}")
logger.debug("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()])
LinkedServices.remove_unused_edges()
# Check if local planner is enabled
enable_local_planner = is_local_planner_enabled(svc, service_configs)
if enable_local_planner:
# Raise warning if local planner is enabled, but workers for prefill or decode is > 1. Not supported.
raise_local_planner_warning(svc, service_configs)
from dynamo.sdk.cli.serving import serve_dynamo_graph # type: ignore
svc.inject_config()
# Start the service
console.print(
Panel.fit(
f"[bold]Starting Dynamo service:[/bold] [cyan]{graph}[/cyan]",
title="[bold green]Dynamo Serve[/bold green]",
border_style="green",
)
)
serve_dynamo_graph(
graph,
working_dir=working_dir_str,
# host=host,
# port=port,
dependency_map=runner_map_dict,
service_name=service_name,
enable_local_planner=enable_local_planner,
target=target,
system_app_port=system_app_port,
system_app_host=system_app_host,
enable_system_app=enable_system_app,
use_default_health_checks=use_default_health_checks,
)
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import asyncio
import atexit
import inspect
import json
import logging
import os
import signal
import typing as t
from typing import Any
import typer
import uvicorn
import uvloop
from fastapi.responses import StreamingResponse
from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context
from dynamo.sdk.core.protocol.interface import DynamoTransport, LinkedServices
from dynamo.sdk.core.runner.health import (
register_liveness_probe,
register_readiness_probe,
)
from dynamo.sdk.lib.loader import find_and_load_service
from dynamo.sdk.lib.utils import get_host_port, get_system_app_host_port
logger = logging.getLogger(__name__)
def add_fastapi_routes(app, service, class_instance):
"""
Add FastAPI routes for Dynamo endpoints supporting HTTP transport.
Args:
app: FastAPI app instance
service: Dynamo service instance
class_instance: Instance of the service class
"""
added_routes = []
for name, endpoint in service.get_dynamo_endpoints().items():
if DynamoTransport.HTTP in endpoint.transports:
path = name if name.startswith("/") else f"/{name}"
# Bind the method to the class instance
bound_method = endpoint.func.__get__(class_instance)
# Check if the method is a generator or async generator
is_streaming = inspect.isasyncgenfunction(
bound_method
) or inspect.isgeneratorfunction(bound_method)
# Set up appropriate response model and response class
if is_streaming:
logger.info(f"Registering streaming endpoint {path}")
app.add_api_route(
path,
bound_method,
methods=["POST"],
response_class=StreamingResponse,
)
else:
logger.info(f"Registering regular endpoint {path}")
app.add_api_route(
path,
bound_method,
methods=["POST"],
)
added_routes.append(path)
logger.info(f"Added API route {path} to FastAPI app")
return added_routes
app = typer.Typer(pretty_exceptions_enable=False)
@app.command()
def main(
dynamo_identifier: str = typer.Argument(".", help="The dynamo identifier"),
service_name: str = typer.Option("", help="Service name"),
runner_map: str = typer.Option(
None,
envvar="DYNAMO_RUNNER_MAP",
help="JSON string of runners map, default sets to envars `DYNAMO_RUNNER_MAP`",
),
worker_env: str = typer.Option(None, help="Environment variables"),
worker_id: int = typer.Option(
None,
help="If set, start the server as a bare worker with the given worker ID. Otherwise start a standalone server with a supervisor process.",
),
custom_component_name: str = typer.Option(
None,
help="If set, use this custom component name instead of the default service name",
),
target: str = typer.Option(
"dynamo",
help="Specify the target: 'dynamo' or 'dynamo'.",
),
) -> None:
"""Start a worker for the given service - either Dynamo or regular service"""
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.runner import TargetEnum
configure_target_environment(TargetEnum(target))
dynamo_context["service_name"] = service_name
dynamo_context["runner_map"] = runner_map
dynamo_context["worker_id"] = worker_id
# Ensure environment variables are set before we initialize
if worker_env:
env_list: list[dict[str, t.Any]] = json.loads(worker_env)
if worker_id is not None:
worker_key = worker_id - 1
if worker_key >= len(env_list):
raise IndexError(
f"Worker ID {worker_id} is out of range, "
f"the maximum worker ID is {len(env_list)}"
)
os.environ.update(env_list[worker_key])
service = find_and_load_service(dynamo_identifier)
if service_name and service_name != service.name:
service = service.find_dependent_by_name(service_name)
# Set namespace in dynamo_context if service is a dynamo component
namespace, _ = service.dynamo_address()
dynamo_context["namespace"] = namespace
configure_dynamo_logging(service_name=service_name, worker_id=worker_id)
# TODO: test this with a deep chain of services
LinkedServices.remove_unused_edges()
# Instance of the inner class of the service should be the same across the dynamo_worker, web_worker, and system_app_worker
class_instance: Any = None
# will be set once dyn_worker has created class_instance
instanceReady = asyncio.Event()
@dynamo_worker()
async def dyn_worker(runtime: DistributedRuntime):
nonlocal class_instance
global dynamo_context
dynamo_context["runtime"] = runtime
# Get Dynamo configuration and create component
namespace, component_name = service.dynamo_address()
logger.info(f"Registering component {namespace}/{component_name}")
component = runtime.namespace(namespace).component(component_name)
try:
# Create service first
await component.create_service()
logger.info(f"Created {service.name} component")
# Set runtime on all dependencies
for dep in service.dependencies.values():
dep.set_runtime(runtime)
logger.debug(f"Set runtime for dependency: {dep}")
# Then register all Dynamo endpoints
dynamo_endpoints = service.get_dynamo_endpoints()
endpoints = []
for name, endpoint in dynamo_endpoints.items():
if DynamoTransport.DEFAULT in endpoint.transports:
td_endpoint = component.endpoint(name)
logger.debug(
f"Registering endpoint '{name}' with DEFAULT transport"
)
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
dynamo_context["component"] = component
dynamo_context["endpoints"] = endpoints
class_instance = service.inner()
# signal that class_instance (and its setup) is done
instanceReady.set()
dynamo_handlers = []
for name, endpoint in dynamo_endpoints.items():
if DynamoTransport.DEFAULT in endpoint.transports:
bound_method = endpoint.func.__get__(class_instance)
# Only pass request type for now, use Any for response
# TODO: Handle an endpoint not having types
# TODO: Handle multiple endpoints in a single component
dynamo_wrapped_method = dynamo_endpoint(endpoint.request_type, Any)(
bound_method
)
dynamo_handlers.append(dynamo_wrapped_method)
# Run startup hooks before setting up endpoints
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(
member, "__dynamo_startup_hook__", False
):
logger.debug(f"Running startup hook: {name}")
result = getattr(class_instance, name)()
if inspect.isawaitable(result):
# await on startup hook async_on_start
await result
logger.debug(f"Completed async startup hook: {name}")
else:
logger.info(f"Completed startup hook: {name}")
logger.info(
f"Starting {service.name} instance with all registered endpoints"
)
# Launch serve_endpoint for all endpoints concurrently
tasks = [
endpoint.serve_endpoint(handler)
for endpoint, handler in zip(endpoints, dynamo_handlers)
]
if tasks:
# Wait for all tasks to complete
await asyncio.gather(*tasks)
else:
msg = f"No Dynamo endpoints found in service {service.name} but keeping service alive"
logger.info(msg)
# Even with no endpoints, we should keep the service running
# until explicitly terminated
try:
# Create an event to wait on indefinitely until interrupted
stop_event = asyncio.Event()
# Wait for the event that will never be set unless interrupted
await stop_event.wait()
except asyncio.CancelledError:
logger.info("Service execution cancelled")
except KeyboardInterrupt:
logger.info("Service interrupted by user")
except Exception as e:
logger.exception(
f"Unexpected error while keeping service alive: {e}"
)
finally:
logger.info("Service shutting down")
except Exception as e:
logger.error(f"Error in Dynamo component setup: {str(e)}")
raise
# if the service has a FastAPI app, add the worker as an event handler
async def web_worker():
# We want to wait until dyn_worker has initialized class_instance
await instanceReady.wait()
if not service.app:
return
# TODO: init hooks
# Add API routes to the FastAPI app
added_routes = add_fastapi_routes(service.app, service, class_instance)
if added_routes:
# Configure uvicorn with graceful shutdown
host, port = get_host_port()
# Pass None to uvicorn setting to unify log style
config = uvicorn.Config(service.app, host=host, port=port, log_config=None)
server = uvicorn.Server(config)
# Start the server with graceful shutdown handling
logger.info(
f"Starting FastAPI server on {config.host}:{config.port} with routes: {added_routes}"
)
await server.serve()
else:
logger.warning("No API routes found, not starting FastAPI server")
async def system_app_worker():
# We want to wait until dyn_worker has initialized class_instance
await instanceReady.wait()
if not service.system_app:
raise ValueError("System app not defined for service")
# Register system endpoints
use_default_health_checks = (
os.environ.get(
"DYNAMO_SYSTEM_APP_USE_DEFAULT_HEALTH_CHECKS", "false"
).lower()
== "true"
)
if use_default_health_checks:
logger.info("Using default health checks for liveness and readiness probes")
register_liveness_probe(
service.system_app, class_instance, use_default=use_default_health_checks
)
register_readiness_probe(
service.system_app, class_instance, use_default=use_default_health_checks
)
# readiness, etc...
host, port = get_system_app_host_port()
server = uvicorn.Server(
uvicorn.Config(service.system_app, host=host, port=port, log_config=None)
)
logger.info(f"Starting system app on {host}:{port}")
await server.serve()
def should_start_system_app():
return os.environ.get("DYNAMO_SYSTEM_APP_ENABLED", "false").lower() == "true"
# Helper to launch fastapi server and dynamo worker concurrently
async def run_concurrent_workers(tasks):
await asyncio.gather(*tasks)
def exit_handler():
"""Exit handler that runs shutdown hooks before process termination."""
if class_instance is not None:
logger.info("Running shutdown hooks on exit")
try:
run_shutdown_hooks(class_instance)
logger.info("Shutdown hooks completed successfully")
except Exception as e:
logger.error(f"Error running shutdown hooks: {e}")
else:
logger.debug("No class instance available for shutdown hooks")
# Register the exit handler
atexit.register(exit_handler)
# Also handle signals for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating graceful shutdown")
exit_handler()
# Exit the process after running shutdown hooks
os._exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
worker_tasks = []
uvloop.install()
start_http_server = False
for endpoint in service.get_dynamo_endpoints().values():
logger.debug(f"Checking transports for endpoint: {endpoint.transports}")
if DynamoTransport.HTTP in endpoint.transports:
start_http_server = True
break
if start_http_server:
worker_tasks.append(web_worker())
if should_start_system_app():
logger.info("Starting system app")
worker_tasks.append(system_app_worker())
# Always start the dynamo worker, no reason not to
worker_tasks.append(dyn_worker())
asyncio.run(run_concurrent_workers(worker_tasks))
def run_shutdown_hooks(class_instance):
"""Run all shutdown hooks on the class instance."""
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(member, "__dynamo_shutdown_hook__", False):
shutdown_func = getattr(class_instance, name)
shutdown_func()
if __name__ == "__main__":
app()
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import contextlib
import json
import logging
import os
import pathlib
import shutil
import tempfile
from typing import Any, Dict, Optional
from circus.sockets import CircusSocket
from circus.watcher import Watcher
from dynamo.sdk.cli.circus import CircusRunner
from dynamo.sdk.core.runner import TargetEnum
from .allocator import NVIDIA_GPU, ResourceAllocator
from .circus import _get_server_socket
from .utils import (
DYN_LOCAL_STATE_DIR,
ServiceProtocol,
reserve_free_port,
save_dynamo_state,
)
logger = logging.getLogger(__name__)
_DYNAMO_WORKER_SCRIPT = "dynamo.sdk.cli.serve_dynamo"
def _get_dynamo_worker_script(
dynamo_identifier: str, svc_name: str, target: TargetEnum
) -> list[str]:
args = [
"-m",
_DYNAMO_WORKER_SCRIPT,
dynamo_identifier,
"--service-name",
svc_name,
"--worker-id",
"$(CIRCUS.WID)",
"--target",
target,
]
return args
def create_dynamo_watcher(
dynamo_identifier: str,
svc: ServiceProtocol,
uds_path: str,
scheduler: ResourceAllocator,
working_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
target: TargetEnum = TargetEnum.DYNAMO,
) -> tuple[Watcher, CircusSocket, str]:
"""Create a watcher for a Dynamo service in the dependency graph"""
from dynamo.sdk.cli.circus import create_circus_watcher
num_workers, resource_envs = scheduler.get_resource_envs(svc)
uri, socket = _get_server_socket(svc, uds_path)
args = _get_dynamo_worker_script(dynamo_identifier, svc.name, target)
if resource_envs:
args.extend(["--worker-env", json.dumps(resource_envs)])
# Update env to include ServiceConfig and service-specific environment variables
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ["DYNAMO_SERVICE_CONFIG"]
# Get service-specific environment variables from DYNAMO_SERVICE_ENVS
if "DYNAMO_SERVICE_ENVS" in os.environ:
try:
service_envs = json.loads(os.environ["DYNAMO_SERVICE_ENVS"])
if svc.name in service_envs:
service_args = service_envs[svc.name].get("ServiceArgs", {})
if "envs" in service_args:
worker_env.update(service_args["envs"])
logger.info(
f"Added service-specific environment variables for {svc.name}"
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse DYNAMO_SERVICE_ENVS: {e}")
# use namespace from the service
namespace, _ = svc.dynamo_address()
# Create the watcher with updated environment
watcher = create_circus_watcher(
name=f"{namespace}_{svc.name}",
args=args,
numprocesses=num_workers,
working_dir=working_dir,
env=worker_env,
)
logger.info(f"Created watcher for {svc.name}'s in the {namespace} namespace")
return watcher, socket, uri
def clear_namespace(namespace: str) -> None:
"""
Check if utils/clear_namespace.py exists and run it to clear the namespace.
"""
import os.path
import subprocess
clear_script_path = "utils/clear_namespace.py"
if os.path.exists(clear_script_path):
logger.info(f"Clearing namespace {namespace} using {clear_script_path}")
try:
# Run the script and wait for it to complete
result = subprocess.run(
["python", "-m", "utils.clear_namespace", "--namespace", namespace],
check=True,
capture_output=True,
text=True,
)
logger.info(f"Clear namespace output: {result.stdout}")
logger.info(f"Successfully cleared namespace {namespace}")
if result.stderr:
logger.info(f"Clear namespace stderr: {result.stderr}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to clear namespace {namespace}: {e.stderr}")
else:
logger.debug(
f"Script not found at {clear_script_path}, skip namespace clearing"
)
def serve_dynamo_graph(
graph: str,
working_dir: str | None = None,
dependency_map: dict[str, str] | None = None,
service_name: str = "",
enable_local_planner: bool = False,
target: TargetEnum = TargetEnum.DYNAMO,
system_app_port: Optional[int] = None,
system_app_host: Optional[str] = None,
enable_system_app: bool = False,
use_default_health_checks: bool = False,
) -> CircusRunner:
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.circus import create_arbiter, create_circus_watcher
from dynamo.sdk.lib.loader import find_and_load_service
from .allocator import ResourceAllocator
configure_dynamo_logging(service_name=service_name)
namespace: str = ""
env: dict[str, Any] = {}
svc = find_and_load_service(graph, working_dir)
dynamo_path = pathlib.Path(working_dir or ".")
watchers: list[Watcher] = []
sockets: list[CircusSocket] = []
allocator = ResourceAllocator()
if dependency_map is None:
dependency_map = {}
standalone = False
if service_name:
logger.info(f"Service '{service_name}' running in standalone mode")
standalone = True
# TODO: We are signaling by setting env vars to downstream subprocesses. Let's pass flags on our invokation of serve_dynamo instead. That way the API is defined at the top level.
# Signal downstream workers to start system app by setting DYNAMO_SYSTEM_APP_* env vars for each worker. They are respectively consumed in serve_dynamo.py
if enable_system_app:
env["DYNAMO_SYSTEM_APP_ENABLED"] = "true"
if system_app_port:
# Throw if not standalone mode. Should only be set in standalone mode.
# TODO: This might still cause issues if we are running in standalone, but have multiple workers, need to figure this one out
if not standalone:
raise ValueError(
"Specifying system app port is only supported in standalone mode (i.e --service-name is set)"
)
env["DYNAMO_SYSTEM_APP_PORT"] = str(system_app_port)
if system_app_host:
env["DYNAMO_SYSTEM_APP_HOST"] = system_app_host
# Only set use_default_health_checks if explicitly enabled
if use_default_health_checks:
env["DYNAMO_SYSTEM_APP_USE_DEFAULT_HEALTH_CHECKS"] = "true"
logger.info("Default health checks enabled for system app")
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, resource_envs = allocator.get_resource_envs(svc)
uds_path = tempfile.mkdtemp(prefix="dynamo-uds-")
try:
if not service_name and not standalone:
with contextlib.ExitStack() as port_stack:
# first check if all components has the same namespace
namespaces = set()
for name, dep_svc in svc.all_services().items():
if name == svc.name or name in dependency_map:
continue
namespaces.add(dep_svc.dynamo_address()[0])
if len(namespaces) > 1:
raise RuntimeError(
f"All components must have the same namespace, got {namespaces}"
)
else:
namespace = namespaces.pop() if namespaces else ""
logger.info(f"Serving dynamo graph with namespace {namespace}")
# clear residue etcd/nats entry (if any) under this namespace
logger.info(f"Clearing namespace {namespace} before serving")
clear_namespace(namespace)
for name, dep_svc in svc.all_services().items():
if name == svc.name or name in dependency_map:
continue
if not dep_svc.is_servable():
raise RuntimeError(
f"Service {dep_svc.name} is not servable. Please use link to override with a concrete implementation."
)
new_watcher, new_socket, uri = create_dynamo_watcher(
graph,
dep_svc,
uds_path,
allocator,
str(dynamo_path.absolute()),
env=env,
target=target,
)
watchers.append(new_watcher)
sockets.append(new_socket)
dependency_map[name] = uri
# reserve one more to avoid conflicts
port_stack.enter_context(reserve_free_port())
else:
namespace, _ = svc.dynamo_address()
dynamo_args = [
"-m",
_DYNAMO_WORKER_SCRIPT,
graph,
"--service-name",
svc.name,
"--worker-id",
"$(CIRCUS.WID)",
]
# resource_envs is the resource allocation (ie CUDA_VISIBLE_DEVICES) for each worker created by the allocator
# these resource_envs are passed to each individual worker's environment which is set in serve_dynamo
if resource_envs:
dynamo_args.extend(["--worker-env", json.dumps(resource_envs)])
# env is the base dynamlocal fault tolerence o environment variables. We make a copy and update it to add any service configurations and additional env vars
worker_env = env.copy() if env else {}
# Pass through the main service config
if "DYNAMO_SERVICE_CONFIG" in os.environ:
worker_env["DYNAMO_SERVICE_CONFIG"] = os.environ["DYNAMO_SERVICE_CONFIG"]
# Get service-specific environment variables from DYNAMO_SERVICE_ENVS
if "DYNAMO_SERVICE_ENVS" in os.environ:
try:
service_envs = json.loads(os.environ["DYNAMO_SERVICE_ENVS"])
if svc.name in service_envs:
service_args = service_envs[svc.name].get("ServiceArgs", {})
if "envs" in service_args:
worker_env.update(service_args["envs"])
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse DYNAMO_SERVICE_ENVS: {e}")
watcher = create_circus_watcher(
name=f"{namespace}_{svc.name}",
args=dynamo_args,
numprocesses=num_workers,
working_dir=str(dynamo_path.absolute()),
env=worker_env,
)
watchers.append(watcher)
logger.info(
f"Created watcher for {svc.name} with {num_workers} workers in the {namespace} namespace"
)
# inject runner map now
inject_env = {"DYNAMO_RUNNER_MAP": json.dumps(dependency_map)}
for watcher in watchers:
if watcher.env is None:
watcher.env = inject_env
else:
watcher.env.update(inject_env)
arbiter_kwargs: dict[str, Any] = {
"watchers": watchers,
"sockets": sockets,
}
arbiter = create_arbiter(**arbiter_kwargs)
arbiter.exit_stack.callback(clear_namespace, namespace)
arbiter.exit_stack.callback(shutil.rmtree, uds_path, ignore_errors=True)
if enable_local_planner:
arbiter.exit_stack.callback(
shutil.rmtree,
os.environ.get(
DYN_LOCAL_STATE_DIR, os.path.expanduser("~/.dynamo/state")
),
ignore_errors=True,
)
logger.warn(f"arbiter: {arbiter.endpoint}")
# save deployment state for planner
if not namespace:
raise ValueError("No namespace found for service")
# Track GPU allocation for each component
component_resources = {}
logger.info(f"Building component resources for {len(watchers)} watchers")
for watcher in watchers:
component_name = watcher.name
logger.info(f"Processing watcher: {component_name}")
# Extract worker info including GPU allocation
worker_gpu_info: dict[str, Any] = {}
# Extract service name from watcher name
service_name = ""
if component_name.startswith(f"{namespace}"):
service_name = component_name.replace(f"{namespace}_", "", 1)
# Get GPU allocation from ResourceAllocator
if (
not worker_gpu_info
and hasattr(allocator, "_service_gpu_allocations")
and service_name
):
gpu_allocations = getattr(allocator, "_service_gpu_allocations", {})
if service_name in gpu_allocations:
logger.info(
f"Found GPU allocation for {service_name} in ResourceAllocator: {gpu_allocations[service_name]}"
)
worker_gpu_info["allocated_gpus"] = gpu_allocations[
service_name
]
# Store final worker GPU info
component_resources[component_name] = worker_gpu_info
logger.info(f"Final GPU info for {component_name}: {worker_gpu_info}")
logger.info(f"Completed component resources: {component_resources}")
# Now create components dict with resources included
components_dict = {
watcher.name: {
"watcher_name": watcher.name,
"cmd": watcher.cmd
+ " -m "
+ " ".join(
watcher.args[1:]
) # WAR because it combines python-m into 1 word
if hasattr(watcher, "args")
else watcher.cmd,
"resources": component_resources.get(watcher.name, {}),
}
for watcher in watchers
}
save_dynamo_state(
namespace,
arbiter.endpoint,
components=components_dict,
environment={
"DYNAMO_SERVICE_CONFIG": os.environ["DYNAMO_SERVICE_CONFIG"],
"SYSTEM_RESOURCES": {
"total_gpus": len(allocator.system_resources[NVIDIA_GPU]),
"gpu_info": [
str(gpu) for gpu in allocator.system_resources[NVIDIA_GPU]
],
},
},
)
arbiter.start(
cb=lambda _: logger.info( # type: ignore
(
"Starting Dynamo Service %s (Press CTRL+C to quit)"
if (
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
)
else "Starting %s (Press CTRL+C to quit)"
),
*(
(svc.name,)
if (
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
)
else (graph,)
),
),
)
return CircusRunner(arbiter=arbiter)
except Exception:
shutil.rmtree(uds_path, ignore_errors=True)
raise
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import collections
import contextlib
import json
import logging
import os
import pathlib
import socket
from typing import Any, DefaultDict, Dict, Iterator, Protocol, TextIO, Union
import typer
import yaml
from rich.console import Console
from dynamo.planner.defaults import BasePlannerDefaults # type: ignore[attr-defined]
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.core.protocol.interface import ComponentType
from dynamo.sdk.core.runner import TargetEnum
configure_dynamo_logging()
logger = logging.getLogger(__name__)
console = Console()
DYN_LOCAL_STATE_DIR = "DYN_LOCAL_STATE_DIR"
PLANNER_SERVICE_NAME = "Planner"
# Define a Protocol for services to ensure type safety
class ServiceProtocol(Protocol):
name: str
inner: Any
models: list[Any]
dynamo: Any
def is_dynamo_component(self) -> bool:
...
def dynamo_address(self) -> tuple[str, str]:
...
class PortReserver:
def __init__(self, host: str = "localhost"):
self.host = host
self.socket: socket.socket | None = None
self.port: int | None = None
def __enter__(self) -> int:
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.host, 0))
_, self.port = self.socket.getsockname()
return self.port
except socket.error as e:
self.close_socket()
logger.warning(f"Failed to reserve port on {self.host}: {str(e)}")
raise
def __exit__(self, exc_type, exc_val, exc_tb):
self.close_socket()
def close_socket(self):
try:
if self.socket:
self.socket.close()
except socket.error as e:
logger.warning(f"Error while closing socket: {str(e)}")
# Don't re-raise the exception as this is cleanup code
return True
@contextlib.contextmanager
def reserve_free_port(
host: str = "localhost",
) -> Iterator[int]:
"""
Detect free port and reserve until exit the context.
Returns a context manager that yields the reserved port.
"""
with PortReserver(host) as port:
yield port
def save_dynamo_state(
namespace: str,
circus_endpoint: str,
components: dict[str, Any],
environment: dict[str, Any],
):
state_dir = os.environ.get(
DYN_LOCAL_STATE_DIR, os.path.expanduser("~/.dynamo/state")
)
os.makedirs(state_dir, exist_ok=True)
# create the state object
state = {
"namespace": namespace,
"circus_endpoint": circus_endpoint,
"components": components,
"environment": environment,
}
# save the state object to a file
state_file = os.path.join(state_dir, f"{namespace}.json")
with open(state_file, "w") as f:
json.dump(state, f)
logger.warning(f"Saved state to {state_file}")
def append_dynamo_state(namespace: str, component_name: str, data: dict) -> None:
"""Append additional data to an existing component's state"""
state_dir = os.environ.get(
DYN_LOCAL_STATE_DIR, os.path.expanduser("~/.dynamo/state")
)
state_file = os.path.join(state_dir, f"{namespace}.json")
if not os.path.exists(state_file):
logger.warning(
f"Skipping append to state file {state_file} because it doesn't exist"
)
return
with open(state_file, "r") as f:
state = json.load(f)
if "components" not in state:
state["components"] = {}
if component_name not in state["components"]:
state["components"][component_name] = {}
state["components"][component_name].update(data)
logger.warning(f"Appending {data} to {component_name} in {state_file}")
with open(state_file, "w") as f:
json.dump(state, f)
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, Any]:
"""Parse a single CLI argument into service name, key, and value."""
parts = arg_name.split(".")
service = parts[0]
nested_keys = parts[1:]
# Special case: if this is a ServiceArgs.envs.* path, keep value as string
if (
len(nested_keys) >= 2
and nested_keys[0] == "ServiceArgs"
and nested_keys[1] == "envs"
):
value: Union[str, int, float, bool, dict, list] = arg_value
else:
# Parse value based on type for non-env vars
try:
value = json.loads(arg_value)
except json.JSONDecodeError:
if arg_value.isdigit():
value = int(arg_value)
elif arg_value.replace(".", "", 1).isdigit() and arg_value.count(".") <= 1:
value = float(arg_value)
elif arg_value.lower() in ("true", "false"):
value = arg_value.lower() == "true"
else:
value = arg_value
# Build nested dict structure
result = value
for key in reversed(nested_keys[1:]):
result = {key: result}
return service, nested_keys[0], result
def _parse_service_args(args: list[str]) -> Dict[str, Any]:
service_configs: DefaultDict[str, Dict[str, Any]] = collections.defaultdict(dict)
def deep_update(d: dict, key: str, value: Any):
"""
Recursively updates nested dictionaries. We use this to process arguments like
---Worker.ServiceArgs.env.CUDA_VISIBLE_DEVICES="0,1"
The _parse_service_arg function will parse this into:
service = "Worker"
nested_keys = ["ServiceArgs", "envs", "CUDA_VISIBLE_DEVICES"]
And returns: ("VllmWorker", "ServiceArgs", {"envs": {"CUDA_VISIBLE_DEVICES": "0,1"}})
We then use deep_update to update the service_configs dictionary with this nested value.
"""
if isinstance(value, dict) and key in d and isinstance(d[key], dict):
for k, v in value.items():
deep_update(d[key], k, v)
else:
d[key] = value
index = 0
while index < len(args):
next_arg = args[index]
if not (next_arg.startswith("--") or "." not in next_arg):
continue
try:
if "=" in next_arg:
arg_name, arg_value = next_arg.split("=", 1)
index += 1
elif args[index + 1] == "=":
arg_name = next_arg
arg_value = args[index + 2]
index += 3
else:
arg_name = next_arg
arg_value = args[index + 1]
index += 2
if arg_value.startswith("-"):
raise ValueError("Service arg value can not start with -")
arg_name = arg_name[2:]
service, key, value = _parse_service_arg(arg_name, arg_value)
deep_update(service_configs[service], key, value)
except Exception:
raise ValueError(f"Error parsing service arg: {args[index]}")
return service_configs
def resolve_service_config(
config_file: pathlib.Path | TextIO | None = None,
args: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
"""Resolve service configuration from file and command line arguments.
Args:
config_file: Path to YAML config file or file object
args: List of command line arguments
Returns:
Dictionary mapping service names to their configurations
"""
service_configs: dict[str, dict[str, Any]] = {}
# Check for deployment config first
if "DYN_DEPLOYMENT_CONFIG" in os.environ:
try:
deployment_config = yaml.safe_load(os.environ["DYN_DEPLOYMENT_CONFIG"])
# Use deployment config directly
service_configs = deployment_config
logger.info(f"Successfully loaded deployment config: {service_configs}")
logger.warning(
"DYN_DEPLOYMENT_CONFIG found in environment - ignoring configuration file and command line arguments"
)
except Exception as e:
logger.warning(f"Failed to parse DYN_DEPLOYMENT_CONFIG: {e}")
else:
if config_file:
with open(config_file) if isinstance(
config_file, (str, pathlib.Path)
) else contextlib.nullcontext(config_file) as f:
yaml_configs = yaml.safe_load(f)
logger.debug(f"Loaded config from file: {yaml_configs}")
# Initialize service_configs as empty dict if it's None
# Convert nested YAML structure to flat dict with dot notation
for service, configs in yaml_configs.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
# Process command line overrides
if args:
cmdline_overrides = _parse_service_args(args)
logger.info(f"Applying command line overrides: {cmdline_overrides}")
for service, configs in cmdline_overrides.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
logger.info(f"Running dynamo serve with config: {service_configs}")
return service_configs
def configure_target_environment(target: TargetEnum):
from dynamo.sdk.core.lib import set_target
if target == TargetEnum.DYNAMO:
from dynamo.sdk.core.runner.dynamo import LocalDeploymentTarget
target = LocalDeploymentTarget()
else:
raise ValueError(f"Invalid target: {target}")
logger.debug(f"Setting deployment target to {target}")
set_target(target)
def is_local_planner_enabled(svc: Any, service_configs: dict) -> bool:
"""Check if local planner is enabled.
Args:
svc: The entrypoint service instance
service_configs: Dictionary of service configurations
Returns:
bool: True if local planner is enabled, False otherwise
"""
# Check all nodes to find planner
nodes = [dep for dep in svc.all_services().values()]
nodes.append(svc)
planners = [
node
for node in nodes
if node.config.dynamo.component_type == ComponentType.PLANNER
]
if len(planners) > 1:
console.print(
"[bold red]Error:[/bold red] More than one planner found in the pipeline"
)
raise typer.Exit(code=1)
# Exactly one planner
if planners:
# Get the config for the planner and check environment
planner_config = service_configs.get(PLANNER_SERVICE_NAME, {})
environment = planner_config.get("environment", BasePlannerDefaults.environment)
return environment == "local"
return False
def raise_local_planner_warning(svc: Any, service_configs: dict) -> None:
"""Warn if local planner is enabled and active (not set to no-op), but workers for prefill or decode is > 1. This is currently not supported.
Args:
svc: The service instance
service_configs: Dictionary of service configurations
"""
planner_config = service_configs.get(PLANNER_SERVICE_NAME, {})
# Resolve no-op setting
no_op = planner_config.get("no-operation", BasePlannerDefaults.no_operation)
# Check worker counts across nodes
nodes = [dep for dep in svc.all_services().values()]
nodes.append(svc)
worker_names = ("PrefillWorker", "VllmWorker")
worker_counts_greater_than_one = [
node.config.workers > 1 for node in nodes if node.name in worker_names
]
if any(worker_counts_greater_than_one) and not no_op:
logger.error(
"Local planner is enabled, but workers for prefill or decode is > 1. Local planner must be started with prefill and decode workers set to 1."
)
raise typer.Exit(code=1)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import abc
import asyncio
import typing as t
from functools import wraps
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Protocol,
TypeVar,
get_type_hints,
)
from dynamo.runtime import DistributedRuntime
from dynamo.sdk.core.protocol.interface import (
DynamoEndpointInterface,
DynamoTransport,
ServiceInterface,
)
T = TypeVar("T")
class AbstractDynamoEndpoint(Protocol):
"""Protocol for functions that can be marked as abstract dynamo endpoints."""
__is_abstract_dynamo__: bool
class DynamoEndpoint(DynamoEndpointInterface):
"""
Base class for dynamo endpoints
Dynamo endpoints are methods decorated with @endpoint.
"""
def __init__(
self,
func: Callable,
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
**kwargs,
):
self.func = func
self._name = name or func.__name__
self._transports = transports or [DynamoTransport.DEFAULT]
# Extract request type from hints
hints = get_type_hints(func)
args = list(hints.items())
# Skip self/cls argument if present
if args and args[0][0] in ("self", "cls"):
args = args[1:]
# Get request type from first arg if available
self.request_type = args[0][1] if args else None
wraps(func)(self)
# Store additional metadata
for key, value in kwargs.items():
setattr(self, key, value)
@property
def name(self) -> str:
return self._name
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
return await self.func(*args, **kwargs)
@property
def transports(self) -> List[DynamoTransport]:
return self._transports
# Decorator for abstract dynamo endpoints
def abstract_endpoint(func: t.Callable) -> t.Callable:
"""Mark an abstract endpoint in an interface."""
func.__is_abstract_dynamo__ = True # type: ignore
return abc.abstractmethod(func)
def endpoint(
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
**kwargs,
) -> Callable[[Callable], DynamoEndpoint]:
"""Decorator for dynamo endpoints."""
def decorator(func: Callable) -> DynamoEndpoint:
return DynamoEndpoint(func, name, transports, **kwargs)
return decorator
def api(
name: Optional[str] = None,
**kwargs,
) -> Callable[[Callable], DynamoEndpoint]:
"""Decorator for dynamo endpoints."""
def decorator(func: Callable) -> DynamoEndpoint:
return DynamoEndpoint(func, name, transports=[DynamoTransport.HTTP], **kwargs)
return decorator
class DynamoClient:
"""Client for calling Dynamo endpoints with streaming support"""
def __init__(self, service: ServiceInterface[Any]):
self._service = service
self._endpoints = service.get_dynamo_endpoints()
self._dynamo_clients: Dict[str, Any] = {}
self._runtime = None
def __getattr__(self, name: str) -> Any:
if name not in self._endpoints:
raise AttributeError(
f"No Dynamo endpoint '{name}' found on service '{self._service.name}'. "
f"Available endpoints: {list(self._endpoints.keys())}"
)
# For streaming endpoints, create/cache the stream function
if name not in self._dynamo_clients:
namespace, component_name = self._service.dynamo_address()
# Create async generator function that directly yields from the stream
async def get_stream(*args, **kwargs):
if self._runtime is not None:
runtime = self._runtime
else:
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, False)
self._runtime = runtime
# Use existing runtime if available
try:
# TODO: bis - dont recreate the client every time
client = (
await runtime.namespace(namespace)
.component(component_name)
.endpoint(name)
.client()
)
# Directly yield items from the stream
stream = await client.generate(*args, **kwargs)
async for item in stream:
yield item.data()
except Exception as e:
raise e
self._dynamo_clients[name] = get_stream
return self._dynamo_clients[name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import logging
import os
from typing import Any, Callable, Dict, Optional, Type, TypeVar, Union
from fastapi import FastAPI
from dynamo.sdk.core.protocol.interface import (
AbstractService,
DependencyInterface,
DeploymentTarget,
DynamoConfig,
ServiceConfig,
ServiceInterface,
validate_dynamo_interfaces,
)
G = TypeVar("G", bound=Callable[..., Any])
# Note: global service provider.
# this should be set to a concrete implementation of the DeploymentTarget interface
_target: DeploymentTarget
# Add global cache for abstract services
_abstract_service_cache: Dict[Type[AbstractService], ServiceInterface[Any]] = {}
logger = logging.getLogger(__name__)
DYNAMO_IMAGE = os.getenv("DYNAMO_IMAGE", "dynamo:latest-vllm")
def set_target(target: DeploymentTarget) -> None:
"""Set the global service provider implementation"""
global _target
_target = target
def get_target() -> DeploymentTarget:
"""Get the current service provider implementation"""
global _target
return _target
# Helper function to get or create service instance for AbstractService
def _get_or_create_abstract_service_instance(
abstract_service_cls: Type[AbstractService],
) -> ServiceInterface[Any]:
"""
Retrieves a service instance from cache or creates a new one
for the given AbstractService class.
"""
global _abstract_service_cache
if abstract_service_cls in _abstract_service_cache:
return _abstract_service_cache[abstract_service_cls]
# This placeholder service will be a singleton, and will be used for all dependencies that depend on this abstract service.
# The name for DynamoConfig will be the class name of the abstract service.
dynamo_config_for_abstract = DynamoConfig(enabled=True)
# Call the main service() decorator/function to create the service instance
# validate_dynamo_interfaces is False because validating an interface has implemented dynamo endpoints will obviously fail
service_instance = service(
abstract_service_cls,
dynamo=dynamo_config_for_abstract,
should_validate_dynamo_interfaces=False,
)
_abstract_service_cache[abstract_service_cls] = service_instance
return service_instance
def service(
inner: Optional[Type[G]] = None,
/,
*,
app: Optional[FastAPI] = None,
should_validate_dynamo_interfaces: bool = True,
system_app: Optional[FastAPI] = None,
**kwargs: Any,
) -> Any:
"""Service decorator that's adapter-agnostic"""
config = ServiceConfig(**kwargs)
def decorator(inner: Type[G]) -> ServiceInterface[G]:
# Ensures that all declared dynamo endpoints on the parent interfaces are implemented
if should_validate_dynamo_interfaces:
validate_dynamo_interfaces(inner)
provider = get_target()
if inner is not None:
config.dynamo.name = inner.__name__
service_instance = provider.create_service(
service_cls=inner,
config=config,
app=app,
system_app=system_app,
**kwargs,
)
return service_instance
ret = decorator(inner) if inner is not None else decorator
return ret
def depends(
on: Optional[Union[ServiceInterface[G], Type[AbstractService]]] = None,
**kwargs: Any,
) -> DependencyInterface[G]:
"""Create a dependency using the current service provider.
If 'on' is an AbstractService type, a placeholder service will be
created and used for the dependency.
"""
provider = get_target()
actual_on_service: Optional[ServiceInterface[Any]] = None
if isinstance(on, type) and issubclass(on, AbstractService):
actual_on_service = _get_or_create_abstract_service_instance(on)
# The type of actual_on_service here would be ServiceInterface[NameOfAbstractClass]
# So, T would be NameOfAbstractClass.
return provider.create_dependency(on=actual_on_service, **kwargs)
elif isinstance(on, ServiceInterface):
# This handles both 'on=None' and 'on=SomeServiceInterfaceInstance'
# If 'on' is ServiceInterface[K], T could be K. If 'on' is None, T remains unbound here.
actual_on_service = on
return provider.create_dependency(on=actual_on_service, **kwargs)
else:
raise TypeError(
"depends() expects 'on' to be a ServiceInterface, an AbstractService type"
)
def liveness(func: G) -> G:
"""Decorator for liveness probe."""
if not callable(func):
raise TypeError("@liveness can only decorate callable methods")
func.__is_liveness_probe__ = True # type: ignore
return func
def get_liveness_handler(obj):
for attr in dir(obj):
fn = getattr(obj, attr)
if callable(fn) and getattr(fn, "__is_liveness_probe__", False):
return fn
return None
def readiness(func: G) -> G:
"""Decorator for readiness probe."""
if not callable(func):
raise TypeError("@readiness can only decorate callable methods")
func.__is_readiness_probe__ = True # type: ignore
return func
def get_readiness_handler(obj):
for attr in dir(obj):
fn = getattr(obj, attr)
if callable(fn) and getattr(fn, "__is_readiness_probe__", False):
return fn
return None
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing as t
from dataclasses import dataclass, field
@dataclass
class Resources:
"""Resources for a service."""
cpu: str | None = None # example: "3", "300m"
memory: str | None = None # example: "10Gi", "1024Mi"
gpu: str | None = None # example: "4"
def __post_init__(self):
# Validate and normalize CPU format (e.g., "3", "300m")
if self.cpu is not None:
self.cpu = self.cpu.strip()
if not (
self.cpu.isdigit() or (self.cpu[:-1].isdigit() and self.cpu[-1] == "m")
):
raise ValueError(
f"Invalid CPU format: {self.cpu}. Expected format like '3' or '300m'"
)
# Validate and normalize memory format (e.g., "10Gi", "1024Mi")
if self.memory is not None:
self.memory = self.memory.strip()
valid_suffixes = [
"Ki",
"Mi",
"Gi",
"Ti",
"Pi",
"Ei",
"K",
"M",
"G",
"T",
"P",
"E",
]
if not any(
self.memory.endswith(suffix) and self.memory[: -len(suffix)].isdigit()
for suffix in valid_suffixes
):
if not self.memory.isdigit():
raise ValueError(
f"Invalid memory format: {self.memory}. Expected format like '10Gi' or '1024Mi'"
)
# Validate and normalize GPU format (should be a number)
if self.gpu is not None:
self.gpu = self.gpu.strip()
if not self.gpu.isdigit():
raise ValueError(
f"Invalid GPU format: {self.gpu}. Expected a number like '1' or '4'"
)
@dataclass
class Env:
"""Environment variable."""
name: str
value: str = ""
@dataclass
class Service:
"""The entry service of a deployment."""
service_name: str
name: str
namespace: str
version: str
path: str
cmd: t.List[str] = field(default_factory=list)
resources: Resources | None = None
envs: t.List[Env] = field(default_factory=list)
secrets: t.List[str] = field(default_factory=list)
apis: dict = field(default_factory=dict)
size_bytes: int = 0
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import abc
from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum, auto
from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar
from fastapi import FastAPI
from pydantic import BaseModel, ConfigDict, Field, field_validator
from .deployment import Env
T = TypeVar("T", bound=object)
class AbstractService(abc.ABC):
"""Base class for Dynamo service interfaces."""
pass
class LeaseConfig(BaseModel):
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class ComponentType:
"""Types of Dynamo components"""
PLANNER = "planner"
class DynamoConfig(BaseModel):
"""Configuration for Dynamo components"""
enabled: bool = True
name: str | None = None
namespace: str | None = None
custom_lease: LeaseConfig | None = None
component_type: str | None = None # Indicates if this is a meta/system component
class DynamoTransport(Enum):
"""Transport types supported by Dynamo services"""
DEFAULT = auto()
HTTP = auto()
class ResourceConfig(BaseModel):
"""Configuration for Dynamo resources"""
# auto convert gpu and cpu values to string from int
model_config = ConfigDict(coerce_numbers_to_str=True)
cpu: str = Field(default="1")
memory: str = Field(default="500Mi")
gpu: str = Field(default="0")
class KubernetesOverrides(BaseModel):
"""Class for kubernetes overrides from the sdk to limit to supported fields."""
model_config = ConfigDict(extra="forbid")
entrypoint: List[str] | None = None
cmd: List[str] | None = None
@field_validator("entrypoint", "cmd", mode="before")
@classmethod
def _coerce_str_to_list(cls, v):
if v is None or isinstance(v, list):
return v
if isinstance(v, str):
return [v]
raise TypeError("Must be str or list[str]")
class ServiceConfig(BaseModel):
"""Base service configuration that can be extended by adapters"""
dynamo: DynamoConfig
resources: ResourceConfig = ResourceConfig()
workers: int = 1
image: str | None = None
envs: List[Env] | None = None
labels: Dict[str, str] | None = None
kubernetes_overrides: KubernetesOverrides | None = None
class DynamoEndpointInterface(ABC):
"""Generic interface for service endpoints"""
@property
@abstractmethod
def name(self) -> str:
"""Get the name of this endpoint"""
pass
@abstractmethod
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Call the endpoint implementation"""
pass
@property
@abstractmethod
def transports(self) -> List[DynamoTransport]:
"""Get the transport type of this endpoint"""
return [DynamoTransport.DEFAULT]
class ServiceInterface(Generic[T], ABC):
"""Generic interface for service implementations"""
@property
@abstractmethod
def name(self) -> str:
"""Get the service name"""
pass
@property
@abstractmethod
def config(self) -> ServiceConfig:
"""Get the service configuration"""
pass
@property
def dependencies(self) -> Dict[str, "DependencyInterface"]:
"""Get the service dependencies"""
return {}
@property
@abstractmethod
def envs(self) -> List[Env]:
"""Get the service's environment variables"""
return []
@property
@abstractmethod
def inner(self) -> Type[T]:
"""Get the inner service implementation class"""
pass
@abstractmethod
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
"""Get all registered endpoints"""
pass
@abstractmethod
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
"""Get a specific endpoint by name"""
pass
@abstractmethod
def list_endpoints(self) -> List[str]:
"""List names of all registered endpoints"""
pass
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
"""Link this service to another service, creating a pipeline.
This method allows linking (injecting) a concrete service implementation by checking if there is a dependency that next_service implements/inherits from.
Args:
next_service: The concrete service implementation to link
Returns:
The next_service that was linked to this service
Raises:
ValueError: If no matching interface is found or if multiple matches are found
"""
if not isinstance(next_service, ServiceInterface):
raise ValueError(f"link must be passed a Service, got {type(next_service)}")
# Get all the deps of the service
inner_deps = [
(dep.on.inner, dep_key, dep)
for dep_key, dep in self.dependencies.items()
if dep.on is not None
] # type: ignore
# Get the inner class of the passed in service
curr_inner = next_service.inner
# Find deps that next_service implements/inherits from
matching_deps = []
for dep_inner, dep_key, original_dep in inner_deps:
if issubclass(curr_inner, dep_inner):
matching_deps.append((dep_inner, dep_key, original_dep))
if not matching_deps:
raise ValueError(
f"{curr_inner.__name__} does not fulfill any dependencies required by {self.name}"
)
if len(matching_deps) > 1:
dep_names = [dep_key for _, _, dep_key in matching_deps]
raise ValueError(
f"{curr_inner.__name__} fulfills multiple dependencies required by {self.name}: {dep_names}"
)
# Get the matching interface, dep_key, and original dependency
_, _, matching_dep = matching_deps[0]
# Let's hot swap the on of the existing dependency with the new service
matching_dep.on = next_service
# Record the link
LinkedServices.add((self, next_service))
return next_service
@abstractmethod
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
"""Remove unused dependencies"""
pass
@abstractmethod
def inject_config(self) -> None:
"""Inject configuration from environment into service configs"""
pass
@abstractmethod
def get_service_configs(self) -> Dict[str, ServiceConfig]:
"""Get all services"""
return {}
@property
def service_configs(self) -> List[ServiceConfig]:
"""Get all service configs"""
return []
def all_services(self) -> Dict[str, "ServiceInterface"]:
"""Get all services"""
return {self.name: self}
def get_dynamo_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
"""Get all Dynamo endpoints"""
endpoints = {}
for field in dir(self.inner):
value = getattr(self.inner, field)
if isinstance(value, DynamoEndpointInterface):
endpoints[value.name] = value
return endpoints
def __call__(self) -> T:
return self.inner()
def find_dependent_by_name(self, service_name: str) -> "ServiceInterface":
"""Find a dependent service by name"""
raise NotImplementedError()
def dynamo_address(self) -> tuple[str, str]:
raise NotImplementedError()
def is_servable(self) -> bool:
"""Check if this service is ready to be served.
A service is servable if:
1. It is not a subclass of AbstractService (concrete service)
2. If it is a subclass of AbstractService, all abstract methods are implemented
with @dynamo_endpoint decorators
"""
# If not a AbstractService, it's servable by default
if not issubclass(self.inner, AbstractService):
return True
# For AbstractService, check implementations
abstract_endpoints = _get_abstract_dynamo_endpoints(self.inner)
if (
not abstract_endpoints
): # No abstract endpoints to implement, so it's servable
return True
return all(
_check_dynamo_endpoint_implemented(self.inner, name)
for name in abstract_endpoints
) # type: ignore[return-value]
def _get_abstract_dynamo_endpoints(cls: type) -> Set[str]:
"""Get all abstract endpoint names from the class's MRO."""
return {
name
for base in cls.mro()
for name, val in base.__dict__.items()
if getattr(val, "__is_abstract_dynamo__", False)
}
def _check_dynamo_endpoint_implemented(cls: type, name: str) -> bool:
"""Check if an endpoint is properly implemented."""
impl = getattr(cls, name, None)
# Ensure the implementation is a callable DynamoEndpointInterface
return (
impl is not None
and callable(impl)
and isinstance(impl, DynamoEndpointInterface)
)
def validate_dynamo_interfaces(cls: type) -> None:
"""
Validate that *cls* fully implements every @abstract_endpoint
declared in its ancestors and that each implementation is
decorated with @dynamo_endpoint.
"""
required = _get_abstract_dynamo_endpoints(cls)
missing: List[str] = []
undecorated: List[str] = []
not_callable: List[Tuple[str, str]] = []
for name in required:
impl = getattr(cls, name, None)
if impl is None:
missing.append(name)
continue
if not callable(impl):
not_callable.append((name, type(impl).__name__))
continue
if not isinstance(impl, DynamoEndpointInterface):
undecorated.append(name)
problems = []
if missing:
problems.append(f"missing implementation(s): {', '.join(missing)}")
if undecorated:
problems.append(
f"method(s) not decorated with @endpoint: {', '.join(undecorated)}"
)
if not_callable:
problems.append(
", ".join(f"{n} must be callable, got {kind}" for n, kind in not_callable)
)
if problems:
raise TypeError(
f"{cls.__name__} violates Dynamo interface — " + "; ".join(problems)
)
class DeploymentTarget(ABC):
"""Interface for service provider implementations"""
@abstractmethod
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
"""Create a service instance"""
pass
@abstractmethod
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> "DependencyInterface[T]":
"""Create a dependency on a service"""
pass
class DependencyInterface(Generic[T], ABC):
"""Generic interface for service dependencies"""
@property
@abstractmethod
def on(self) -> Optional[ServiceInterface[T]]:
"""Get the service this dependency is on"""
pass
@on.setter
@abstractmethod
def on(self, value: Optional[ServiceInterface[T]]) -> None:
"""Set the service this dependency is on"""
pass
@abstractmethod
def get(self, *args: Any, **kwargs: Any) -> Any:
"""Get the dependency client"""
pass
@abstractmethod
async def get_endpoint(self, name: str) -> Any:
"""Get a specific endpoint from the service"""
pass
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T:
raise NotImplementedError()
class RuntimeLinkedServices:
"""
A class to track the linked services in the runtime.
"""
def __init__(self) -> None:
self.edges: Dict[ServiceInterface, Set[ServiceInterface]] = defaultdict(set)
def add(self, edge: Tuple[ServiceInterface, ServiceInterface]):
src, dest = edge
self.edges[src].add(dest)
# track the dest node as well so we can cleanup later
self.edges[dest]
def remove_unused_edges(self):
# this method is idempotent
if not self.edges:
return
# remove edges that are not in the current service
for u, vertices in self.edges.items():
u.remove_unused_edges(used_edges=vertices)
LinkedServices = RuntimeLinkedServices()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from enum import Enum
class TargetEnum(str, Enum):
"""The target deployment environment for the service"""
DYNAMO = "dynamo"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import json
import logging
import os
from typing import Any, ClassVar, Dict, Optional
logger = logging.getLogger(__name__)
class ServiceMixin:
"""Mixin for Dynamo services to inject configuration from environment."""
# Class variable to store service configurations
_global_service_configs: ClassVar[Dict[str, Dict[str, Any]]] = {}
def all_services(self) -> Dict[str, Any]:
"""Return all services in the dependency chain."""
raise NotImplementedError("")
def inject_config(self) -> None:
"""Inject configuration from environment into service configs.
This reads from DYNAMO_SERVICE_CONFIG environment variable and merges
the configuration with any existing service config.
"""
# Get service configs from environment
service_config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if not service_config_str:
logger.debug("No DYNAMO_SERVICE_CONFIG found in environment")
return
try:
service_configs = json.loads(service_config_str)
logger.debug(f"Loaded service configs: {service_configs}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse DYNAMO_SERVICE_CONFIG: {e}")
return
cls = self.__class__
# Store the entire config at class level
if not hasattr(cls, "_global_service_configs"):
setattr(cls, "_global_service_configs", {})
cls._global_service_configs = service_configs
# Process ServiceArgs for all services
all_services = self.all_services()
logger.debug(f"Processing configs for services: {list(all_services.keys())}")
for name, svc in all_services.items():
if name in service_configs:
svc_config = service_configs[name]
# Extract ServiceArgs if present
if "ServiceArgs" in svc_config:
logger.debug(
f"Found ServiceArgs for {name}: {svc_config['ServiceArgs']}"
)
if not hasattr(svc, "_service_args"):
object.__setattr__(svc, "_service_args", {})
svc._service_args = svc_config["ServiceArgs"]
else:
logger.debug(f"No ServiceArgs found for {name}")
# Set default config
if not hasattr(svc, "_service_args"):
object.__setattr__(svc, "_service_args", {"workers": 1})
def get_service_configs(self) -> Dict[str, Dict[str, Any]]:
"""Get the service configurations for resource allocation.
Returns:
Dict mapping service names to their configs
"""
# Get all services in the dependency chain
all_services = self.all_services()
result = {}
# If we have global configs, use them to build service configs
cls = self.__class__
if hasattr(cls, "_global_service_configs"):
for name, svc in all_services.items():
# Start with default config
config = {"workers": 1}
# If service has specific args, use them
if hasattr(svc, "_service_args"):
config.update(svc._service_args)
# If there are global configs for this service, get ServiceArgs
if name in cls._global_service_configs:
svc_config = cls._global_service_configs[name]
if "ServiceArgs" in svc_config:
config.update(svc_config["ServiceArgs"])
result[name] = config
logger.debug(f"Built config for {name}: {config}")
return result
def _remove_service_args(self, service_name: str):
"""Remove ServiceArgs from the environment config after using them, preserving envs"""
logger.debug(f"Removing service args for {service_name}")
config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if config_str:
config = json.loads(config_str)
if service_name in config and "ServiceArgs" in config[service_name]:
# Save envs to separate env var before removing ServiceArgs
service_args = config[service_name]["ServiceArgs"]
if "envs" in service_args:
service_envs = os.environ.get("DYNAMO_SERVICE_ENVS", "{}")
envs_config = json.loads(service_envs)
if service_name not in envs_config:
envs_config[service_name] = {}
envs_config[service_name]["ServiceArgs"] = {
"envs": service_args["envs"]
}
os.environ["DYNAMO_SERVICE_ENVS"] = json.dumps(envs_config)
def _get_service_args(self, service_name: str) -> Optional[dict]:
"""Get ServiceArgs from environment config if specified"""
config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if config_str:
config = json.loads(config_str)
service_config = config.get(service_name, {})
return service_config.get("ServiceArgs")
return None
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import contextlib
import logging
import os
import shlex
import sys
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
import psutil
from circus.arbiter import Arbiter
from circus.sockets import CircusSocket
from circus.watcher import Watcher
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.deployment import Env
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
DynamoEndpointInterface,
DynamoTransport,
ServiceConfig,
ServiceInterface,
)
from dynamo.sdk.core.runner.common import ServiceMixin
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=object)
MAX_AF_UNIX_PATH_LENGTH = 103
class LocalEndpoint(DynamoEndpoint):
"""Circus-specific endpoint implementation"""
def __init__(
self, name: str, service: "LocalService", transports: List[DynamoTransport]
):
self._name = name
self._service = service
self._transports = transports
@property
def name(self) -> str:
return self._name
class LocalService(ServiceMixin, ServiceInterface[T]):
"""Circus implementation of the ServiceInterface"""
def __init__(
self,
inner_cls: Type[T],
config: ServiceConfig,
watcher: Optional[Watcher] = None,
socket: Optional[CircusSocket] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
):
self._inner_cls = inner_cls
name = inner_cls.__name__
# Add the dynamo config to the service config
self._config = config
self._watcher = watcher
self._socket = socket
self.app = app or FastAPI(title=name)
self.system_app = system_app or FastAPI(title=f"{name}-system")
self._dependencies: Dict[str, "DependencyInterface"] = {}
self._endpoints: Dict[str, LocalEndpoint] = {}
for field_name in dir(inner_cls):
field = getattr(inner_cls, field_name)
if isinstance(field, DynamoEndpoint):
self._endpoints[field.name] = LocalEndpoint(
field.name, self, field.transports
)
if isinstance(field, DependencyInterface):
self._dependencies[field_name] = field
def find_dependent_by_name(self, name: str) -> "ServiceInterface":
return self.all_services()[name]
def all_services(self) -> dict[str, "ServiceInterface"]:
"""Get a map of the service and all recursive dependencies"""
services: dict[str, "ServiceInterface"] = {self.name: self}
for dependency in self.dependencies.values():
services.update(dependency.on.all_services())
return services
@property
def name(self) -> str:
return self._inner_cls.__name__
@property
def config(self) -> ServiceConfig:
return self._config
@property
def envs(self) -> List[Env]:
return self._config.envs or []
@property
def inner(self) -> Type[T]:
return self._inner_cls
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
return self._endpoints
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
if name not in self._endpoints:
raise ValueError(f"No endpoint found with name: {name}")
return self._endpoints[name]
def list_endpoints(self) -> List[str]:
return list(self._endpoints.keys())
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
# Call the base implementation which handles AbstractService dependencies
return super().link(next_service)
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
current_deps = dict(self._dependencies)
for dep_key, dep_value in current_deps.items():
if dep_value.on not in used_edges:
del self._dependencies[dep_key]
def dynamo_address(self) -> tuple[str, str]:
return (self._config.dynamo.namespace, self._config.dynamo.name)
@property
def dependencies(self) -> dict[str, "DependencyInterface"]:
return self._dependencies
@property
def endpoints(self) -> dict[str, "LocalEndpoint"]:
return self._endpoints
class LocalDependency(DependencyInterface[T]):
"""Circus implementation of the DependencyInterface"""
def __init__(
self,
on_service: Optional[LocalService[T]] = None,
):
self._on_service = on_service
self._dynamo_client = None
self._runtime = None
@property
def on(self) -> Optional[ServiceInterface[T]]:
return self._on_service
@on.setter
def on(self, value: Optional[ServiceInterface[T]]) -> None:
self._on_service = value
def get(self, *args: Any, **kwargs: Any) -> Any:
# Return a client that can communicate with the service
# through the circus socket
if not self._on_service:
raise ValueError("No service specified for this dependency")
return self._on_service
async def get_endpoint(self, name: str) -> Any:
# Get a specific endpoint from the service
if not self._on_service:
raise ValueError("No service specified for this dependency")
return await self._on_service.get_endpoint(name)
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T | Any:
if instance is None:
return self
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
self._dynamo_client._runtime = self._runtime
return self._dynamo_client
def set_runtime(self, runtime: Any) -> None:
"""Set the Dynamo runtime for this dependency"""
self._runtime = runtime
if self._dynamo_client:
self._dynamo_client._runtime = runtime
class LocalDeploymentTarget(DeploymentTarget):
"""Circus implementation of the DeploymentTarget"""
def __init__(self):
self._arbiter = None
self._watchers = []
self._sockets = {}
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
# Get parameters needed for creating a circus watcher
cmd = kwargs.get("cmd", sys.executable)
args = kwargs.get("args", [])
env_vars = kwargs.get("env_vars", {})
# Create a socket for this service
socket_path = os.path.join(
os.environ.get("DYN_CIRCUS_SOCKET_DIR", "/tmp/circus"),
f"{service_cls.__name__}.sock",
)
# Ensure the socket path isn't too long
if len(socket_path) >= MAX_AF_UNIX_PATH_LENGTH:
raise ValueError(
f"Socket path '{socket_path}' exceeds maximum length of {MAX_AF_UNIX_PATH_LENGTH}"
)
# Create the socket
socket = CircusSocket(name=service_cls.__name__, path=socket_path)
self._sockets[service_cls.__name__] = socket
# Create a watcher for the service
watcher = Watcher(
name=service_cls.__name__,
cmd=shlex.quote(cmd) if psutil.POSIX else cmd,
args=args,
copy_env=True,
env=env_vars,
stop_children=True,
use_sockets=True,
graceful_timeout=86400,
respawn=True,
)
self._watchers.append(watcher)
# Create and return the service interface
return LocalService(
inner_cls=service_cls,
config=config,
watcher=watcher,
socket=socket,
)
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> DependencyInterface[T]:
# Ensure the dependency is on a LocalService
if on is not None and not isinstance(on, LocalService):
raise TypeError("LocalDependency can only depend on LocalService")
# Create and return the dependency interface
return LocalDependency(on)
def start_arbiter(self, threaded: bool = False, **kwargs: Any) -> Arbiter:
"""Start the circus arbiter with all configured watchers and sockets"""
if self._arbiter is not None:
logger.warning("Arbiter already started")
return self._arbiter
# Configure arbiter
endpoint_port = int(os.environ.get("DYN_CIRCUS_ENDPOINT_PORT", "41234"))
pubsub_port = int(os.environ.get("DYN_CIRCUS_PUBSUB_PORT", "52345"))
# Create arbiter with all sockets and watchers
arbiter = Arbiter(
watchers=self._watchers,
sockets=[socket for socket in self._sockets.values()],
endpoint=f"tcp://127.0.0.1:{endpoint_port}",
pubsub_endpoint=f"tcp://127.0.0.1:{pubsub_port}",
check_delay=kwargs.pop("check_delay", 10),
**kwargs,
)
# Start arbiter
arbiter.start()
self._arbiter = arbiter
return arbiter
def stop_arbiter(self) -> None:
"""Stop the circus arbiter and all managed processes"""
if self._arbiter is None:
logger.warning("No arbiter to stop")
return
self._arbiter.stop()
self._arbiter = None
@contextlib.contextmanager
def run_services(self, **kwargs: Any):
"""Context manager to run all services and clean up when done"""
try:
arbiter = self.start_arbiter(**kwargs)
yield arbiter
finally:
self.stop_arbiter()
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from typing import Any, Awaitable, Union
from fastapi import FastAPI, Response
# TODO: These defaults should be set by the provider. For now, I'm just adding them so that something is exposed when we do --use-default-health-checks
def default_liveness_check() -> bool:
"""Default liveness check that always returns True."""
return True
def default_readiness_check() -> bool:
"""Default readiness check that always returns True."""
return True
def register_liveness_probe(
app: FastAPI, instance: Any, route: str = "/healthz", use_default: bool = False
) -> None:
"""Registers /healthz endpoint.
If a method decorated with @liveness is found, uses that.
Otherwise, if use_default is True, uses a default check that always returns 200.
Does nothing if neither condition is met.
Args:
app (FastAPI): The FastAPI application to register the liveness route on.
instance (Any): The service or component instance to inspect for a @liveness-decorated method.
route (str, optional): The URL path to register the liveness endpoint under.
Defaults to "/healthz".
use_default (bool, optional): Whether to use default health check if no decorated method is found.
Defaults to False.
"""
# Find the decorated method.
decorated_method = None
for attr in dir(instance):
method = getattr(instance, attr)
if callable(method) and getattr(method, "__is_liveness_probe__", False):
decorated_method = method
break
if not decorated_method and not use_default:
# Do nothing if no @liveness() decorator found and default not requested
return
@app.get(route)
async def liveness_check():
try:
# Use decorated method if available, otherwise use default
check_method = (
decorated_method if decorated_method else default_liveness_check
)
# self needs to be bound so we need to use the instance of the inner
result: Union[bool, Awaitable[bool]] = check_method()
if isinstance(result, Awaitable):
result = await result
return Response(status_code=200 if result else 503)
except Exception as e:
return Response(content=str(e), status_code=500)
def register_readiness_probe(
app: FastAPI, instance: Any, route: str = "/readyz", use_default: bool = False
) -> None:
"""Registers /readyz endpoint.
If a method decorated with @readiness is found, uses that.
Otherwise, if use_default is True, uses a default check that always returns 200.
Does nothing if neither condition is met.
Args:
app (FastAPI): The FastAPI application to register the readiness route on.
instance (Any): The service or component instance to inspect for a @readiness-decorated method.
route (str, optional): The URL path to register the readiness endpoint under.
Defaults to "/readyz".
use_default (bool, optional): Whether to use default health check if no decorated method is found.
Defaults to False.
"""
# Find the decorated method.
decorated_method = None
for attr in dir(instance):
method = getattr(instance, attr)
if callable(method) and getattr(method, "__is_readiness_probe__", False):
decorated_method = method
break
if not decorated_method and not use_default:
# Do nothing if no @readiness() decorator found and default not requested
return
@app.get(route)
async def readiness_check():
try:
# Use decorated method if available, otherwise use default
check_method = (
decorated_method if decorated_method else default_readiness_check
)
# self needs to be bound so we need to use the instance of the inner
result: Union[bool, Awaitable[bool]] = check_method()
if isinstance(result, Awaitable):
result = await result
return Response(status_code=200 if result else 503)
except Exception as e:
return Response(content=str(e), status_code=500)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def main() -> None:
print("Hello from dynamo sdk!")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import typing as t
from functools import wraps
from typing import Any, get_type_hints
from pydantic import BaseModel
from dynamo.sdk.core.protocol.interface import DynamoTransport
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
class DynamoEndpoint:
"""Decorator class for Dynamo endpoints"""
def __init__(
self,
func: t.Callable,
name: str | None = None,
transports: t.List[DynamoTransport] | None = None,
):
self.func = func
self.name = name or func.__name__
self._transports = transports or [DynamoTransport.DEFAULT]
# Extract request type from hints
hints = get_type_hints(func)
args = list(hints.items())
# Skip self/cls argument
if args[0][0] in ("self", "cls"):
args = args[1:]
# Get request type from first arg
self.request_type = args[0][1]
wraps(func)(self)
async def __call__(self, *args: t.Any, **kwargs: t.Any) -> Any:
# Validate request
if len(args) > 1 and issubclass(self.request_type, BaseModel):
args = list(args) # type: ignore
if isinstance(args[1], (str, dict)):
args[1] = self.request_type.parse_obj(args[1]) # type: ignore
# Convert Pydantic model to dict before passing to dynamo
if len(args) > 1 and isinstance(args[1], BaseModel):
args = list(args) # type: ignore
args[1] = args[1].model_dump() # type: ignore
return await self.func(*args, **kwargs)
def endpoint(
name: str | None = None,
is_api: bool = False,
) -> t.Callable[[t.Callable], DynamoEndpoint]:
"""Decorator for Dynamo endpoints.
Args:
name: Optional name for the endpoint. Defaults to function name.
is_api: Whether to expose the endpoint as an API. Defaults to False.
Example:
@endpoint()
def my_endpoint(self, input: str) -> str:
return input
@endpoint(name="custom_name")
def another_endpoint(self, input: str) -> str:
return input
"""
def decorator(func: t.Callable) -> DynamoEndpoint:
transports = [DynamoTransport.HTTP] if is_api else [DynamoTransport.DEFAULT]
return DynamoEndpoint(func, name, transports)
return decorator
def async_on_start(func: F) -> F:
"""Decorator for async onstart functions."""
# Mark the function as a startup hook
setattr(func, "__dynamo_startup_hook__", True)
return func
def on_shutdown(func: F) -> F:
"""Decorator for shutdown hook."""
# Mark the function as a shutdown hook
setattr(func, "__dynamo_shutdown_hook__", True)
return func
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from http import HTTPStatus
class DynamoException(Exception):
"""Base class for all Dynamo SDK Exception."""
error_code = HTTPStatus.INTERNAL_SERVER_ERROR
error_mapping: dict[HTTPStatus, type[DynamoException]] = {}
def __init_subclass__(cls) -> None:
if "error_code" in cls.__dict__:
cls.error_mapping[cls.error_code] = cls
def __init__(self, message: str, error_code: HTTPStatus | None = None):
super().__init__(message)
self.message = message
self.error_code = error_code or self.error_code
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
DYNAMO_IMAGE = os.getenv("DYNAMO_IMAGE", "dynamo:latest-vllm")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment