Unverified Commit afb8495e authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: decouple bento dependency (#1266)

parent e31f8d95
...@@ -347,9 +347,6 @@ RUN mkdir -p /opt/dynamo/bindings/wheels && \ ...@@ -347,9 +347,6 @@ RUN mkdir -p /opt/dynamo/bindings/wheels && \
RUN uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \ RUN uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \
uv pip install /workspace/dist/ai_dynamo*any.whl uv pip install /workspace/dist/ai_dynamo*any.whl
# TODO: remove kantoku as a transitive dependency
RUN uv pip uninstall kantoku circus && uv pip install circus==0.19.0
# Copy launch banner # Copy launch banner
RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \ RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \ sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
...@@ -399,9 +396,6 @@ RUN uv pip install ai-dynamo[vllm] --find-links wheelhouse && \ ...@@ -399,9 +396,6 @@ RUN uv pip install ai-dynamo[vllm] --find-links wheelhouse && \
ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/ && \ ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/ && \
rm -r wheelhouse rm -r wheelhouse
# TODO: remove kantoku as a transitive dependency
RUN uv pip uninstall kantoku circus && uv pip install circus==0.19.0
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing # Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so" ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so"
......
...@@ -323,10 +323,6 @@ RUN . /opt/dynamo/venv/bin/activate && \ ...@@ -323,10 +323,6 @@ RUN . /opt/dynamo/venv/bin/activate && \
uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \ uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \
uv pip install /workspace/dist/ai_dynamo*any.whl uv pip install /workspace/dist/ai_dynamo*any.whl
# TODO: remove kantoku as a transitive dependency
RUN . /opt/dynamo/venv/bin/activate && \
uv pip uninstall kantoku circus && uv pip install circus==0.19.0
# Install dynamo.runtime and dynamo.llm wheels globally in container for tests # Install dynamo.runtime and dynamo.llm wheels globally in container for tests
# TODO: In future, we may use a virtualenv for everything and remove this. # TODO: In future, we may use a virtualenv for everything and remove this.
RUN pip install dist/ai_dynamo_runtime*cp312*.whl && \ RUN pip install dist/ai_dynamo_runtime*cp312*.whl && \
......
...@@ -438,9 +438,6 @@ RUN mkdir -p /opt/dynamo/bindings/wheels && \ ...@@ -438,9 +438,6 @@ RUN mkdir -p /opt/dynamo/bindings/wheels && \
RUN uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \ RUN uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \
uv pip install /workspace/dist/ai_dynamo*any.whl uv pip install /workspace/dist/ai_dynamo*any.whl
# TODO: remove kantoku as a transitive dependency
RUN uv pip uninstall kantoku circus && uv pip install circus==0.19.0
# Copy launch banner # Copy launch banner
RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \ RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \ sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
...@@ -517,9 +514,6 @@ RUN uv pip install ai-dynamo[vllm] --find-links wheelhouse && \ ...@@ -517,9 +514,6 @@ RUN uv pip install ai-dynamo[vllm] --find-links wheelhouse && \
ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/ && \ ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/ && \
rm -r wheelhouse rm -r wheelhouse
# TODO: remove kantoku as a transitive dependency
RUN uv pip uninstall kantoku circus && uv pip install circus==0.19.0
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing # Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so" ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so"
......
...@@ -87,4 +87,4 @@ cd - ...@@ -87,4 +87,4 @@ cd -
# Install the Helm chart with the correct tag (SHA) # Install the Helm chart with the correct tag (SHA)
echo "Installing Helm chart with image: $docker_tag_for_registry" echo "Installing Helm chart with image: $docker_tag_for_registry"
HELM_RELEASE="${DYNAMO_MODULE//_/\-}" HELM_RELEASE="${DYNAMO_MODULE//_/\-}"
helm upgrade -i "$HELM_RELEASE" ./chart -f ~/bentoml/bentos/"$DYNAMO_NAME"/"$docker_sha"/bento.yaml --set image="$docker_tag_for_registry" --set dynamoIdentifier="$DYNAMO_IDENTIFIER" --set configFilePath="$DYNAMO_CONFIG_FILE" -n "$NAMESPACE" helm upgrade -i "$HELM_RELEASE" ./chart -f ~/.dynamo/packages/"$DYNAMO_MODULE"/"$DYNAMO_NAME"/dynamo.yaml --set image="$docker_tag_for_registry" --set dynamoIdentifier="$DYNAMO_IDENTIFIER" --set configFilePath="$DYNAMO_CONFIG_FILE" -n "$NAMESPACE"
\ No newline at end of file \ No newline at end of file
...@@ -20,19 +20,16 @@ from typing import Any ...@@ -20,19 +20,16 @@ from typing import Any
# TODO: Remove this line after the bentoml import is removed from this file # TODO: Remove this line after the bentoml import is removed from this file
warnings.filterwarnings("ignore", category=UserWarning, message=".*pkg_resources.*") warnings.filterwarnings("ignore", category=UserWarning, message=".*pkg_resources.*")
# flake8: noqa: E402
from bentoml import on_shutdown as async_on_shutdown
# flake8: noqa: E402 # flake8: noqa: E402
from dynamo.sdk.core.decorators.endpoint import api, endpoint from dynamo.sdk.core.decorators.endpoint import api, endpoint
from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, liveness, readiness, service from dynamo.sdk.core.lib import DYNAMO_IMAGE, depends, liveness, readiness, service
from dynamo.sdk.lib.decorators import async_on_start from dynamo.sdk.lib.decorators import async_on_start, on_shutdown
dynamo_context: dict[str, Any] = {} dynamo_context: dict[str, Any] = {}
__all__ = [ __all__ = [
"DYNAMO_IMAGE", "DYNAMO_IMAGE",
"async_on_shutdown", "on_shutdown",
"async_on_start", "async_on_start",
"depends", "depends",
"dynamo_context", "dynamo_context",
......
...@@ -21,8 +21,7 @@ import logging ...@@ -21,8 +21,7 @@ import logging
import os import os
from typing import Any from typing import Any
from _bentoml_sdk import Service from dynamo.sdk.core.protocol.interface import ServiceInterface
from simple_di import inject
# Import our own resource module # Import our own resource module
from dynamo.sdk.lib.resource import ( from dynamo.sdk.lib.resource import (
...@@ -149,10 +148,9 @@ class ResourceAllocator: ...@@ -149,10 +148,9 @@ class ResourceAllocator:
"""Get detailed statistics for all GPUs.""" """Get detailed statistics for all GPUs."""
return self.gpu_manager.get_gpu_stats() return self.gpu_manager.get_gpu_stats()
@inject
def get_resource_envs( def get_resource_envs(
self, self,
service: Service[Any], service: ServiceInterface[Any],
) -> tuple[int, list[dict[str, str]]]: ) -> tuple[int, list[dict[str, str]]]:
""" """
Get resource environment variables for a service. Get resource environment variables for a service.
......
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
"""
User facing python APIs for managing local bentos and build new bentos.
"""
from __future__ import annotations
import logging
import os
import typing as t
import fs
import fs.errors
import fs.mirror
from bentoml._internal.bento.bento import BENTO_PROJECT_DIR_NAME, BENTO_README_FILENAME
from bentoml._internal.bento.bento import Bento as BaseBento
from bentoml._internal.bento.bento import (
BentoApiInfo,
BentoInfo,
BentoInfoV2,
BentoModelInfo,
BentoRunnerInfo,
BentoServiceInfo,
get_default_svc_readme,
)
from bentoml._internal.bento.build_config import BentoBuildConfig, BentoPathSpec
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.service import Service
from bentoml._internal.service.loader import load
from bentoml._internal.tag import Tag, to_snake_case
from bentoml._internal.utils.filesystem import copy_file_to_fs_folder
from bentoml._internal.utils.uri import encode_path_for_uri
from bentoml.exceptions import BentoMLException, InvalidArgument
from fs.copy import copy_file
from fs.tempfs import TempFS
from simple_di import Provide, inject
from dynamo.sdk.core.protocol.interface import LinkedServices
logger = logging.getLogger(__name__)
class Bento(BaseBento):
"""Dynamo's Bento class that extends BentoML's Bento with additional functionality."""
@classmethod
@inject
def create(
cls,
build_config: BentoBuildConfig,
version: t.Optional[str] = None,
build_ctx: t.Optional[str] = None,
platform: t.Optional[str] = None,
bare: bool = False,
reload: bool = False,
enabled_features: list[str] = Provide[BentoMLContainer.enabled_features],
) -> Bento:
from _bentoml_sdk.images import Image, populate_image_from_build_config
from _bentoml_sdk.models import BentoModel
build_ctx = (
os.getcwd()
if build_ctx is None
else os.path.realpath(os.path.expanduser(build_ctx))
)
if not os.path.isdir(build_ctx):
raise InvalidArgument(
f"Bento build context {build_ctx} does not exist or is not a directory."
)
BentoMLContainer.model_aliases.set(build_config.model_aliases)
# This also verifies that svc can be imported correctly
svc = load(build_config.service, working_dir=build_ctx, reload=reload)
# TODO: At some point we need this to take place within the load function
LinkedServices.remove_unused_edges()
inner = svc.get_bentoml_service().inner
name = f"{inner.__module__}:{inner.__name__}"
setattr(svc.get_bentoml_service(), "_import_str", name)
if not build_config.service:
object.__setattr__(build_config, "service", name)
is_legacy = isinstance(svc.get_bentoml_service(), Service)
# Apply default build options
image: Image | None = None
disable_image = "no_image" in enabled_features or is_legacy
if isinstance(svc, Service):
# for < 1.2
bento_name = (
build_config.name if build_config.name is not None else svc.name
)
else:
# for >= 1.2
svc.inject_config()
bento_name = (
build_config.name
if build_config.name is not None
else to_snake_case(svc.name)
)
# build_config.envs.extend(svc.envs)
# build_config.labels.update(svc.labels)
if svc.image is not None:
image = Image(base_image=svc.image)
if not disable_image:
image = populate_image_from_build_config(image, build_config, build_ctx)
build_config = build_config.with_defaults()
tag = Tag(bento_name, version)
if version is None:
tag = tag.make_new_version()
logger.debug(
'Building BentoML service "%s" from build context "%s".', tag, build_ctx
)
bento_fs = TempFS(
identifier=f"bentoml_bento_{bento_name}",
temp_dir=BentoMLContainer.tmp_bento_store_dir.get(),
)
models: list[BentoModelInfo] = []
def append_model(model: BentoModelInfo) -> None:
if model not in models:
models.append(model)
if build_config.models:
for model_spec in build_config.models:
model = BentoModel(model_spec.tag)
append_model(model.to_info(model_spec.alias))
elif is_legacy:
# XXX: legacy way to get models from service
# Add all models required by the service
for model in svc.models:
append_model(BentoModel(model.tag).to_info())
# Add all models required by service runners
for runner in svc.runners:
for model in runner.models:
append_model(BentoModel(model.tag).to_info())
if not bare:
ctx_fs = fs.open_fs(encode_path_for_uri(build_ctx))
# create ignore specs
specs = BentoPathSpec(build_config.include, build_config.exclude, build_ctx)
# Copy all files base on include and exclude, into `src` directory
relpaths = [s for s in build_config.include if s.startswith("../")]
if len(relpaths) != 0:
raise InvalidArgument(
"Paths outside of the build context directory cannot be included; use a symlink or copy those files into the working directory manually."
)
bento_fs.makedir(BENTO_PROJECT_DIR_NAME)
target_fs = bento_fs.opendir(BENTO_PROJECT_DIR_NAME)
with target_fs.open("bentofile.yaml", "w") as bentofile_yaml:
build_config.to_yaml(bentofile_yaml)
for dir_path, _, files in ctx_fs.walk():
for f in files:
path = fs.path.combine(dir_path, f.name).lstrip("/")
if specs.includes(path):
if ctx_fs.getsize(path) > 10 * 1024 * 1024:
logger.warning("File size is larger than 10MiB: %s", path)
target_fs.makedirs(dir_path, recreate=True)
copy_file(ctx_fs, path, target_fs, path)
if image is None:
# NOTE: we need to generate both Python and Conda
# first to make sure we can generate the Dockerfile correctly.
build_config.python.write_to_bento(
bento_fs, build_ctx, platform_=platform
)
build_config.conda.write_to_bento(bento_fs, build_ctx)
build_config.docker.write_to_bento(
bento_fs, build_ctx, build_config.conda
)
# Create `readme.md` file
if (
build_config.description is not None
and build_config.description.startswith("file:")
):
file_name = build_config.description[5:].strip()
if not ctx_fs.exists(file_name):
raise InvalidArgument(f"File {file_name} does not exist.")
copy_file_to_fs_folder(
ctx_fs.getsyspath(file_name),
bento_fs,
dst_filename=BENTO_README_FILENAME,
)
elif build_config.description is None and ctx_fs.exists(
BENTO_README_FILENAME
):
copy_file_to_fs_folder(
ctx_fs.getsyspath(BENTO_README_FILENAME),
bento_fs,
dst_filename=BENTO_README_FILENAME,
)
else:
with bento_fs.open(BENTO_README_FILENAME, "w") as f:
if build_config.description is None:
f.write(get_default_svc_readme(svc, version))
else:
f.write(build_config.description)
if image is None:
bento_info = BentoInfo(
tag=tag,
service=svc, # type: ignore # attrs converters do not typecheck
entry_service=svc.name,
labels=build_config.labels,
models=models,
runners=(
[BentoRunnerInfo.from_runner(r) for r in svc.runners] # type: ignore # attrs converters do not typecheck
if is_legacy
else []
),
apis=(
[BentoApiInfo.from_inference_api(api) for api in svc.apis.values()]
if is_legacy
else []
),
services=(
[
BentoServiceInfo.from_service(s)
for s in svc.all_services().values()
]
if not is_legacy
else []
),
docker=build_config.docker,
python=build_config.python,
conda=build_config.conda,
envs=build_config.envs,
schema=svc.schema() if not is_legacy else {},
)
else:
services = [
BentoServiceInfo.from_service(s.get_bentoml_service())
for s in svc.all_services().values()
]
svc = svc.get_bentoml_service()
bento_info = BentoInfoV2(
tag=tag,
service=svc, # type: ignore # attrs converters do not typecheck
entry_service=svc.name,
labels=build_config.labels,
models=models,
services=(services if not is_legacy else []),
envs=build_config.envs,
image=image.freeze(bento_fs, build_config.envs, platform),
)
res = Bento(tag, bento_fs, bento_info)
if bare:
return res
# Create bento.yaml
res.flush_info()
try:
res.validate()
except BentoMLException as e:
raise BentoMLException(f"Failed to create {res!s}: {e}") from None
return res
This diff is collapsed.
...@@ -592,7 +592,7 @@ def build( ...@@ -592,7 +592,7 @@ def build(
image_name, image_name,
"-f", "-f",
str(docker_file), str(docker_file),
output_path, str(output_path),
], ],
check=True, check=True,
) )
......
...@@ -26,7 +26,6 @@ from dynamo.sdk.cli.build import build ...@@ -26,7 +26,6 @@ from dynamo.sdk.cli.build import build
from dynamo.sdk.cli.deployment import app as deployment_app from dynamo.sdk.cli.deployment import app as deployment_app
from dynamo.sdk.cli.deployment import deploy from dynamo.sdk.cli.deployment import deploy
from dynamo.sdk.cli.env import env from dynamo.sdk.cli.env import env
from dynamo.sdk.cli.pipeline import get
from dynamo.sdk.cli.run import run from dynamo.sdk.cli.run import run
from dynamo.sdk.cli.serve import serve from dynamo.sdk.cli.serve import serve
...@@ -81,7 +80,6 @@ cli.command( ...@@ -81,7 +80,6 @@ cli.command(
cli.add_typer(deployment_app, name="deployment") cli.add_typer(deployment_app, name="deployment")
cli.command()(deploy) cli.command()(deploy)
cli.command()(build) cli.command()(build)
cli.command()(get)
if __name__ == "__main__": if __name__ == "__main__":
cli() cli()
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import json
import logging
import os
import subprocess
import typing as t
import attr
import typer
import yaml
from bentoml._internal.bento.bento import DEFAULT_BENTO_BUILD_FILES
from bentoml._internal.bento.build_config import BentoBuildConfig
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.utils.args import set_arguments
from bentoml._internal.utils.filesystem import resolve_user_filepath
from bentoml.exceptions import InvalidArgument
from rich.console import Console
from rich.syntax import Syntax
from simple_di import Provide, inject
from dynamo.sdk.cli.bento_util import Bento
from dynamo.sdk.core.runner import TargetEnum
if t.TYPE_CHECKING:
from bentoml._internal.bento import BentoStore
from bentoml._internal.container import DefaultBuilder
logger = logging.getLogger(__name__)
console = Console()
DYNAMO_FIGLET = """
██████╗ ██╗ ██╗███╗ ██╗ █████╗ ███╗ ███╗ ██████╗
██╔══██╗╚██╗ ██╔╝████╗ ██║██╔══██╗████╗ ████║██╔═══██╗
██║ ██║ ╚████╔╝ ██╔██╗ ██║███████║██╔████╔██║██║ ██║
██║ ██║ ╚██╔╝ ██║╚██╗██║██╔══██║██║╚██╔╝██║██║ ██║
██████╔╝ ██║ ██║ ╚████║██║ ██║██║ ╚═╝ ██║╚██████╔╝
╚═════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
"""
@inject
def build_bentofile(
bentofile: str | None = None,
*,
service: str | None = None,
name: str | None = None,
version: str | None = None,
labels: dict[str, str] | None = None,
build_ctx: str | None = None,
platform: str | None = None,
bare: bool = False,
reload: bool = False,
args: dict[str, t.Any] | None = None,
_bento_store: BentoStore = Provide[BentoMLContainer.bento_store],
) -> Bento:
"""
Build a Dynamo pipeline based on options specified in a bentofile.yaml file.
"""
if args is not None:
set_arguments(**args)
if bentofile:
try:
bentofile = resolve_user_filepath(bentofile, None)
except FileNotFoundError:
raise InvalidArgument(f'bentofile "{bentofile}" not found')
else:
build_config = BentoBuildConfig.from_file(bentofile)
else:
for filename in DEFAULT_BENTO_BUILD_FILES:
try:
bentofile = resolve_user_filepath(filename, build_ctx)
except FileNotFoundError:
pass
else:
build_config = BentoBuildConfig.from_file(bentofile)
break
else:
build_config = BentoBuildConfig(service=service or "")
new_attrs: dict[str, t.Any] = {}
if name is not None:
new_attrs["name"] = name
if labels:
# Ensure both dictionaries are of type dict[str, str]
existing_labels: dict[str, str] = build_config.labels or {}
new_attrs["labels"] = {**existing_labels, **labels}
if new_attrs:
build_config = attr.evolve(build_config, **new_attrs)
bento = Bento.create(
build_config=build_config,
version=version,
build_ctx=build_ctx,
platform=platform,
bare=bare,
reload=reload,
)
if not bare:
return bento.save(_bento_store)
return bento
def get(
pipeline_tag: str = typer.Argument(
..., help="The tag of the Dynamo pipeline to display"
),
output: str = typer.Option(
"yaml",
"--output",
"-o",
help="Output format (json, yaml, or path)",
show_default=True,
),
) -> None:
"""Display Dynamo pipeline details.
Prints information about a Dynamo pipeline by its tag.
"""
# Validate output format
valid_outputs = ["json", "yaml", "path"]
if output not in valid_outputs:
console.print(f"[red]Error: Output format must be one of {valid_outputs}[/red]")
raise typer.Exit(code=1)
bento_store = BentoMLContainer.bento_store.get()
bento = bento_store.get(pipeline_tag)
if output == "path":
console.print(bento.path)
elif output == "json":
info = json.dumps(bento.info.to_dict(), indent=2, default=str)
console.print_json(info)
else:
info = yaml.dump(bento.info.to_dict(), indent=2, sort_keys=False)
console.print(Syntax(info, "yaml", background_color="default"))
def build(
dynamo_pipeline: str = typer.Argument(
..., help="Path to the Dynamo pipeline to build"
),
output: str = typer.Option(
"default",
"--output",
"-o",
help="Output log format. Use 'tag' to display only pipeline tag.",
show_default=True,
),
containerize: bool = typer.Option(
False,
"--containerize",
help="Containerize the Dynamo pipeline after building. Shortcut for 'dynamo build && dynamo containerize'.",
),
platform: str = typer.Option(None, "--platform", help="Platform to build for"),
target: TargetEnum = typer.Option(
TargetEnum.BENTO,
"--target",
help="Specify the target: 'dynamo' or 'bento'.",
case_sensitive=False,
),
) -> None:
"""Build a new Dynamo pipeline from the specified path.
Creates a packaged Dynamo pipeline ready for deployment. Optionally builds a docker container.
"""
from bentoml._internal.configuration import get_quiet_mode, set_quiet_mode
from bentoml._internal.log import configure_logging
from dynamo.sdk.cli.utils import configure_target_environment
configure_target_environment(target)
# Validate output format
valid_outputs = ["tag", "default"]
if output not in valid_outputs:
console.print(f"[red]Error: Output format must be one of {valid_outputs}[/red]")
raise typer.Exit(code=1)
if output == "tag":
set_quiet_mode()
configure_logging()
service: str | None = None
build_ctx = "."
if ":" in dynamo_pipeline:
service = dynamo_pipeline
else:
build_ctx = dynamo_pipeline
if target != TargetEnum.BENTO:
raise NotImplementedError(
"currently only bento based build target is supported"
)
bento = build_bentofile(
service=service,
build_ctx=build_ctx,
platform=platform,
)
containerize_cmd = f"dynamo containerize {bento.tag}"
if output == "tag":
console.print(f"__tag__:{bento.tag}")
else:
if not get_quiet_mode():
console.print(DYNAMO_FIGLET)
console.print(f"[green]Successfully built {bento.tag}.")
next_steps = []
if not containerize:
next_steps.append(
"\n\n* Containerize your Dynamo pipeline with `dynamo containerize`:\n"
f" $ {containerize_cmd} [or dynamo build --containerize]"
)
if next_steps:
console.print(f"\n[blue]Next steps: {''.join(next_steps)}[/]")
if containerize:
backend: DefaultBuilder = t.cast(
"DefaultBuilder", os.getenv("BENTOML_CONTAINERIZE_BACKEND", "docker")
)
try:
import bentoml
bentoml.container.health(backend)
except subprocess.CalledProcessError:
from bentoml.exceptions import BentoMLException
raise BentoMLException(f"Backend {backend} is not healthy")
bentoml.container.build(bento.tag, backend=backend)
...@@ -18,10 +18,12 @@ ...@@ -18,10 +18,12 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import atexit
import inspect import inspect
import json import json
import logging import logging
import os import os
import signal
import typing as t import typing as t
from typing import Any from typing import Any
...@@ -113,13 +115,7 @@ def main( ...@@ -113,13 +115,7 @@ def main(
help="Specify the target: 'dynamo' or 'bento'.", help="Specify the target: 'dynamo' or 'bento'.",
), ),
) -> None: ) -> None:
# hack to avoid bentoml from respawning the workers after their leases are revoked
os.environ["BENTOML_CONTAINERIZED"] = "true"
"""Start a worker for the given service - either Dynamo or regular service""" """Start a worker for the given service - either Dynamo or regular service"""
from bentoml._internal.container import BentoMLContainer
from bentoml._internal.context import server_context
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment from dynamo.sdk.cli.utils import configure_target_environment
from dynamo.sdk.core.runner import TargetEnum from dynamo.sdk.core.runner import TargetEnum
...@@ -150,17 +146,8 @@ def main( ...@@ -150,17 +146,8 @@ def main(
dynamo_context["namespace"] = namespace dynamo_context["namespace"] = namespace
configure_dynamo_logging(service_name=service_name, worker_id=worker_id) configure_dynamo_logging(service_name=service_name, worker_id=worker_id)
if runner_map:
BentoMLContainer.remote_runner_mapping.set(
t.cast(t.Dict[str, str], json.loads(runner_map))
)
# TODO: test this with a deep chain of services # TODO: test this with a deep chain of services
LinkedServices.remove_unused_edges() LinkedServices.remove_unused_edges()
# Check if Dynamo is enabled for this service
if worker_id is not None:
server_context.worker_index = worker_id
# Instance of the inner class of the service should be the same across the dynamo_worker, web_worker, and system_app_worker # Instance of the inner class of the service should be the same across the dynamo_worker, web_worker, and system_app_worker
class_instance: Any = None class_instance: Any = None
# will be set once dyn_worker has created class_instance # will be set once dyn_worker has created class_instance
...@@ -171,12 +158,6 @@ def main( ...@@ -171,12 +158,6 @@ def main(
nonlocal class_instance nonlocal class_instance
global dynamo_context global dynamo_context
dynamo_context["runtime"] = runtime dynamo_context["runtime"] = runtime
if service_name and service_name != service.name:
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
server_context.service_name = service.name
# Get Dynamo configuration and create component # Get Dynamo configuration and create component
namespace, component_name = service.dynamo_address() namespace, component_name = service.dynamo_address()
logger.info(f"Registering component {namespace}/{component_name}") logger.info(f"Registering component {namespace}/{component_name}")
...@@ -330,6 +311,31 @@ def main( ...@@ -330,6 +311,31 @@ def main(
async def run_concurrent_workers(tasks): async def run_concurrent_workers(tasks):
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
def exit_handler():
"""Exit handler that runs shutdown hooks before process termination."""
if class_instance is not None:
logger.info("Running shutdown hooks on exit")
try:
run_shutdown_hooks(class_instance)
logger.info("Shutdown hooks completed successfully")
except Exception as e:
logger.error(f"Error running shutdown hooks: {e}")
else:
logger.debug("No class instance available for shutdown hooks")
# Register the exit handler
atexit.register(exit_handler)
# Also handle signals for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating graceful shutdown")
exit_handler()
# Exit the process after running shutdown hooks
os._exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
worker_tasks = [] worker_tasks = []
uvloop.install() uvloop.install()
...@@ -351,5 +357,13 @@ def main( ...@@ -351,5 +357,13 @@ def main(
asyncio.run(run_concurrent_workers(worker_tasks)) asyncio.run(run_concurrent_workers(worker_tasks))
def run_shutdown_hooks(class_instance):
"""Run all shutdown hooks on the class instance."""
for name, member in vars(class_instance.__class__).items():
if callable(member) and getattr(member, "__dynamo_shutdown_hook__", False):
shutdown_func = getattr(class_instance, name)
shutdown_func()
if __name__ == "__main__": if __name__ == "__main__":
app() app()
...@@ -24,13 +24,10 @@ import os ...@@ -24,13 +24,10 @@ import os
import pathlib import pathlib
import shutil import shutil
import tempfile import tempfile
from typing import Any, Dict, Optional, TypeVar from typing import Any, Dict, Optional
# TODO: WARNING: internal but only for type checking in the deploy path i believe
from _bentoml_sdk import Service
from circus.sockets import CircusSocket from circus.sockets import CircusSocket
from circus.watcher import Watcher from circus.watcher import Watcher
from simple_di import inject
from dynamo.sdk.cli.circus import CircusRunner from dynamo.sdk.cli.circus import CircusRunner
from dynamo.sdk.core.runner import TargetEnum from dynamo.sdk.core.runner import TargetEnum
...@@ -44,13 +41,6 @@ from .utils import ( ...@@ -44,13 +41,6 @@ from .utils import (
save_dynamo_state, save_dynamo_state,
) )
# WARNING: internal
# Use Protocol as the base for type alias
AnyService = TypeVar("AnyService", bound=ServiceProtocol)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_DYNAMO_WORKER_SCRIPT = "dynamo.sdk.cli.serve_dynamo" _DYNAMO_WORKER_SCRIPT = "dynamo.sdk.cli.serve_dynamo"
...@@ -159,9 +149,8 @@ def clear_namespace(namespace: str) -> None: ...@@ -159,9 +149,8 @@ def clear_namespace(namespace: str) -> None:
) )
@inject(squeeze_none=True)
def serve_dynamo_graph( def serve_dynamo_graph(
bento_identifier: str | AnyService, dynamo_pipeline: str,
working_dir: str | None = None, working_dir: str | None = None,
dependency_map: dict[str, str] | None = None, dependency_map: dict[str, str] | None = None,
service_name: str = "", service_name: str = "",
...@@ -180,21 +169,10 @@ def serve_dynamo_graph( ...@@ -180,21 +169,10 @@ def serve_dynamo_graph(
configure_dynamo_logging(service_name=service_name) configure_dynamo_logging(service_name=service_name)
bento_id: str = ""
namespace: str = "" namespace: str = ""
env: dict[str, Any] = {} env: dict[str, Any] = {}
if isinstance(bento_identifier, Service): svc = find_and_load_service(dynamo_pipeline, working_dir)
svc = bento_identifier dynamo_path = pathlib.Path(working_dir or ".")
bento_id = svc.import_string
assert (
working_dir is None
), "working_dir should not be set when passing a service in process"
# use cwd
bento_path = pathlib.Path(".")
else:
svc = find_and_load_service(bento_identifier, working_dir)
bento_id = str(bento_identifier)
bento_path = pathlib.Path(working_dir or ".")
watchers: list[Watcher] = [] watchers: list[Watcher] = []
sockets: list[CircusSocket] = [] sockets: list[CircusSocket] = []
...@@ -254,11 +232,11 @@ def serve_dynamo_graph( ...@@ -254,11 +232,11 @@ def serve_dynamo_graph(
if name == svc.name or name in dependency_map: if name == svc.name or name in dependency_map:
continue continue
new_watcher, new_socket, uri = create_dynamo_watcher( new_watcher, new_socket, uri = create_dynamo_watcher(
bento_id, dynamo_pipeline,
dep_svc, dep_svc,
uds_path, uds_path,
allocator, allocator,
str(bento_path.absolute()), str(dynamo_path.absolute()),
env=env, env=env,
target=target, target=target,
) )
...@@ -272,7 +250,7 @@ def serve_dynamo_graph( ...@@ -272,7 +250,7 @@ def serve_dynamo_graph(
dynamo_args = [ dynamo_args = [
"-m", "-m",
_DYNAMO_WORKER_SCRIPT, _DYNAMO_WORKER_SCRIPT,
bento_identifier, dynamo_pipeline,
"--service-name", "--service-name",
svc.name, svc.name,
"--worker-id", "--worker-id",
...@@ -305,7 +283,7 @@ def serve_dynamo_graph( ...@@ -305,7 +283,7 @@ def serve_dynamo_graph(
name=f"{namespace}_{svc.name}", name=f"{namespace}_{svc.name}",
args=dynamo_args, args=dynamo_args,
numprocesses=num_workers, numprocesses=num_workers,
working_dir=str(bento_path.absolute()), working_dir=str(dynamo_path.absolute()),
env=worker_env, env=worker_env,
) )
watchers.append(watcher) watchers.append(watcher)
...@@ -428,7 +406,7 @@ def serve_dynamo_graph( ...@@ -428,7 +406,7 @@ def serve_dynamo_graph(
hasattr(svc, "is_dynamo_component") hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component() and svc.is_dynamo_component()
) )
else (bento_identifier,) else (dynamo_pipeline,)
), ),
), ),
) )
......
...@@ -22,7 +22,7 @@ from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar ...@@ -22,7 +22,7 @@ from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar
from fastapi import FastAPI from fastapi import FastAPI
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from dynamo.sdk.core.protocol.deployment import Env from .deployment import Env
T = TypeVar("T", bound=object) T = TypeVar("T", bound=object)
......
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from dataclasses import asdict
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
from _bentoml_sdk import Service as BentoService
from _bentoml_sdk.service.dependency import Dependency as BentoDependency
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.deployment import Env
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
DynamoConfig,
DynamoEndpointInterface,
DynamoTransport,
LinkedServices,
ServiceConfig,
ServiceInterface,
)
from dynamo.sdk.core.runner.common import ServiceMixin
T = TypeVar("T", bound=object)
class BentoEndpoint(DynamoEndpoint):
"""BentoML-specific endpoint implementation"""
def __init__(
self,
bentoml_endpoint: Any,
name: Optional[str] = None,
transports: Optional[List[DynamoTransport]] = None,
):
self.bentoml_endpoint = bentoml_endpoint
self._name = name or bentoml_endpoint.name
self._transports = transports or bentoml_endpoint.transports
@property
def name(self) -> str:
return self._name
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
return await self.bentoml_endpoint(*args, **kwargs)
@property
def transports(self) -> List[DynamoTransport]:
return self._transports
class BentoServiceAdapter(ServiceMixin, ServiceInterface[T]):
"""BentoML adapter implementing the ServiceInterface"""
def __init__(
self,
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
):
name = service_cls.__name__
self._dynamo_config = dynamo_config or DynamoConfig(
name=name, namespace="default"
)
image = kwargs.get("image")
envs = kwargs.get("envs", [])
# attributes from decorators
for attr in ["workers", "resources"]:
if attr in kwargs:
config[attr] = kwargs[attr]
self.image = image
# Get service args from environment if available
service_args = self._get_service_args(name)
if service_args:
# Update config with service args
for key, value in service_args.items():
if key not in config:
config[key] = value
# Extract and apply specific args if needed
if "resources" in service_args:
config["resources"] = service_args["resources"]
if "workers" in service_args:
config["workers"] = service_args["workers"]
if "envs" in service_args and envs:
envs.extend(service_args["envs"])
elif "envs" in service_args:
envs = service_args["envs"]
# Initialize BentoML service
self._bentoml_service = BentoService(
config=config,
inner=service_cls,
image=image,
envs=envs or [],
)
self._endpoints: Dict[str, BentoEndpoint] = {}
self.app = app or FastAPI(title=name)
self.system_app = system_app or FastAPI(title=f"{name}-system")
self._dependencies: Dict[str, "DependencyInterface"] = {}
self._bentoml_service.config["dynamo"] = asdict(self._dynamo_config)
self._api_endpoints: list[str] = []
# Map BentoML endpoints to our generic interface
for field_name in dir(service_cls):
field = getattr(service_cls, field_name)
if isinstance(field, DynamoEndpoint):
self._endpoints[field.name] = BentoEndpoint(
field, field.name, field.transports
)
if DynamoTransport.HTTP in field.transports:
# Ensure endpoint path starts with '/'
path = (
field.name if field.name.startswith("/") else f"/{field.name}"
)
self._api_endpoints.append(path)
if isinstance(field, DependencyInterface):
self._dependencies[field_name] = field
# If any API endpoints exist, mark service as HTTP-exposed and list endpoints
if self._api_endpoints:
self._bentoml_service.config["http_exposed"] = True
self._bentoml_service.config["api_endpoints"] = self._api_endpoints.copy()
@property
def dependencies(self) -> dict[str, "DependencyInterface"]:
return self._dependencies
@property
def name(self) -> str:
return self._bentoml_service.name
@property
def config(self) -> ServiceConfig:
return ServiceConfig(self._bentoml_service.config)
@property
def inner(self) -> Type[T]:
return self._bentoml_service.inner
@property
def envs(self) -> List[Env]:
return self._bentoml_service.envs
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
return self._endpoints
def get_endpoint(self, name: str) -> DynamoEndpointInterface:
if name not in self._endpoints:
raise ValueError(f"No endpoint found with name: {name}")
return self._endpoints[name]
def list_endpoints(self) -> List[str]:
return list(self._endpoints.keys())
def link(self, next_service: "ServiceInterface") -> "ServiceInterface":
# Check if the next service is a BentoML service adapter
LinkedServices.add((self, next_service))
return next_service
def remove_unused_edges(self, used_edges: Set["ServiceInterface"]) -> None:
current_deps = dict(self._dependencies)
for dep_key, dep_value in current_deps.items():
if dep_value.on not in used_edges:
del self._dependencies[dep_key]
# Add methods to expose underlying BentoML service when needed
def get_bentoml_service(self) -> BentoService:
return self._bentoml_service
def __call__(self) -> T:
instance = self.inner()
return instance
def find_dependent_by_name(self, name: str) -> "ServiceInterface":
"""Find dynamo service by name"""
return self.all_services()[name]
def dynamo_address(self) -> tuple[str, str]:
return (self._dynamo_config.namespace, self._dynamo_config.name)
def all_services(self) -> dict[str, "ServiceInterface"]:
"""Get a map of the service and all recursive dependencies"""
services: dict[str, "ServiceInterface"] = {self.name: self}
for dep in self.dependencies.values():
services.update(dep.on.all_services())
return services
class BentoMLDependency(DependencyInterface[T]):
"""BentoML adapter implementing the DependencyInterface"""
def __init__(
self,
bentoml_dependency: BentoDependency,
on_service: Optional[BentoServiceAdapter[T]] = None,
):
self._bentoml_dependency = bentoml_dependency
self._on_service = on_service
self._dynamo_client = None
self._runtime = None
@property
def on(self) -> Optional[ServiceInterface[T]]:
return self._on_service
def get(self, *args: Any, **kwargs: Any) -> Any:
return self._bentoml_dependency.get(*args, **kwargs)
def set_runtime(self, runtime: Any) -> None:
"""Set the Dynamo runtime for this dependency"""
self._runtime = runtime
if self._dynamo_client:
self._dynamo_client._runtime = runtime
async def get_endpoint(self, name: str) -> Any:
# Implementation depends on what BentoML provides
# This is a simplified version
client = self.get()
if hasattr(client, name):
return getattr(client, name)
raise ValueError(f"No endpoint found with name: {name}")
# Add method to expose underlying BentoML dependency when needed
def get_bentoml_dependency(self) -> BentoDependency:
return self._bentoml_dependency
def __get__(
self: "DependencyInterface[T]", instance: Any, owner: Any
) -> "DependencyInterface[T]" | T | Any:
if instance is None:
return self
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
self._dynamo_client._runtime = self._runtime
return self._dynamo_client
class BentoDeploymentTarget(DeploymentTarget):
"""Kubernetes implementation of the DeploymentTarget"""
def create_service(
self,
service_cls: Type[T],
config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None,
**kwargs,
) -> ServiceInterface[T]:
"""Create a BentoServiceAdapter with the given parameters"""
return BentoServiceAdapter(
service_cls=service_cls,
config=config,
dynamo_config=dynamo_config,
app=app,
system_app=system_app,
**kwargs,
)
def create_dependency(
self, on: Optional[ServiceInterface[T]] = None, **kwargs
) -> DependencyInterface[T]:
url = kwargs.get("url")
deployment = kwargs.get("deployment")
cluster = kwargs.get("cluster")
# Get the underlying BentoML service if available
bentoml_service = None
if on is not None and isinstance(on, BentoServiceAdapter):
# this is underlying bentoml service
bentoml_service = on.get_bentoml_service()
# Create underlying BentoML dependency
bentoml_dependency = BentoDependency(
bentoml_service, url=url, deployment=deployment, cluster=cluster
)
# Wrap in our adapter
return BentoMLDependency(bentoml_dependency, on)
...@@ -23,6 +23,8 @@ from pydantic import BaseModel ...@@ -23,6 +23,8 @@ from pydantic import BaseModel
from dynamo.sdk.core.protocol.interface import DynamoTransport from dynamo.sdk.core.protocol.interface import DynamoTransport
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
class DynamoEndpoint: class DynamoEndpoint:
"""Decorator class for Dynamo endpoints""" """Decorator class for Dynamo endpoints"""
...@@ -90,8 +92,15 @@ def endpoint( ...@@ -90,8 +92,15 @@ def endpoint(
return decorator return decorator
def async_on_start(func: t.Callable) -> t.Callable: def async_on_start(func: F) -> F:
"""Decorator for async onstart functions.""" """Decorator for async onstart functions."""
# Mark the function as a startup hook # Mark the function as a startup hook
setattr(func, "__dynamo_startup_hook__", True) setattr(func, "__dynamo_startup_hook__", True)
return func return func
def on_shutdown(func: F) -> F:
"""Decorator for shutdown hook."""
# Mark the function as a shutdown hook
setattr(func, "__dynamo_shutdown_hook__", True)
return func
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import asyncio
from typing import Any, Dict, Optional, TypeVar
# WARNING: internal
from _bentoml_sdk.service import Service
from _bentoml_sdk.service.dependency import Dependency
from dynamo.runtime import DistributedRuntime
from dynamo.sdk.lib.service import DynamoService
T = TypeVar("T")
class DynamoClient:
"""Client for calling Dynamo endpoints with streaming support"""
def __init__(self, service: DynamoService[Any]):
self._service = service
self._endpoints = service.get_dynamo_endpoints()
self._dynamo_clients: Dict[str, Any] = {}
self._runtime = None
def __getattr__(self, name: str) -> Any:
if name not in self._endpoints:
raise AttributeError(
f"No Dynamo endpoint '{name}' found on service '{self._service.name}'. "
f"Available endpoints: {list(self._endpoints.keys())}"
)
# For streaming endpoints, create/cache the stream function
if name not in self._dynamo_clients:
namespace, component_name = self._service.dynamo_address()
# Create async generator function that directly yields from the stream
async def get_stream(*args, **kwargs):
if self._runtime is not None:
# Use existing runtime if available
runtime = self._runtime
else:
# Create new runtime and store it
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, False)
self._runtime = runtime
try:
client = (
await runtime.namespace(namespace)
.component(component_name)
.endpoint(name)
.client()
)
# Directly yield items from the stream
stream = await client.generate(*args, **kwargs)
async for item in stream:
yield item.data()
except Exception as e:
raise e
self._dynamo_clients[name] = get_stream
return self._dynamo_clients[name]
class DynamoDependency(Dependency[T]):
"""Enhanced dependency that supports Dynamo endpoints"""
def __init__(
self,
on: Service[T] | None = None,
url: str | None = None,
deployment: str | None = None,
cluster: str | None = None,
):
super().__init__(on, url=url, deployment=deployment, cluster=cluster)
self._dynamo_client: Optional[DynamoClient] = None
self._runtime = None
# offers an escape hatch to get the endpoint directly
async def get_endpoint(self, name: str) -> Any:
"""
usage:
dep = depends(Worker)
...
await dep.get_endpoint("generate") # equivalent to the following
router_client = (
await runtime.namespace("dynamo")
.component("router")
.endpoint("generate")
.client()
)
"""
# TODO: Read the runtime from the tdist since it is not stored in global
if self._runtime is None:
print(
"Get Endpoint: Runtime not set for DynamoDependency. Cannot get endpoint."
)
raise ValueError("Runtime not set for DynamoDependency")
address = self.on.dynamo_address()
comp_ns, comp_name = address
print("Get Endpoint: Dynamo ADDRESS: ", address)
return (
await self._runtime.namespace(comp_ns)
.component(comp_name)
.endpoint(name)
.client()
)
def set_runtime(self, runtime: Any) -> None:
"""Set the Dynamo runtime for this dependency"""
self._runtime = runtime
if self._dynamo_client:
self._dynamo_client._runtime = runtime
def get(self, *args: Any, **kwargs: Any) -> T | Any:
# If this is a Dynamo-enabled service, return the Dynamo client
if isinstance(self.on, DynamoService):
if self._dynamo_client is None:
self._dynamo_client = DynamoClient(self.on)
if self._runtime:
self._dynamo_client._runtime = self._runtime
return self._dynamo_client
# Otherwise fall back to normal BentoML dependency resolution
return super().get(*args, **kwargs)
def depends(
on: Service[T] | None = None,
*,
url: str | None = None,
deployment: str | None = None,
cluster: str | None = None,
) -> DynamoDependency[T]:
"""Create a dependency that's Dynamo-aware.
If the dependency is on a Dynamo-enabled service, this will return a client
that can call Dynamo endpoints. Otherwise behaves like normal BentoML dependency.
Args:
on: The service to depend on
url: URL for remote service
deployment: Deployment name
cluster: Cluster name
Raises:
AttributeError: When trying to call a non-existent Dynamo endpoint
"""
if on is not None and not isinstance(on, Service):
raise TypeError("depends() expects a class decorated with @service()")
return DynamoDependency(on, url=url, deployment=deployment, cluster=cluster)
...@@ -26,7 +26,7 @@ from typing import Optional, TypeVar ...@@ -26,7 +26,7 @@ from typing import Optional, TypeVar
import yaml import yaml
from dynamo.sdk.core.protocol.deployment import Service from dynamo.sdk.core.protocol.deployment import Service
from dynamo.sdk.lib.service import DynamoService from dynamo.sdk.core.protocol.interface import ServiceInterface
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
T = TypeVar("T", bound=object) T = TypeVar("T", bound=object)
...@@ -35,7 +35,7 @@ T = TypeVar("T", bound=object) ...@@ -35,7 +35,7 @@ T = TypeVar("T", bound=object)
def find_and_load_service( def find_and_load_service(
import_str: str, import_str: str,
working_dir: Optional[str] = None, working_dir: Optional[str] = None,
) -> DynamoService: ) -> ServiceInterface:
"""Load a DynamoService instance from source code by providing an import string. """Load a DynamoService instance from source code by providing an import string.
Args: Args:
...@@ -84,7 +84,7 @@ def find_and_load_service( ...@@ -84,7 +84,7 @@ def find_and_load_service(
os.chdir(prev_cwd) os.chdir(prev_cwd)
def _do_import(import_str: str, working_dir: str) -> DynamoService: def _do_import(import_str: str, working_dir: str) -> ServiceInterface:
"""Internal function to handle the actual import logic""" """Internal function to handle the actual import logic"""
import_path, _, attrs_str = import_str.partition(":") import_path, _, attrs_str = import_str.partition(":")
logger.debug(f"Parsed import string - path: {import_path}, attributes: {attrs_str}") logger.debug(f"Parsed import string - path: {import_path}, attributes: {attrs_str}")
...@@ -139,7 +139,7 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService: ...@@ -139,7 +139,7 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService:
services = [ services = [
(name, obj) (name, obj)
for name, obj in module.__dict__.items() for name, obj in module.__dict__.items()
if isinstance(obj, DynamoService) if isinstance(obj, ServiceInterface)
] ]
logger.debug(f"Found {len(services)} DynamoService instances") logger.debug(f"Found {len(services)} DynamoService instances")
...@@ -178,7 +178,7 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService: ...@@ -178,7 +178,7 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService:
instance = module instance = module
for attr in attrs_str.split("."): for attr in attrs_str.split("."):
try: try:
if isinstance(instance, DynamoService): if isinstance(instance, ServiceInterface):
logger.debug(f"Following dependency link: {attr}") logger.debug(f"Following dependency link: {attr}")
instance = instance.dependencies[attr].on instance = instance.dependencies[attr].on
else: else:
...@@ -209,7 +209,7 @@ def _get_dir_size(path: str) -> int: ...@@ -209,7 +209,7 @@ def _get_dir_size(path: str) -> int:
def load_entry_service( def load_entry_service(
pipeline_tag: str, build_dir: str = "~/.dynamo/packages" pipeline_tag: str, build_dir: str = "~/.dynamo/packages"
) -> Service: ) -> ServiceInterface:
""" """
Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance. Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance.
""" """
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
import os
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union
# WARNING: internal
from _bentoml_sdk import Service, ServiceConfig
from _bentoml_sdk.images import Image
from _bentoml_sdk.service.config import validate
from fastapi import FastAPI
from dynamo.sdk.core.protocol.interface import DynamoTransport, LinkedServices
from dynamo.sdk.lib.decorators import DynamoEndpoint
T = TypeVar("T", bound=object)
logger = logging.getLogger(__name__)
class ComponentType(str, Enum):
"""Types of Dynamo components"""
PLANNER = "planner"
# Future types can be added here like:
# METRICS = "metrics"
# MONITOR = "monitor"
# etc.
@dataclass
class DynamoConfig:
"""Configuration for Dynamo components"""
enabled: bool = False
name: str | None = None
namespace: str | None = None
custom_lease: LeaseConfig | None = None
component_type: ComponentType | None = (
None # Indicates if this is a meta/system component
)
@dataclass
class LeaseConfig:
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class DynamoService(Service[T]):
"""A custom service class that extends BentoML's base Service with Dynamo capabilities"""
def __init__(
self,
config: ServiceConfig,
inner: type[T],
image: Optional[Image] = None,
envs: Optional[list[dict[str, Any]]] = None,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None,
):
service_name = inner.__name__
service_args = self._get_service_args(service_name)
self.app = app
if service_args:
# Validate and merge service args with existing config
validated_args = validate(service_args)
config.update(validated_args)
self._remove_service_args(service_name)
super().__init__(config=config, inner=inner, image=image, envs=envs or [])
# Get any dynamo overrides from service_args
dynamo_overrides = {}
if service_args and "dynamo" in service_args:
dynamo_overrides = service_args["dynamo"]
logger.debug(f"Found dynamo overrides in service_args: {dynamo_overrides}")
# Initialize Dynamo configuration with overrides
base_config = (
dynamo_config
if dynamo_config
else DynamoConfig(name=inner.__name__, namespace="default")
)
logger.debug(f"Initial base DynamoConfig: {asdict(base_config)}")
# Apply overrides from service_args to base config
for key, value in dynamo_overrides.items():
if hasattr(base_config, key):
logger.debug(f"Applying override: {key}={value}")
setattr(base_config, key, value)
self._dynamo_config = base_config
if self._dynamo_config.name is None:
self._dynamo_config.name = inner.__name__
logger.debug(f"Final DynamoConfig: {asdict(self._dynamo_config)}")
# Add dynamo configuration to the service config
# this allows for the config to be part of the service in bento.yaml
self.config["dynamo"] = asdict(self._dynamo_config)
# Register Dynamo endpoints
self._dynamo_endpoints: Dict[str, DynamoEndpoint] = {}
self._api_endpoints: list[str] = []
for field in dir(inner):
value = getattr(inner, field)
if isinstance(value, DynamoEndpoint):
self._dynamo_endpoints[value.name] = value
if DynamoTransport.HTTP in getattr(
value, "_transports", [DynamoTransport.DEFAULT]
):
# Ensure endpoint path starts with '/'
path = (
value.name if value.name.startswith("/") else f"/{value.name}"
)
self._api_endpoints.append(path)
# If any API endpoints exist, mark service as HTTP-exposed and list endpoints
if self._api_endpoints:
self.config["http_exposed"] = True
self.config["api_endpoints"] = self._api_endpoints.copy()
self._linked_services: List[DynamoService] = [] # Track linked services
def _get_service_args(self, service_name: str) -> Optional[dict]:
"""Get ServiceArgs from environment config if specified"""
config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if config_str:
config = json.loads(config_str)
service_config = config.get(service_name, {})
return service_config.get("ServiceArgs")
return None
def dynamo_address(self) -> Tuple[Optional[str], Optional[str]]:
"""Get the Dynamo address for this component in namespace/name format"""
# Check if we have a runner map with Dynamo address
runner_map = os.environ.get("BENTOML_RUNNER_MAP")
if runner_map:
try:
runners = json.loads(runner_map)
if self.name in runners:
address = runners[self.name]
if address.startswith("dynamo://"):
# Parse dynamo://namespace/name into (namespace, name)
_, path = address.split("://", 1)
namespace, name = path.split("/", 1)
logger.debug(
f"Resolved Dynamo address from runner map: {namespace}/{name}"
)
return (namespace, name)
except (json.JSONDecodeError, ValueError) as e:
raise ValueError(f"Failed to parse BENTOML_RUNNER_MAP: {str(e)}") from e
logger.debug(
f"Using default Dynamo address: {self._dynamo_config.namespace}/{self._dynamo_config.name}"
)
return (self._dynamo_config.namespace, self._dynamo_config.name)
def get_dynamo_endpoints(self) -> Dict[str, DynamoEndpoint]:
"""Get all registered Dynamo endpoints"""
return self._dynamo_endpoints
def get_dynamo_endpoint(self, name: str) -> DynamoEndpoint:
"""Get a specific Dynamo endpoint by name"""
if name not in self._dynamo_endpoints:
raise ValueError(f"No Dynamo endpoint found with name: {name}")
return self._dynamo_endpoints[name]
def list_dynamo_endpoints(self) -> List[str]:
"""List names of all registered Dynamo endpoints"""
return list(self._dynamo_endpoints.keys())
def remove_unused_edges(self, used_edges: Set[DynamoService]):
"""Remove a dependancy from the current service based on the key"""
current_deps = dict(self.dependencies)
for dep_key, dep_value in current_deps.items():
if dep_value.on.inner not in used_edges:
del self.dependencies[dep_key]
def link(self, next_service: DynamoService):
"""Link this service to another service, creating a pipeline."""
self._linked_services.append(next_service)
LinkedServices.add((self, next_service))
return next_service
def _remove_service_args(self, service_name: str):
"""Remove ServiceArgs from the environment config after using them, preserving envs"""
logger.debug(f"Removing service args for {service_name}")
config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if config_str:
config = json.loads(config_str)
if service_name in config and "ServiceArgs" in config[service_name]:
# Save envs to separate env var before removing ServiceArgs
service_args = config[service_name]["ServiceArgs"]
if "envs" in service_args:
service_envs = os.environ.get("DYNAMO_SERVICE_ENVS", "{}")
envs_config = json.loads(service_envs)
if service_name not in envs_config:
envs_config[service_name] = {}
envs_config[service_name]["ServiceArgs"] = {
"envs": service_args["envs"]
}
os.environ["DYNAMO_SERVICE_ENVS"] = json.dumps(envs_config)
def inject_config(self) -> None:
"""Inject configuration from environment into service configs.
This reads from DYNAMO_SERVICE_CONFIG environment variable and merges
the configuration with any existing service config.
"""
# Get service configs from environment
service_config_str = os.environ.get("DYNAMO_SERVICE_CONFIG")
if not service_config_str:
logger.debug("No DYNAMO_SERVICE_CONFIG found in environment")
return
try:
service_configs = json.loads(service_config_str)
logger.debug(f"Loaded service configs: {service_configs}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse DYNAMO_SERVICE_CONFIG: {e}")
return
# Store the entire config at class level
if not hasattr(DynamoService, "_global_service_configs"):
setattr(DynamoService, "_global_service_configs", {})
DynamoService._global_service_configs = service_configs
# Process ServiceArgs for all services
all_services = self.all_services()
logger.debug(f"Processing configs for services: {list(all_services.keys())}")
for name, svc in all_services.items():
if name in service_configs:
svc_config = service_configs[name]
# Extract ServiceArgs if present
if "ServiceArgs" in svc_config:
logger.debug(
f"Found ServiceArgs for {name}: {svc_config['ServiceArgs']}"
)
if not hasattr(svc, "_service_args"):
object.__setattr__(svc, "_service_args", {})
svc._service_args = svc_config["ServiceArgs"]
else:
logger.debug(f"No ServiceArgs found for {name}")
# Set default config
if not hasattr(svc, "_service_args"):
object.__setattr__(svc, "_service_args", {"workers": 1})
def get_service_configs(self) -> Dict[str, Dict[str, Any]]:
"""Get the service configurations for resource allocation.
Returns:
Dict mapping service names to their configs
"""
# Get all services in the dependency chain
all_services = self.all_services()
result = {}
# If we have global configs, use them to build service configs
if hasattr(DynamoService, "_global_service_configs"):
for name, svc in all_services.items():
# Start with default config
config = {"workers": 1}
# If service has specific args, use them
if hasattr(svc, "_service_args"):
config.update(svc._service_args)
# If there are global configs for this service, get ServiceArgs
if name in DynamoService._global_service_configs:
svc_config = DynamoService._global_service_configs[name]
if "ServiceArgs" in svc_config:
config.update(svc_config["ServiceArgs"])
result[name] = config
logger.debug(f"Built config for {name}: {config}")
return result
def service(
inner: Optional[type[T]] = None,
/,
*,
image: Optional[str] = None,
envs: Optional[list[dict[str, Any]]] = None,
dynamo: Optional[Union[Dict[str, Any], DynamoConfig]] = None,
app: Optional[FastAPI] = None,
**kwargs: Any,
) -> Any:
"""Enhanced service decorator that supports Dynamo configuration
Args:
dynamo: Dynamo configuration, either as a DynamoConfig object or dict with keys:
- enabled: bool (default True)
- name: str (default: class name)
- namespace: str (default: "default")
**kwargs: Existing BentoML service configuration
"""
config = kwargs
# Parse dict into DynamoConfig object
dynamo_config: Optional[DynamoConfig] = None
if dynamo is not None:
if isinstance(dynamo, dict):
dynamo_config = DynamoConfig(**dynamo)
else:
dynamo_config = dynamo
def decorator(inner: type[T]) -> DynamoService[T]:
if isinstance(inner, Service):
raise TypeError("service() decorator can only be applied once")
return DynamoService(
config=config,
inner=inner,
image=image,
envs=envs or [],
dynamo_config=dynamo_config,
app=app,
)
return decorator(inner) if inner is not None else decorator
...@@ -46,3 +46,23 @@ def test_gpu_resources(setup_and_teardown): ...@@ -46,3 +46,23 @@ def test_gpu_resources(setup_and_teardown):
assert dyn_svc.config.resources.cpu == "2" assert dyn_svc.config.resources.cpu == "2"
assert dyn_svc.config.resources.gpu == "1" assert dyn_svc.config.resources.gpu == "1"
assert dyn_svc.config.resources.memory == "4Gi" assert dyn_svc.config.resources.memory == "4Gi"
def test_gpu_resources_coercing_from_integers(setup_and_teardown):
"""Test resource configurations"""
from dynamo.sdk import service
@service(
resources={"cpu": 3, "gpu": 4, "memory": "4Gi"},
dynamo={"namespace": "test"},
)
class MockService:
def __init__(self) -> None:
pass
dyn_svc: ServiceInterface = MockService
assert dyn_svc.config is not None # type: ignore
assert dyn_svc.config.resources.cpu == "3"
assert dyn_svc.config.resources.gpu == "4"
assert dyn_svc.config.resources.memory == "4Gi"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment