Unverified Commit 33e72720 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

feat: support k8s target in dynamo deploy command (#1104)

parent 31ff2370
## Provision S3-compatible cloud object storage:
The Dynamo API Server requires a s3-compatible object store to store Dynamo NIMs.
The Dynamo API Server requires a s3-compatible object store to store Dynamo Components.
## Provision PostgreSQL Database
The Dynamo API Server requires a PostgreSQL database to store data entity and version metadata.
......@@ -23,4 +23,4 @@ ai-dynamo-store
uv pip install -e ".[dev]"
#### Run docker container locally
earthly +docker && docker run -it my-registry/ai-dynamo-store:latest
\ No newline at end of file
earthly +docker && docker run -it my-registry/ai-dynamo-store:latest
......@@ -39,7 +39,7 @@ class TimeCreatedUpdated(SQLModel):
)
class DynamoNimUploadStatus(str, Enum):
class DynamoComponentUploadStatus(str, Enum):
Pending = "pending"
Uploading = "uploading"
Success = "success"
......@@ -62,22 +62,22 @@ class TransmissionStrategy(str, Enum):
"""
class CreateDynamoNimRequest(BaseModel):
class CreateDynamoComponentRequest(BaseModel):
name: str
description: str
labels: Optional[Dict[str, str]] = None
class CreateDynamoNimVersionRequest(BaseModel):
class CreateDynamoComponentVersionRequest(BaseModel):
description: str
version: str
manifest: DynamoNimVersionManifestSchema
manifest: DynamoComponentVersionManifestSchema
build_at: datetime
labels: Optional[list[Dict[str, str]]] = None
class UpdateDynamoNimVersionRequest(BaseModel):
manifest: DynamoNimVersionManifestSchema
class UpdateDynamoComponentVersionRequest(BaseModel):
manifest: DynamoComponentVersionManifestSchema
labels: Optional[list[Dict[str, str]]] = None
......@@ -113,8 +113,8 @@ class ListQuerySchema(BaseModel):
class ResourceType(str, Enum):
Organization = "organization"
Cluster = "cluster"
DynamoNim = "dynamo_nim"
DynamoNimVersion = "dynamo_nim_version"
DynamoComponent = "dynamo_component"
DynamoComponentVersion = "dynamo_component_version"
Deployment = "deployment"
DeploymentRevision = "deployment_revision"
TerminalRecord = "terminal_record"
......@@ -156,29 +156,29 @@ class UserSchema(BaseModel):
last_name: str
class DynamoNimVersionApiSchema(BaseModel):
class DynamoComponentVersionApiSchema(BaseModel):
route: str
doc: str
input: str
output: str
class DynamoNimVersionManifestSchema(BaseModel):
class DynamoComponentVersionManifestSchema(BaseModel):
service: str
bentoml_version: str
apis: Dict[str, DynamoNimVersionApiSchema]
bentoml_version: Optional[str] = None
apis: Dict[str, DynamoComponentVersionApiSchema]
size_bytes: int
def _validate_manifest(v):
try:
# Validate that the 'manifest' matches the DynamoManifestSchema
return DynamoNimVersionManifestSchema.model_validate(v).model_dump()
return DynamoComponentVersionManifestSchema.model_validate(v).model_dump()
except ValidationError as e:
raise ValueError(f"Invalid manifest schema: {e}")
class DynamoNimVersionSchema(ResourceSchema):
class DynamoComponentVersionSchema(ResourceSchema):
bento_repository_uid: str
version: str
description: str
......@@ -192,7 +192,7 @@ class DynamoNimVersionSchema(ResourceSchema):
presigned_urls_deprecated: bool = False
transmission_strategy: TransmissionStrategy
upload_id: str = ""
manifest: Optional[Union[DynamoNimVersionManifestSchema, Dict[str, Any]]]
manifest: Optional[Union[DynamoComponentVersionManifestSchema, Dict[str, Any]]]
build_at: datetime
@field_validator("manifest")
......@@ -200,31 +200,31 @@ class DynamoNimVersionSchema(ResourceSchema):
return _validate_manifest(v)
class DynamoNimVersionFullSchema(DynamoNimVersionSchema):
repository: DynamoNimSchema
class DynamoComponentVersionFullSchema(DynamoComponentVersionSchema):
repository: DynamoComponentSchema
class DynamoNimSchema(ResourceSchema):
latest_bento: Optional[DynamoNimVersionSchema]
latest_bentos: Optional[List[DynamoNimVersionSchema]]
class DynamoComponentSchema(ResourceSchema):
latest_bento: Optional[DynamoComponentVersionSchema]
latest_bentos: Optional[List[DynamoComponentVersionSchema]]
n_bentos: int
description: str
class DynamoNimSchemaWithDeploymentsSchema(DynamoNimSchema):
class DynamoComponentSchemaWithDeploymentsSchema(DynamoComponentSchema):
deployments: List[str] = [] # mocked for now
class DynamoNimSchemaWithDeploymentsListSchema(BaseListSchema):
items: List[DynamoNimSchemaWithDeploymentsSchema]
class DynamoComponentSchemaWithDeploymentsListSchema(BaseListSchema):
items: List[DynamoComponentSchemaWithDeploymentsSchema]
class DynamoNimVersionsWithNimListSchema(BaseListSchema):
items: List[DynamoNimVersionWithNimSchema]
class DynamoComponentVersionsWithNimListSchema(BaseListSchema):
items: List[DynamoComponentVersionWithNimSchema]
class DynamoNimVersionWithNimSchema(DynamoNimVersionSchema):
repository: DynamoNimSchema
class DynamoComponentVersionWithNimSchema(DynamoComponentVersionSchema):
repository: DynamoComponentSchema
"""
......@@ -232,16 +232,16 @@ class DynamoNimVersionWithNimSchema(DynamoNimVersionSchema):
"""
class BaseDynamoNimModel(TimeCreatedUpdated, AsyncAttrs):
class BaseDynamoComponentModel(TimeCreatedUpdated, AsyncAttrs):
deleted_at: Optional[datetime] = SQLField(nullable=True, default=None)
class DynamoNimVersionBase(BaseDynamoNimModel):
class DynamoComponentVersionBase(BaseDynamoComponentModel):
version: str = SQLField(default=None)
description: str = SQLField(default="")
file_path: Optional[str] = SQLField(default=None)
file_oid: Optional[str] = SQLField(default=None) # Used for GIT Lfs access
upload_status: DynamoNimUploadStatus = SQLField()
upload_status: DynamoComponentUploadStatus = SQLField()
image_build_status: ImageBuildStatus = SQLField()
image_build_status_syncing_at: Optional[datetime] = SQLField(default=None)
image_build_status_updated_at: Optional[datetime] = SQLField(default=None)
......@@ -249,7 +249,7 @@ class DynamoNimVersionBase(BaseDynamoNimModel):
upload_finished_at: Optional[datetime] = SQLField(default=None)
upload_finished_reason: str = SQLField(default="")
manifest: Optional[
Union[DynamoNimVersionManifestSchema, Dict[str, Any]]
Union[DynamoComponentVersionManifestSchema, Dict[str, Any]]
] = SQLField(
default=None, sa_column=Column(JSON)
) # JSON-like field for the manifest
......@@ -260,6 +260,6 @@ class DynamoNimVersionBase(BaseDynamoNimModel):
return _validate_manifest(v)
class DynamoNimBase(BaseDynamoNimModel):
class DynamoComponentBase(BaseDynamoComponentModel):
name: str = SQLField(default="", unique=True)
description: str = SQLField(default="")
......@@ -40,13 +40,13 @@ from .utils import build_latest_revision_from_cr, get_deployment_status, get_url
router = APIRouter(prefix="/api/v2/deployments", tags=["deployments"])
def sanitize_deployment_name(name: Optional[str], dynamo_nim: str) -> str:
def sanitize_deployment_name(name: Optional[str], dynamo_component: str) -> str:
"""
Resolve a name for the DynamoGraphDeployment that will work safely in k8s
Args:
name: Optional custom name
dynamo_nim: Bento name and version (format: name:version)
dynamo_component: Component name and version (format: name:version)
Returns:
A unique deployment name that is at most 63 characters
......@@ -55,11 +55,11 @@ def sanitize_deployment_name(name: Optional[str], dynamo_nim: str) -> str:
# If name is provided, truncate it to 63
base_name = name[:63]
else:
# Generate base name from dynamoNim
dynamo_nim_parts = dynamo_nim.split(":")
if len(dynamo_nim_parts) != 2:
raise ValueError("Invalid dynamoNim format, expected 'name:version'")
base_name = f"dep-{dynamo_nim_parts[0]}-{dynamo_nim_parts[1]}"
# Generate base name from dynamo_component
dynamo_component_parts = dynamo_component.split(":")
if len(dynamo_component_parts) != 2:
raise ValueError("Invalid dynamo_component format, expected 'name:version'")
base_name = f"dep-{dynamo_component_parts[0]}-{dynamo_component_parts[1]}"
# Truncate to 63 chars
base_name = base_name[:63]
......@@ -91,7 +91,7 @@ async def create_deployment(deployment: CreateDeploymentSchema):
created_crd = create_dynamo_deployment(
name=deployment_name,
namespace=kube_namespace,
dynamo_nim=deployment.bento,
dynamo_component=deployment.bento or deployment.component,
labels={
"ngc-organization": ownership["organization_id"],
"ngc-user": ownership["user_id"],
......
......@@ -73,7 +73,7 @@ def create_custom_resource(
def create_dynamo_deployment(
name: str,
namespace: str,
dynamo_nim: str,
dynamo_component: str,
labels: Dict[str, str],
envs: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, Any]:
......@@ -83,7 +83,7 @@ def create_dynamo_deployment(
Args:
name: Deployment name
namespace: Target namespace
dynamo_nim: Bento name and version (format: name:version)
dynamo_component: Bento name and version (format: name:version)
labels: Resource labels
envs: Optional list of environment variables
......@@ -95,7 +95,7 @@ def create_dynamo_deployment(
"kind": "DynamoGraphDeployment",
"metadata": {"name": name, "namespace": namespace, "labels": labels},
"spec": {
"dynamoGraph": dynamo_nim,
"dynamoGraph": dynamo_component,
"services": {},
"envs": envs if envs else [],
},
......
......@@ -22,7 +22,7 @@ from sqlalchemy import Column, DateTime
from sqlmodel import Field as SQLField
from sqlmodel import UniqueConstraint
from .components import DynamoNimBase, DynamoNimVersionBase
from .components import DynamoComponentBase, DynamoComponentVersionBase
"""
This file stores all of the models/tables stored in the SQL database.
......@@ -72,12 +72,14 @@ def make_aware(dt: Optional[datetime]) -> Optional[datetime]:
return dt
class DynamoNimVersion(DynamoNimVersionBase, table=True):
"""A row in the dynamo nim table."""
class DynamoComponentVersion(DynamoComponentVersionBase, table=True):
"""A row in the dynamo component table."""
__tablename__ = "dynamonimversion"
__tablename__ = "dynamocomponentversion"
__table_args__ = (
UniqueConstraint("dynamo_nim_id", "version", name="version_unique_per_nim"),
UniqueConstraint(
"dynamo_component_id", "version", name="version_unique_per_component"
),
)
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
......@@ -95,13 +97,13 @@ class DynamoNimVersion(DynamoNimVersionBase, table=True):
# upload_finished_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
build_at: datetime = SQLField(sa_column=Column(DateTime, nullable=False))
dynamo_nim_id: str = SQLField(foreign_key="dynamonim.id")
dynamo_component_id: str = SQLField(foreign_key="dynamocomponent.id")
class DynamoNim(DynamoNimBase, table=True):
"""A row in the dynamo nim table."""
class DynamoComponent(DynamoComponentBase, table=True):
"""A row in the dynamo component table."""
__tablename__ = "dynamonim"
__tablename__ = "dynamocomponent"
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
......
......@@ -56,7 +56,8 @@ class DeploymentConfigSchema(BaseModel):
class UpdateDeploymentSchema(DeploymentConfigSchema):
bento: str
bento: Optional[str] = None
component: Optional[str] = None
class CreateDeploymentSchema(UpdateDeploymentSchema):
......
......@@ -38,14 +38,14 @@ func NewApiStoreClient(endpoint string) *ApiStoreClient {
}
func (c *ApiStoreClient) GetDynamoComponent(ctx context.Context, name, version string) (component *schemas.DynamoComponent, err error) {
url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s", name, version))
url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_components/%s/versions/%s", name, version))
component = &schemas.DynamoComponent{}
_, err = DoJsonRequest(ctx, "GET", url_, nil, nil, nil, component, nil)
return
}
func (c *ApiStoreClient) PresignDynamoComponentDownloadURL(ctx context.Context, name, version string) (component *schemas.DynamoComponent, err error) {
url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s/presign_download_url", name, version))
url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_components/%s/versions/%s/presign_download_url", name, version))
component = &schemas.DynamoComponent{}
_, err = DoJsonRequest(ctx, "PATCH", url_, nil, nil, nil, component, nil)
return
......
......@@ -1000,7 +1000,7 @@ func (r *DynamoComponentReconciler) generateImageBuilderPodTemplateSpec(ctx cont
r.Recorder.Eventf(opt.DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got presigned url for dynamoComponent %s from api store service", opt.DynamoComponent.Spec.DynamoComponent)
dynamoComponentDownloadURL = dynamoComponent_.PresignedDownloadUrl
} else {
dynamoComponentDownloadURL = fmt.Sprintf("%s/api/v1/dynamo_nims/%s/versions/%s/download", apiStoreConf.Endpoint, dynamoComponentRepositoryName, dynamoComponentVersion)
dynamoComponentDownloadURL = fmt.Sprintf("%s/api/v1/dynamo_components/%s/versions/%s/download", apiStoreConf.Endpoint, dynamoComponentRepositoryName, dynamoComponentVersion)
}
}
......
......@@ -154,7 +154,7 @@ func RetrieveDynamoGraphDownloadURL(ctx context.Context, dynamoDeployment *v1alp
recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got presigned url for dynamo graph %s from api store service", dynamoDeployment.Spec.DynamoGraph)
dynamoGraphDownloadURL = dynamoComponent_.PresignedDownloadUrl
} else {
dynamoGraphDownloadURL = fmt.Sprintf("%s/api/v1/dynamo_nims/%s/versions/%s/download", apiStoreConf.Endpoint, dynamoComponentRepositoryName, dynamoComponentVersion)
dynamoGraphDownloadURL = fmt.Sprintf("%s/api/v1/dynamo_components/%s/versions/%s/download", apiStoreConf.Endpoint, dynamoComponentRepositoryName, dynamoComponentVersion)
}
return &dynamoGraphDownloadURL, nil
......
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import typing as t
from bentoml._internal.cloud import BentoCloudClient
from bentoml._internal.cloud.client import RestApiClient
from bentoml._internal.cloud.config import CloudClientConfig, CloudClientContext
from bentoml._internal.cloud.deployment import DeploymentConfigParameters
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml.exceptions import BentoMLException, CLIException, CloudRESTApiClientError
from rich.console import Console
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.core.protocol.deployment import Deployment as ProtocolDeployment
from dynamo.sdk.core.protocol.deployment import (
DeploymentManager,
DeploymentResponse,
DeploymentStatus,
)
# Configure logging to suppress INFO HTTP logs
logging.getLogger("httpx").setLevel(logging.WARNING) # HTTP client library logs
logging.getLogger("httpcore").setLevel(logging.WARNING) # HTTP core library logs
configure_dynamo_logging()
logger = logging.getLogger(__name__)
console = Console(highlight=False)
class BentoCloudDeploymentManager(DeploymentManager):
"""
Implementation of DeploymentManager that talks to the BentoCloud deployment API.
Handles all BentoCloud-specific config parameter building, error handling, and API calls.
Accepts **kwargs for backend-specific options.
Raises exceptions for errors; CLI handles user interaction.
"""
def __init__(self, endpoint: str):
self.endpoint = endpoint.rstrip("/")
self._cloud_client = self._login_to_cloud()
def _login_to_cloud(self) -> "BentoCloudClient":
"""Connect to Dynamo Cloud and return an authenticated BentoCloudClient."""
try:
logger.info(f"Running against Dynamo Cloud at {self.endpoint}")
api_token = "" # Using empty string for now as it's not used
cloud_rest_client = RestApiClient(self.endpoint, api_token)
user = cloud_rest_client.v1.get_current_user()
if user is None:
raise CLIException("current user is not found")
org = cloud_rest_client.v1.get_current_organization()
if org is None:
raise CLIException("current organization is not found")
current_context_name = CloudClientConfig.get_config().current_context_name
cloud_context = BentoMLContainer.cloud_context.get()
ctx = CloudClientContext(
name=cloud_context
if cloud_context is not None
else current_context_name,
endpoint=self.endpoint,
api_token=api_token,
email=user.email,
)
ctx.save()
logger.debug(
f"Configured Dynamo Cloud credentials (current-context: {ctx.name})"
)
logger.debug(f"Logged in as {user.email} at {org.name} organization")
return BentoCloudClient(endpoint=self.endpoint, api_key=api_token)
except CloudRESTApiClientError as e:
if e.error_code == 401:
console.print(
f":police_car_light: Error validating token: HTTP 401: Bad credentials ({self.endpoint}/api-token)"
)
else:
console.print(
f":police_car_light: Error validating token: HTTP {e.error_code}"
)
raise BentoMLException(f"Failed to login to Dynamo Cloud: {str(e)}") from e
except Exception as e:
console.print(
f":police_car_light: Error connecting to Dynamo Cloud: {str(e)}"
)
raise BentoMLException(f"Failed to login to Dynamo Cloud: {str(e)}") from e
def create_deployment(
self, deployment: ProtocolDeployment, **kwargs
) -> DeploymentResponse:
dev = kwargs.get("dev", False)
config_params = DeploymentConfigParameters(
name=deployment.name,
bento=deployment.pipeline or deployment.namespace,
envs=deployment.envs,
secrets=None,
cli=True,
dev=dev,
)
try:
config_params.verify()
except BentoMLException as e:
raise RuntimeError((400, f"Config verification error: {str(e)}", None))
try:
deployment_obj = self._cloud_client.deployment.create(
deployment_config_params=config_params
)
return deployment_obj.to_dict()
except BentoMLException as e:
error_msg = str(e)
if "already exists" in error_msg:
raise RuntimeError((409, error_msg, None)) from e
raise RuntimeError((500, error_msg, None)) from e
def update_deployment(
self, deployment_id: str, deployment: ProtocolDeployment
) -> DeploymentResponse:
config_params = DeploymentConfigParameters(
name=deployment_id,
envs=deployment.envs,
cli=True,
)
try:
config_params.verify(create=False)
except BentoMLException as e:
raise RuntimeError((400, f"Config verification error: {str(e)}", None))
try:
deployment = self._cloud_client.deployment.update(
deployment_config_params=config_params
)
return deployment.to_dict()
except BentoMLException as e:
raise RuntimeError((500, f"Deployment update error: {str(e)}", None)) from e
def get_deployment(self, deployment_id: str) -> DeploymentResponse:
try:
deployment_obj = self._cloud_client.deployment.get(name=deployment_id)
return deployment_obj.to_dict()
except BentoMLException as e:
error_msg = str(e)
raise RuntimeError((404, error_msg, None)) from e
def list_deployments(self) -> list[DeploymentResponse]:
try:
deployments = self._cloud_client.deployment.list()
return [
d.to_dict() if hasattr(d, "to_dict") else vars(d) for d in deployments
]
except BentoMLException as e:
error_msg = str(e)
raise RuntimeError((500, error_msg, None)) from e
def delete_deployment(self, deployment_id: str) -> None:
try:
self._cloud_client.deployment.delete(name=deployment_id)
except BentoMLException as e:
error_msg = str(e)
raise RuntimeError((404, error_msg, None)) from e
def get_status(
self,
deployment_id: str,
) -> DeploymentStatus:
dep = self._cloud_client.deployment.get(deployment_id)
status = dep._schema.status if dep._schema.status else "unknown"
# Escape any characters that are interpreted as markup
status = status.replace("[", "\\[")
if status == "running":
return DeploymentStatus.RUNNING
elif status == "failed":
return DeploymentStatus.FAILED
elif status == "deploying":
return DeploymentStatus.IN_PROGRESS
elif status == "terminated":
return DeploymentStatus.TERMINATED
else:
return DeploymentStatus.PENDING
def wait_until_ready(
self, deployment_id: str, timeout: int = 3600
) -> t.Tuple[DeploymentResponse, bool]:
dep = self._cloud_client.deployment.get(name=deployment_id)
retcode = dep.wait_until_ready(timeout=timeout)
if retcode != 0:
return dep.to_dict(), False
return dep.to_dict(), True
def get_endpoint_urls(
self,
deployment_id: str,
) -> list[str]:
dep = self.get_deployment(deployment_id)
latest = self._cloud_client.deployment._client.v2.get_deployment(
dep["name"], dep["cluster"]
)
urls = latest.urls if hasattr(latest, "urls") else None
return urls if urls is not None else []
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import tarfile
import time
import typing as t
from datetime import datetime
import requests
from dynamo.sdk.core.protocol.deployment import (
Deployment,
DeploymentManager,
DeploymentResponse,
DeploymentStatus,
Service,
)
class KubernetesDeploymentManager(DeploymentManager):
"""
Implementation of DeploymentManager that talks to the dynamo_store deployment API.
Accepts **kwargs for backend-specific options.
Handles error reporting and payload construction according to the API schema.
Raises exceptions for errors; CLI handles user interaction.
"""
def __init__(self, endpoint: str):
self.endpoint = endpoint.rstrip("/")
self.session = requests.Session()
self.namespace = "default"
def _upload_pipeline(self, pipeline: str, entry_service: Service, **kwargs) -> None:
"""Upload the entire pipeline as a single component/version, with a manifest of all services."""
session = self.session
endpoint = self.endpoint
pipeline_name, pipeline_version = pipeline.split(":")
# Check if component exists before POST
comp_url = f"{endpoint}/api/v1/dynamo_components"
comp_get_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}"
comp_exists = False
comp_resp = session.get(comp_get_url)
if comp_resp.status_code == 200:
comp_exists = True
if not comp_exists:
comp_payload = {
"name": pipeline_name,
"description": "Registered by Dynamo's KubernetesDeploymentManager",
}
resp = session.post(comp_url, json=comp_payload)
if resp.status_code not in (200, 201, 409):
print(resp.status_code)
raise RuntimeError(f"Failed to create component: {resp.text}")
# Check if version exists before POST
ver_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions"
ver_get_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions/{pipeline_version}"
ver_exists = False
ver_resp = session.get(ver_get_url)
if ver_resp.status_code == 200:
ver_exists = True
if not ver_exists:
build_at = kwargs.get("build_at")
if not build_at:
build_at = datetime.utcnow()
if isinstance(build_at, str):
try:
build_at = datetime.fromisoformat(build_at)
except Exception:
build_at = datetime.utcnow()
manifest = {
"service": entry_service.service_name,
"apis": entry_service.apis,
"size_bytes": entry_service.size_bytes,
}
ver_payload = {
"name": entry_service.name,
"description": f"Auto-registered version for {pipeline}",
"resource_type": "dynamo_component_version",
"version": entry_service.version,
"manifest": manifest,
"build_at": build_at.isoformat(),
}
resp = session.post(ver_url, json=ver_payload)
if resp.status_code not in (200, 201, 409):
raise RuntimeError(f"Failed to create component version: {resp.text}")
# Upload the graph
build_dir = entry_service.path
if not build_dir or not os.path.isdir(build_dir):
raise FileNotFoundError(f"Built pipeline directory not found: {build_dir}")
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
tar.add(build_dir, arcname=".")
tar_stream.seek(0)
upload_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions/{pipeline_version}/upload"
upload_headers = {"Content-Type": "application/x-tar"}
resp = session.put(upload_url, data=tar_stream, headers=upload_headers)
if resp.status_code not in (200, 201, 204):
raise RuntimeError(f"Failed to upload pipeline artifact: {resp.text}")
def create_deployment(self, deployment: Deployment, **kwargs) -> DeploymentResponse:
"""Create a new deployment. Ensures all components and versions are registered/uploaded before creating the deployment."""
# For each service/component in the deployment, upload it to the API store
self._upload_pipeline(
pipeline=deployment.pipeline or deployment.namespace,
entry_service=deployment.entry_service,
**kwargs,
)
# Now create the deployment
dev = kwargs.get("dev", False)
payload = {
"name": deployment.name,
"component": deployment.pipeline or deployment.namespace,
"dev": dev,
"envs": deployment.envs,
}
payload = {k: v for k, v in payload.items() if v is not None}
url = f"{self.endpoint}/api/v2/deployments"
try:
resp = self.session.post(url, json=payload)
resp.raise_for_status()
return resp.json()
except requests.HTTPError as e:
status = e.response.status_code if e.response is not None else None
msg = e.response.text if e.response is not None else str(e)
if "already exists" in msg:
raise RuntimeError((409, msg, None)) from e
raise RuntimeError((status, msg, url)) from e
def update_deployment(
self, deployment_id: str, deployment: Deployment, **kwargs
) -> None:
"""Update an existing deployment."""
access_authorization = kwargs.get("access_authorization", False)
payload = {
"name": deployment.name,
"component": deployment.pipeline or deployment.namespace,
"envs": deployment.envs,
"services": deployment.services,
"access_authorization": access_authorization,
}
payload = {k: v for k, v in payload.items() if v is not None}
url = f"{self.endpoint}/api/v2/deployments/{deployment_id}"
try:
resp = self.session.put(url, json=payload)
resp.raise_for_status()
except requests.HTTPError as e:
status = e.response.status_code if e.response is not None else None
msg = e.response.text if e.response is not None else str(e)
raise RuntimeError((status, msg, url))
def get_deployment(self, deployment_id: str) -> DeploymentResponse:
"""Get deployment details."""
url = f"{self.endpoint}/api/v2/deployments/{deployment_id}"
try:
resp = self.session.get(url)
resp.raise_for_status()
return resp.json()
except requests.HTTPError as e:
status = e.response.status_code if e.response is not None else None
msg = e.response.text if e.response is not None else str(e)
raise RuntimeError((status, msg, url)) from e
def list_deployments(self) -> t.List[DeploymentResponse]:
"""List all deployments."""
url = f"{self.endpoint}/api/v2/deployments"
try:
resp = self.session.get(url)
resp.raise_for_status()
data = resp.json()
return data.get("items", [])
except requests.HTTPError as e:
msg = e.response.text if e.response is not None else str(e)
raise RuntimeError(
(e.response.status_code if e.response else None, msg, url)
)
def delete_deployment(self, deployment_id: str) -> None:
"""Delete a deployment."""
url = f"{self.endpoint}/api/v2/deployments/{deployment_id}"
try:
resp = self.session.delete(url)
resp.raise_for_status()
except requests.HTTPError as e:
status = e.response.status_code if e.response is not None else None
msg = e.response.text if e.response is not None else str(e)
raise RuntimeError((status, msg, url)) from e
def get_status(
self,
deployment_id: str,
) -> DeploymentStatus:
dep = self.get_deployment(deployment_id)
status = dep.get("status", "unknown")
if status == "running":
return DeploymentStatus.RUNNING
elif status == "failed":
return DeploymentStatus.FAILED
elif status == "deploying":
return DeploymentStatus.IN_PROGRESS
elif status == "terminated":
return DeploymentStatus.TERMINATED
else:
return DeploymentStatus.PENDING
def wait_until_ready(
self, deployment_id: str, timeout: int = 3600
) -> t.Tuple[DeploymentResponse, bool]:
start = time.time()
while time.time() - start < timeout:
dep = self.get_deployment(deployment_id)
status = self.get_status(deployment_id)
if status == DeploymentStatus.RUNNING:
return dep, True
elif status == DeploymentStatus.FAILED:
return dep, False
time.sleep(5)
return dep, False
def get_endpoint_urls(
self,
deployment_id: str,
) -> t.List[str]:
dep = self.get_deployment(deployment_id)
return dep.get("urls", [])
......@@ -77,32 +77,52 @@ class DeploymentStatus(str, Enum):
"""Status of a dynamo deployment."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
IN_PROGRESS = "in progress"
RUNNING = "running"
FAILED = "failed"
TERMINATED = "terminate"
SCALED_TO_ZERO = "scaled_to_zero"
SCALED_TO_ZERO = "scaled to zero"
@property
def color(self) -> str:
return {
DeploymentStatus.RUNNING: "green",
DeploymentStatus.IN_PROGRESS: "yellow",
DeploymentStatus.PENDING: "yellow",
DeploymentStatus.FAILED: "red",
DeploymentStatus.TERMINATED: "red",
DeploymentStatus.SCALED_TO_ZERO: "yellow",
}.get(self, "white")
@dataclass
class ScalingPolicy:
policy: str
parameters: dict[str, t.Union[int, float, str]] = field(default_factory=dict)
parameters: t.Dict[str, t.Union[int, float, str]] = field(default_factory=dict)
@dataclass
class Env:
name: str
value: str = ""
@dataclass
class Service:
"""A single component."""
"""The entry service of a deployment."""
service_name: str
name: str
namespace: str
class_name: str
id: str | None = None
cmd: list[str] = field(default_factory=list)
version: str
path: str
cmd: t.List[str] = field(default_factory=list)
resources: Resources | None = None
environment: dict[str, str] = field(default_factory=dict)
secrets: list[str] = field(default_factory=list)
envs: t.List[Env] = field(default_factory=list)
secrets: t.List[str] = field(default_factory=list)
scaling: ScalingPolicy = field(default_factory=lambda: ScalingPolicy(policy="none"))
apis: dict = field(default_factory=dict)
size_bytes: int = 0
@dataclass
......@@ -111,21 +131,28 @@ class Deployment:
name: str
namespace: str
services: list[Service] = field(default_factory=list)
pipeline: t.Optional[str] = None
entry_service: t.Optional[Service] = None
envs: t.Optional[t.List[dict]] = None
# Type alias for deployment responses (e.g., from backend APIs)
DeploymentResponse = t.Dict[str, t.Any]
class DeploymentManager(ABC):
"""Interface for managing dynamo graph deployments."""
@abstractmethod
def create_deployment(self, deployment: Deployment) -> str:
def create_deployment(self, deployment: Deployment, **kwargs) -> DeploymentResponse:
"""Create new deployment.
Args:
deployment: Deployment configuration
**kwargs: Additional backend-specific arguments
Returns:
The ID of the created deployment
The created deployment
"""
pass
......@@ -140,7 +167,7 @@ class DeploymentManager(ABC):
pass
@abstractmethod
def get_deployment(self, deployment_id: str) -> dict[str, t.Any]:
def get_deployment(self, deployment_id: str) -> DeploymentResponse:
"""Get deployment details.
Args:
......@@ -152,7 +179,7 @@ class DeploymentManager(ABC):
pass
@abstractmethod
def list_deployments(self) -> list[dict[str, t.Any]]:
def list_deployments(self) -> t.List[DeploymentResponse]:
"""List all deployments.
Returns:
......@@ -170,10 +197,13 @@ class DeploymentManager(ABC):
pass
@abstractmethod
def get_status(self, deployment_id: str) -> DeploymentStatus:
def get_status(
self,
deployment_id: str,
) -> DeploymentStatus:
"""Get the current status of a deployment.
Args:
Args (one of):
deployment_id: The ID of the deployment
Returns:
......@@ -182,7 +212,9 @@ class DeploymentManager(ABC):
pass
@abstractmethod
def wait_until_ready(self, deployment_id: str, timeout: int = 3600) -> bool:
def wait_until_ready(
self, deployment_id: str, timeout: int = 3600
) -> t.Tuple[DeploymentResponse, bool]:
"""Wait until a deployment is ready.
Args:
......@@ -190,15 +222,18 @@ class DeploymentManager(ABC):
timeout: Maximum time to wait in seconds
Returns:
True if deployment became ready, False if timed out
Tuple of deployment response and a boolean indicating if the deployment became ready
"""
pass
@abstractmethod
def get_endpoint_urls(self, deployment_id: str) -> list[str]:
def get_endpoint_urls(
self,
deployment_id: str,
) -> t.List[str]:
"""Get the list of endpoint urls attached to a deployment.
Args:
Args (one of):
deployment_id: The ID of the deployment
Returns:
......
......@@ -22,6 +22,8 @@ from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar
from fastapi import FastAPI
from dynamo.sdk.core.protocol.deployment import Env
T = TypeVar("T", bound=object)
......@@ -74,6 +76,17 @@ class ServiceInterface(Generic[T], ABC):
"""Get the service configuration"""
pass
@property
def dependencies(self) -> Dict[str, "DependencyInterface"]:
"""Get the service dependencies"""
return {}
@property
@abstractmethod
def envs(self) -> List[Env]:
"""Get the service's environment variables"""
return []
@property
@abstractmethod
def inner(self) -> Type[T]:
......@@ -110,20 +123,12 @@ class ServiceInterface(Generic[T], ABC):
"""Inject configuration from environment into service configs"""
pass
@property
# @abstractmethod
def dependencies(self) -> Dict[str, "DependencyInterface"]:
"""Get the service dependencies"""
return {}
# @property
@abstractmethod
def get_service_configs(self) -> Dict[str, ServiceConfig]:
"""Get all services"""
return {}
@property
# @abstractmethod
def service_configs(self) -> List[ServiceConfig]:
"""Get all service configs"""
return []
......
......@@ -23,6 +23,7 @@ from _bentoml_sdk.service.dependency import Dependency as BentoDependency
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.deployment import Env
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
......@@ -154,6 +155,10 @@ class BentoServiceAdapter(ServiceMixin, ServiceInterface[T]):
def inner(self) -> Type[T]:
return self._bentoml_service.inner
@property
def envs(self) -> List[Env]:
return self._bentoml_service.envs
def get_endpoints(self) -> Dict[str, DynamoEndpointInterface]:
return self._endpoints
......
......@@ -29,6 +29,7 @@ from circus.watcher import Watcher
from fastapi import FastAPI
from dynamo.sdk.core.decorators.endpoint import DynamoClient, DynamoEndpoint
from dynamo.sdk.core.protocol.deployment import Env
from dynamo.sdk.core.protocol.interface import (
DependencyInterface,
DeploymentTarget,
......@@ -115,6 +116,10 @@ class LocalService(ServiceMixin, ServiceInterface[T]):
def config(self) -> ServiceConfig:
return self._config
@property
def envs(self) -> List[Env]:
return self._config.get("envs", [])
@property
def inner(self) -> Type[T]:
return self._inner_cls
......
......@@ -23,6 +23,9 @@ import os
import sys
from typing import Optional, TypeVar
import yaml
from dynamo.sdk.core.protocol.deployment import Service
from dynamo.sdk.lib.service import DynamoService
logger = logging.getLogger(__name__)
......@@ -191,3 +194,62 @@ def _do_import(import_str: str, working_dir: str) -> DynamoService:
object.__setattr__(instance, "_import_str", import_str_val)
return instance
def _get_dir_size(path: str) -> int:
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
if os.path.isfile(fp):
total += os.path.getsize(fp)
logger.info(f"Total size of {path}: {total} bytes")
return total
def load_entry_service(
pipeline_tag: str, build_dir: str = "~/bentoml/bentos"
) -> Service:
"""
Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance.
"""
if ":" not in pipeline_tag:
raise ValueError("pipeline_tag must be in the form name:version")
name, version = pipeline_tag.split(":", 1)
graph_dir = os.path.expanduser(f"{build_dir}/{name}/{version}")
if not os.path.isdir(graph_dir):
raise FileNotFoundError(f"Pipeline directory not found: {graph_dir}")
config_path = os.path.join(graph_dir, "bento.yaml")
if not os.path.isfile(config_path):
raise FileNotFoundError(
f"Pipeline config (bento.yaml) not found in {graph_dir}"
)
with open(config_path, encoding="utf-8") as f:
graph_cfg = yaml.safe_load(f)
# Add src_dir to sys.path if needed
src_dir = os.path.join(graph_dir, "src")
if src_dir not in sys.path:
sys.path.insert(0, src_dir)
# Compute size_bytes as the total size of the bento directory
size_bytes = _get_dir_size(graph_dir)
service_name = graph_cfg.get("service")
for svc in graph_cfg.get("services", []):
svc_name = svc["name"]
if svc_name != graph_cfg.get("entry_service"):
continue
entry_service = Service(
service_name=service_name,
name=svc_name,
namespace="default",
version=version,
path=graph_dir,
envs=graph_cfg.get("envs", []),
apis={},
size_bytes=size_bytes,
)
return entry_service
raise ValueError("No entry service found in the pipeline")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment