Unverified Commit 68ac71c4 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: portable dynamo build (#1215)

parent 0594235b
...@@ -196,7 +196,7 @@ func RetrieveDynamoGraphConfigurationFile(ctx context.Context, url string) (*byt ...@@ -196,7 +196,7 @@ func RetrieveDynamoGraphConfigurationFile(ctx context.Context, url string) (*byt
} }
// Extract the YAML file // Extract the YAML file
yamlFileName := "bento.yaml" yamlFileName := "dynamo.yaml"
yamlContent, err := archive.ExtractFileFromTar(tarData, yamlFileName) yamlContent, err := archive.ExtractFileFromTar(tarData, yamlFileName)
if err != nil { if err != nil {
return nil, err return nil, err
......
# Use ARG to allow base image to be specified at build time
ARG BASE_IMAGE=__BASE_IMAGE__
FROM ${BASE_IMAGE}
# Build arguments for user configuration
ARG USER_ID=1024
ARG GROUP_ID=1024
ARG USERNAME=dynamo
ARG GROUPNAME=dynamo
ARG HOME_DIR=/home/${USERNAME}
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PATH="${HOME_DIR}/.local/bin:$PATH"
ENV PYTHONPATH="${HOME_DIR}/app:$PYTHONPATH"
# Create group and user
RUN if [ "$(id -u)" != "0" ]; then \
echo "Using sudo for user/group creation"; \
sudo groupadd --gid ${GROUP_ID} ${GROUPNAME} \
&& sudo useradd --uid ${USER_ID} --gid ${GROUP_ID} --create-home --shell /bin/bash ${USERNAME} \
&& sudo mkdir -p ${HOME_DIR}/app \
&& sudo mkdir -p ${HOME_DIR}/.local/bin \
&& sudo mkdir -p ${HOME_DIR}/.cache/pip \
&& sudo chown -R ${USERNAME}:${GROUPNAME} ${HOME_DIR}; \
else \
echo "Running as root, no sudo needed"; \
groupadd --gid ${GROUP_ID} ${GROUPNAME} \
&& useradd --uid ${USER_ID} --gid ${GROUP_ID} --create-home --shell /bin/bash ${USERNAME} \
&& mkdir -p ${HOME_DIR}/app \
&& mkdir -p ${HOME_DIR}/.local/bin \
&& mkdir -p ${HOME_DIR}/.cache/pip \
&& chown -R ${USERNAME}:${GROUPNAME} ${HOME_DIR}; \
fi
# Switch to non-root user
USER ${USERNAME}
WORKDIR ${HOME_DIR}/app
# Copy application code
COPY --chown=${USERNAME}:${GROUPNAME} . .
RUN chmod +x ${HOME_DIR}/app
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import datetime
import importlib
import importlib.util
import inspect
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import typing as t
import uuid
from pathlib import Path
from typing import TypeVar
import typer
import yaml
from pydantic import BaseModel, Field
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from dynamo.sdk import DYNAMO_IMAGE
from dynamo.sdk.core.protocol.interface import (
DynamoTransport,
LinkedServices,
ServiceInterface,
)
from dynamo.sdk.core.runner import TargetEnum
logger = logging.getLogger(__name__)
console = Console()
T = TypeVar("T", bound=object)
DYNAMO_FIGLET = """
██████╗ ██╗ ██╗███╗ ██╗ █████╗ ███╗ ███╗ ██████╗
██╔══██╗╚██╗ ██╔╝████╗ ██║██╔══██╗████╗ ████║██╔═══██╗
██║ ██║ ╚████╔╝ ██╔██╗ ██║███████║██╔████╔██║██║ ██║
██║ ██║ ╚██╔╝ ██║╚██╗██║██╔══██║██║╚██╔╝██║██║ ██║
██████╔╝ ██║ ██║ ╚████║██║ ██║██║ ╚═╝ ██║╚██████╔╝
╚═════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
"""
# --- Custom exceptions ---
class InvalidArgument(Exception):
"""Exception raised for invalid arguments."""
pass
class BuildError(Exception):
"""Exception raised for build errors."""
pass
# --- Data models ---
class Tag(BaseModel):
"""Tag for identifying a package."""
name: str
version: t.Optional[str] = None
def __str__(self) -> str:
if self.version:
return f"{self.name}:{self.version}"
return self.name
def make_new_version(self) -> Tag:
"""Create a new version based on timestamp."""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:8]
return Tag(name=self.name, version=f"{timestamp}_{short_uuid}")
class ServiceConfig(BaseModel):
"""Configuration for a service."""
name: str
service: str = "" # Fully qualified service name
models: t.List[str] = Field(default_factory=list)
dependencies: t.List[str] = Field(default_factory=list)
resource: t.Dict[str, t.Any] = Field(default_factory=dict)
workers: t.Optional[int] = None
image: str = "dynamo:latest"
dynamo: t.Dict[str, t.Any] = Field(default_factory=dict)
http_exposed: bool = False
api_endpoints: t.List[str] = Field(default_factory=list)
class ServiceInfo(BaseModel):
"""Information about a service."""
name: str
module_path: str
class_name: str
config: ServiceConfig
@classmethod
def from_service(cls, service: ServiceInterface[T]) -> ServiceInfo:
"""Create ServiceInfo from a service instance."""
service_class = service.inner
name = getattr(service, "name", service_class.__name__)
# Extract API endpoints if available
api_endpoints = []
for ep_name, endpoint in service.get_dynamo_endpoints().items():
if DynamoTransport.HTTP in endpoint.transports:
api_endpoints.append(f"/{ep_name}")
# Create config
config = ServiceConfig(
name=name,
service="",
resource=service.config.resource.model_dump(),
workers=service.config.workers,
image=service.config.image,
dynamo=service.config.dynamo.model_dump(),
http_exposed=len(api_endpoints) > 0,
api_endpoints=api_endpoints,
)
return cls(
name=name,
module_path=service.__module__,
class_name=service_class.__name__,
config=config,
)
class BuildConfig(BaseModel):
"""Configuration for building a Dynamo pipeline."""
service: str
name: t.Optional[str] = None
version: t.Optional[str] = None
tag: t.Optional[str] = None
include: t.List[str] = Field(
default_factory=lambda: [
"**/*.py",
"**/*.yaml",
"**/*.json",
"**/*.toml",
"**/*.md",
"**/*.sh",
]
)
exclude: t.List[str] = Field(
default_factory=lambda: [
"**/__pycache__/**",
"**/.git/**",
]
)
labels: t.Dict[str, str] = Field(default_factory=dict)
envs: t.List[str] = Field(default_factory=list)
docker: t.Dict[str, t.Any] = Field(default_factory=dict)
def to_yaml(self, file_obj: t.TextIO) -> None:
"""Write config to YAML file."""
yaml.dump(self.model_dump(), file_obj)
def with_defaults(self) -> BuildConfig:
"""Return config with default values filled in."""
return self
class ManifestInfo(BaseModel):
"""Information for generating a manifest file."""
service: str
name: str
version: str
creation_time: str
labels: t.Dict[str, str]
entry_service: str
services: t.List[ServiceInfo]
envs: t.List[str]
def to_dict(self) -> t.Dict[str, t.Any]:
"""Convert to dictionary for YAML serialization."""
result = self.model_dump()
# Convert ServiceInfo objects to dictionaries
services_dict = []
for service in result["services"]:
service_dict = {
"name": service["name"],
"service": service["config"]["service"],
"config": {
"resource": service["config"]["resource"],
"workers": service["config"]["workers"],
"image": service["config"]["image"],
"dynamo": service["config"]["dynamo"],
},
}
# Add HTTP configuration if exposed
if service["config"]["http_exposed"]:
service_dict["config"]["http_exposed"] = True
service_dict["config"]["api_endpoints"] = service["config"][
"api_endpoints"
]
services_dict.append(service_dict)
result["services"] = services_dict
return result
class PackageInfo(BaseModel):
"""Information about a built package."""
tag: Tag
path: str
service: str
services: t.List[ServiceInfo]
entry_service: str
labels: t.Dict[str, str]
envs: t.List[str]
def to_yaml(self) -> str:
"""Convert to YAML string."""
return yaml.dump(self.model_dump())
def to_manifest(self) -> ManifestInfo:
"""Convert to manifest information."""
return ManifestInfo(
service=self.service,
name=self.tag.name,
version=self.tag.version if self.tag.version else "",
creation_time=datetime.datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S.%f+00:00"
),
labels=self.labels,
entry_service=self.entry_service,
services=self.services,
envs=self.envs,
)
class Package:
"""Dynamo package that bundles services for deployment."""
def __init__(self, tag: Tag, path: str, info: PackageInfo):
self.tag = tag
self.path = path
self.info = info
def __str__(self) -> str:
return str(self.tag)
@classmethod
def dynamo_service(
cls,
build_config: BuildConfig,
build_ctx: t.Optional[str] = None,
) -> t.Any:
"""Get a dynamo service from config."""
build_ctx = (
os.getcwd()
if build_ctx is None
else os.path.realpath(os.path.expanduser(build_ctx))
)
if not os.path.isdir(build_ctx):
raise InvalidArgument(
f"Build context {build_ctx} does not exist or is not a directory."
)
# Load the service
from dynamo.sdk.lib.loader import find_and_load_service
dyn_svc = find_and_load_service(build_config.service, working_dir=build_ctx)
# Clean up unused edges
LinkedServices.remove_unused_edges()
dyn_svc.inject_config()
return dyn_svc
@classmethod
def create(
cls,
build_config: BuildConfig,
build_ctx: str,
version: t.Optional[str] = None,
) -> Package:
dyn_svc = cls.dynamo_service(build_config, build_ctx)
# Get service name for package
package_name = cls.to_package_name(build_config.service)
# image: str = dyn_svc.image
# Use provided version or create new one
if version is None:
version = build_config.version
# Create tag with version
tag = Tag(name=package_name, version=version)
if version is None:
tag = tag.make_new_version()
logger.debug(
f'Building Dynamo package "{tag}" from build context "{build_ctx}".'
)
# Create temporary directory for package
package_dir = tempfile.mkdtemp(prefix=f"dynamo_package_{package_name}_")
# Copy files based on include/exclude patterns
cls.copy_files(
build_ctx, package_dir, build_config.include, build_config.exclude
)
# Get info about all services
all_services = list(dyn_svc.all_services().values())
services_info = [ServiceInfo.from_service(s) for s in all_services]
# Create package info
package_info = PackageInfo(
tag=tag,
service=build_config.service,
path=package_dir,
services=services_info,
entry_service=dyn_svc.name,
labels=build_config.labels,
envs=build_config.envs,
)
# Create the package
package = cls(tag, package_dir, package_info)
# Write package info and manifests
return package
def generate_manifests(self) -> None:
"""Generate manifest files for the package."""
with Progress(
SpinnerColumn(),
TextColumn("[bold green]Generating manifests..."),
transient=True,
) as progress:
progress.add_task("generate", total=None)
manifest = self.info.to_manifest()
manifest_dict = manifest.to_dict()
with open(os.path.join(self.path, "dynamo.yaml"), "w") as f:
yaml.dump(manifest_dict, f, default_flow_style=False)
@staticmethod
def load_service(service_path: str, working_dir: str) -> t.Any:
"""Load a service from a path."""
logger.info(f"Loading service from: {service_path}")
# Add working directory to sys.path
sys.path.insert(0, working_dir)
try:
# Handle module:class format
if ":" in service_path:
module_path, class_name = service_path.split(":", 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
# Handle direct Python file
elif service_path.endswith(".py"):
module_name = os.path.basename(service_path)[:-3]
spec = importlib.util.spec_from_file_location(
module_name, os.path.join(working_dir, service_path)
)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load {service_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find the service class
for attr_name in dir(module):
attr = getattr(module, attr_name)
if inspect.isclass(attr) and attr.__module__ == module.__name__:
# Simple heuristic - find a class defined in this module
return attr
raise ImportError(f"No service class found in {service_path}")
# Handle Python module
else:
module = importlib.import_module(service_path)
# Find the service class
for attr_name in dir(module):
attr = getattr(module, attr_name)
if inspect.isclass(attr) and attr.__module__ == module.__name__:
# Simple heuristic - find a class defined in this module
return attr
raise ImportError(f"No service class found in {service_path}")
finally:
# Remove working directory from sys.path
sys.path.pop(0)
@staticmethod
def to_package_name(name: str) -> str:
"""Convert CamelCase to snake_case."""
import re
name = name.split(":")[1].lower()
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
s2 = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1)
ret = s2.replace(":", "_")
print(f"Converting {name} to snake_case: {ret}")
return ret
@staticmethod
def _get_dockerfile_template(base_image: str = "dynamo:latest") -> str:
"""Get the Dockerfile template content with configurable base image."""
# Try to load the Dockerfile.template template from the CLI directory
cli_template_path = Path(__file__).parent / "Dockerfile.template"
if not cli_template_path.exists():
raise FileNotFoundError(
f"Dockerfile template not found at {cli_template_path}"
)
with open(cli_template_path, "r") as f:
template_content = f.read()
# Replace the base image placeholder with the actual base image
template_content = template_content.replace("__BASE_IMAGE__", base_image)
return template_content
@staticmethod
def copy_files(
source_dir: str,
target_dir: str,
include_patterns: t.List[str],
exclude_patterns: t.List[str],
) -> None:
"""Copy files based on include/exclude patterns."""
import glob
# Create set of all files to include
all_files = set()
for pattern in include_patterns:
pattern_path = os.path.join(source_dir, pattern)
matched_files = glob.glob(pattern_path, recursive=True)
all_files.update(matched_files)
# Remove excluded files
for pattern in exclude_patterns:
pattern_path = os.path.join(source_dir, pattern)
excluded_files = glob.glob(pattern_path, recursive=True)
all_files.difference_update(excluded_files)
# Copy each file preserving relative path
for file_path in all_files:
if os.path.isfile(file_path):
rel_path = os.path.relpath(file_path, source_dir)
target_path = os.path.join(target_dir, rel_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(file_path, target_path)
def build(
service: str = typer.Argument(
..., help="Service specification in the format module:ServiceClass"
),
output_dir: t.Optional[str] = typer.Option(
None, "--output-dir", "-o", help="Output directory for the build"
),
force: bool = typer.Option(
False, "--force", "-f", help="Force overwrite of existing build"
),
containerize: bool = typer.Option(
False,
"--containerize",
help="Containerize the dynamo pipeline after building.",
),
) -> None:
"""Packages Dynamo service for deployment. Optionally builds a docker container."""
from dynamo.sdk.cli.utils import configure_target_environment
configure_target_environment(TargetEnum.DYNAMO)
# Determine output directory
if output_dir is None:
# Default to ~/.dynamo/packages/service_name
graph_name = service.rsplit(":", 1)[-1].lower()
dynamo_tag = generate_random_tag()
output_dir = str(Path.home() / ".dynamo" / "packages" / graph_name / dynamo_tag)
output_path = Path(output_dir)
# Check if output directory exists
if output_path.exists() and not force:
console.print(
f"[bold red]Output directory {output_dir} already exists. Use --force to overwrite.[/]"
)
raise typer.Exit(1)
source_dir = output_path / "src"
source_dir.mkdir(exist_ok=True, parents=True)
build_ctx = "."
build_config = BuildConfig(
service=service,
tag=dynamo_tag,
)
try:
# Create the package
package = Package.create(
build_config=build_config,
version=dynamo_tag,
build_ctx=build_ctx,
)
# Copy to output directory
with Progress(
SpinnerColumn(),
TextColumn(f"[bold green]Copying package to {output_dir}..."),
transient=True,
) as progress:
progress.add_task("copy", total=None)
for item in os.listdir(package.path):
s = os.path.join(package.path, item)
d = os.path.join(source_dir, item)
if os.path.isdir(s):
shutil.copytree(s, d, dirs_exist_ok=True)
else:
shutil.copy2(s, d)
# Update package path
package.path = output_dir
package.generate_manifests()
console.print(DYNAMO_FIGLET)
console.print(f"[green]Successfully built {package.tag}.")
console.print(f"[green]Output directory: {output_dir}")
next_steps = []
if not containerize:
next_steps.append(
"\n\n* Containerize your Dynamo pipeline with "
"`dynamo build --containerize <service_name>`:\n"
f" $ dynamo build --containerize {service}"
)
if next_steps:
console.print(f"\n[blue]Next steps: {''.join(next_steps)}[/]")
docker_dir = output_path / "env" / "docker"
docker_dir.mkdir(exist_ok=True, parents=True)
docker_file = docker_dir / "Dockerfile"
dockerfile_content = Package._get_dockerfile_template(DYNAMO_IMAGE)
with open(docker_file, "w") as f:
f.write(dockerfile_content)
if containerize:
# Generate Dockerfile next to dynamo.yaml using template
# Build Docker image
image_name = f"{package.tag.name}:{package.tag.version}"
with Progress(
SpinnerColumn(),
TextColumn(f"[bold green]Building Docker image {image_name}..."),
transient=True,
) as progress:
progress.add_task("docker", total=None)
subprocess.run(
[
"docker",
"build",
"-t",
image_name,
"-f",
str(docker_file),
output_path,
],
check=True,
)
console.print(f"[green]Successfully built Docker image {image_name}.")
except Exception as e:
console.print(f"[red]Error building package: {str(e)}")
raise
def generate_random_tag() -> str:
"""Generate a random tag for the Dynamo pipeline."""
return f"{uuid.uuid4().hex[:8]}"
if __name__ == "__main__":
typer.run(build)
...@@ -22,10 +22,11 @@ import importlib.metadata ...@@ -22,10 +22,11 @@ import importlib.metadata
import typer import typer
from rich.console import Console from rich.console import Console
from dynamo.sdk.cli.build import build
from dynamo.sdk.cli.deployment import app as deployment_app from dynamo.sdk.cli.deployment import app as deployment_app
from dynamo.sdk.cli.deployment import deploy from dynamo.sdk.cli.deployment import deploy
from dynamo.sdk.cli.env import env from dynamo.sdk.cli.env import env
from dynamo.sdk.cli.pipeline import build, get from dynamo.sdk.cli.pipeline import get
from dynamo.sdk.cli.run import run from dynamo.sdk.cli.run import run
from dynamo.sdk.cli.serve import serve from dynamo.sdk.cli.serve import serve
......
...@@ -140,7 +140,7 @@ def _handle_deploy_create( ...@@ -140,7 +140,7 @@ def _handle_deploy_create(
# TODO: hardcoding this is a hack to get the services for the deployment # TODO: hardcoding this is a hack to get the services for the deployment
# we should find a better way to do this once build is finished/generic # we should find a better way to do this once build is finished/generic
configure_target_environment(TargetEnum.BENTO) configure_target_environment(TargetEnum.DYNAMO)
entry_service = load_entry_service(pipeline) entry_service = load_entry_service(pipeline)
deployment_manager = get_deployment_manager(target, endpoint) deployment_manager = get_deployment_manager(target, endpoint)
......
...@@ -164,7 +164,7 @@ def serve( ...@@ -164,7 +164,7 @@ def serve(
sys.path.insert(0, working_dir_str) sys.path.insert(0, working_dir_str)
svc = find_and_load_service(dynamo_pipeline, working_dir=working_dir) svc = find_and_load_service(dynamo_pipeline, working_dir=working_dir)
logger.info(f"Loaded service: {svc.name}") logger.debug(f"Loaded service: {svc.name}")
logger.debug("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()]) logger.debug("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()])
LinkedServices.remove_unused_edges() LinkedServices.remove_unused_edges()
......
...@@ -363,11 +363,7 @@ def resolve_service_config( ...@@ -363,11 +363,7 @@ def resolve_service_config(
def configure_target_environment(target: TargetEnum): def configure_target_environment(target: TargetEnum):
from dynamo.sdk.core.lib import set_target from dynamo.sdk.core.lib import set_target
if target == TargetEnum.BENTO: if target == TargetEnum.DYNAMO:
from dynamo.sdk.core.runner.bentoml import BentoDeploymentTarget
target = BentoDeploymentTarget()
elif target == TargetEnum.DYNAMO:
from dynamo.sdk.core.runner.dynamo import LocalDeploymentTarget from dynamo.sdk.core.runner.dynamo import LocalDeploymentTarget
target = LocalDeploymentTarget() target = LocalDeploymentTarget()
...@@ -393,7 +389,7 @@ def is_local_planner_enabled(svc: Any, service_configs: dict) -> bool: ...@@ -393,7 +389,7 @@ def is_local_planner_enabled(svc: Any, service_configs: dict) -> bool:
planners = [ planners = [
node node
for node in nodes for node in nodes
if node.config.get("dynamo", {}).get("component_type") == ComponentType.PLANNER if node.config.dynamo.component_type == ComponentType.PLANNER
] ]
if len(planners) > 1: if len(planners) > 1:
...@@ -429,7 +425,7 @@ def raise_local_planner_warning(svc: Any, service_configs: dict) -> None: ...@@ -429,7 +425,7 @@ def raise_local_planner_warning(svc: Any, service_configs: dict) -> None:
nodes.append(svc) nodes.append(svc)
worker_names = ("PrefillWorker", "VllmWorker") worker_names = ("PrefillWorker", "VllmWorker")
worker_counts_greater_than_one = [ worker_counts_greater_than_one = [
node.config.get("workers", 1) > 1 for node in nodes if node.name in worker_names node.config.workers > 1 for node in nodes if node.name in worker_names
] ]
if any(worker_counts_greater_than_one) and not no_op: if any(worker_counts_greater_than_one) and not no_op:
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
# limitations under the License. # limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES # Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import logging
import os import os
from typing import Any, Callable, Dict, Optional, Type, TypeVar, Union from typing import Any, Callable, Optional, Type, TypeVar
from fastapi import FastAPI from fastapi import FastAPI
from dynamo.sdk.core.protocol.interface import ( from dynamo.sdk.core.protocol.interface import (
DependencyInterface, DependencyInterface,
DeploymentTarget, DeploymentTarget,
DynamoConfig,
ServiceConfig, ServiceConfig,
ServiceInterface, ServiceInterface,
) )
...@@ -33,6 +33,7 @@ G = TypeVar("G", bound=Callable[..., Any]) ...@@ -33,6 +33,7 @@ G = TypeVar("G", bound=Callable[..., Any])
# this should be set to a concrete implementation of the DeploymentTarget interface # this should be set to a concrete implementation of the DeploymentTarget interface
_target: DeploymentTarget _target: DeploymentTarget
logger = logging.getLogger(__name__)
DYNAMO_IMAGE = os.getenv("DYNAMO_IMAGE", "dynamo:latest-vllm") DYNAMO_IMAGE = os.getenv("DYNAMO_IMAGE", "dynamo:latest-vllm")
...@@ -49,36 +50,25 @@ def get_target() -> DeploymentTarget: ...@@ -49,36 +50,25 @@ def get_target() -> DeploymentTarget:
return _target return _target
# TODO: dynamo_component
def service( def service(
inner: Optional[Type[G]] = None, inner: Optional[Type[G]] = None,
/, /,
*, *,
dynamo: Optional[Union[Dict[str, Any], DynamoConfig]] = None,
app: Optional[FastAPI] = None, app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None, system_app: Optional[FastAPI] = None,
**kwargs: Any, **kwargs: Any,
) -> Any: ) -> Any:
"""Service decorator that's adapter-agnostic""" """Service decorator that's adapter-agnostic"""
config = ServiceConfig(kwargs) config = ServiceConfig(**kwargs)
# Parse dict into DynamoConfig object logger.info(f"inner: {inner} config: {config}")
dynamo_config: Optional[DynamoConfig] = None
if dynamo is not None:
if isinstance(dynamo, dict):
dynamo_config = DynamoConfig(**dynamo)
else:
dynamo_config = dynamo
assert isinstance(dynamo_config, DynamoConfig)
def decorator(inner: Type[G]) -> ServiceInterface[G]: def decorator(inner: Type[G]) -> ServiceInterface[G]:
provider = get_target() provider = get_target()
if inner is not None: if inner is not None:
dynamo_config.name = inner.__name__ config.dynamo.name = inner.__name__
return provider.create_service( return provider.create_service(
service_cls=inner, service_cls=inner,
config=config, config=config,
dynamo_config=dynamo_config,
app=app, app=app,
system_app=system_app, system_app=system_app,
**kwargs, **kwargs,
......
...@@ -97,12 +97,16 @@ class DeploymentStatus(str, Enum): ...@@ -97,12 +97,16 @@ class DeploymentStatus(str, Enum):
@dataclass @dataclass
class ScalingPolicy: class ScalingPolicy:
"""Scaling policy."""
policy: str policy: str
parameters: t.Dict[str, t.Union[int, float, str]] = field(default_factory=dict) parameters: t.Dict[str, t.Union[int, float, str]] = field(default_factory=dict)
@dataclass @dataclass
class Env: class Env:
"""Environment variable."""
name: str name: str
value: str = "" value: str = ""
......
...@@ -16,17 +16,39 @@ ...@@ -16,17 +16,39 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar
from fastapi import FastAPI from fastapi import FastAPI
from pydantic import BaseModel
from dynamo.sdk.core.protocol.deployment import Env from dynamo.sdk.core.protocol.deployment import Env
T = TypeVar("T", bound=object) T = TypeVar("T", bound=object)
class LeaseConfig(BaseModel):
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class ComponentType:
"""Types of Dynamo components"""
PLANNER = "planner"
class DynamoConfig(BaseModel):
"""Configuration for Dynamo components"""
enabled: bool = True
name: str | None = None
namespace: str | None = None
custom_lease: LeaseConfig | None = None
component_type: str | None = None # Indicates if this is a meta/system component
class DynamoTransport(Enum): class DynamoTransport(Enum):
"""Transport types supported by Dynamo services""" """Transport types supported by Dynamo services"""
...@@ -34,10 +56,23 @@ class DynamoTransport(Enum): ...@@ -34,10 +56,23 @@ class DynamoTransport(Enum):
HTTP = auto() HTTP = auto()
class ServiceConfig(Dict[str, Any]): class ResourceConfig(BaseModel):
"""Configuration for Dynamo resources"""
cpu: int = 1
memory: str = "100Mi"
gpu: str = "0"
class ServiceConfig(BaseModel):
"""Base service configuration that can be extended by adapters""" """Base service configuration that can be extended by adapters"""
pass dynamo: DynamoConfig
resource: ResourceConfig = ResourceConfig()
workers: int = 1
image: str | None = None
envs: List[Env] | None = None
labels: Dict[str, str] | None = None
class DynamoEndpointInterface(ABC): class DynamoEndpointInterface(ABC):
...@@ -157,30 +192,6 @@ class ServiceInterface(Generic[T], ABC): ...@@ -157,30 +192,6 @@ class ServiceInterface(Generic[T], ABC):
raise NotImplementedError() raise NotImplementedError()
@dataclass
class LeaseConfig:
"""Configuration for custom dynamo leases"""
ttl: int = 1 # seconds
class ComponentType:
"""Types of Dynamo components"""
PLANNER = "planner"
@dataclass
class DynamoConfig:
"""Configuration for Dynamo components"""
enabled: bool = True
name: str | None = None
namespace: str | None = None
custom_lease: LeaseConfig | None = None
component_type: str | None = None # Indicates if this is a meta/system component
class DeploymentTarget(ABC): class DeploymentTarget(ABC):
"""Interface for service provider implementations""" """Interface for service provider implementations"""
...@@ -189,7 +200,6 @@ class DeploymentTarget(ABC): ...@@ -189,7 +200,6 @@ class DeploymentTarget(ABC):
self, self,
service_cls: Type[T], service_cls: Type[T],
config: ServiceConfig, config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None, app: Optional[FastAPI] = None,
**kwargs, **kwargs,
) -> ServiceInterface[T]: ) -> ServiceInterface[T]:
......
...@@ -19,7 +19,6 @@ import logging ...@@ -19,7 +19,6 @@ import logging
import os import os
import shlex import shlex
import sys import sys
from dataclasses import asdict
from typing import Any, Dict, List, Optional, Set, Type, TypeVar from typing import Any, Dict, List, Optional, Set, Type, TypeVar
import psutil import psutil
...@@ -33,7 +32,6 @@ from dynamo.sdk.core.protocol.deployment import Env ...@@ -33,7 +32,6 @@ from dynamo.sdk.core.protocol.deployment import Env
from dynamo.sdk.core.protocol.interface import ( from dynamo.sdk.core.protocol.interface import (
DependencyInterface, DependencyInterface,
DeploymentTarget, DeploymentTarget,
DynamoConfig,
DynamoEndpointInterface, DynamoEndpointInterface,
DynamoTransport, DynamoTransport,
LinkedServices, LinkedServices,
...@@ -71,20 +69,15 @@ class LocalService(ServiceMixin, ServiceInterface[T]): ...@@ -71,20 +69,15 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
self, self,
inner_cls: Type[T], inner_cls: Type[T],
config: ServiceConfig, config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
watcher: Optional[Watcher] = None, watcher: Optional[Watcher] = None,
socket: Optional[CircusSocket] = None, socket: Optional[CircusSocket] = None,
app: Optional[FastAPI] = None, app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None, system_app: Optional[FastAPI] = None,
): ):
self._inner_cls = inner_cls self._inner_cls = inner_cls
self._config = config
name = inner_cls.__name__ name = inner_cls.__name__
self._dynamo_config = dynamo_config or DynamoConfig(
name=name, namespace="default"
)
# Add the dynamo config to the service config # Add the dynamo config to the service config
self._config["dynamo"] = asdict(self._dynamo_config) self._config = config
self._watcher = watcher self._watcher = watcher
self._socket = socket self._socket = socket
self.app = app or FastAPI(title=name) self.app = app or FastAPI(title=name)
...@@ -120,7 +113,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]): ...@@ -120,7 +113,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
@property @property
def envs(self) -> List[Env]: def envs(self) -> List[Env]:
return self._config.get("envs", []) return self._config.envs or []
@property @property
def inner(self) -> Type[T]: def inner(self) -> Type[T]:
...@@ -148,7 +141,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]): ...@@ -148,7 +141,7 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
del self._dependencies[dep_key] del self._dependencies[dep_key]
def dynamo_address(self) -> tuple[str, str]: def dynamo_address(self) -> tuple[str, str]:
return (self._dynamo_config.namespace, self._dynamo_config.name) return (self._config.dynamo.namespace, self._config.dynamo.name)
@property @property
def dependencies(self) -> dict[str, "DependencyInterface"]: def dependencies(self) -> dict[str, "DependencyInterface"]:
...@@ -217,7 +210,6 @@ class LocalDeploymentTarget(DeploymentTarget): ...@@ -217,7 +210,6 @@ class LocalDeploymentTarget(DeploymentTarget):
self, self,
service_cls: Type[T], service_cls: Type[T],
config: ServiceConfig, config: ServiceConfig,
dynamo_config: Optional[DynamoConfig] = None,
app: Optional[FastAPI] = None, app: Optional[FastAPI] = None,
system_app: Optional[FastAPI] = None, system_app: Optional[FastAPI] = None,
**kwargs, **kwargs,
...@@ -261,7 +253,6 @@ class LocalDeploymentTarget(DeploymentTarget): ...@@ -261,7 +253,6 @@ class LocalDeploymentTarget(DeploymentTarget):
return LocalService( return LocalService(
inner_cls=service_cls, inner_cls=service_cls,
config=config, config=config,
dynamo_config=dynamo_config,
watcher=watcher, watcher=watcher,
socket=socket, socket=socket,
) )
......
...@@ -208,7 +208,7 @@ def _get_dir_size(path: str) -> int: ...@@ -208,7 +208,7 @@ def _get_dir_size(path: str) -> int:
def load_entry_service( def load_entry_service(
pipeline_tag: str, build_dir: str = "~/bentoml/bentos" pipeline_tag: str, build_dir: str = "~/.dynamo/packages"
) -> Service: ) -> Service:
""" """
Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance. Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance.
...@@ -220,7 +220,7 @@ def load_entry_service( ...@@ -220,7 +220,7 @@ def load_entry_service(
if not os.path.isdir(graph_dir): if not os.path.isdir(graph_dir):
raise FileNotFoundError(f"Pipeline directory not found: {graph_dir}") raise FileNotFoundError(f"Pipeline directory not found: {graph_dir}")
config_path = os.path.join(graph_dir, "bento.yaml") config_path = os.path.join(graph_dir, "dynamo.yaml")
if not os.path.isfile(config_path): if not os.path.isfile(config_path):
raise FileNotFoundError( raise FileNotFoundError(
f"Pipeline config (bento.yaml) not found in {graph_dir}" f"Pipeline config (bento.yaml) not found in {graph_dir}"
......
...@@ -23,14 +23,12 @@ pytestmark = pytest.mark.pre_merge ...@@ -23,14 +23,12 @@ pytestmark = pytest.mark.pre_merge
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
def setup_and_teardown(): def setup_and_teardown():
configure_target_environment(TargetEnum.BENTO)
yield
configure_target_environment(TargetEnum.DYNAMO) configure_target_environment(TargetEnum.DYNAMO)
yield
def test_gpu_resources(setup_and_teardown): def test_gpu_resources(setup_and_teardown):
"""Test resource configurations""" """Test resource configurations"""
from _bentoml_sdk import Service as BentoService
from dynamo.sdk import service from dynamo.sdk import service
...@@ -42,7 +40,4 @@ def test_gpu_resources(setup_and_teardown): ...@@ -42,7 +40,4 @@ def test_gpu_resources(setup_and_teardown):
def __init__(self) -> None: def __init__(self) -> None:
pass pass
svc: BentoService = MyService.get_bentoml_service() # type: ignore assert MyService.config is not None # type: ignore
assert svc.config["resources"]["cpu"] == "2"
assert svc.config["resources"]["gpu"] == "1"
assert svc.config["resources"]["memory"] == "4Gi"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment