"examples/vscode:/vscode.git/clone" did not exist on "fcdcb12fd59357852bcb66b731a7e0dc5190ee08"
Commit 7f136e29 authored by April Yang's avatar April Yang Committed by GitHub
Browse files

feat: update deploy api & sdk (#74)


Co-authored-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
Co-authored-by: default avatarHannah Zhang <hannahz@nvidia.com>
Co-authored-by: default avatarBiswa Panda <biswa.panda@gmail.com>
Co-authored-by: default avatarMaksim Khadkevich <mkhadkevich@nvidia.com>
parent dd238a26
......@@ -24,6 +24,11 @@ golang-base:
FROM golang:1.23
RUN apt-get update && apt-get install -y git && apt-get clean && rm -rf /var/lib/apt/lists/* && curl -sSfL https://github.com/golangci/golangci-lint/releases/download/v1.61.0/golangci-lint-1.61.0-linux-amd64.tar.gz | tar -xzv && mv golangci-lint-1.61.0-linux-amd64/golangci-lint /usr/local/bin/
operator-src:
FROM +golang-base
COPY ./deploy/dynamo/operator /artifacts/operator
SAVE ARTIFACT /artifacts/operator
############### ALL TARGETS ##############################
all-test:
BUILD ./deploy/dynamo/operator+test
......
......@@ -40,7 +40,7 @@ docker:
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
ARG IMAGE_SUFFIX=dynamo-operator
FROM urm.nvidia.com/sw-gpu-ucs-hardened-docker/distroless/go:v3.1.3-amd64
FROM gcr.io/distroless/static-debian11
WORKDIR /
COPY +build/manager .
USER 65532:65532
......
......@@ -21,4 +21,5 @@ metadata:
app.kubernetes.io/managed-by: kustomize
name: dynamodeployment-sample
spec:
dynamoNim: frontend:2c4romhs6s33e4w7
# TODO(user): Add fields here
# EXAMPLE: dynamoNim: basic:dev
......@@ -90,6 +90,7 @@ const (
ContainerPortNameHTTPProxy = "http-proxy"
ServicePortNameHTTPNonProxy = "http-non-proxy"
HeaderNameDebug = "X-Yatai-Debug"
kDefaultIngressSuffix = "local"
)
var ServicePortHTTPNonProxy = commonconsts.BentoServicePort + 1
......@@ -1136,6 +1137,10 @@ func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context
if dynamoNimDeployment.Spec.Ingress.HostPrefix != nil {
vsName = *dynamoNimDeployment.Spec.Ingress.HostPrefix + vsName
}
ingressSuffix, found := os.LookupEnv("DYNAMO_INGRESS_SUFFIX")
if !found || ingressSuffix == "" {
ingressSuffix = kDefaultIngressSuffix
}
vs := &networkingv1beta1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: dynamoNimDeployment.Name,
......@@ -1143,7 +1148,7 @@ func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context
},
Spec: istioNetworking.VirtualService{
Hosts: []string{
fmt.Sprintf("%s.dev.aire.nvidia.com", vsName),
fmt.Sprintf("%s.%s", vsName, ingressSuffix),
},
Gateways: []string{"istio-system/ingress-alb"},
Http: []*istioNetworking.HTTPRoute{
......
......@@ -25,16 +25,15 @@ def create_bentoml_cli() -> click.Command:
from bentoml_cli.bentos import bento_command
from bentoml_cli.cloud import cloud_command
from bentoml_cli.containerize import containerize_command
from bentoml_cli.deployment import (
deploy_command,
deployment_command,
develop_command,
)
from bentoml_cli.deployment import deployment_command, develop_command
from bentoml_cli.env import env_command
from bentoml_cli.models import model_command
from bentoml_cli.secret import secret_command
from bentoml_cli.utils import BentoMLCommandGroup, get_entry_points
from dynamo.sdk.cli.delete import delete_command
from dynamo.sdk.cli.deploy import deploy_command
from dynamo.sdk.cli.list import list_command
from dynamo.sdk.cli.run import run_command
from dynamo.sdk.cli.serve import serve_command
from dynamo.sdk.cli.start import start_command
......@@ -61,6 +60,8 @@ def create_bentoml_cli() -> click.Command:
bentoml_cli.add_command(develop_command)
bentoml_cli.add_command(deployment_command)
bentoml_cli.add_command(secret_command)
bentoml_cli.add_command(list_command)
bentoml_cli.add_command(delete_command)
# Load commands from extensions
for ep in get_entry_points("bentoml.commands"):
bentoml_cli.add_command(ep.load())
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import bentoml
import click
from bentoml._internal.cloud.base import Spinner
from bentoml_cli.utils import BentoMLCommandGroup
from kubernetes import client, config
from rich.console import Console
logger = logging.getLogger(__name__)
def build_delete_command() -> click.Group:
@click.group(name="delete", cls=BentoMLCommandGroup)
def cli():
"""Delete resources"""
pass
@cli.command(name="bentos")
@click.argument("bento_tag", type=click.STRING, required=False)
@click.option(
"--all",
is_flag=True,
default=False,
help="Delete all bentos in local store",
)
@click.option(
"--force",
is_flag=True,
default=False,
help="Skip confirmation prompt",
)
def delete_bentos(
bento_tag: str | None = None,
all: bool = False,
force: bool = False,
):
"""
Delete bentos from local store
Args:
bento_tag: Tag of the bento to delete
all: Delete all bentos
force: Skip confirmation prompt
"""
console = Console(highlight=False)
# Validate arguments
if not bento_tag and not all:
raise click.ClickException(
"Either specify a bento tag or use --all to delete all bentos"
)
if bento_tag and all:
raise click.ClickException("Cannot specify both a bento tag and --all flag")
with Spinner(console=console) as spinner:
try:
# Get bentos to delete
bentos_to_delete = []
if all:
# Get all bentos
spinner.update("Fetching all bentos")
bentos = bentoml.list()
bentos_to_delete = [str(bento.tag) for bento in bentos]
if not bentos_to_delete:
spinner.log("No bentos found in local store")
return
else:
# Check if the specified bento exists
if bento_tag is not None:
bentos_to_delete = [bento_tag]
else:
# This should never happen due to earlier validation, but handle it anyway
spinner.log("[bold red]No bento tag specified[/]")
return
# Confirm deletion if not forced
if not force:
spinner.stop()
if all:
message = f"Are you sure you want to delete all {len(bentos_to_delete)} bentos from local store?"
else:
message = f"Are you sure you want to delete bento '{bento_tag}' from local store?"
if not click.confirm(message):
console.print("[yellow]Deletion cancelled[/]")
return
spinner.start()
# Delete bentos
for tag in bentos_to_delete:
spinner.update(f"Deleting bento '{tag}'")
try:
bentoml.delete(tag)
spinner.log(f"[green]Successfully deleted bento '{tag}'[/]")
except Exception as e:
spinner.log(
f"[bold red]Failed to delete bento '{tag}': {str(e)}[/]"
)
logger.error(f"Failed to delete bento '{tag}'", exc_info=True)
# Final summary
if all:
spinner.log(
f"[bold green]Deleted {len(bentos_to_delete)} bentos from local store[/]"
)
except Exception as e:
logger.error("Deletion operation failed", exc_info=True)
spinner.log(f"[bold red]Operation failed: {str(e)}[/]")
raise SystemExit(1)
@cli.command(name="deployments")
@click.argument("deployment_name", type=click.STRING, required=False)
@click.option(
"--namespace",
type=click.STRING,
default="default",
help="Kubernetes namespace containing the deployments",
)
@click.option(
"--all",
is_flag=True,
default=False,
help="Delete all deployments in the namespace",
)
@click.option(
"--force",
is_flag=True,
default=False,
help="Skip confirmation prompt",
)
def delete_deployments(
deployment_name: str | None = None,
namespace: str = "default",
all: bool = False,
force: bool = False,
):
"""
Delete deployments from a Kubernetes namespace
Args:
deployment_name: Name of the deployment to delete
namespace: Kubernetes namespace containing the deployments
all: Delete all deployments in the namespace
force: Skip confirmation prompt
"""
console = Console(highlight=False)
# Validate arguments
if not deployment_name and not all:
raise click.ClickException(
"Either specify a deployment name or use --all to delete all deployments"
)
if deployment_name and all:
raise click.ClickException(
"Cannot specify both a deployment name and --all flag"
)
# Load Kubernetes configuration
try:
config.load_kube_config()
api = client.CustomObjectsApi()
except Exception as e:
logger.error("Failed to load Kubernetes configuration", exc_info=True)
raise click.ClickException(
f"Failed to load Kubernetes configuration: {str(e)}"
)
# Define the group, version, and plural for the CRD
group = "nvidia.com"
version = "v1alpha1"
plural = "dynamodeployments"
with Spinner(console=console) as spinner:
try:
# Get deployments to delete
deployments_to_delete = []
if all:
# Get all deployments in the namespace
spinner.update(
f"Fetching all deployments in namespace '{namespace}'"
)
deployments = api.list_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
)
deployments_to_delete = [
item["metadata"]["name"]
for item in deployments.get("items", [])
]
if not deployments_to_delete:
spinner.log(f"No deployments found in namespace '{namespace}'")
return
else:
# Check if the specified deployment exists
try:
api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=deployment_name,
)
deployments_to_delete = [deployment_name]
except client.rest.ApiException as e:
if e.status == 404:
spinner.log(
f"[bold red]Deployment '{deployment_name}' not found in namespace '{namespace}'[/]"
)
return
raise
# Confirm deletion if not forced
if not force:
spinner.stop()
if all:
message = f"Are you sure you want to delete all {len(deployments_to_delete)} deployments in namespace '{namespace}'?"
else:
message = f"Are you sure you want to delete deployment '{deployment_name}' in namespace '{namespace}'?"
if not click.confirm(message):
console.print("[yellow]Deletion cancelled[/]")
return
spinner.start()
# Delete deployments
for name in deployments_to_delete:
spinner.update(
f"Deleting deployment '{name}' in namespace '{namespace}'"
)
try:
api.delete_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=name,
)
spinner.log(
f"[green]Successfully deleted deployment '{name}'[/]"
)
except client.rest.ApiException as e:
if e.status == 404:
spinner.log(
f"[yellow]Deployment '{name}' not found or already deleted[/]"
)
else:
spinner.log(
f"[bold red]Failed to delete deployment '{name}': {str(e)}[/]"
)
logger.error(
f"Failed to delete deployment '{name}'", exc_info=True
)
# Final summary
if all:
spinner.log(
f"[bold green]Deleted {len(deployments_to_delete)} deployments from namespace '{namespace}'[/]"
)
except Exception as e:
logger.error("Deletion operation failed", exc_info=True)
spinner.log(f"[bold red]Operation failed: {str(e)}[/]")
raise SystemExit(1)
return cli
delete_command = build_delete_command()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
import os
import sys
import time
import typing as t
from datetime import datetime
import click
from bentoml._internal.cloud.base import Spinner
from bentoml.exceptions import BentoMLException, CLIException
from rich.console import Console
from simple_di import inject
from dynamo.sdk.cli.deployment import DynamoDeployment
logger = logging.getLogger(__name__)
@click.group(name="deploy")
def deploy_command_group():
"""Deploy 🍱 to a cluster"""
pass
def convert_env_to_dict(env: tuple[str, ...] | None) -> list[dict[str, str]] | None:
if env is None:
return None
collected_envs: list[dict[str, str]] = []
if env:
for item in env:
if "=" in item:
name, value = item.split("=", 1)
else:
name = item
if name not in os.environ:
raise CLIException(f"Environment variable {name} not found")
value = os.environ[name]
collected_envs.append({"name": name, "value": value})
return collected_envs
def build_deploy_command() -> click.Command:
@click.command(name="deploy")
@click.argument("bento", type=click.STRING, default=".")
@click.option("-n", "--name", type=click.STRING, help="Deployment name")
@click.option(
"--namespace",
type=click.STRING,
default="default",
help="Kubernetes namespace to deploy to",
)
@click.option(
"--scaling-min",
type=click.INT,
default=1,
show_default=True,
help="Minimum scaling value",
)
@click.option(
"--scaling-max",
type=click.INT,
default=5,
show_default=True,
help="Maximum scaling value",
)
@click.option("--instance-type", type=click.STRING, help="Type of instance")
@click.option(
"--env",
type=click.STRING,
multiple=True,
default=[],
help="Environment variables in key=value format",
)
@click.option("--secret", type=click.STRING, multiple=True, help="Secret names")
@click.option(
"-f",
"--config-file",
type=click.Path(exists=True, dir_okay=False, readable=True),
help="Configuration file path",
)
@click.option(
"--wait/--no-wait", default=True, help="Wait for deployment to be ready"
)
@click.option(
"--timeout",
type=click.INT,
default=600,
help="Timeout for deployment readiness in seconds",
)
@click.option(
"--working-dir",
type=click.Path(),
default=None,
show_default=True,
help="Directory to find the Service instance",
)
@click.option(
"--access-authorization", type=click.BOOL, default=False, show_default=True
)
@click.option("--strategy", type=click.STRING, default="rolling-update")
@click.option("--version", type=click.STRING, help="Version tag for the Bento")
def deploy_command(
bento: str | None,
name: str | None,
namespace: str | None = "default",
access_authorization: bool | None = False,
scaling_min: int | None = 1,
scaling_max: int | None = 5,
instance_type: str | None = None,
strategy: str | None = "rolling-update",
env: tuple[str, ...] | None = None,
secret: tuple[str] | None = None,
config_file: str | t.TextIO | None = None,
config_dict: str | None = None,
wait: bool = True,
timeout: int = 600,
working_dir: str | None = None,
version: str | None = None,
):
"""
Deploy 🍱 to a cluster
\b
BENTO is the serving target, it can be:
- a tag to a Bento in local Bento store
- a folder containing a valid 'bentofile.yaml'
- a path to a built Bento
"""
from bentoml._internal.log import configure_server_logging
configure_server_logging()
# Fix handling of None values
if working_dir is None:
if bento is not None and os.path.isdir(os.path.expanduser(bento)):
working_dir = os.path.expanduser(bento)
else:
working_dir = "."
# Make sure working_dir is in the front of sys.path for imports
if sys.path[0] != working_dir:
sys.path.insert(0, working_dir)
# Load the Bento to validate
import bentoml
from bentoml._internal.service.loader import load
# Check if the bento exists in the local store
bento_exists = False
bento_tag = None
try:
bentos = bentoml.list()
bento_tags = [str(b.tag) for b in bentos]
bento_exists = bento in bento_tags
if bento_exists:
bento_tag = bento
logger.debug("Verified Bento exists: %s", bento_tag)
else:
# If not a tag, check if it's a path to a built Bento
if bento is not None and os.path.isdir(bento):
service_name = os.path.basename(os.path.abspath(bento))
bento_version = (
version or f"v{datetime.now().strftime('%Y%m%d%H%M%S')}"
)
bento_tag = f"{service_name}:{bento_version}"
logger.debug(
"Using Bento from directory: %s with tag: %s", bento, bento_tag
)
else:
raise click.ClickException(
f"Invalid Bento reference: {bento}. Ensure it's a valid Bento tag or directory."
)
except Exception as exception_var:
logger.error("Bento validation failed:", exc_info=True)
raise click.ClickException(
f"Failed to validate bento: {str(exception_var)}"
)
# Load the service to validate it
svc = load(bento_identifier=bento_tag, working_dir=working_dir)
print(f"Service loaded: {svc}")
create_dynamo_deployment(
bento=bento_tag,
name=name,
namespace=namespace,
access_authorization=access_authorization,
scaling_min=scaling_min,
scaling_max=scaling_max,
instance_type=instance_type,
strategy=strategy,
env=env,
secret=secret,
config_file=config_file,
config_dict=config_dict,
wait=wait,
timeout=timeout,
)
return deploy_command
deploy_command = build_deploy_command()
@inject
def create_dynamo_deployment(
bento: str | None = None,
name: str | None = None,
namespace: str | None = "default",
access_authorization: bool | None = None,
scaling_min: int | None = None,
scaling_max: int | None = None,
instance_type: str | None = None,
strategy: str | None = None,
env: tuple[str, ...] | None = None,
secret: tuple[str] | None = None,
config_file: str | t.TextIO | None = None,
config_dict: str | None = None,
wait: bool = True,
timeout: int = 3600,
dev: bool = False,
) -> DynamoDeployment:
from bentoml._internal.cloud.deployment import DeploymentConfigParameters
from bentoml_cli.deployment import raise_deployment_config_error
from kubernetes import client, config
cfg_dict = None
if config_dict is not None and config_dict != "":
cfg_dict = json.loads(config_dict)
config_params = DeploymentConfigParameters(
name=name,
bento=bento,
cluster=namespace,
access_authorization=access_authorization,
scaling_max=scaling_max,
scaling_min=scaling_min,
instance_type=instance_type,
strategy=strategy,
envs=convert_env_to_dict(tuple(env) if env else None),
secrets=list(secret) if secret is not None else None,
config_file=config_file,
config_dict=cfg_dict,
cli=True,
dev=dev,
)
try:
config_params.verify()
except BentoMLException as exc:
error_message = str(exc)
raise_deployment_config_error(error_message, "create")
# Fix the deployment name generation
deployment_name = name
if deployment_name is None and bento is not None:
deployment_name = f"{bento.replace(':', '-').replace('/', '-')}-deployment"
print(f"Deployment name: {deployment_name}")
# Create the deployment object
deployment = DynamoDeployment.create_deployment(
deployment_name=deployment_name,
namespace=namespace,
config=config,
)
# Convert env tuple to k8s env format
env_vars = []
if env:
for e in env:
if "=" in e:
k, v = e.split("=", 1)
env_vars.append({"name": k, "value": v})
# Get the CRD payload
crd_payload = deployment.get_crd_payload(
bento=bento,
scaling_min=scaling_min or 1,
scaling_max=scaling_max or 5,
instance_type=instance_type,
env_vars=env_vars,
secret=list(secret) if secret else [],
)
console = Console(highlight=False)
with Spinner(console=console) as spinner:
try:
spinner.update("Creating deployment via Kubernetes operator")
config.load_kube_config()
api = client.CustomObjectsApi()
# Create the CRD
group = "nvidia.com"
version = "v1alpha1"
plural = "dynamodeployments"
created_crd = api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
body=crd_payload,
)
# Add validation and logging for the created CRD
if not created_crd:
raise click.ClickException(
"Failed to create deployment: No response from Kubernetes API"
)
logger.debug("Created CRD: %s", json.dumps(created_crd, indent=2))
spinner.log(
f':white_check_mark: Created deployment "{deployment_name}" in namespace "{namespace}"'
)
if wait:
spinner.update("Waiting for deployment to become ready")
start_time = time.time()
while time.time() - start_time < timeout:
# Check status using Kubernetes API
status_data = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=deployment_name,
)
state = status_data.get("status", {}).get("state", "Pending")
conditions = status_data.get("status", {}).get("conditions", [])
events = status_data.get("status", {}).get("events", [])
logger.debug(f"Current deployment state: {state}")
logger.debug(f"Conditions: {json.dumps(conditions, indent=2)}")
logger.debug(f"Events: {json.dumps(events, indent=2)}")
logger.debug(f"Time elapsed: {int(time.time() - start_time)}s")
logger.debug(
f"Full status: {json.dumps(status_data.get('status', {}), indent=2)}"
)
# Check for successful states
if state.lower() in [
"running",
"ready",
"active",
"available",
"complete",
]:
if deployment.ingress_url:
spinner.log("[bold green]Deployment ready![/]")
spinner.log(
f"[bold]Ingress URL: {deployment.ingress_url}[/]"
)
else:
spinner.log("[bold green]Deployment ready![/]")
return deployment
# Check for failed states
elif state.lower() in [
"failed",
"error",
"unavailable",
"degraded",
]:
error_message = status_data.get("message", "Unknown error")
if conditions:
error_message = next(
(
c.get("message", error_message)
for c in conditions
if c.get("type", "").lower() == "failed"
),
error_message,
)
# Add more detailed error logging
logger.error(f"Deployment failed with state: {state}")
logger.error(f"Error message: {error_message}")
logger.error(f"Conditions: {json.dumps(conditions, indent=2)}")
logger.error(f"Events: {json.dumps(events, indent=2)}")
raise click.ClickException(
f"Deployment failed: {error_message}\n"
f"State: {state}\n"
f"Conditions: {json.dumps(conditions, indent=2)}\n"
f"Events: {json.dumps(events, indent=2)}"
)
time.sleep(5)
if time.time() - start_time >= timeout:
# Check if deployment exists but we just timed out waiting
try:
final_status = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=deployment_name,
)
if final_status.get("status", {}).get("state", "").lower() in [
"running",
"ready",
"active",
"available",
"complete",
]:
if deployment.ingress_url:
spinner.log("[bold green]Deployment ready![/]")
spinner.log(
f"[bold]Ingress URL: {deployment.ingress_url}[/]"
)
else:
spinner.log("[bold green]Deployment ready![/]")
return deployment
except Exception as e:
logger.error("Timeout check failed", exc_info=True)
# Add timeout debug information
pods = api.list_namespaced_pod(
namespace, label_selector=f"app={deployment_name}"
)
logger.error(
f"Timeout reached. Pod statuses: {json.dumps([p.status for p in pods.items], indent=2)}"
)
raise click.ClickException(
f"Deployment timeout reached\n"
f"Pod statuses: {json.dumps([p.status for p in pods.items], indent=2)}"
)
return deployment
except Exception as e:
logger.error("Deployment failed", exc_info=True)
spinner.log(f"[bold red]Deployment failed: {str(e)}[/]")
raise SystemExit(1)
deploy_command = build_deploy_command()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Any, Dict, Optional
from kubernetes import config as k8s_config
class DynamoDeployment:
def __init__(
self,
name: str,
cluster: str,
admin_console: str,
created_at: str,
created_by: str,
_schema: str = "v1",
ingress_base_url: Optional[str] = None,
):
self.name = name
self.cluster = cluster
self.admin_console = admin_console
self.created_at = created_at
self.created_by = created_by
self._schema = _schema
self.ingress_url = (
f"{ingress_base_url}/api/v2/deployments/{name}?cluster={cluster}"
if ingress_base_url
else None
)
@classmethod
def create_deployment(
cls, deployment_name: str, namespace: str, config: Any
) -> "DynamoDeployment":
# Load kube config and get username
k8s_config.load_kube_config()
username = (
k8s_config.list_kube_config_contexts()[1]
.get("context", {})
.get("user", "unknown")
)
return cls(
name=deployment_name,
cluster=namespace,
admin_console=f"kubectl get dynamodeployment {deployment_name} -n {namespace}",
created_at=datetime.now().isoformat(),
created_by=username,
_schema="v1alpha1",
ingress_base_url=config.get("ingress_base_url"),
)
def get_crd_payload(
self,
bento: str,
scaling_min: int,
scaling_max: int,
instance_type: str,
env_vars: list,
secret: list,
) -> Dict[str, Any]:
# Ensure bento is in name:tag format
if ":" not in bento:
bento = f"{bento}:latest"
payload = {
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoDeployment",
"metadata": {
"name": self.name,
"namespace": self.cluster,
"labels": {
"app.kubernetes.io/name": "dynamo-kubernetes-operator",
"app.kubernetes.io/managed-by": "dynamo-cli",
},
},
"spec": {
"dynamoNim": bento,
"services": {
"main": {
"spec": {
"dynamoNim": bento,
"serviceName": self.name,
"autoscaling": {
"minReplicas": scaling_min or 1,
"maxReplicas": scaling_max or 5,
},
"resources": {
"requests": {
"cpu": "4",
"memory": "16Gi",
"gpu": instance_type or "1",
},
"limits": {
"cpu": "8",
"memory": "32Gi",
"gpu": instance_type or "1",
},
},
"envs": env_vars,
"ingress": {
"enabled": True,
"hostPrefix": self.name,
},
"readinessProbe": {
"httpGet": {"path": "/healthz", "port": 8080},
"initialDelaySeconds": 5,
"periodSeconds": 10,
},
}
}
},
},
}
return payload
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import os
import typing as t
import bentoml
import click
from bentoml_cli.utils import BentoMLCommandGroup
from kubernetes import client, config
from rich import print as rich_print
from rich.table import Table
def build_list_command() -> click.Group:
@click.group(name="list", cls=BentoMLCommandGroup)
def cli():
"""List resources"""
pass
@cli.command(name="bentos")
def list_bentos():
"""List all bentos in local store"""
bentos = bentoml.list()
table = Table(box=None, expand=True)
table.add_column("Tag", overflow="fold")
table.add_column("Service", overflow="fold")
table.add_column("Created At", overflow="fold")
for bento in bentos:
table.add_row(
str(bento.tag),
bento.info.service,
bento.info.creation_time.strftime("%Y-%m-%d %H:%M:%S"),
)
rich_print(table)
@cli.command(name="deployments")
@click.option("--namespace", type=click.STRING, help="Kubernetes namespace")
@click.option("--cluster", type=click.STRING, help="Cluster name")
@click.option(
"-o",
"--output",
help="Display the output of this command.",
type=click.Choice(["json", "table"]),
default="table",
)
def list_deployments(
namespace: str | None = None,
cluster: str | None = None,
output: t.Literal["json", "table"] = "table",
):
"""List deployments"""
config.load_kube_config()
api = client.CustomObjectsApi()
# Define the group, version, and plural for the CRD
group = "nvidia.com"
version = "v1alpha1"
plural = "dynamodeployments"
# Get the deployments from the Kubernetes API
deployments = api.list_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
)
if output == "json":
rich_print(json.dumps(deployments, indent=2))
return
# Create table for output
table = Table(box=None, expand=True)
table.add_column("Name", overflow="fold")
table.add_column("Namespace", overflow="fold")
table.add_column("Status", overflow="fold")
table.add_column("Created At", overflow="fold")
table.add_column("Replicas", overflow="fold")
table.add_column("Resources", overflow="fold")
table.add_column("URL", overflow="fold")
ingress_suffix = os.getenv("DYNAMO_INGRESS_SUFFIX", "local")
for item in deployments.get("items", []):
metadata = item.get("metadata", {})
spec = item.get("spec", {})
services = spec.get("services", {}).get("main", {}).get("spec", {})
resources = services.get("resources", {})
ingress = services.get("ingress", {})
# Format resources
resources_str = (
f"CPU: {resources.get('requests', {}).get('cpu', 'N/A')} / {resources.get('limits', {}).get('cpu', 'N/A')}\n"
f"Memory: {resources.get('requests', {}).get('memory', 'N/A')} / {resources.get('limits', {}).get('memory', 'N/A')}\n"
f"GPU: {resources.get('requests', {}).get('gpu', 'N/A')} / {resources.get('limits', {}).get('gpu', 'N/A')}"
)
# Format URL
url = (
f"https://{ingress.get('hostPrefix', 'N/A')}.{ingress_suffix}"
if ingress.get("enabled", False)
else "N/A"
)
table.add_row(
metadata.get("name", "N/A"),
metadata.get("namespace", "N/A"),
item.get("status", {}).get("state", "Unknown"),
metadata.get("creationTimestamp", "N/A"),
f"{services.get('autoscaling', {}).get('minReplicas', 'N/A')} - {services.get('autoscaling', {}).get('maxReplicas', 'N/A')}",
resources_str,
url,
)
rich_print(table)
return cli
list_command = build_list_command()
......@@ -478,9 +478,14 @@ def serve_http(
arbiter.exit_stack.callback(shutil.rmtree, uds_path, ignore_errors=True)
arbiter.start(
cb=lambda _: logger.info( # type: ignore
(
"Starting Dynamo Service %s (%s/%s) listening on %s://%s:%d (Press CTRL+C to quit)"
if (hasattr(svc, "is_dynamo_component") and svc.is_dynamo_component())
else 'Starting production %s BentoServer from "%s" (Press CTRL+C to quit)',
if (
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
)
else 'Starting production %s BentoServer from "%s" (Press CTRL+C to quit)'
),
*(
(svc.name, *svc.dynamo_address(), scheme, log_host, port)
if (
......
......@@ -47,13 +47,18 @@ class EtcdClient:
"""
Etcd is used for discovery in the DistributedRuntime
"""
async def kv_create_or_validate(self, key: str, value: bytes, lease_id: Optional[int] = None) -> None:
async def kv_create_or_validate(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Atomically create a key if it does not exist, or validate the values are identical if the key exists.
"""
...
async def kv_put(self, key: str, value: bytes, lease_id: Optional[int] = None) -> None:
async def kv_put(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Put a key-value pair into etcd
"""
......@@ -179,7 +184,12 @@ class DisaggregatedRouter:
sequence length thresholds.
"""
def __init__(self, drt: DistributedRuntime, model_name: str, default_max_local_prefill_length: int) -> None:
def __init__(
self,
drt: DistributedRuntime,
model_name: str,
default_max_local_prefill_length: int,
) -> None:
"""
Create a `DisaggregatedRouter` object.
......@@ -239,31 +249,30 @@ class KvMetricsPublisher:
this method will interact with KV router of the same component.
"""
def publish(self, request_active_slots: int,
def publish(
self,
request_active_slots: int,
request_total_slots: int,
kv_active_blocks: int,
kv_total_blocks: int,
num_requests_waiting: int,
gpu_cache_usage_perc: float,
gpu_prefix_cache_hit_rate: float
) -> None:
"""
Update the KV metrics being reported.
"""
...
class ModelDeploymentCard:
"""
A model deployment card is a collection of model information
"""
...
...
class OAIChatPreprocessor:
"""
A preprocessor for OpenAI chat completions
"""
...
async def start(self) -> None:
......@@ -272,12 +281,12 @@ class OAIChatPreprocessor:
"""
...
class Backend:
"""
LLM Backend engine manages resources and concurrency for executing inference
requests in LLM engines (trtllm, vllm, sglang etc)
"""
...
async def start(self, handler: RequestHandler) -> None:
......@@ -286,7 +295,6 @@ class Backend:
"""
...
class OverlapScores:
"""
A collection of prefix matching scores of workers for a given token ids.
......@@ -307,7 +315,9 @@ class KvIndexer:
Create a `KvIndexer` object
"""
def find_matches_for_request(self, token_ids: List[int], lora_id: int) -> OverlapScores:
def find_matches_for_request(
self, token_ids: List[int], lora_id: int
) -> OverlapScores:
"""
Return the overlapping scores of workers for the given token ids.
"""
......@@ -344,17 +354,19 @@ class KvMetricsAggregator:
"""
...
class HttpService:
"""
A HTTP service for dynamo applications.
It is a OpenAI compatible http ingress into the Dynamo Distributed Runtime.
"""
...
class HttpError:
"""
An error that occurred in the HTTP service
"""
...
class HttpAsyncEngine:
......@@ -363,4 +375,5 @@ class HttpAsyncEngine:
python based AsyncEngine that handles HttpError exceptions from Python and
converts them to the Rust version of HttpError
"""
...
......@@ -63,7 +63,7 @@ def dynamo_endpoint(
request_model: Union[Type[BaseModel], Type[Any]], response_model: Type[BaseModel]
) -> Callable:
def decorator(
func: Callable[..., AsyncGenerator[Any, None]]
func: Callable[..., AsyncGenerator[Any, None]],
) -> Callable[..., AsyncGenerator[Any, None]]:
@wraps(func)
async def wrapper(*args, **kwargs) -> AsyncGenerator[Any, None]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment