Unverified Commit f27cdbcb authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

refactor: transition CLI to use typer for UX and testing (#703)


Co-authored-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent 4d02a463
This diff is collapsed.
...@@ -19,66 +19,70 @@ from __future__ import annotations ...@@ -19,66 +19,70 @@ from __future__ import annotations
import importlib.metadata import importlib.metadata
import click import typer
import psutil from rich.console import Console
from dynamo.sdk.cli.cloud import app as cloud_app
def create_bentoml_cli() -> click.Command: from dynamo.sdk.cli.deployment import app as deployment_app
from bentoml._internal.context import server_context from dynamo.sdk.cli.deployment import deploy
from dynamo.sdk.cli.env import env
# from bentoml_cli.cloud import cloud_command from dynamo.sdk.cli.pipeline import build, get
# from bentoml_cli.containerize import containerize_command from dynamo.sdk.cli.run import run
from bentoml_cli.utils import get_entry_points from dynamo.sdk.cli.serve import serve
from dynamo.sdk.cli.bentos import bento_command console = Console()
from dynamo.sdk.cli.cloud import cloud_command
from dynamo.sdk.cli.deployment import deploy_command, deployment_command cli = typer.Typer(
from dynamo.sdk.cli.env import env_command context_settings={"help_option_names": ["-h", "--help"]},
from dynamo.sdk.cli.run import run_command name="dynamo",
from dynamo.sdk.cli.serve import serve_command no_args_is_help=True,
from dynamo.sdk.cli.utils import DynamoCommandGroup pretty_exceptions_enable=False,
)
server_context.service_type = "cli"
dynamo_version = importlib.metadata.version("ai-dynamo")
def version_callback(value: bool):
CONTEXT_SETTINGS = {"help_option_names": ("-h", "--help")} if value:
version = importlib.metadata.version("ai-dynamo")
@click.group(cls=DynamoCommandGroup, context_settings=CONTEXT_SETTINGS) console.print(
@click.version_option(dynamo_version, "-v", "--version") f"[bold green]Dynamo CLI[/bold green] version: [cyan]{version}[/cyan]"
def bentoml_cli(): # TODO: to be renamed to something.... )
""" raise typer.Exit()
The Dynamo CLI is a CLI for serving, containerizing, and deploying Dynamo applications.
It takes inspiration from and leverages core pieces of the BentoML deployment stack.
@cli.callback()
At a high level, you use `serve` to run a set of dynamo services locally, def main(
`build` and `containerize` to package them up for deployment, and then `cloud` version: bool = typer.Option(
and `deploy` to deploy them to a K8s cluster running the Dynamo Cloud Server False,
""" "--version",
"-v",
# Add top-level CLI commands help="Show the application version and exit.",
bentoml_cli.add_command(cloud_command) callback=version_callback,
bentoml_cli.add_single_command(bento_command, "build") is_eager=True,
bentoml_cli.add_single_command(bento_command, "get") ),
bentoml_cli.add_subcommands(serve_command) ):
bentoml_cli.add_subcommands(run_command) """
bentoml_cli.add_command(deploy_command) The Dynamo CLI is a CLI for serving, containerizing, and deploying Dynamo applications.
# bentoml_cli.add_command(containerize_command) It takes inspiration from and leverages core pieces of the BentoML deployment stack.
bentoml_cli.add_command(deployment_command)
bentoml_cli.add_command(env_command) At a high level, you use `serve` to run a set of dynamo services locally,
`build` and `containerize` to package them up for deployment, and then `cloud`
# Load commands from extensions and `deploy` to deploy them to a K8s cluster running the Dynamo Cloud
for ep in get_entry_points("bentoml.commands"): """
bentoml_cli.add_command(ep.load())
if psutil.WINDOWS: cli.command()(env)
import sys cli.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
sys.stdout.reconfigure(encoding="utf-8") # type: ignore )(serve)
cli.command(
return bentoml_cli context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
add_help_option=False,
)(run)
cli = create_bentoml_cli() cli.add_typer(cloud_app, name="cloud")
cli.add_typer(deployment_app, name="deployment")
cli.command()(deploy)
cli.command()(build)
cli.command()(get)
if __name__ == "__main__": if __name__ == "__main__":
cli() cli()
...@@ -17,115 +17,64 @@ ...@@ -17,115 +17,64 @@
from __future__ import annotations from __future__ import annotations
import sys
import click
import rich import rich
import typer
from bentoml._internal.cloud.client import RestApiClient from bentoml._internal.cloud.client import RestApiClient
from bentoml._internal.cloud.config import ( from bentoml._internal.cloud.config import CloudClientConfig, CloudClientContext
DEFAULT_ENDPOINT,
CloudClientConfig,
CloudClientContext,
)
from bentoml._internal.configuration.containers import BentoMLContainer from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.utils import add_experimental_docstring
from bentoml._internal.utils.cattr import bentoml_cattr
from bentoml.exceptions import CLIException, CloudRESTApiClientError from bentoml.exceptions import CLIException, CloudRESTApiClientError
app = typer.Typer(
help="Interact with your Dynamo Cloud Server",
add_completion=True,
no_args_is_help=True,
)
console = rich.console.Console()
def build_cloud_command() -> click.Group:
@click.group(name="cloud")
@add_experimental_docstring
def cloud_command():
"""Interact with your Dynamo Cloud Server"""
@cloud_command.command() @app.command()
@click.option( def login(
"--endpoint", endpoint: str = typer.Argument(
type=click.STRING, ..., help="Dynamo Cloud endpoint", envvar="DYNAMO_CLOUD_API_ENDPOINT"
help="Dynamo Cloud endpoint",
default=DEFAULT_ENDPOINT,
envvar="DYNAMO_CLOUD_API_ENDPOINT",
show_default=True,
show_envvar=True,
required=True,
)
@click.option(
"--api-token",
type=click.STRING,
help="Dynamo Cloud user API token",
envvar="DYNAMO_CLOUD_API_KEY",
show_envvar=True,
required=True,
) )
def login(endpoint: str, api_token: str) -> None: # type: ignore ) -> None:
"""Connect to your Dynamo Cloud. You can find deployment instructions for this in our docs""" """Connect to your Dynamo Cloud. You can find deployment instructions for this in our docs"""
try: try:
cloud_rest_client = RestApiClient(endpoint, api_token) api_token = "" # Using empty string for now as it's not used
user = cloud_rest_client.v1.get_current_user() cloud_rest_client = RestApiClient(endpoint, api_token)
user = cloud_rest_client.v1.get_current_user()
if user is None: if user is None:
raise CLIException("current user is not found") raise CLIException("current user is not found")
org = cloud_rest_client.v1.get_current_organization() org = cloud_rest_client.v1.get_current_organization()
if org is None: if org is None:
raise CLIException("current organization is not found") raise CLIException("current organization is not found")
current_context_name = CloudClientConfig.get_config().current_context_name current_context_name = CloudClientConfig.get_config().current_context_name
cloud_context = BentoMLContainer.cloud_context.get() cloud_context = BentoMLContainer.cloud_context.get()
ctx = CloudClientContext(
name=cloud_context
if cloud_context is not None
else current_context_name,
endpoint=endpoint,
api_token=api_token,
email=user.email,
)
ctx.save()
rich.print(
f":white_check_mark: Configured Dynamo Cloud credentials (current-context: {ctx.name})"
)
rich.print(
f":white_check_mark: Logged in as [blue]{user.email}[/] at [blue]{org.name}[/] organization"
)
except CloudRESTApiClientError as e:
if e.error_code == 401:
rich.print(
f":police_car_light: Error validating token: HTTP 401: Bad credentials ({endpoint}/api-token)",
file=sys.stderr,
)
else:
rich.print(
f":police_car_light: Error validating token: HTTP {e.error_code}",
file=sys.stderr,
)
@cloud_command.command() ctx = CloudClientContext(
def current_context() -> None: # type: ignore name=cloud_context if cloud_context is not None else current_context_name,
"""Get current cloud context.""" endpoint=endpoint,
rich.print_json( api_token=api_token,
data=bentoml_cattr.unstructure(CloudClientConfig.get_config().get_context()) email=user.email,
) )
@cloud_command.command() ctx.save()
def list_context() -> None: # type: ignore console.print(
"""List all available context.""" f":white_check_mark: Configured Dynamo Cloud credentials (current-context: {ctx.name})"
config = CloudClientConfig.get_config()
rich.print_json(
data=bentoml_cattr.unstructure([i.name for i in config.contexts])
) )
console.print(
@cloud_command.command() f":white_check_mark: Logged in as [blue]{user.email}[/] at [blue]{org.name}[/] organization"
@click.argument("context_name", type=click.STRING) )
def update_current_context(context_name: str) -> None: # type: ignore except CloudRESTApiClientError as e:
"""Update current context""" if e.error_code == 401:
ctx = CloudClientConfig.get_config().set_current_context(context_name) console.print(
rich.print(f"Successfully switched to context: {ctx.name}") f":police_car_light: Error validating token: HTTP 401: Bad credentials ({endpoint}/api-token)"
)
return cloud_command else:
console.print(
f":police_car_light: Error validating token: HTTP {e.error_code}"
cloud_command = build_cloud_command() )
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import json
import logging
import os
import sys
import time
import typing as t
from datetime import datetime
import click
from bentoml._internal.cloud.base import Spinner
from bentoml.exceptions import BentoMLException, CLIException
from rich.console import Console
from simple_di import inject
from dynamo.sdk.cli.deployment import DynamoDeployment
logger = logging.getLogger(__name__)
@click.group(name="deploy")
def deploy_command_group():
"""Deploy to a cluster"""
pass
def convert_env_to_dict(env: tuple[str, ...] | None) -> list[dict[str, str]] | None:
if env is None:
return None
collected_envs: list[dict[str, str]] = []
if env:
for item in env:
if "=" in item:
name, value = item.split("=", 1)
else:
name = item
if name not in os.environ:
raise CLIException(f"Environment variable {name} not found")
value = os.environ[name]
collected_envs.append({"name": name, "value": value})
return collected_envs
def build_deploy_command() -> click.Command:
from bentoml._internal.utils import add_experimental_docstring
@click.command(name="deploy")
@click.argument("bento", type=click.STRING, default=".")
@click.option("-n", "--name", type=click.STRING, help="Deployment name")
@click.option(
"--namespace",
type=click.STRING,
default="default",
help="Kubernetes namespace to deploy to",
)
@click.option(
"--scaling-min",
type=click.INT,
default=1,
show_default=True,
help="Minimum scaling value",
)
@click.option(
"--scaling-max",
type=click.INT,
default=5,
show_default=True,
help="Maximum scaling value",
)
@click.option("--instance-type", type=click.STRING, help="Type of instance")
@click.option(
"--env",
type=click.STRING,
multiple=True,
default=[],
help="Environment variables in key=value format",
)
@click.option("--secret", type=click.STRING, multiple=True, help="Secret names")
@click.option(
"-f",
"--config-file",
type=click.Path(exists=True, dir_okay=False, readable=True),
help="Configuration file path",
)
@click.option(
"--wait/--no-wait", default=True, help="Wait for deployment to be ready"
)
@click.option(
"--timeout",
type=click.INT,
default=600,
help="Timeout for deployment readiness in seconds",
)
@click.option(
"--working-dir",
type=click.Path(),
default=None,
show_default=True,
help="Directory to find the Service instance",
)
@click.option(
"--access-authorization", type=click.BOOL, default=False, show_default=True
)
@click.option("--strategy", type=click.STRING, default="rolling-update")
@click.option("--version", type=click.STRING, help="Version tag for the Bento")
@add_experimental_docstring
def deploy_command(
bento: str | None,
name: str | None,
namespace: str | None = "default",
access_authorization: bool | None = False,
scaling_min: int | None = 1,
scaling_max: int | None = 5,
instance_type: str | None = None,
strategy: str | None = "rolling-update",
env: tuple[str, ...] | None = None,
secret: tuple[str] | None = None,
config_file: str | t.TextIO | None = None,
config_dict: str | None = None,
wait: bool = True,
timeout: int = 600,
working_dir: str | None = None,
version: str | None = None,
):
"""
Deploy a set of Dynamo services in a Bento to a K8s cluster
\b
BENTO is the serving target, it can be:
- a tag to a Bento in local Bento store
- a path to a built Bento
"""
from dynamo.sdk.lib.logging import configure_server_logging
configure_server_logging()
# Fix handling of None values
if working_dir is None:
if bento is not None and os.path.isdir(os.path.expanduser(bento)):
working_dir = os.path.expanduser(bento)
else:
working_dir = "."
# Make sure working_dir is in the front of sys.path for imports
if sys.path[0] != working_dir:
sys.path.insert(0, working_dir)
# Load the Bento to validate
import bentoml
from bentoml._internal.service.loader import load
# Check if the bento exists in the local store
bento_exists = False
bento_tag = None
try:
bentos = bentoml.list()
bento_tags = [str(b.tag) for b in bentos]
bento_exists = bento in bento_tags
if bento_exists:
bento_tag = bento
logger.debug("Verified Bento exists: %s", bento_tag)
else:
# If not a tag, check if it's a path to a built Bento
if bento is not None and os.path.isdir(bento):
service_name = os.path.basename(os.path.abspath(bento))
bento_version = (
version or f"v{datetime.now().strftime('%Y%m%d%H%M%S')}"
)
bento_tag = f"{service_name}:{bento_version}"
logger.debug(
"Using Bento from directory: %s with tag: %s", bento, bento_tag
)
else:
raise click.ClickException(
f"Invalid Bento reference: {bento}. Ensure it's a valid Bento tag or directory."
)
except Exception as exception_var:
logger.error("Bento validation failed:", exc_info=True)
raise click.ClickException(
f"Failed to validate bento: {str(exception_var)}"
)
# Load the service to validate it
svc = load(bento_identifier=bento_tag, working_dir=working_dir)
print(f"Service loaded: {svc}")
create_dynamo_deployment(
bento=bento_tag,
name=name,
namespace=namespace,
access_authorization=access_authorization,
scaling_min=scaling_min,
scaling_max=scaling_max,
instance_type=instance_type,
strategy=strategy,
env=env,
secret=secret,
config_file=config_file,
config_dict=config_dict,
wait=wait,
timeout=timeout,
)
return deploy_command
deploy_command = build_deploy_command()
@inject
def create_dynamo_deployment(
bento: str | None = None,
name: str | None = None,
namespace: str | None = "default",
access_authorization: bool | None = None,
scaling_min: int | None = None,
scaling_max: int | None = None,
instance_type: str | None = None,
strategy: str | None = None,
env: tuple[str, ...] | None = None,
secret: tuple[str] | None = None,
config_file: str | t.TextIO | None = None,
config_dict: str | None = None,
wait: bool = True,
timeout: int = 3600,
dev: bool = False,
) -> DynamoDeployment:
from bentoml._internal.cloud.deployment import DeploymentConfigParameters
from bentoml_cli.deployment import raise_deployment_config_error
from kubernetes import client, config
cfg_dict = None
if config_dict is not None and config_dict != "":
cfg_dict = json.loads(config_dict)
config_params = DeploymentConfigParameters(
name=name,
bento=bento,
cluster=namespace,
access_authorization=access_authorization,
scaling_max=scaling_max,
scaling_min=scaling_min,
instance_type=instance_type,
strategy=strategy,
envs=convert_env_to_dict(tuple(env) if env else None),
secrets=list(secret) if secret is not None else None,
config_file=config_file,
config_dict=cfg_dict,
cli=True,
dev=dev,
)
try:
config_params.verify()
except BentoMLException as exc:
error_message = str(exc)
raise_deployment_config_error(error_message, "create")
# Fix the deployment name generation
deployment_name = name
if deployment_name is None and bento is not None:
deployment_name = f"{bento.replace(':', '-').replace('/', '-')}-deployment"
print(f"Deployment name: {deployment_name}")
# Create the deployment object
deployment = DynamoDeployment.create_deployment(
deployment_name=deployment_name,
namespace=namespace,
config=config,
)
# Convert env tuple to k8s env format
env_vars = []
if env:
for e in env:
if "=" in e:
k, v = e.split("=", 1)
env_vars.append({"name": k, "value": v})
# Get the CRD payload
crd_payload = deployment.get_crd_payload(
bento=bento,
scaling_min=scaling_min or 1,
scaling_max=scaling_max or 5,
instance_type=instance_type,
env_vars=env_vars,
secret=list(secret) if secret else [],
)
console = Console(highlight=False)
with Spinner(console=console) as spinner:
try:
spinner.update("Creating deployment via Kubernetes operator")
config.load_kube_config()
api = client.CustomObjectsApi()
# Create the CRD
group = "nvidia.com"
version = "v1alpha1"
plural = "dynamodeployments"
created_crd = api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=crd_payload,
)
# Add validation and logging for the created CRD
if not created_crd:
raise click.ClickException(
"Failed to create deployment: No response from Kubernetes API"
)
logger.debug("Created CRD: %s", json.dumps(created_crd, indent=2))
spinner.log(
f':white_check_mark: Created deployment "{deployment_name}" in namespace "{namespace}"'
)
if wait:
spinner.update("Waiting for deployment to become ready")
start_time = time.time()
while time.time() - start_time < timeout:
# Check status using Kubernetes API
status_data = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=deployment_name,
)
state = status_data.get("status", {}).get("state", "Pending")
conditions = status_data.get("status", {}).get("conditions", [])
events = status_data.get("status", {}).get("events", [])
logger.debug(f"Current deployment state: {state}")
logger.debug(f"Conditions: {json.dumps(conditions, indent=2)}")
logger.debug(f"Events: {json.dumps(events, indent=2)}")
logger.debug(f"Time elapsed: {int(time.time() - start_time)}s")
logger.debug(
f"Full status: {json.dumps(status_data.get('status', {}), indent=2)}"
)
# Check for successful states
if state.lower() in [
"running",
"ready",
"active",
"available",
"complete",
]:
if deployment.ingress_url:
spinner.log("[bold green]Deployment ready![/]")
spinner.log(
f"[bold]Ingress URL: {deployment.ingress_url}[/]"
)
else:
spinner.log("[bold green]Deployment ready![/]")
return deployment
# Check for failed states
elif state.lower() in [
"failed",
"error",
"unavailable",
"degraded",
]:
error_message = status_data.get("message", "Unknown error")
if conditions:
error_message = next(
(
c.get("message", error_message)
for c in conditions
if c.get("type", "").lower() == "failed"
),
error_message,
)
# Add more detailed error logging
logger.error(f"Deployment failed with state: {state}")
logger.error(f"Error message: {error_message}")
logger.error(f"Conditions: {json.dumps(conditions, indent=2)}")
logger.error(f"Events: {json.dumps(events, indent=2)}")
raise click.ClickException(
f"Deployment failed: {error_message}\n"
f"State: {state}\n"
f"Conditions: {json.dumps(conditions, indent=2)}\n"
f"Events: {json.dumps(events, indent=2)}"
)
time.sleep(5)
if time.time() - start_time >= timeout:
# Check if deployment exists but we just timed out waiting
try:
final_status = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=deployment_name,
)
if final_status.get("status", {}).get("state", "").lower() in [
"running",
"ready",
"active",
"available",
"complete",
]:
if deployment.ingress_url:
spinner.log("[bold green]Deployment ready![/]")
spinner.log(
f"[bold]Ingress URL: {deployment.ingress_url}[/]"
)
else:
spinner.log("[bold green]Deployment ready![/]")
return deployment
except Exception as e:
logger.error("Timeout check failed", exc_info=True)
# Add timeout debug information
pods = api.list_namespaced_pod(
namespace, label_selector=f"app={deployment_name}"
)
logger.error(
f"Timeout reached. Pod statuses: {json.dumps([p.status for p in pods.items], indent=2)}"
)
raise click.ClickException(
f"Deployment timeout reached\n"
f"Pod statuses: {json.dumps([p.status for p in pods.items], indent=2)}"
)
return deployment
except Exception as e:
logger.error("Deployment failed", exc_info=True)
spinner.log(f"[bold red]Deployment failed: {str(e)}[/]")
raise SystemExit(1)
deploy_command = build_deploy_command()
...@@ -23,12 +23,12 @@ import re ...@@ -23,12 +23,12 @@ import re
import sys import sys
import typing as t import typing as t
from http import HTTPStatus from http import HTTPStatus
from typing import Any, Dict, List, Optional, TextIO
import click import typer
from bentoml._internal.cloud.base import Spinner from bentoml._internal.cloud.base import Spinner
from bentoml._internal.cloud.deployment import Deployment, DeploymentConfigParameters from bentoml._internal.cloud.deployment import Deployment, DeploymentConfigParameters
from bentoml._internal.configuration.containers import BentoMLContainer from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.utils import add_experimental_docstring
from bentoml.exceptions import BentoMLException from bentoml.exceptions import BentoMLException
from rich.console import Console from rich.console import Console
from simple_di import Provide, inject from simple_di import Provide, inject
...@@ -44,13 +44,16 @@ configure_server_logging() ...@@ -44,13 +44,16 @@ configure_server_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = typer.Typer(
help="Deploy Dynamo applications to Kubernetes cluster",
add_completion=True,
no_args_is_help=True,
)
console = Console(highlight=False)
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
from bentoml._internal.cloud import BentoCloudClient from bentoml._internal.cloud import BentoCloudClient
TupleStrAny = tuple[str, ...]
else:
TupleStrAny = tuple
def raise_deployment_config_error(err: BentoMLException, action: str) -> t.NoReturn: def raise_deployment_config_error(err: BentoMLException, action: str) -> t.NoReturn:
if err.error_code == HTTPStatus.UNAUTHORIZED: if err.error_code == HTTPStatus.UNAUTHORIZED:
...@@ -62,207 +65,15 @@ def raise_deployment_config_error(err: BentoMLException, action: str) -> t.NoRet ...@@ -62,207 +65,15 @@ def raise_deployment_config_error(err: BentoMLException, action: str) -> t.NoRet
) from None ) from None
@click.command(name="deploy")
@click.argument(
"bento",
type=click.STRING,
required=False,
)
@click.option(
"-n",
"--name",
type=click.STRING,
help="Deployment name",
)
@click.option(
"-f",
"--config-file",
type=click.File(),
help="Configuration file path",
default=None,
)
@click.option(
"--wait/--no-wait",
type=click.BOOL,
is_flag=True,
help="Do not wait for deployment to be ready",
default=True,
)
@click.option(
"--timeout",
type=click.INT,
default=3600,
help="Timeout for deployment to be ready in seconds",
)
@click.pass_context
@add_experimental_docstring
def deploy_command(
ctx: click.Context,
bento: str | None,
name: str | None,
config_file: str | t.TextIO | None,
wait: bool,
timeout: int,
) -> None:
"""Create a deployment on BentoCloud.
\b
Create a deployment using parameters, or using config yaml file.
"""
create_deployment(
bento=bento,
name=name,
config_file=config_file,
wait=wait,
timeout=timeout,
args=ctx.args,
)
def build_deployment_command() -> click.Group:
@click.group(name="deployment")
@add_experimental_docstring
def deployment_command():
"""Deploy Dynamo applications to Kubernetes cluster"""
@deployment_command.command()
@click.argument(
"bento",
type=click.STRING,
required=False,
)
@click.option(
"-n",
"--name",
type=click.STRING,
help="Deployment name",
)
@click.option(
"-f",
"--config-file",
type=click.File(),
help="Configuration file path",
default=None,
)
@click.option(
"--wait/--no-wait",
type=click.BOOL,
is_flag=True,
help="Do not wait for deployment to be ready",
default=True,
)
@click.option(
"--timeout",
type=click.INT,
default=3600,
help="Timeout for deployment to be ready in seconds",
)
@click.pass_context
def create(
ctx: click.Context,
bento: str | None,
name: str | None,
config_file: str | t.TextIO | None,
wait: bool,
timeout: int,
) -> None:
"""Create a deployment on Dynamo Cloud.
\b
Create a deployment using parameters, or using config yaml file.
"""
create_deployment(
bento=bento,
name=name,
config_file=config_file,
wait=wait,
timeout=timeout,
args=ctx.args,
)
@deployment_command.command()
@click.argument("name", type=click.STRING)
@click.option(
"--cluster",
type=click.STRING,
help="Cluster name",
default=None,
)
def get(name: str, cluster: str | None) -> None:
"""Get deployment details from Dynamo Cloud.
\b
Get deployment details by name.
"""
get_deployment(name, cluster=cluster)
@deployment_command.command()
@click.option(
"--cluster",
type=click.STRING,
help="Cluster name",
default=None,
)
@click.option(
"--search",
type=click.STRING,
help="Search query",
default=None,
)
@click.option(
"--dev",
is_flag=True,
help="List development deployments",
default=False,
)
@click.option(
"-q",
"--query",
type=click.STRING,
help="Advanced query string",
default=None,
)
def list(
cluster: str | None, search: str | None, dev: bool, query: str | None
) -> None:
"""List all deployments from Dynamo Cloud.
\b
List and filter deployments.
"""
list_deployments(cluster=cluster, search=search, dev=dev, q=query)
@deployment_command.command()
@click.argument("name", type=click.STRING)
@click.option(
"--cluster",
type=click.STRING,
help="Cluster name",
default=None,
)
def delete(name: str, cluster: str | None) -> None:
"""Delete a deployment from Dynamo Cloud.
\b
Delete deployment by name.
"""
delete_deployment(name, cluster=cluster)
return deployment_command
deployment_command = build_deployment_command()
@inject @inject
def create_deployment( def create_deployment(
bento: str | None = None, bento: Optional[str] = None,
name: str | None = None, name: Optional[str] = None,
config_file: str | t.TextIO | None = None, config_file: Optional[TextIO] = None,
wait: bool = True, wait: bool = True,
timeout: int = 3600, timeout: int = 3600,
dev: bool = False, dev: bool = False,
args: list[str] | None = None, args: Optional[List[str]] = None,
_cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], _cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client],
) -> Deployment: ) -> Deployment:
# Load config from file and serialize to env # Load config from file and serialize to env
...@@ -288,7 +99,6 @@ def create_deployment( ...@@ -288,7 +99,6 @@ def create_deployment(
print(f"Error: {str(e)}") print(f"Error: {str(e)}")
sys.exit(1) sys.exit(1)
console = Console(highlight=False)
with Spinner(console=console) as spinner: with Spinner(console=console) as spinner:
try: try:
# Create deployment with initial status message # Create deployment with initial status message
...@@ -330,7 +140,7 @@ def create_deployment( ...@@ -330,7 +140,7 @@ def create_deployment(
sys.exit(1) sys.exit(1)
def _get_urls(deployment: Deployment) -> list[str]: def _get_urls(deployment: Deployment) -> List[str]:
"""Get URLs from deployment.""" """Get URLs from deployment."""
latest = deployment._client.v2.get_deployment(deployment.name, deployment.cluster) latest = deployment._client.v2.get_deployment(deployment.name, deployment.cluster)
urls = latest.urls if hasattr(latest, "urls") else None urls = latest.urls if hasattr(latest, "urls") else None
...@@ -367,11 +177,10 @@ def _display_deployment_info(spinner: Spinner, deployment: Deployment) -> None: ...@@ -367,11 +177,10 @@ def _display_deployment_info(spinner: Spinner, deployment: Deployment) -> None:
@inject @inject
def get_deployment( def get_deployment(
name: str, name: str,
cluster: str | None = None, cluster: Optional[str] = None,
_cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], _cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client],
) -> Deployment: ) -> Deployment:
"""Get deployment details from Dynamo Cloud.""" """Get deployment details from Dynamo Cloud."""
console = Console(highlight=False)
with Spinner(console=console) as spinner: with Spinner(console=console) as spinner:
try: try:
spinner.update(f'Getting deployment "{name}" from Dynamo Cloud...') spinner.update(f'Getting deployment "{name}" from Dynamo Cloud...')
...@@ -399,11 +208,10 @@ def get_deployment( ...@@ -399,11 +208,10 @@ def get_deployment(
@inject @inject
def delete_deployment( def delete_deployment(
name: str, name: str,
cluster: str | None = None, cluster: Optional[str] = None,
_cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], _cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client],
) -> None: ) -> None:
"""Delete a deployment from Dynamo Cloud.""" """Delete a deployment from Dynamo Cloud."""
console = Console(highlight=False)
with Spinner(console=console) as spinner: with Spinner(console=console) as spinner:
try: try:
spinner.update(f'Deleting deployment "{name}" from Dynamo Cloud...') spinner.update(f'Deleting deployment "{name}" from Dynamo Cloud...')
...@@ -426,15 +234,14 @@ def delete_deployment( ...@@ -426,15 +234,14 @@ def delete_deployment(
@inject @inject
def list_deployments( def list_deployments(
cluster: str | None = None, cluster: Optional[str] = None,
search: str | None = None, search: Optional[str] = None,
dev: bool = False, dev: bool = False,
q: str | None = None, q: Optional[str] = None,
labels: t.List[dict[str, t.Any]] | None = None, labels: Optional[List[Dict[str, Any]]] = None,
_cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], _cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client],
) -> None: ) -> None:
"""List all deployments from Dynamo Cloud.""" """List all deployments from Dynamo Cloud."""
console = Console(highlight=False)
with Spinner(console=console) as spinner: with Spinner(console=console) as spinner:
try: try:
# Handle label-based filtering # Handle label-based filtering
...@@ -467,3 +274,100 @@ def list_deployments( ...@@ -467,3 +274,100 @@ def list_deployments(
sys.exit(1) sys.exit(1)
spinner.log(f"[red]:x: Error:[/] Failed to list deployments: {str(e)}") spinner.log(f"[red]:x: Error:[/] Failed to list deployments: {str(e)}")
sys.exit(1) sys.exit(1)
@app.command()
def create(
ctx: typer.Context,
bento: Optional[str] = typer.Argument(None, help="Bento to deploy"),
name: Optional[str] = typer.Option(None, "--name", "-n", help="Deployment name"),
config_file: Optional[typer.FileText] = typer.Option(
None, "--config-file", "-f", help="Configuration file path"
),
wait: bool = typer.Option(
True, "--wait/--no-wait", help="Do not wait for deployment to be ready"
),
timeout: int = typer.Option(
3600, "--timeout", help="Timeout for deployment to be ready in seconds"
),
) -> None:
"""Create a deployment on Dynamo Cloud.
Create a deployment using parameters, or using config yaml file.
"""
create_deployment(
bento=bento,
name=name,
config_file=config_file,
wait=wait,
timeout=timeout,
args=ctx.args if hasattr(ctx, "args") else None,
)
@app.command()
def get(
name: str = typer.Argument(..., help="Deployment name"),
cluster: Optional[str] = typer.Option(None, "--cluster", help="Cluster name"),
) -> None:
"""Get deployment details from Dynamo Cloud.
Get deployment details by name.
"""
get_deployment(name, cluster=cluster)
@app.command()
def list(
cluster: Optional[str] = typer.Option(None, "--cluster", help="Cluster name"),
search: Optional[str] = typer.Option(None, "--search", help="Search query"),
dev: bool = typer.Option(False, "--dev", help="List development deployments"),
query: Optional[str] = typer.Option(
None, "--query", "-q", help="Advanced query string"
),
) -> None:
"""List all deployments from Dynamo Cloud.
List and filter deployments.
"""
list_deployments(cluster=cluster, search=search, dev=dev, q=query)
@app.command()
def delete(
name: str = typer.Argument(..., help="Deployment name"),
cluster: Optional[str] = typer.Option(None, "--cluster", help="Cluster name"),
) -> None:
"""Delete a deployment from Dynamo Cloud.
Delete deployment by name.
"""
delete_deployment(name, cluster=cluster)
def deploy(
ctx: typer.Context,
bento: Optional[str] = typer.Argument(None, help="Bento to deploy"),
name: Optional[str] = typer.Option(None, "--name", "-n", help="Deployment name"),
config_file: Optional[typer.FileText] = typer.Option(
None, "--config-file", "-f", help="Configuration file path"
),
wait: bool = typer.Option(
True, "--wait/--no-wait", help="Do not wait for deployment to be ready"
),
timeout: int = typer.Option(
3600, "--timeout", help="Timeout for deployment to be ready in seconds"
),
) -> None:
"""Create a deployment on Dynamo Cloud.
Create a deployment using parameters, or using config yaml file.
"""
create_deployment(
bento=bento,
name=name,
config_file=config_file,
wait=wait,
timeout=timeout,
args=ctx.args if hasattr(ctx, "args") else None,
)
...@@ -19,7 +19,6 @@ import platform ...@@ -19,7 +19,6 @@ import platform
import subprocess import subprocess
import sys import sys
import click
import distro import distro
import pkg_resources import pkg_resources
...@@ -141,34 +140,22 @@ def get_python_packages() -> str: ...@@ -141,34 +140,22 @@ def get_python_packages() -> str:
return "\n".join(out) return "\n".join(out)
def build_env_command() -> click.Command: def env() -> None:
@click.command(name="env") """Display information about the current environment."""
def env() -> None: print("System Information:")
"""Display information about the current environment.""" print(f"OS: {get_os_version()}")
click.echo("System Information:") print(f"Glibc Version: {get_glibc_version()}")
click.echo(f"OS: {get_os_version()}") print(f"GCC Version: {get_gcc_version()}")
click.echo(f"Glibc Version: {get_glibc_version()}") print(f"Cmake Version: {get_cmake_version()}")
click.echo(f"GCC Version: {get_gcc_version()}") print(f"Rust Version: {get_rust_version()}")
click.echo(f"Cmake Version: {get_cmake_version()}") print(f"Docker Version: {get_docker_version()}")
click.echo(f"Rust Version: {get_rust_version()}")
click.echo(f"Docker Version: {get_docker_version()}") print("\nCPU Information:")
print(f"{get_cpu_architecture()}")
click.echo("\nCPU Information:")
click.echo(f"{get_cpu_architecture()}") # Python Environment
py_version = sys.version.split()[0]
click.echo("\nGPU Information:") print(f"\nPython Version: {py_version}")
click.echo(f"Models and configurations: {query_nvidia_smi('gpu_name')}") print(f"Python Platform: {get_python_platform()}")
click.echo(f"Driver Version: {query_nvidia_smi('driver_version')}") print("\nPython Packages:")
click.echo(f"CUDA Runtime Version: {get_cuda_version()}") print(f"{get_python_packages()}")
click.echo(f"Topology: {get_gpu_topo()}")
# Get Python version
py_version = sys.version.split()[0]
click.echo(f"\nPython Version: {py_version}")
click.echo(f"Python Platform: {get_python_platform()}")
click.echo("\nPython Packages:")
click.echo(f"{get_python_packages()}")
return env
env_command = build_env_command()
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import json
import logging
import os
import subprocess
import typing as t
import attr
import typer
import yaml
from bentoml._internal.bento.bento import DEFAULT_BENTO_BUILD_FILES
from bentoml._internal.bento.build_config import BentoBuildConfig
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.utils.args import set_arguments
from bentoml._internal.utils.filesystem import resolve_user_filepath
from bentoml.exceptions import InvalidArgument
from rich.console import Console
from rich.syntax import Syntax
from simple_di import Provide, inject
from dynamo.sdk.lib.bento import Bento
if t.TYPE_CHECKING:
from bentoml._internal.bento import BentoStore
from bentoml._internal.container import DefaultBuilder
logger = logging.getLogger(__name__)
console = Console()
DYNAMO_FIGLET = """
██████╗ ██╗ ██╗███╗ ██╗ █████╗ ███╗ ███╗ ██████╗
██╔══██╗╚██╗ ██╔╝████╗ ██║██╔══██╗████╗ ████║██╔═══██╗
██║ ██║ ╚████╔╝ ██╔██╗ ██║███████║██╔████╔██║██║ ██║
██║ ██║ ╚██╔╝ ██║╚██╗██║██╔══██║██║╚██╔╝██║██║ ██║
██████╔╝ ██║ ██║ ╚████║██║ ██║██║ ╚═╝ ██║╚██████╔╝
╚═════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
"""
@inject
def build_bentofile(
bentofile: str | None = None,
*,
service: str | None = None,
name: str | None = None,
version: str | None = None,
labels: dict[str, str] | None = None,
build_ctx: str | None = None,
platform: str | None = None,
bare: bool = False,
reload: bool = False,
args: dict[str, t.Any] | None = None,
_bento_store: BentoStore = Provide[BentoMLContainer.bento_store],
) -> Bento:
"""
Build a Dynamo pipeline based on options specified in a bentofile.yaml file.
"""
if args is not None:
set_arguments(**args)
if bentofile:
try:
bentofile = resolve_user_filepath(bentofile, None)
except FileNotFoundError:
raise InvalidArgument(f'bentofile "{bentofile}" not found')
else:
build_config = BentoBuildConfig.from_file(bentofile)
else:
for filename in DEFAULT_BENTO_BUILD_FILES:
try:
bentofile = resolve_user_filepath(filename, build_ctx)
except FileNotFoundError:
pass
else:
build_config = BentoBuildConfig.from_file(bentofile)
break
else:
build_config = BentoBuildConfig(service=service or "")
new_attrs: dict[str, t.Any] = {}
if name is not None:
new_attrs["name"] = name
if labels:
# Ensure both dictionaries are of type dict[str, str]
existing_labels: dict[str, str] = build_config.labels or {}
new_attrs["labels"] = {**existing_labels, **labels}
if new_attrs:
build_config = attr.evolve(build_config, **new_attrs)
bento = Bento.create(
build_config=build_config,
version=version,
build_ctx=build_ctx,
platform=platform,
bare=bare,
reload=reload,
)
if not bare:
return bento.save(_bento_store)
return bento
def get(
pipeline_tag: str = typer.Argument(
..., help="The tag of the Dynamo pipeline to display"
),
output: str = typer.Option(
"yaml",
"--output",
"-o",
help="Output format (json, yaml, or path)",
show_default=True,
),
) -> None:
"""Display Dynamo pipeline details.
Prints information about a Dynamo pipeline by its tag.
"""
# Validate output format
valid_outputs = ["json", "yaml", "path"]
if output not in valid_outputs:
console.print(f"[red]Error: Output format must be one of {valid_outputs}[/red]")
raise typer.Exit(code=1)
bento_store = BentoMLContainer.bento_store.get()
bento = bento_store.get(pipeline_tag)
if output == "path":
console.print(bento.path)
elif output == "json":
info = json.dumps(bento.info.to_dict(), indent=2, default=str)
console.print_json(info)
else:
info = yaml.dump(bento.info.to_dict(), indent=2, sort_keys=False)
console.print(Syntax(info, "yaml", background_color="default"))
def build(
dynamo_pipeline: str = typer.Argument(
..., help="Path to the Dynamo pipeline to build"
),
output: str = typer.Option(
"default",
"--output",
"-o",
help="Output log format. Use 'tag' to display only pipeline tag.",
show_default=True,
),
containerize: bool = typer.Option(
False,
"--containerize",
help="Containerize the Dynamo pipeline after building. Shortcut for 'dynamo build && dynamo containerize'.",
),
platform: str = typer.Option(None, "--platform", help="Platform to build for"),
) -> None:
"""Build a new Dynamo pipeline from the specified path.
Creates a packaged Dynamo pipeline ready for deployment. Optionally builds a docker container.
"""
from bentoml._internal.configuration import get_quiet_mode, set_quiet_mode
from bentoml._internal.log import configure_logging
# Validate output format
valid_outputs = ["tag", "default"]
if output not in valid_outputs:
console.print(f"[red]Error: Output format must be one of {valid_outputs}[/red]")
raise typer.Exit(code=1)
if output == "tag":
set_quiet_mode()
configure_logging()
service: str | None = None
build_ctx = "."
if ":" in dynamo_pipeline:
service = dynamo_pipeline
else:
build_ctx = dynamo_pipeline
bento = build_bentofile(
service=service,
build_ctx=build_ctx,
platform=platform,
)
containerize_cmd = f"dynamo containerize {bento.tag}"
if output == "tag":
console.print(f"__tag__:{bento.tag}")
else:
if not get_quiet_mode():
console.print(DYNAMO_FIGLET)
console.print(f"[green]Successfully built {bento.tag}.")
next_steps = []
if not containerize:
next_steps.append(
"\n\n* Containerize your Dynamo pipeline with `dynamo containerize`:\n"
f" $ {containerize_cmd} [or dynamo build --containerize]"
)
if next_steps:
console.print(f"\n[blue]Next steps: {''.join(next_steps)}[/]")
if containerize:
backend: DefaultBuilder = t.cast(
"DefaultBuilder", os.getenv("BENTOML_CONTAINERIZE_BACKEND", "docker")
)
try:
import bentoml
bentoml.container.health(backend)
except subprocess.CalledProcessError:
from bentoml.exceptions import BentoMLException
raise BentoMLException(f"Backend {backend} is not healthy")
bentoml.container.build(bento.tag, backend=backend)
...@@ -19,39 +19,30 @@ import shutil ...@@ -19,39 +19,30 @@ import shutil
import subprocess import subprocess
import sys import sys
import click import typer
from rich.console import Console
def build_run_command() -> click.Group: console = Console()
@click.group(name="run")
def cli():
pass def run(ctx: typer.Context):
"""Execute dynamo-run with any additional arguments."""
# set help_option_names to empty to let dynamo-run handles help option, instead of intercepting by "dynamo run" # Check if dynamo-run is available in PATH
@cli.command( if shutil.which("dynamo-run") is None:
context_settings=dict( console.print(
ignore_unknown_options=True, allow_extra_args=True, help_option_names=() "[bold red]Error:[/bold red] 'dynamo-run' is needed but not found.\n"
), "Please install it using: [bold cyan]cargo install dynamo-run[/bold cyan]",
) style="red",
def run() -> None: )
"""Call dynamo-run with remaining arguments""" raise typer.Exit(code=1)
# Check if dynamo-run is available in PATH
if shutil.which("dynamo-run") is None: # Extract all arguments after 'run'
click.echo( args = sys.argv[sys.argv.index("run") + 1 :] if "run" in sys.argv else []
"Error: 'dynamo-run' is needed but not found.\n"
"Please install it using: cargo install dynamo-run", command = ["dynamo-run"] + args
err=True,
) try:
sys.exit(1) subprocess.run(command)
except Exception as e:
command = ["dynamo-run"] + sys.argv[2:] console.print(f"[bold red]Error executing dynamo-run:[/bold red] {str(e)}")
try: raise typer.Exit(code=1)
subprocess.run(command)
except Exception as e:
click.echo(f"Error executing dynamo-run: {str(e)}", err=True)
sys.exit(1)
return cli
run_command = build_run_command()
...@@ -22,10 +22,12 @@ import logging ...@@ -22,10 +22,12 @@ import logging
import os import os
import sys import sys
import typing as t import typing as t
from typing import Optional from pathlib import Path
from typing import List, Optional
import click import typer
import rich from rich.console import Console
from rich.panel import Panel
from .utils import resolve_service_config from .utils import resolve_service_config
...@@ -34,144 +36,131 @@ if t.TYPE_CHECKING: ...@@ -34,144 +36,131 @@ if t.TYPE_CHECKING:
F = t.Callable[P, t.Any] # type: ignore F = t.Callable[P, t.Any] # type: ignore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
console = Console()
def build_serve_command() -> click.Group: def serve(
from dynamo.sdk.lib.logging import configure_server_logging ctx: typer.Context,
dynamo_pipeline: str = typer.Argument(
@click.group(name="serve") ..., help="The path to the Dynamo pipeline to serve"
def cli(): ),
pass service_name: str = typer.Option(
"",
@cli.command(
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("bento", type=click.STRING, default=".")
@click.option(
"--service-name",
type=click.STRING,
required=False,
default="",
envvar="BENTOML_SERVE_SERVICE_NAME",
help="Only serve the specified service. Don't serve any dependencies of this service.", help="Only serve the specified service. Don't serve any dependencies of this service.",
) envvar="DYNAMO_SERVE_SERVICE_NAME",
@click.option( ),
"--depends", depends: List[str] = typer.Option(
type=click.STRING, [],
multiple=True, help="List of runner dependencies in name=value format",
envvar="BENTOML_SERVE_DEPENDS", envvar="DYNAMO_SERVE_DEPENDS",
help="list of runners map", ),
) config_file: Optional[Path] = typer.Option(
@click.option( None,
"--config-file",
"-f", "-f",
"--file",
type=click.Path(exists=True),
help="Path to YAML config file for service configuration", help="Path to YAML config file for service configuration",
) exists=True,
@click.option( ),
"-p", port: Optional[int] = typer.Option(
None,
"--port", "--port",
type=click.INT, "-p",
help="The port to listen on for the REST api server if you are not using a dynamo service", help="The port to listen on for the REST API server",
envvar="BENTOML_PORT", envvar="DYNAMO_PORT",
show_envvar=True, ),
) host: Optional[str] = typer.Option(
@click.option( None,
"--host", help="The host to bind for the REST API server",
type=click.STRING, envvar="DYNAMO_HOST",
help="The host to bind for the REST api server if you are not using a dynamo service", ),
envvar="BENTOML_HOST", working_dir: Optional[Path] = typer.Option(
show_envvar=True, None,
)
@click.option(
"--working-dir",
type=click.Path(),
help="When loading from source code, specify the directory to find the Service instance", help="When loading from source code, specify the directory to find the Service instance",
default=None, ),
show_default=True, dry_run: bool = typer.Option(
) False,
@click.option(
"--dry-run",
is_flag=True,
help="Print the final service configuration and exit without starting the server", help="Print the final service configuration and exit without starting the server",
default=False, ),
) enable_planner: bool = typer.Option(
@click.option( False,
"--enable-planner",
is_flag=True,
help="Save a snapshot of your service state to a file that allows planner to edit your deployment configuration", help="Save a snapshot of your service state to a file that allows planner to edit your deployment configuration",
default=False, ),
) ):
@click.pass_context """Locally serve a Dynamo pipeline.
def serve(
ctx: click.Context, Starts a local server for the specified Dynamo pipeline.
bento: str, """
service_name: str,
depends: Optional[list[str]], # Warning: internal
dry_run: bool, from bentoml._internal.service.loader import load
port: int,
host: str, from dynamo.sdk.lib.logging import configure_server_logging
file: str | None, from dynamo.sdk.lib.service import LinkedServices
working_dir: str | None,
enable_planner: bool, # Extract extra arguments not captured by typer
**attrs: t.Any, service_configs = resolve_service_config(config_file, ctx.args)
) -> None:
"""Locally run connected Dynamo services. You can pass service-specific configuration options using --ServiceName.param=value format.""" # Process depends
# WARNING: internal runner_map_dict = {}
from bentoml._internal.service.loader import load if depends:
try:
from dynamo.sdk.lib.service import LinkedServices
# Resolve service configs from yaml file, command line args into a python dict
service_configs = resolve_service_config(file, ctx.args)
# Process depends
if depends:
runner_map_dict = dict([s.split("=", maxsplit=2) for s in depends or []]) runner_map_dict = dict([s.split("=", maxsplit=2) for s in depends or []])
else: except ValueError:
runner_map_dict = {} console.print(
"[bold red]Error:[/bold red] Invalid format for --depends option. Use format 'name=value'"
if dry_run: )
rich.print("[bold]Service Configuration:[/bold]") raise typer.Exit(code=1)
rich.print(json.dumps(service_configs, indent=2))
rich.print("\n[bold]Environment Variable that would be set:[/bold]") if dry_run:
rich.print(f"DYNAMO_SERVICE_CONFIG={json.dumps(service_configs)}") console.print("[bold green]Service Configuration:[/bold green]")
sys.exit(0) console.print_json(json.dumps(service_configs))
console.print(
configure_server_logging(service_name=service_name) "\n[bold green]Environment Variable that would be set:[/bold green]"
# Set environment variable with service configuration
if service_configs:
logger.info(f"Running dynamo serve with service configs {service_configs}")
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
if working_dir is None:
if os.path.isdir(os.path.expanduser(bento)):
working_dir = os.path.expanduser(bento)
else:
working_dir = "."
if sys.path[0] != working_dir:
sys.path.insert(0, working_dir)
svc = load(bento_identifier=bento, working_dir=working_dir)
LinkedServices.remove_unused_edges()
from dynamo.sdk.cli.serving import serve_http # type: ignore
svc.inject_config()
serve_http(
bento,
working_dir=working_dir,
host=host,
port=port,
dependency_map=runner_map_dict,
service_name=service_name,
enable_planner=enable_planner,
) )
console.print(f"DYNAMO_SERVICE_CONFIG={json.dumps(service_configs)}")
raise typer.Exit()
configure_server_logging()
if service_configs:
logger.info(f"Running dynamo serve with service configs {service_configs}")
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
if working_dir is None:
if os.path.isdir(os.path.expanduser(dynamo_pipeline)):
working_dir = Path(os.path.expanduser(dynamo_pipeline))
else:
working_dir = Path(".")
return cli # Convert Path objects to strings where string is required
working_dir_str = str(working_dir)
if sys.path[0] != working_dir_str:
sys.path.insert(0, working_dir_str)
serve_command = build_serve_command() svc = load(bento_identifier=dynamo_pipeline, working_dir=working_dir_str)
LinkedServices.remove_unused_edges()
from dynamo.sdk.cli.serving import serve_http # type: ignore
svc.inject_config()
# Start the service
console.print(
Panel.fit(
f"[bold]Starting Dynamo service:[/bold] [cyan]{dynamo_pipeline}[/cyan]",
title="[bold green]Dynamo Serve[/bold green]",
border_style="green",
)
)
serve_http(
dynamo_pipeline,
working_dir=working_dir_str,
host=host,
port=port,
dependency_map=runner_map_dict,
service_name=service_name,
enable_planner=enable_planner,
)
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
# limitations under the License. # limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES # Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import collections import collections
import contextlib import contextlib
import json import json
...@@ -281,7 +283,7 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]: ...@@ -281,7 +283,7 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
def resolve_service_config( def resolve_service_config(
config_file: str | t.TextIO | None = None, config_file: pathlib.Path | t.TextIO | None = None,
args: list[str] | None = None, args: list[str] | None = None,
) -> dict[str, dict[str, t.Any]]: ) -> dict[str, dict[str, t.Any]]:
"""Resolve service configuration from file and command line arguments. """Resolve service configuration from file and command line arguments.
...@@ -308,10 +310,9 @@ def resolve_service_config( ...@@ -308,10 +310,9 @@ def resolve_service_config(
except Exception as e: except Exception as e:
logger.warning(f"Failed to parse DYN_DEPLOYMENT_CONFIG: {e}") logger.warning(f"Failed to parse DYN_DEPLOYMENT_CONFIG: {e}")
else: else:
# Load file if provided
if config_file: if config_file:
with open(config_file) if isinstance( with open(config_file) if isinstance(
config_file, str config_file, (str, pathlib.Path)
) else contextlib.nullcontext(config_file) as f: ) else contextlib.nullcontext(config_file) as f:
yaml_configs = yaml.safe_load(f) yaml_configs = yaml.safe_load(f)
logger.debug(f"Loaded config from file: {yaml_configs}") logger.debug(f"Loaded config from file: {yaml_configs}")
......
...@@ -13,12 +13,17 @@ ...@@ -13,12 +13,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import re
import subprocess import subprocess
import time import time
import pytest import pytest
from typer.testing import CliRunner
from dynamo.sdk.cli.cli import cli
pytestmark = pytest.mark.pre_merge pytestmark = pytest.mark.pre_merge
runner = CliRunner()
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
...@@ -28,6 +33,21 @@ def setup_and_teardown(): ...@@ -28,6 +33,21 @@ def setup_and_teardown():
etcd = subprocess.Popen(["etcd"]) etcd = subprocess.Popen(["etcd"])
print("Setting up resources") print("Setting up resources")
# Run the serve command in dry-run mode with CLI runner to check it's working
result = runner.invoke(
cli,
[
"serve",
"pipeline:Frontend",
"--working-dir",
"deploy/dynamo/sdk/src/dynamo/sdk/tests",
"--Frontend.model=qwentastic",
"--Middle.bias=0.5",
"--dry-run",
],
)
# Now start the actual server using subprocess for the real integration test
server = subprocess.Popen( server = subprocess.Popen(
[ [
"dynamo", "dynamo",
...@@ -42,7 +62,7 @@ def setup_and_teardown(): ...@@ -42,7 +62,7 @@ def setup_and_teardown():
time.sleep(5) time.sleep(5)
yield yield result
# Teardown code # Teardown code
print("Tearing down resources") print("Tearing down resources")
...@@ -54,7 +74,17 @@ def setup_and_teardown(): ...@@ -54,7 +74,17 @@ def setup_and_teardown():
etcd.wait() etcd.wait()
async def test_pipeline(): async def test_pipeline(setup_and_teardown):
# Check the CLI command ran successfully
result = setup_and_teardown
assert result.exit_code == 0
# Clean the output to check for expected content
clean_output = re.sub(r"\x1b\[[0-9;]*m", "", result.output)
assert "Service Configuration:" in clean_output
assert '"Frontend": {' in clean_output
assert '"model": "qwentastic"' in clean_output
import asyncio import asyncio
import aiohttp import aiohttp
......
...@@ -25,7 +25,7 @@ export DEPLOYMENT_NAME="${DEPLOYMENT_NAME:-ci-hw}" ...@@ -25,7 +25,7 @@ export DEPLOYMENT_NAME="${DEPLOYMENT_NAME:-ci-hw}"
cd /workspace/examples/hello_world cd /workspace/examples/hello_world
# Step.1: Login to dynamo cloud # Step.1: Login to dynamo cloud
dynamo cloud login --api-token TEST-TOKEN --endpoint $DYNAMO_CLOUD dynamo cloud login $DYNAMO_CLOUD
# Step.2: build a dynamo nim with framework-less base # Step.2: build a dynamo nim with framework-less base
DYNAMO_TAG=$(dynamo build hello_world:Frontend | grep "Successfully built" | awk -F"\"" '{ print $2 }') DYNAMO_TAG=$(dynamo build hello_world:Frontend | grep "Successfully built" | awk -F"\"" '{ print $2 }')
......
...@@ -54,7 +54,7 @@ export KUBE_NS=hello-world ...@@ -54,7 +54,7 @@ export KUBE_NS=hello-world
export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com
# Login to the Dynamo cloud # Login to the Dynamo cloud
dynamo cloud login --api-token TEST-TOKEN --endpoint $DYNAMO_CLOUD dynamo cloud login $DYNAMO_CLOUD
``` ```
### 2. Build the Dynamo Base Image ### 2. Build the Dynamo Base Image
......
...@@ -142,7 +142,7 @@ The deployment process involves two distinct build steps: ...@@ -142,7 +142,7 @@ The deployment process involves two distinct build steps:
export PROJECT_ROOT=$(pwd) export PROJECT_ROOT=$(pwd)
export KUBE_NS=hello-world # Must match your Kubernetes namespace export KUBE_NS=hello-world # Must match your Kubernetes namespace
export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com
dynamo cloud login --api-token TEST-TOKEN --endpoint $DYNAMO_CLOUD dynamo cloud login $DYNAMO_CLOUD
``` ```
2. **Build the Dynamo Base Image** 2. **Build the Dynamo Base Image**
......
...@@ -185,7 +185,7 @@ You must have first followed the instructions in [deploy/dynamo/helm/README.md]( ...@@ -185,7 +185,7 @@ You must have first followed the instructions in [deploy/dynamo/helm/README.md](
export PROJECT_ROOT=$(pwd) export PROJECT_ROOT=$(pwd)
export KUBE_NS=dynamo-cloud # Note: This must match the Kubernetes namespace where you installed Dynamo Cloud export KUBE_NS=dynamo-cloud # Note: This must match the Kubernetes namespace where you installed Dynamo Cloud
export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com # Externally accessible endpoint to the `dynamo-store` service within your Dynamo Cloud installation export DYNAMO_CLOUD=https://${KUBE_NS}.dev.aire.nvidia.com # Externally accessible endpoint to the `dynamo-store` service within your Dynamo Cloud installation
dynamo cloud login --api-token TEST-TOKEN --endpoint $DYNAMO_CLOUD dynamo cloud login $DYNAMO_CLOUD
``` ```
2. **Build the Dynamo Base Image** 2. **Build the Dynamo Base Image**
......
...@@ -31,6 +31,7 @@ dependencies = [ ...@@ -31,6 +31,7 @@ dependencies = [
"kubernetes==32.0.1", "kubernetes==32.0.1",
"ai-dynamo-runtime==0.1.1", "ai-dynamo-runtime==0.1.1",
"distro", "distro",
"typer",
] ]
classifiers = [ classifiers = [
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment