Unverified Commit 7a341f86 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: simplify k8s deployment (#1708)

parent 5505507b
......@@ -180,7 +180,6 @@ all-docker:
ARG DOCKER_SERVER=my-registry
ARG IMAGE_TAG=latest
BUILD ./deploy/cloud/operator+docker --DOCKER_SERVER=$DOCKER_SERVER --IMAGE_TAG=$IMAGE_TAG
BUILD ./deploy/cloud/api-store+docker --DOCKER_SERVER=$DOCKER_SERVER --IMAGE_TAG=$IMAGE_TAG
all-lint:
BUILD ./deploy/cloud/operator+lint
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
.venv/
.env
*.egg-info
\ No newline at end of file
# Local development env
DB_USER="postgres"
DB_PASSWORD="pgadmin"
DB_HOST="localhost"
DB_PORT=5432
DB_NAME="postgres"
API_DATABASE_PORT="8001"
API_BACKEND_URL="http://localhost:8001"
DEFAULT_KUBE_NAMESPACE="dynamo"
DYN_OBJECT_STORE_ENDPOINT="http://localhost:9000"
DYN_OBJECT_STORE_KEY="dynamo-minio"
DYN_OBJECT_STORE_ID="dynamo-minio"
DYN_OBJECT_STORE_REGION="local"
DYN_OBJECT_STORE_BUCKET="dynamo-storage"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
VERSION 0.8
uv-source:
FROM ghcr.io/astral-sh/uv:latest
SAVE ARTIFACT /uv
uv-base:
FROM python:3.12-slim
COPY +uv-source/uv /bin/uv
RUN uv venv
ENV PATH="/app/.venv/bin:$PATH"
WORKDIR /app
COPY uv.lock pyproject.toml README.md /app
RUN uv sync --frozen --no-install-project --no-dev --no-install-workspace --no-editable
# Copy project files
COPY ai_dynamo_store ai_dynamo_store
RUN uv pip install .
# Save the entire app directory with installed packages
SAVE ARTIFACT /app /app
docker:
ARG DOCKER_SERVER=my-registry
ARG IMAGE_TAG=latest
ARG IMAGE=dynamo-api-store
FROM nvcr.io/nvidia/distroless/python:3.12-v3.4.13-dev
# Copy the entire installed environment from uv-base
COPY +uv-base/app /app
WORKDIR /app
ENV PATH="/app/.venv/bin:$PATH"
ENTRYPOINT ["ai-dynamo-store"]
SAVE IMAGE --push $DOCKER_SERVER/$IMAGE:$IMAGE_TAG
## Provision S3-compatible cloud object storage:
The Dynamo API Server requires a s3-compatible object store to store Dynamo Components.
## Provision PostgreSQL Database
The Dynamo API Server requires a PostgreSQL database to store data entity and version metadata.
## Contributing
### Initialize a new virtual environment with uv
uv venv
### Activate the virtual environment
source .venv/bin/activate
### Install service
uv pip install .
### Start the service
ai-dynamo-store
### (Optional) Development workflow
#### Install dev dependencies
uv pip install -e ".[dev]"
#### Run docker container locally
earthly +docker && docker run -it my-registry/ai-dynamo-store:latest
# Local development env
DB_USER="postgres"
DB_PASSWORD="pgadmin"
DB_HOST="localhost"
DB_PORT=5432
DB_NAME="postgres"
API_DATABASE_PORT="8001"
API_BACKEND_URL="http://localhost:8001"
DEFAULT_KUBE_NAMESPACE="dynamo"
DYN_OBJECT_STORE_ENDPOINT="http://localhost:9000"
DYN_OBJECT_STORE_KEY="dynamo-minio"
DYN_OBJECT_STORE_ID="dynamo-minio"
DYN_OBJECT_STORE_REGION="local"
DYN_OBJECT_STORE_BUCKET="dynamo-storage"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""AI Dynamo Store package."""
__version__ = "0.1.0"
from .app import run_app
__all__ = ["run_app"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from fastapi import Query
from pydantic import BaseModel, ValidationError, field_validator
from sqlalchemy import JSON, Column
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import Field as SQLField
from sqlmodel import SQLModel
class TimeCreatedUpdated(SQLModel):
created_at: datetime = SQLField(
default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
nullable=False,
)
updated_at: datetime = SQLField(
default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
nullable=False,
)
class DynamoComponentUploadStatus(str, Enum):
Pending = "pending"
Uploading = "uploading"
Success = "success"
Failed = "failed"
class ImageBuildStatus(str, Enum):
Pending = "pending"
Building = "building"
Success = "success"
Failed = "failed"
class TransmissionStrategy(str, Enum):
Proxy = "proxy"
"""
API Request Objects
"""
class CreateDynamoComponentRequest(BaseModel):
name: str
description: str
labels: Optional[Dict[str, str]] = None
class CreateDynamoComponentVersionRequest(BaseModel):
description: str
version: str
manifest: DynamoComponentVersionManifestSchema
build_at: datetime
labels: Optional[list[Dict[str, str]]] = None
class UpdateDynamoComponentVersionRequest(BaseModel):
manifest: DynamoComponentVersionManifestSchema
labels: Optional[list[Dict[str, str]]] = None
class ListQuerySchema(BaseModel):
start: int = Query(default=0, ge=0, alias="start")
count: int = Query(default=20, ge=0, alias="count")
search: Optional[str] = Query(None, alias="search")
q: Optional[str] = Query(default="", alias="q")
sort_asc: bool = Query(default=False)
def get_query_map(self) -> Dict[str, Any]:
if not self.q:
return {}
query = defaultdict(list)
for piece in self.q.split():
if ":" in piece:
k, v = piece.split(":")
query[k].append(v)
else:
# Todo: add search keywords
continue
return query
"""
API Schemas
"""
class ResourceType(str, Enum):
Organization = "organization"
Cluster = "cluster"
DynamoComponent = "dynamo_component"
DynamoComponentVersion = "dynamo_component_version"
Deployment = "deployment"
DeploymentRevision = "deployment_revision"
TerminalRecord = "terminal_record"
Label = "label"
class BaseSchema(BaseModel):
uid: str
created_at: datetime
updated_at: datetime
deleted_at: Optional[datetime] = None
class BaseListSchema(BaseModel):
total: int
start: int
count: int
class ResourceSchema(BaseSchema):
name: str
resource_type: ResourceType
labels: List[LabelItemSchema]
class LabelItemSchema(BaseModel):
key: str
value: str
class OrganizationSchema(ResourceSchema):
description: str
class UserSchema(BaseModel):
name: str
email: str
first_name: str
last_name: str
class DynamoComponentVersionApiSchema(BaseModel):
route: str
doc: str
input: str
output: str
class DynamoComponentVersionManifestSchema(BaseModel):
service: str
dynamo_version: Optional[str] = None
apis: Dict[str, DynamoComponentVersionApiSchema]
size_bytes: int
def _validate_manifest(v):
try:
# Validate that the 'manifest' matches the DynamoManifestSchema
return DynamoComponentVersionManifestSchema.model_validate(v).model_dump()
except ValidationError as e:
raise ValueError(f"Invalid manifest schema: {e}")
class DynamoComponentVersionSchema(ResourceSchema):
dynamo_repository_uid: str
version: str
description: str
image_build_status: ImageBuildStatus
upload_status: str
# upload_started_at: Optional[datetime]
# upload_finished_at: Optional[datetime]
upload_finished_reason: str
presigned_upload_url: str = ""
presigned_download_url: str = ""
presigned_urls_deprecated: bool = False
transmission_strategy: TransmissionStrategy
upload_id: str = ""
manifest: Optional[Union[DynamoComponentVersionManifestSchema, Dict[str, Any]]]
build_at: datetime
@field_validator("manifest")
def validate_manifest(cls, v):
return _validate_manifest(v)
class DynamoComponentVersionFullSchema(DynamoComponentVersionSchema):
repository: DynamoComponentSchema
class DynamoComponentSchema(ResourceSchema):
latest_dynamo: Optional[DynamoComponentVersionSchema]
latest_dynamos: Optional[List[DynamoComponentVersionSchema]]
n_dynamos: int
description: str
class DynamoComponentSchemaWithDeploymentsSchema(DynamoComponentSchema):
deployments: List[str] = [] # mocked for now
class DynamoComponentSchemaWithDeploymentsListSchema(BaseListSchema):
items: List[DynamoComponentSchemaWithDeploymentsSchema]
class DynamoComponentVersionsWithNimListSchema(BaseListSchema):
items: List[DynamoComponentVersionWithNimSchema]
class DynamoComponentVersionWithNimSchema(DynamoComponentVersionSchema):
repository: DynamoComponentSchema
"""
DB Models
"""
class BaseDynamoComponentModel(TimeCreatedUpdated, AsyncAttrs):
deleted_at: Optional[datetime] = SQLField(nullable=True, default=None)
class DynamoComponentVersionBase(BaseDynamoComponentModel):
version: str = SQLField(default=None)
description: str = SQLField(default="")
file_path: Optional[str] = SQLField(default=None)
file_oid: Optional[str] = SQLField(default=None) # Used for GIT Lfs access
upload_status: DynamoComponentUploadStatus = SQLField()
image_build_status: ImageBuildStatus = SQLField()
image_build_status_syncing_at: Optional[datetime] = SQLField(default=None)
image_build_status_updated_at: Optional[datetime] = SQLField(default=None)
upload_started_at: Optional[datetime] = SQLField(default=None)
upload_finished_at: Optional[datetime] = SQLField(default=None)
upload_finished_reason: str = SQLField(default="")
manifest: Optional[
Union[DynamoComponentVersionManifestSchema, Dict[str, Any]]
] = SQLField(
default=None, sa_column=Column(JSON)
) # JSON-like field for the manifest
build_at: datetime = SQLField()
@field_validator("manifest")
def validate_manifest(cls, v):
return _validate_manifest(v)
class DynamoComponentBase(BaseDynamoComponentModel):
name: str = SQLField(default="", unique=True)
description: str = SQLField(default="")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from ..models.schemas import (
CreateDeploymentSchema,
DeploymentFullSchema,
DeploymentListResponse,
ResourceSchema,
UpdateDeploymentSchema,
create_default_cluster,
create_default_user,
)
from .k8s import (
create_dynamo_deployment,
delete_dynamo_deployment,
get_dynamo_deployment,
get_namespace,
list_dynamo_deployments,
update_dynamo_deployment,
)
from .utils import build_latest_revision_from_cr, get_deployment_status, get_urls
router = APIRouter(prefix="/api/v2/deployments", tags=["deployments"])
def sanitize_deployment_name(name: Optional[str], dynamo_component: str) -> str:
"""
Resolve a name for the DynamoGraphDeployment that will work safely in k8s
Args:
name: Optional custom name
dynamo_component: Component name and version (format: name:version)
Returns:
A unique deployment name that is at most 63 characters
"""
if name:
# If name is provided, truncate it to 63
base_name = name[:63]
else:
# Generate base name from dynamo_component
dynamo_component_parts = dynamo_component.split(":")
if len(dynamo_component_parts) != 2:
raise ValueError("Invalid dynamo_component format, expected 'name:version'")
base_name = f"dep-{dynamo_component_parts[0]}-{dynamo_component_parts[1]}"
# Truncate to 63 chars
base_name = base_name[:63]
return base_name
@router.post("", response_model=DeploymentFullSchema)
async def create_deployment(deployment: CreateDeploymentSchema):
"""
Create a new deployment.
Args:
deployment: The deployment configuration following CreateDeploymentSchema
Returns:
DeploymentFullSchema: The created deployment details
"""
try:
# Get ownership info for labels
ownership = {"organization_id": "default-org", "user_id": "default-user"}
# Get the k8s namespace from environment variable
kube_namespace = get_namespace()
# Generate deployment name
deployment_name = sanitize_deployment_name(deployment.name, deployment.dynamo)
# Create the deployment using helper function
created_crd = create_dynamo_deployment(
name=deployment_name,
namespace=kube_namespace,
dynamo_component=deployment.dynamo or deployment.component,
labels={
"ngc-organization": ownership["organization_id"],
"ngc-user": ownership["user_id"],
},
envs=deployment.envs,
)
# Create response schema
resource = ResourceSchema(
uid=created_crd["metadata"]["uid"],
name=created_crd["metadata"]["name"],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
resource_type="deployment",
labels=[],
)
# Use helper functions for default resources
creator = create_default_user()
cluster = create_default_cluster(creator)
deployment_schema = DeploymentFullSchema(
**resource.dict(),
status="deploying",
kube_namespace=kube_namespace,
creator=creator,
cluster=cluster,
latest_revision=build_latest_revision_from_cr(created_crd),
manifest=None,
)
return deployment_schema
except Exception as e:
print("Error creating deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{name}", response_model=DeploymentFullSchema)
def get_deployment(name: str) -> DeploymentFullSchema:
"""
Retrieve a deployment by name.
Args:
name: The name of the deployment to retrieve
Returns:
DeploymentFullSchema: The deployment details
"""
try:
kube_namespace = get_namespace()
cr = get_dynamo_deployment(
name=name,
namespace=kube_namespace,
)
deployment_schema = DeploymentFullSchema(
name=name,
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=build_latest_revision_from_cr(cr),
manifest=None,
)
return deployment_schema
except HTTPException as e:
raise e
except Exception as e:
print("Error retrieving deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/{name}", response_model=DeploymentFullSchema)
def delete_deployment(name: str) -> DeploymentFullSchema:
"""
Delete a deployment by name.
Args:
name: The name of the deployment to delete
Returns:
DeploymentFullSchema: The deleted deployment details
"""
try:
kube_namespace = get_namespace()
# Get deployment details before deletion
cr = get_dynamo_deployment(name, kube_namespace)
deployment_schema = DeploymentFullSchema(
name=name,
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=build_latest_revision_from_cr(cr),
manifest=None,
)
# Delete the deployment
delete_dynamo_deployment(name, kube_namespace)
return deployment_schema
except HTTPException as e:
raise e
except Exception as e:
print("Error deleting deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/", response_model=DeploymentListResponse)
@router.get("", response_model=DeploymentListResponse)
def list_deployments(
search: str = Query(default="", description="Search query"),
dev: bool = Query(default=False, description="Filter development deployments"),
q: str = Query(default="", description="Advanced query string"),
all: bool = Query(default=False, description="Return all deployments"),
count: str = Query(default="", description="Number of items to return"),
start: str = Query(default="", description="Starting index"),
cluster: str = Query(default="", description="Filter by cluster name"),
) -> Dict[str, Any]:
"""
List all deployments with optional filtering.
Args:
search: Simple text search
dev: Filter development deployments
q: Advanced query string
all: Whether to return all deployments
count: Number of deployments to return
start: Starting index for pagination
cluster: Filter by cluster name
Returns:
Dict containing paginated deployment list
"""
try:
# Convert count and start to integers if they're not empty
count_val = int(count) if count else None
start_val = int(start) if start else None
if count_val is not None and count_val <= 0:
raise HTTPException(status_code=400, detail="Count must be greater than 0")
if start_val is not None and start_val < 0:
raise HTTPException(status_code=400, detail="Start must be non-negative")
kube_namespace = get_namespace()
crs = list_dynamo_deployments(
namespace=kube_namespace,
label_selector=q,
)
deployments = []
for cr in crs:
deployment_schema = DeploymentFullSchema(
name=cr["metadata"]["name"],
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=build_latest_revision_from_cr(cr),
manifest=None,
)
# Apply cluster filter if provided
if cluster and cluster != deployment_schema.cluster.name:
continue
# Apply search filter if provided
if search and search.lower() not in deployment_schema.name.lower():
continue
# Apply dev filter if enabled and all is not True
if not all and dev and not deployment_schema.name.startswith("dev-"):
continue
deployments.append(deployment_schema)
# Handle pagination
total = len(deployments)
start_idx = start_val if start_val is not None else 0
if count_val is not None:
deployments = deployments[start_idx : start_idx + count_val]
else:
deployments = deployments[start_idx:]
return {
"start": start_idx,
"count": len(deployments),
"total": total,
"items": deployments,
}
except HTTPException as e:
raise e
except Exception as e:
print("Error listing deployments:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
@router.put("/{name}", response_model=DeploymentFullSchema)
def update_deployment(name: str, deployment: UpdateDeploymentSchema):
"""
Update an existing deployment.
Args:
name: The name of the deployment to update (path param)
deployment: The new deployment configuration (body)
Returns:
updated deployment details
"""
try:
ownership = {"organization_id": "default-org", "user_id": "default-user"}
kube_namespace = get_namespace()
existing_deployment = get_deployment(name)
if existing_deployment.dynamo != deployment.dynamo:
raise HTTPException(
status_code=422,
detail="Cannot update the Dynamo components of a deployment.",
)
deployment_name = sanitize_deployment_name(name, deployment.dynamo)
updated_crd = update_dynamo_deployment(
name=deployment_name,
namespace=kube_namespace,
dynamo_nim=deployment.dynamo,
labels={
"ngc-organization": ownership["organization_id"],
"ngc-user": ownership["user_id"],
},
envs=deployment.envs,
)
resource = ResourceSchema(
uid=updated_crd["metadata"]["uid"],
name=updated_crd["metadata"]["name"],
created_at=updated_crd["metadata"].get(
"creationTimestamp", datetime.utcnow()
),
updated_at=datetime.utcnow(),
resource_type="deployment",
labels=[],
)
creator = create_default_user()
cluster = create_default_cluster(creator)
deployment_schema = DeploymentFullSchema(
**resource.dict(),
status=get_deployment_status(updated_crd),
kube_namespace=kube_namespace,
creator=creator,
cluster=cluster,
latest_revision=build_latest_revision_from_cr(updated_crd),
manifest=None,
urls=get_urls(updated_crd),
)
return deployment_schema
except Exception as e:
print("Error updating deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fastapi import APIRouter
router = APIRouter()
@router.get("/healthz")
@router.get("/readyz")
async def health_check():
"""Health check endpoint.
Returns:
dict: Status information
"""
return {"status": "healthy"}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import wraps
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from kubernetes import client, config
class K8sResource:
def __init__(self, group: str, version: str, plural: str):
self.group = group
self.version = version
self.plural = plural
DynamoGraphDeployment = K8sResource(
group="nvidia.com",
version="v1alpha1",
plural="dynamographdeployments",
)
def ensure_kube_config(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
config.load_incluster_config()
except config.config_exception.ConfigException:
config.load_kube_config()
return func(*args, **kwargs)
return wrapper
@ensure_kube_config
def create_custom_resource(
group: str, version: str, namespace: str, plural: str, body: Dict[str, Any]
) -> Dict[str, Any]:
"""
Create a custom resource in Kubernetes.
Args:
group: API group
version: API version
namespace: Target namespace
plural: Resource plural name
body: Resource definition
Returns:
Created resource
"""
api = client.CustomObjectsApi()
return api.create_namespaced_custom_object(
group=group, version=version, namespace=namespace, plural=plural, body=body
)
def create_dynamo_deployment(
name: str,
namespace: str,
dynamo_component: str,
labels: Dict[str, str],
envs: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, Any]:
"""
Create a DynamoGraphDeployment custom resource.
Args:
name: Deployment name
namespace: Target namespace
dynamo_component: Dynamo artifact name and version (format: name:version)
labels: Resource labels
envs: Optional list of environment variables
Returns:
Created deployment
"""
body = {
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoGraphDeployment",
"metadata": {"name": name, "namespace": namespace, "labels": labels},
"spec": {
"dynamoGraph": dynamo_component,
"services": {},
"envs": envs if envs else [],
},
}
return create_custom_resource(
group=DynamoGraphDeployment.group,
version=DynamoGraphDeployment.version,
namespace=namespace,
plural=DynamoGraphDeployment.plural,
body=body,
)
@ensure_kube_config
def get_dynamo_deployment(name: str, namespace: str) -> Dict[str, Any]:
"""
Get a DynamoGraphDeployment custom resource.
Args:
name: Deployment name
namespace: Target namespace
Returns:
Deployment
Raises:
HTTPException: If the deployment is not found or an error occurs
"""
api = client.CustomObjectsApi()
try:
return api.get_namespaced_custom_object(
group=DynamoGraphDeployment.group,
version=DynamoGraphDeployment.version,
namespace=namespace,
plural=DynamoGraphDeployment.plural,
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
raise HTTPException(status_code=404, detail="Deployment not found")
else:
raise HTTPException(status_code=500, detail=str(e))
def get_namespace() -> str:
"""
Get the namespace from the environment variable.
"""
return os.getenv("DEFAULT_KUBE_NAMESPACE", "dynamo")
@ensure_kube_config
def delete_dynamo_deployment(name: str, namespace: str) -> Dict[str, Any]:
"""
Delete a DynamoGraphDeployment custom resource.
"""
api = client.CustomObjectsApi()
try:
return api.delete_namespaced_custom_object(
group=DynamoGraphDeployment.group,
version=DynamoGraphDeployment.version,
namespace=namespace,
plural=DynamoGraphDeployment.plural,
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
raise HTTPException(status_code=404, detail="Deployment not found")
else:
raise HTTPException(status_code=500, detail=str(e))
@ensure_kube_config
def list_dynamo_deployments(
namespace: str,
label_selector: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
List DynamoGraphDeployment custom resources.
Args:
namespace: Target namespace
label_selector: Optional label selector for filtering
Returns:
List of deployments
Raises:
HTTPException: If an error occurs during listing
"""
api = client.CustomObjectsApi()
try:
response = api.list_namespaced_custom_object(
group=DynamoGraphDeployment.group,
version=DynamoGraphDeployment.version,
namespace=namespace,
plural=DynamoGraphDeployment.plural,
label_selector=label_selector,
)
return response["items"]
except client.rest.ApiException as e:
raise HTTPException(status_code=500, detail=str(e))
@ensure_kube_config
def update_dynamo_deployment(
name: str,
namespace: str,
dynamo_nim: str,
labels: Dict[str, str],
envs: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, Any]:
"""
Update a DynamoGraphDeployment custom resource.
Args:
name: Deployment name
namespace: Target namespace
dynamo_nim: Dynamo artifact name and version (format: name:version)
labels: Resource labels
envs: Optional list of environment variables
Returns:
Updated deployment
"""
# Fetch the current resource to get resourceVersion
current = get_dynamo_deployment(name, namespace)
resource_version = current["metadata"].get("resourceVersion")
if not resource_version:
raise RuntimeError("resourceVersion not found in current resource")
body = {
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoGraphDeployment",
"metadata": {
"name": name,
"namespace": namespace,
"labels": labels,
"resourceVersion": resource_version, # Required for update
},
"spec": {
"dynamoGraph": dynamo_nim,
"services": {},
"envs": envs if envs else [],
},
}
api = client.CustomObjectsApi()
try:
return api.replace_namespaced_custom_object(
group=DynamoGraphDeployment.group,
version=DynamoGraphDeployment.version,
namespace=namespace,
plural=DynamoGraphDeployment.plural,
name=name,
body=body,
)
except client.rest.ApiException as e:
if e.status == 404:
raise HTTPException(status_code=404, detail="Deployment not found")
else:
raise HTTPException(status_code=500, detail=str(e))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from datetime import datetime, timezone
from typing import Optional
import base58
from sqlalchemy import Column, DateTime
from sqlmodel import Field as SQLField
from sqlmodel import UniqueConstraint
from .components import DynamoComponentBase, DynamoComponentVersionBase
"""
This file stores all of the models/tables stored in the SQL database.
This is needed because otherwise we get an error like so:
raise exc.InvalidRequestError(sqlalchemy.exc.InvalidRequestError:
When initializing mapper Mapper[Checkpoint(checkpoint)],
expression "relationship("Optional['Model']")" seems to be using a generic class as the
argument to relationship(); please state the generic argument using an annotation, e.g.
"parent_model: Mapped[Optional['Model']] = relationship()"
"""
def get_random_id(prefix: str) -> str:
u = uuid.uuid4()
return f"{prefix}-{base58.b58encode(u.bytes).decode('ascii')}"
def new_compound_entity_id() -> str:
return get_random_id("compound")
# Define a function to create timezone-naive datetime objects
def utc_now_naive() -> datetime:
"""Return current UTC time without timezone info for database compatibility"""
now = datetime.now(timezone.utc)
return now.replace(tzinfo=None)
# Utility function to strip timezone info from datetime objects
def make_naive(dt: Optional[datetime]) -> Optional[datetime]:
"""Convert a datetime to naive (no timezone) if it has timezone info"""
if dt is None:
return None
if dt.tzinfo is not None:
return dt.replace(tzinfo=None)
return dt
# Utility function to add UTC timezone to naive datetime objects
def make_aware(dt: Optional[datetime]) -> Optional[datetime]:
"""Add UTC timezone to naive datetime objects"""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
class DynamoComponentVersion(DynamoComponentVersionBase, table=True):
"""A row in the dynamo component table."""
__tablename__ = "dynamocomponentversion"
__table_args__ = (
UniqueConstraint(
"dynamo_component_id", "version", name="version_unique_per_component"
),
)
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
# Override the datetime fields to explicitly use timezone-naive datetimes
# created_at: datetime = SQLField(
# sa_column=Column(DateTime, nullable=False, default=utc_now_naive)
# )
# updated_at: datetime = SQLField(
# sa_column=Column(
# DateTime, nullable=False, default=utc_now_naive, onupdate=utc_now_naive
# )
# )
# upload_started_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
# upload_finished_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
build_at: datetime = SQLField(sa_column=Column(DateTime, nullable=False))
dynamo_component_id: str = SQLField(foreign_key="dynamocomponent.id")
class DynamoComponent(DynamoComponentBase, table=True):
"""A row in the dynamo component table."""
__tablename__ = "dynamocomponent"
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
# Override the datetime fields to explicitly use timezone-naive datetimes
# created_at: datetime = SQLField(
# sa_column=Column(DateTime, nullable=False, default=utc_now_naive)
# )
# updated_at: datetime = SQLField(
# sa_column=Column(
# DateTime, nullable=False, default=utc_now_naive, onupdate=utc_now_naive
# )
# )
# deleted_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from typing import Any, AsyncGenerator
import boto3
from botocore.exceptions import ClientError
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
logger = logging.getLogger(__name__)
### SQL database
DB_URL_PARTS = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME"]
POSTGRES_DB_URL_FORMAT = (
"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
def get_db_url_from_env():
database_url = os.getenv("DATABASE_URL", None)
if database_url:
return database_url
db_creds = {key: os.getenv(key) for key in DB_URL_PARTS}
db_creds["DB_PORT"] = os.getenv("DB_PORT", "5432")
if all(list(db_creds.values())):
# we can construct db url from parts
return POSTGRES_DB_URL_FORMAT.format(**db_creds)
return None
database_url = get_db_url_from_env()
connect_args = {}
if not database_url: # default to sqlite in-memory
sqlite_file_name = "database.db"
database_url = f"sqlite+aiosqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
logger.warning(
"WARNING: Using SQLite in-memory database, no data persistence"
) # noqa: T201
os.environ["API_DATABASE_URL"] = database_url
engine = create_async_engine(
url=database_url, echo=True, pool_pre_ping=True, connect_args=connect_args
)
async def get_session() -> AsyncGenerator[AsyncSession, Any]:
async_session = async_sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
async def create_db_and_tables_async():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
### S3 storage
DYN_OBJECT_STORE_BUCKET = os.getenv("DYN_OBJECT_STORE_BUCKET", "dynamo-storage").lower()
def get_s3_client():
s3_key = os.getenv("DYN_OBJECT_STORE_ID")
s3_secret = os.getenv("DYN_OBJECT_STORE_KEY")
s3_url = os.getenv("DYN_OBJECT_STORE_ENDPOINT")
if not s3_url:
raise ValueError("DYN_OBJECT_STORE_ENDPOINT is required for S3 connection")
if not s3_key:
raise ValueError("DYN_OBJECT_STORE_ID is required for S3 authentication")
if not s3_secret:
raise ValueError("DYN_OBJECT_STORE_KEY is required for S3 authentication")
return boto3.client(
"s3",
aws_access_key_id=s3_key,
aws_secret_access_key=s3_secret,
endpoint_url=s3_url,
)
class S3Storage:
def __init__(self):
self.s3_client = get_s3_client()
self.bucket_name = DYN_OBJECT_STORE_BUCKET.replace("_", "-").lower()
self.ensure_bucket_exists()
def ensure_bucket_exists(self):
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
# Bucket doesn't exist, create it
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
except ClientError as create_error:
logger.error(
f"Failed to create bucket {self.bucket_name}: {create_error}"
)
raise
else:
logger.error(f"Error checking bucket {self.bucket_name}: {e}")
raise
def upload_file(self, file_data, object_name):
try:
self.s3_client.put_object(
Bucket=self.bucket_name, Key=object_name, Body=file_data
)
except ClientError as e:
logger.error(f"Error uploading file to S3: {e}")
raise
def download_file(self, object_name):
try:
response = self.s3_client.get_object(
Bucket=self.bucket_name, Key=object_name
)
return response["Body"].read()
except ClientError as e:
logger.error(f"Error downloading file from S3: {e}")
raise
S3_STORAGE_INSTANCE: S3Storage | None = None
def get_s3_storage() -> S3Storage:
global S3_STORAGE_INSTANCE
if S3_STORAGE_INSTANCE is None:
S3_STORAGE_INSTANCE = S3Storage()
assert isinstance(S3_STORAGE_INSTANCE, S3Storage)
return S3_STORAGE_INSTANCE
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Any, Dict, List
def get_deployment_status(resource: Dict[str, Any]) -> str:
"""
Get the current status of a deployment.
Maps operator status to Dynamo status values.
Returns lowercase status values matching Dynamo's DeploymentStatus enum.
"""
status = resource.get("status", {})
conditions = status.get("conditions", [])
state = status.get("state", "")
# First check Ready condition
for condition in conditions:
if condition.get("type") == "Ready":
if condition.get("status") == "True":
# If state is "successful", map to "running"
if state == "successful":
return "running"
return condition.get("message", "running").lower()
elif condition.get("message"):
return condition.get("message").lower()
# If no Ready condition or not True, check state
if state == "failed":
return "failed"
elif state == "pending":
return "deploying" # map pending to deploying to match Dynamo states
# Default fallback
return "unknown"
def get_urls(resource: Dict[str, Any]) -> List[str]:
"""
Get the URLs for a deployment.
Returns URLs as soon as they are available from EndpointExposed condition.
"""
urls = []
conditions = resource.get("status", {}).get("conditions", [])
# Check for EndpointExposed condition
for condition in conditions:
if (
condition.get("type") == "EndpointExposed"
and condition.get("status") == "True"
):
if message := condition.get("message"):
urls.append(message)
return urls
def build_latest_revision_from_cr(cr: dict) -> dict:
spec = cr.get("spec", {})
meta = cr.get("metadata", {})
now = datetime.utcnow().isoformat() + "Z"
dynamo_str = spec.get("dynamoGraph", "unknown:unknown")
if ":" in dynamo_str:
dynamo_name, dynamo_version = dynamo_str.split(":", 1)
else:
dynamo_name, dynamo_version = "unknown", "unknown"
# Dummy creator
creator = {"name": "system", "email": "", "first_name": "", "last_name": ""}
# Dummy repository
repository = {
"uid": "dummy-repo-uid",
"created_at": now,
"updated_at": now,
"deleted_at": None,
"name": dynamo_name,
"resource_type": "dynamo_repository",
"labels": [],
"description": "",
"latest_dynamo": None,
}
# Dummy dynamo
dynamo = {
"uid": "dummy-dynamo-uid",
"created_at": now,
"updated_at": now,
"deleted_at": None,
"name": dynamo_version,
"resource_type": "dynamo",
"labels": [],
"description": "",
"repository": repository,
"version": dynamo_version,
"image_build_status": "",
"upload_status": "",
"upload_finished_reason": "",
"presigned_upload_url": "",
"presigned_download_url": "",
}
# Target
target = {
"uid": "dummy-target-uid",
"created_at": now,
"updated_at": now,
"deleted_at": None,
"name": "default-target",
"resource_type": "deployment_target",
"labels": [],
"creator": creator,
"status": "running",
"config": {
"services": spec.get("services", {}),
"access_authorization": True,
"envs": spec.get("envs", []),
},
"dynamo": dynamo,
}
# Revision
return {
"uid": meta.get("uid", "dummy-uid"),
"created_at": meta.get("creationTimestamp", now),
"updated_at": meta.get("creationTimestamp", now),
"deleted_at": None,
"name": meta.get("name", "dummy-revision"),
"resource_type": "deployment_revision",
"labels": [],
"creator": creator,
"status": "running",
"targets": [target],
}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import uvicorn
from fastapi import FastAPI
from .api.deployments import router as deployments_router
from .api.dynamo import router as dynamo_router # type: ignore
from .api.health_check import router as health_check_router
from .api.storage import create_db_and_tables_async
# Configure logging to write to stdout
def setup_logging():
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
fmt="%(asctime)s %(levelname)s - %(module)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console_handler.setFormatter(console_format)
logger = logging.getLogger("ai_dynamo_store")
logger.addHandler(console_handler)
return logger
logger = setup_logging()
async def initialize_database():
try:
await create_db_and_tables_async()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise
async def run_app():
"""Create and configure the FastAPI application.
Returns:
FastAPI: The configured application instance
"""
app = FastAPI(
title="AI Dynamo Store",
description="AI Dynamo Store for managing Dynamo artifacts",
version="0.1.0",
)
app.include_router(health_check_router)
app.include_router(dynamo_router)
app.include_router(deployments_router)
port = int(os.getenv("SERVICE_PORT", "8000"))
await initialize_database()
# Start the FastAPI server
config = uvicorn.Config(app=app, host="0.0.0.0", port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from .app import run_app
def main():
asyncio.run(run_app())
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class BaseSchema(BaseModel):
uid: str
created_at: datetime
updated_at: Optional[datetime] = None
deleted_at: Optional[datetime] = None
class ResourceSchema(BaseSchema):
name: str
resource_type: str
labels: List[Dict[str, str]]
class UserSchema(BaseModel):
name: str
email: str
first_name: str
last_name: str
class ClusterSchema(ResourceSchema):
description: str
organization_name: str
creator: UserSchema
is_first: bool = False
class DeploymentConfigSchema(BaseModel):
access_authorization: bool = False
envs: Optional[List[Dict[str, Any]]] = None
labels: Optional[List[Dict[str, str]]] = None
secrets: Optional[List[str]] = None
services: Dict[str, Dict] = Field(default_factory=dict)
class UpdateDeploymentSchema(DeploymentConfigSchema):
dynamo: Optional[str] = None
component: Optional[str] = None
class CreateDeploymentSchema(UpdateDeploymentSchema):
name: Optional[str] = None
dev: bool = False
class DeploymentSchema(ResourceSchema):
status: str
kube_namespace: str
creator: UserSchema
cluster: ClusterSchema
latest_revision: Optional[Dict] = None
manifest: Optional[Dict] = None
class DeploymentFullSchema(DeploymentSchema):
urls: List[str] = Field(default_factory=list)
class DeploymentListResponse(BaseModel):
start: int
count: int
total: int
items: List[DeploymentFullSchema]
def create_default_user() -> UserSchema:
"""Create a default user schema for testing/demo purposes."""
return UserSchema(
name="default-user",
email="default@example.com",
first_name="Default",
last_name="User",
)
def create_default_cluster(creator: UserSchema) -> ClusterSchema:
"""Create a default cluster schema for testing/demo purposes."""
return ClusterSchema(
uid="default-cluster",
name="default",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
resource_type="cluster",
labels=[],
description="Default cluster",
organization_name="default-org",
creator=creator,
is_first=True,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment