Commit 8621d914 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: dynamo deploy hello world example to k8s (#205)

parent 988378ab
......@@ -29,6 +29,86 @@ operator-src:
COPY ./deploy/dynamo/operator /artifacts/operator
SAVE ARTIFACT /artifacts/operator
# artifact-base:
# FROM python:3.12-slim-bookworm
# WORKDIR /artifacts
# dynamo-source-artifacts:
# FROM +artifact-base
# COPY . /artifacts
# SAVE ARTIFACT /artifacts
uv-source:
FROM ghcr.io/astral-sh/uv:latest
SAVE ARTIFACT /uv
dynamo-base:
FROM ubuntu:24.04
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -yq python3-dev python3-pip python3-venv libucx0 curl
COPY +uv-source/uv /bin/uv
ENV CARGO_BUILD_JOBS=16
RUN mkdir /opt/dynamo && \
uv venv /opt/dynamo/venv --python 3.12 && \
. /opt/dynamo/venv/bin/activate && \
uv pip install pip
ENV VIRTUAL_ENV=/opt/dynamo/venv
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
rust-base:
FROM +dynamo-base
# Rust build/dev dependencies
RUN apt update -y && \
apt install --no-install-recommends -y \
wget \
build-essential \
protobuf-compiler \
cmake \
libssl-dev \
pkg-config
ENV RUSTUP_HOME=/usr/local/rustup
ENV CARGO_HOME=/usr/local/cargo
ENV PATH=/usr/local/cargo/bin:$PATH
ENV RUST_VERSION=1.85.0
ENV RUSTARCH=x86_64-unknown-linux-gnu
RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/x86_64-unknown-linux-gnu/rustup-init" && \
echo "a3339fb004c3d0bb9862ba0bce001861fe5cbde9c10d16591eb3f39ee6cd3e7f *rustup-init" | sha256sum -c - && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain 1.85.0 --default-host x86_64-unknown-linux-gnu && \
rm rustup-init && \
chmod -R a+w $RUSTUP_HOME $CARGO_HOME
dynamo-base-docker:
ARG IMAGE=dynamo-base-docker
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
FROM +rust-base
WORKDIR /workspace
COPY . /workspace/
ENV CARGO_TARGET_DIR=/workspace/target
RUN cargo build --release --locked --features mistralrs,sglang,vllm,python && \
cargo doc --no-deps && \
cp target/release/dynamo-run /usr/local/bin && \
cp target/release/http /usr/local/bin && \
cp target/release/llmctl /usr/local/bin && \
cp target/release/metrics /usr/local/bin && \
cp target/release/mock_worker /usr/local/bin
RUN uv build --wheel --out-dir /workspace/dist && \
uv pip install /workspace/dist/ai_dynamo*any.whl
SAVE IMAGE --push $CI_REGISTRY_IMAGE/$IMAGE:$CI_COMMIT_SHA
############### ALL TARGETS ##############################
all-test:
BUILD ./deploy/dynamo/operator+test
......@@ -39,6 +119,7 @@ all-docker:
ARG CI_COMMIT_SHA=latest
BUILD ./deploy/dynamo/operator+docker --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --CI_COMMIT_SHA=$CI_COMMIT_SHA
BUILD ./deploy/dynamo/api-server+docker --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --CI_COMMIT_SHA=$CI_COMMIT_SHA
BUILD ./deploy/dynamo/api-store+docker --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --CI_COMMIT_SHA=$CI_COMMIT_SHA
all-lint:
BUILD ./deploy/dynamo/operator+lint
......
FROM ubuntu:24.04 AS dev
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -yq python3-dev python3-pip python3-venv libucx0
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN mkdir /opt/dynamo && \
uv venv /opt/dynamo/venv --python 3.12 && \
. /opt/dynamo/venv/bin/activate && \
uv pip install pip
ENV VIRTUAL_ENV=/opt/dynamo/venv
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Rust build/dev dependencies
RUN apt update -y && \
apt install --no-install-recommends -y \
wget \
build-essential \
protobuf-compiler \
cmake \
libssl-dev \
pkg-config
ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH \
RUST_VERSION=1.85.0 \
RUSTARCH=x86_64-unknown-linux-gnu
RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/${RUSTARCH}/rustup-init" && \
echo "a3339fb004c3d0bb9862ba0bce001861fe5cbde9c10d16591eb3f39ee6cd3e7f *rustup-init" | sha256sum -c - && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain $RUST_VERSION --default-host ${RUSTARCH} && \
rm rustup-init && \
chmod -R a+w $RUSTUP_HOME $CARGO_HOME
WORKDIR /workspace
COPY . /workspace/
ARG CARGO_BUILD_JOBS
ENV CARGO_TARGET_DIR=/workspace/target
RUN cargo build --release --locked --features mistralrs,sglang,vllm,python && \
cargo doc --no-deps && \
cp target/release/dynamo-run /usr/local/bin && \
cp target/release/http /usr/local/bin && \
cp target/release/llmctl /usr/local/bin && \
cp target/release/metrics /usr/local/bin && \
cp target/release/mock_worker /usr/local/bin
RUN uv build --wheel --out-dir /workspace/dist && \
uv pip install /workspace/dist/ai_dynamo*any.whl
......@@ -278,6 +278,7 @@ RUN cargo build --release --locked --features mistralrs,sglang,vllm,python && \
cp target/release/mock_worker /usr/local/bin
COPY deploy/dynamo/sdk /workspace/deploy/dynamo/sdk
COPY deploy/dynamo/api-store /workspace/deploy/dynamo/api-store
# Build dynamo wheel
RUN source /opt/dynamo/venv/bin/activate && \
cd /workspace/lib/bindings/python && \
......@@ -285,7 +286,10 @@ RUN source /opt/dynamo/venv/bin/activate && \
uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \
cd /workspace && \
uv build --wheel --out-dir /workspace/dist && \
uv pip install /workspace/dist/ai_dynamo*any.whl
uv pip install /workspace/dist/ai_dynamo*any.whl && \
cd /workspace/deploy/dynamo/api-store && \
uv build --wheel --out-dir /workspace/dist && \
uv pip install /workspace/dist/ai_dynamo_store*any.whl
# Package the bindings
RUN mkdir -p /opt/dynamo/bindings/wheels && \
......
......@@ -43,7 +43,7 @@ PYTHON_PACKAGE_VERSION=${current_tag:-$latest_tag.dev+$commit_id}
# dependencies are specified in the /container/deps folder and
# installed within framework specific sections of the Dockerfile.
declare -A FRAMEWORKS=(["VLLM"]=1 ["TENSORRTLLM"]=2)
declare -A FRAMEWORKS=(["VLLM"]=1 ["TENSORRTLLM"]=2 ["NONE"]=3)
DEFAULT_FRAMEWORK=VLLM
SOURCE_DIR=$(dirname "$(readlink -f "$0")")
......@@ -60,6 +60,9 @@ TENSORRTLLM_PIP_WHEEL_PATH=""
VLLM_BASE_IMAGE="nvcr.io/nvidia/cuda-dl-base"
VLLM_BASE_IMAGE_TAG="25.01-cuda12.8-devel-ubuntu24.04"
NONE_BASE_IMAGE="ubuntu"
NONE_BASE_IMAGE_TAG="24.04"
NIXL_COMMIT=f35faf8ba4e725f1724177d0772200481d1d3446
NIXL_REPO=ai-dynamo/nixl.git
......@@ -283,6 +286,8 @@ if [[ $FRAMEWORK == "VLLM" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile.vllm
elif [[ $FRAMEWORK == "TENSORRTLLM" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile.tensorrt_llm
elif [[ $FRAMEWORK == "NONE" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile.none
fi
if [[ $FRAMEWORK == "VLLM" ]]; then
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
.venv/
.env
*.egg-info
\ No newline at end of file
# Local development env
DB_USER="postgres"
DB_PASSWORD="pgadmin"
DB_HOST="localhost"
DB_PORT=5432
DB_NAME="postgres"
API_DATABASE_PORT="8001"
API_BACKEND_URL="http://localhost:8001"
DEFAULT_KUBE_NAMESPACE="dynamo"
DYN_OBJECT_STORE_ENDPOINT="http://localhost:9000"
DYN_OBJECT_STORE_KEY="dynamo-minio"
DYN_OBJECT_STORE_ID="dynamo-minio"
DYN_OBJECT_STORE_REGION="local"
DYN_OBJECT_STORE_BUCKET="dynamo-storage"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
VERSION 0.8
uv-source:
FROM ghcr.io/astral-sh/uv:latest
SAVE ARTIFACT /uv
uv-base:
FROM python:3.12-slim
COPY +uv-source/uv /bin/uv
RUN uv venv
ENV PATH="/app/.venv/bin:$PATH"
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY uv.lock pyproject.toml README.md /app
RUN uv sync --frozen --no-install-project --no-dev --no-install-workspace --no-editable
docker:
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
ARG IMAGE=dynamo-api-store
FROM +uv-base
# Copy project files
COPY ai_dynamo_store ai_dynamo_store
RUN uv pip install .
ENTRYPOINT ["ai-dynamo-store"]
SAVE IMAGE --push $CI_REGISTRY_IMAGE/$IMAGE:$CI_COMMIT_SHA
\ No newline at end of file
## Provision S3-compatible cloud object storage:
The Dynamo API Server requires a s3-compatible object store to store Dynamo NIMs.
## Provision PostgreSQL Database
The Dynamo API Server requires a PostgreSQL database to store data entity and version metadata.
## Contributing
### Initialize a new virtual environment with uv
uv venv
### Activate the virtual environment
source .venv/bin/activate
### Install service
uv pip install .
### Start the service
ai-dynamo-store
### (Optional) Development workflow
#### Install dev dependencies
uv pip install -e ".[dev]"
#### Run docker container locally
earthly +docker && docker run -it my-registry/ai-dynamo-store:latest
\ No newline at end of file
# Local development env
DB_USER="postgres"
DB_PASSWORD="pgadmin"
DB_HOST="localhost"
DB_PORT=5432
DB_NAME="postgres"
API_DATABASE_PORT="8001"
API_BACKEND_URL="http://localhost:8001"
DEFAULT_KUBE_NAMESPACE="dynamo"
DYN_OBJECT_STORE_ENDPOINT="http://localhost:9000"
DYN_OBJECT_STORE_KEY="dynamo-minio"
DYN_OBJECT_STORE_ID="dynamo-minio"
DYN_OBJECT_STORE_REGION="local"
DYN_OBJECT_STORE_BUCKET="dynamo-storage"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""AI Dynamo Store package."""
__version__ = "0.1.0"
from .app import run_app
__all__ = ["run_app"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from fastapi import Query
from pydantic import BaseModel, ValidationError, field_validator
from sqlalchemy import JSON, Column
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import Field as SQLField
from sqlmodel import SQLModel
class TimeCreatedUpdated(SQLModel):
created_at: datetime = SQLField(
default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
nullable=False,
)
updated_at: datetime = SQLField(
default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
nullable=False,
)
class DynamoNimUploadStatus(str, Enum):
Pending = "pending"
Uploading = "uploading"
Success = "success"
Failed = "failed"
class ImageBuildStatus(str, Enum):
Pending = "pending"
Building = "building"
Success = "success"
Failed = "failed"
class TransmissionStrategy(str, Enum):
Proxy = "proxy"
"""
API Request Objects
"""
class CreateDynamoNimRequest(BaseModel):
name: str
description: str
labels: Optional[Dict[str, str]] = None
class CreateDynamoNimVersionRequest(BaseModel):
description: str
version: str
manifest: DynamoNimVersionManifestSchema
build_at: datetime
labels: Optional[list[Dict[str, str]]] = None
class UpdateDynamoNimVersionRequest(BaseModel):
manifest: DynamoNimVersionManifestSchema
labels: Optional[list[Dict[str, str]]] = None
class ListQuerySchema(BaseModel):
start: int = Query(default=0, ge=0, alias="start")
count: int = Query(default=20, ge=0, alias="count")
search: Optional[str] = Query(None, alias="search")
q: Optional[str] = Query(default="", alias="q")
sort_asc: bool = Query(default=False)
def get_query_map(self) -> Dict[str, Any]:
if not self.q:
return {}
query = defaultdict(list)
for piece in self.q.split():
if ":" in piece:
k, v = piece.split(":")
query[k].append(v)
else:
# Todo: add search keywords
continue
return query
"""
API Schemas
"""
class ResourceType(str, Enum):
Organization = "organization"
Cluster = "cluster"
DynamoNim = "dynamo_nim"
DynamoNimVersion = "dynamo_nim_version"
Deployment = "deployment"
DeploymentRevision = "deployment_revision"
TerminalRecord = "terminal_record"
Label = "label"
class BaseSchema(BaseModel):
uid: str
created_at: datetime
updated_at: datetime
deleted_at: Optional[datetime] = None
class BaseListSchema(BaseModel):
total: int
start: int
count: int
class ResourceSchema(BaseSchema):
name: str
resource_type: ResourceType
labels: List[LabelItemSchema]
class LabelItemSchema(BaseModel):
key: str
value: str
class OrganizationSchema(ResourceSchema):
description: str
class UserSchema(BaseModel):
name: str
email: str
first_name: str
last_name: str
class DynamoNimVersionApiSchema(BaseModel):
route: str
doc: str
input: str
output: str
class DynamoNimVersionManifestSchema(BaseModel):
service: str
bentoml_version: str
apis: Dict[str, DynamoNimVersionApiSchema]
size_bytes: int
def _validate_manifest(v):
try:
# Validate that the 'manifest' matches the DynamoManifestSchema
return DynamoNimVersionManifestSchema.model_validate(v).model_dump()
except ValidationError as e:
raise ValueError(f"Invalid manifest schema: {e}")
class DynamoNimVersionSchema(ResourceSchema):
bento_repository_uid: str
version: str
description: str
image_build_status: ImageBuildStatus
upload_status: str
# upload_started_at: Optional[datetime]
# upload_finished_at: Optional[datetime]
upload_finished_reason: str
presigned_upload_url: str = ""
presigned_download_url: str = ""
presigned_urls_deprecated: bool = False
transmission_strategy: TransmissionStrategy
upload_id: str = ""
manifest: Optional[Union[DynamoNimVersionManifestSchema, Dict[str, Any]]]
build_at: datetime
@field_validator("manifest")
def validate_manifest(cls, v):
return _validate_manifest(v)
class DynamoNimVersionFullSchema(DynamoNimVersionSchema):
repository: DynamoNimSchema
class DynamoNimSchema(ResourceSchema):
latest_bento: Optional[DynamoNimVersionSchema]
latest_bentos: Optional[List[DynamoNimVersionSchema]]
n_bentos: int
description: str
class DynamoNimSchemaWithDeploymentsSchema(DynamoNimSchema):
deployments: List[str] = [] # mocked for now
class DynamoNimSchemaWithDeploymentsListSchema(BaseListSchema):
items: List[DynamoNimSchemaWithDeploymentsSchema]
class DynamoNimVersionsWithNimListSchema(BaseListSchema):
items: List[DynamoNimVersionWithNimSchema]
class DynamoNimVersionWithNimSchema(DynamoNimVersionSchema):
repository: DynamoNimSchema
"""
DB Models
"""
class BaseDynamoNimModel(TimeCreatedUpdated, AsyncAttrs):
deleted_at: Optional[datetime] = SQLField(nullable=True, default=None)
class DynamoNimVersionBase(BaseDynamoNimModel):
version: str = SQLField(default=None)
description: str = SQLField(default="")
file_path: Optional[str] = SQLField(default=None)
file_oid: Optional[str] = SQLField(default=None) # Used for GIT Lfs access
upload_status: DynamoNimUploadStatus = SQLField()
image_build_status: ImageBuildStatus = SQLField()
image_build_status_syncing_at: Optional[datetime] = SQLField(default=None)
image_build_status_updated_at: Optional[datetime] = SQLField(default=None)
upload_started_at: Optional[datetime] = SQLField(default=None)
upload_finished_at: Optional[datetime] = SQLField(default=None)
upload_finished_reason: str = SQLField(default="")
manifest: Optional[
Union[DynamoNimVersionManifestSchema, Dict[str, Any]]
] = SQLField(
default=None, sa_column=Column(JSON)
) # JSON-like field for the manifest
build_at: datetime = SQLField()
@field_validator("manifest")
def validate_manifest(cls, v):
return _validate_manifest(v)
class DynamoNimBase(BaseDynamoNimModel):
name: str = SQLField(default="", unique=True)
description: str = SQLField(default="")
# type: ignore # Ignore all mypy errors in this file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from datetime import datetime
from typing import Annotated, List, Optional
from fastapi import APIRouter, Body, Depends, HTTPException, Request, responses
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlmodel import col, desc, func, select
from sqlmodel.ext.asyncio.session import AsyncSession
from .components import (
CreateDynamoNimRequest,
CreateDynamoNimVersionRequest,
DynamoNimSchema,
DynamoNimSchemaWithDeploymentsListSchema,
DynamoNimSchemaWithDeploymentsSchema,
DynamoNimUploadStatus,
DynamoNimVersionFullSchema,
DynamoNimVersionSchema,
DynamoNimVersionsWithNimListSchema,
DynamoNimVersionWithNimSchema,
ImageBuildStatus,
ListQuerySchema,
OrganizationSchema,
ResourceType,
TransmissionStrategy,
UpdateDynamoNimVersionRequest,
UserSchema,
)
from .model import DynamoNim, DynamoNimVersion, make_aware, utc_now_naive
from .storage import S3Storage, get_s3_storage, get_session
API_TAG_MODELS = "dynamo"
DEFAULT_LIMIT = 3
SORTABLE_COLUMNS = {
"created_at": col(DynamoNim.created_at),
"update_at": col(DynamoNim.updated_at),
}
router = APIRouter(prefix="/api/v1")
logger = logging.getLogger(__name__)
@router.get(
"/auth/current",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def login(
request: Request,
):
return UserSchema(
name="dynamo",
email="dynamo@nvidia.com",
first_name="dynamo",
last_name="ai",
)
@router.get(
"/current_org",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def current_org(
request: Request,
):
return OrganizationSchema(
uid="uid",
created_at=datetime(2024, 9, 18, 12, 0, 0),
updated_at=datetime(2024, 9, 18, 12, 0, 0),
deleted_at=None,
name="nvidia",
resource_type=ResourceType.Organization,
labels=[],
description="Dynamo default organization.",
)
# GetDynamoNim is a FastAPI dependency that will perform stored model lookup.
async def dynamo_nim_handler(
*,
session: AsyncSession = Depends(get_session),
dynamo_nim_name: str,
) -> DynamoNim:
statement = select(DynamoNim).where(DynamoNim.name == dynamo_nim_name)
stored_dynamo_nim_result = await session.exec(statement)
stored_dynamo_nim = stored_dynamo_nim_result.first()
if not stored_dynamo_nim:
raise HTTPException(status_code=404, detail="Record not found")
return stored_dynamo_nim
GetDynamoNim = Depends(dynamo_nim_handler)
@router.get(
"/bento_repositories/{dynamo_nim_name}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.get(
"/dynamo_nims/{dynamo_nim_name}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def get_dynamo_nim(
*,
dynamo_nim: DynamoNim = GetDynamoNim,
session: AsyncSession = Depends(get_session),
):
dynamo_nim_id = dynamo_nim.id
statement = (
select(DynamoNimVersion)
.where(
DynamoNimVersion.dynamo_nim_id == dynamo_nim_id,
)
.order_by(desc(DynamoNimVersion.created_at))
)
result = await session.exec(statement)
dynamo_nims = result.all()
latest_dynamo_nim_versions = await convert_dynamo_nim_version_model_to_schema(
session, list(dynamo_nims), dynamo_nim
)
return DynamoNimSchema(
uid=dynamo_nim.id,
created_at=dynamo_nim.created_at,
updated_at=dynamo_nim.updated_at,
deleted_at=dynamo_nim.deleted_at,
name=dynamo_nim.name,
resource_type=ResourceType.DynamoNim,
labels=[],
description=dynamo_nim.description,
latest_bento=None
if not latest_dynamo_nim_versions
else latest_dynamo_nim_versions[0],
latest_bentos=latest_dynamo_nim_versions,
n_bentos=len(dynamo_nims),
)
@router.post(
"/bento_repositories",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.post(
"/dynamo_nims",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def create_dynamo_nim(
*,
session: AsyncSession = Depends(get_session),
request: CreateDynamoNimRequest,
):
"""
Create a new respository
"""
try:
db_dynamo_nim = DynamoNim.model_validate(request)
except ValidationError as e:
raise HTTPException(status_code=422, detail=json.loads(e.json())) # type: ignore
logger.debug("Creating repository...")
try:
session.add(db_dynamo_nim)
await session.flush()
await session.refresh(db_dynamo_nim)
except IntegrityError as e:
logger.error(f"Details: {str(e)}")
await session.rollback()
logger.error(
f"The requested Dynamo NIM {db_dynamo_nim.name} already exists in the database"
)
raise HTTPException(
status_code=422,
detail=f"The Dynamo NIM {db_dynamo_nim.name} already exists in the database",
) # type: ignore
except SQLAlchemyError as e:
logger.error("Something went wrong with adding the repository")
raise HTTPException(status_code=500, detail=str(e))
await session.commit()
logger.debug(
f"Dynamo NIM {db_dynamo_nim.id} with name {db_dynamo_nim.name} saved to database"
)
return DynamoNimSchema(
uid=db_dynamo_nim.id,
created_at=db_dynamo_nim.created_at,
updated_at=db_dynamo_nim.updated_at,
deleted_at=db_dynamo_nim.deleted_at,
name=db_dynamo_nim.name,
resource_type=ResourceType.DynamoNim,
labels=[],
description=db_dynamo_nim.description,
latest_bentos=None,
latest_bento=None,
n_bentos=0,
)
@router.get(
"/bento_repositories",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.get(
"/dynamo_nims",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def get_dynamo_nim_list(
*,
session: AsyncSession = Depends(get_session),
query_params: ListQuerySchema = Depends(),
):
try:
# Base query using SQLModel's select
statement = select(DynamoNim)
# Handle search query 'q'
if query_params.q:
statement = statement.where(DynamoNim.name.ilike(f"%{query_params.q}%"))
# Get total count using SQLModel
total_statement = select(func.count(DynamoNim.id)).select_from(statement)
# Execute count query
result = await session.exec(total_statement)
total = result.scalar() or 0
# Apply pagination and sorting
if query_params.sort_asc is not None:
statement = statement.order_by(
DynamoNim.created_at.asc()
if query_params.sort_asc
else DynamoNim.created_at.desc()
)
statement = statement.offset(query_params.start).limit(query_params.count)
# Execute main query
result = await session.exec(statement)
dynamo_nims = result.scalars().all()
# Rest of your code remains the same
dynamo_nim_schemas = await convert_dynamo_nim_model_to_schema(
session, dynamo_nims
)
dynamo_nims_with_deployments = [
DynamoNimSchemaWithDeploymentsSchema(
**dynamo_nim_schema.model_dump(), deployments=[]
)
for dynamo_nim_schema in dynamo_nim_schemas
]
return DynamoNimSchemaWithDeploymentsListSchema(
total=total,
start=query_params.start,
count=query_params.count,
items=dynamo_nims_with_deployments,
)
except ValidationError as e:
raise HTTPException(status_code=422, detail=json.loads(e.json()))
async def dynamo_nim_version_handler(
*,
session: AsyncSession = Depends(get_session),
dynamo_nim_name: str,
version: str,
) -> tuple[DynamoNimVersion, DynamoNim]:
statement = select(DynamoNimVersion, DynamoNim).where(
DynamoNimVersion.dynamo_nim_id == DynamoNim.id,
DynamoNimVersion.version == version,
DynamoNim.name == dynamo_nim_name,
)
result = await session.exec(statement)
records = result.all()
if not records:
logger.error("No Dynamo NIM version record found")
raise HTTPException(status_code=404, detail="Record not found")
if len(records) >= 2:
logger.error("Found multiple relations for Dynamo NIM version")
raise HTTPException(
status_code=422, detail="Found multiple relations for Dynamo NIM version"
)
return records[0]
GetDynamoNimVersion = Depends(dynamo_nim_version_handler)
@router.get(
"/bento_repositories/{dynamo_nim_name}/bentos/{version}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.get(
"/dynamo_nims/{dynamo_nim_name}/versions/{version}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def get_dynamo_nim_version(
*,
dynamo_nim_entities: tuple[DynamoNimVersion, DynamoNim] = GetDynamoNimVersion,
session: AsyncSession = Depends(get_session),
):
dynamo_nim_version, dynamo_nim = dynamo_nim_entities
dynamo_nim_version_schemas = await convert_dynamo_nim_version_model_to_schema(
session, [dynamo_nim_version], dynamo_nim
)
dynamo_nim_schemas = await convert_dynamo_nim_model_to_schema(session, [dynamo_nim])
full_schema = DynamoNimVersionFullSchema(
**dynamo_nim_version_schemas[0].model_dump(),
repository=dynamo_nim_schemas[0],
)
return full_schema
@router.post(
"/bento_repositories/{dynamo_nim_name}/bentos",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.post(
"/dynamo_nims/{dynamo_nim_name}/versions",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def create_dynamo_nim_version(
request: CreateDynamoNimVersionRequest,
dynamo_nim: DynamoNim = GetDynamoNim,
session: AsyncSession = Depends(get_session),
):
"""
Create a new nim
"""
print("[DEBUG]request", request)
try:
# Create without validation
db_dynamo_nim_version = DynamoNimVersion(
**request.model_dump(),
dynamo_nim_id=dynamo_nim.id,
upload_status=DynamoNimUploadStatus.Pending,
image_build_status=ImageBuildStatus.Pending,
)
DynamoNimVersion.model_validate(db_dynamo_nim_version)
tag = f"{dynamo_nim.name}:{db_dynamo_nim_version.version}"
except ValidationError as e:
raise HTTPException(status_code=422, detail=json.loads(e.json())) # type: ignore
except BaseException as e:
raise HTTPException(status_code=422, detail=json.loads(e.json())) # type: ignore
try:
session.add(db_dynamo_nim_version)
await session.flush()
await session.refresh(db_dynamo_nim_version)
except IntegrityError as e:
logger.error(f"Details: {str(e)}")
await session.rollback()
logger.error(f"The Dynamo NIM {tag} already exists")
raise HTTPException(
status_code=422,
detail=f"The Dynamo NIM version {tag} already exists",
) # type: ignore
except SQLAlchemyError as e:
logger.error("Something went wrong with adding the Dynamo NIM")
raise HTTPException(status_code=500, detail=str(e))
logger.debug(
f"Commiting {dynamo_nim.name}:{db_dynamo_nim_version.version} to database"
)
await session.commit()
schema = await convert_dynamo_nim_version_model_to_schema(
session, [db_dynamo_nim_version]
)
return schema[0]
@router.get(
"/bento_repositories/{dynamo_nim_name}/bentos",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.get(
"/dynamo_nims/{dynamo_nim_name}/versions",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def get_dynamo_nim_versions(
*,
dynamo_nim: DynamoNim = GetDynamoNim,
session: AsyncSession = Depends(get_session),
query_params: ListQuerySchema = Depends(),
):
dynamo_nim_schemas = await convert_dynamo_nim_model_to_schema(session, [dynamo_nim])
dynamo_nim_schema = dynamo_nim_schemas[0]
total_statement = (
select(DynamoNimVersion)
.where(
DynamoNimVersion.dynamo_nim_id == dynamo_nim.id,
)
.order_by(desc(DynamoNimVersion.created_at))
)
result = await session.exec(total_statement)
dynamo_nim_versions = result.all()
total = len(dynamo_nim_versions)
statement = total_statement.limit(query_params.count)
result = await session.exec(statement)
dynamo_nim_versions = list(result.all())
dynamo_nim_version_schemas = await convert_dynamo_nim_version_model_to_schema(
session, dynamo_nim_versions, dynamo_nim
)
items = [
DynamoNimVersionWithNimSchema(
**version.model_dump(), repository=dynamo_nim_schema
)
for version in dynamo_nim_version_schemas
]
return DynamoNimVersionsWithNimListSchema(
total=total, count=query_params.count, start=query_params.start, items=items
)
@router.patch(
"/bento_repositories/{dynamo_nim_name}/bentos/{version}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.patch(
"/dynamo_nims/{dynamo_nim_name}/versions/{version}",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def update_dynamo_nim_version(
*,
dynamo_nim_entities: tuple[DynamoNimVersion, DynamoNim] = GetDynamoNimVersion,
request: UpdateDynamoNimVersionRequest,
session: AsyncSession = Depends(get_session),
):
dynamo_nim_version, _ = dynamo_nim_entities
dynamo_nim_version.manifest = request.manifest.model_dump()
try:
session.add(dynamo_nim_version)
await session.flush()
await session.refresh(dynamo_nim_version)
except SQLAlchemyError as e:
logger.error("Something went wrong with adding the Dynamo NIM")
raise HTTPException(status_code=500, detail=str(e))
logger.debug("Updating Dynamo NIM")
await session.commit()
schema = await convert_dynamo_nim_version_model_to_schema(
session, [dynamo_nim_version]
)
return schema[0]
@router.put(
"/bento_repositories/{dynamo_nim_name}/bentos/{version}/upload",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.put(
"/dynamo_nims/{dynamo_nim_name}/versions/{version}/upload",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def upload_dynamo_nim_version(
*,
dynamo_nim_entities: tuple[DynamoNimVersion, DynamoNim] = GetDynamoNimVersion,
file: Annotated[bytes, Body()],
session: AsyncSession = Depends(get_session),
s3_storage: S3Storage = Depends(get_s3_storage),
):
dynamo_nim_version, dynamo_nim = dynamo_nim_entities
object_name = f"{dynamo_nim.name}/{dynamo_nim_version.version}"
try:
s3_storage.upload_file(file, object_name)
dynamo_nim_version.upload_status = DynamoNimUploadStatus.Success
dynamo_nim_version.upload_finished_at = (
utc_now_naive()
) # datetime.now(timezone.utc)
session.add(dynamo_nim_version)
await session.commit()
return {"message": "File uploaded successfully"}
except Exception as e:
logger.error(f"Error uploading file: {e}")
raise HTTPException(status_code=500, detail="Failed to upload file")
def generate_file_path(version) -> str:
return f"dynamo-{version}"
@router.get(
"/bento_repositories/{dynamo_nim_name}/bentos/{version}/download",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.get(
"/dynamo_nims/{dynamo_nim_name}/versions/{version}/download",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def download_dynamo_nim_version(
*,
dynamo_nim_entities: tuple[DynamoNimVersion, DynamoNim] = GetDynamoNimVersion,
s3_storage: S3Storage = Depends(get_s3_storage),
):
dynamo_nim_version, dynamo_nim = dynamo_nim_entities
object_name = f"{dynamo_nim.name}/{dynamo_nim_version.version}"
try:
file_data = s3_storage.download_file(object_name)
return responses.StreamingResponse(
iter([file_data]), media_type="application/octet-stream"
)
except Exception as e:
logger.error(f"Error downloading file: {e}")
raise HTTPException(status_code=500, detail="Failed to download file")
@router.patch(
"/bento_repositories/{dynamo_nim_name}/bentos/{version}/start_upload",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
@router.patch(
"/dynamo_nims/{dynamo_nim_name}/versions/{version}/start_upload",
responses={
200: {"description": "Successful Response"},
422: {"description": "Validation Error"},
},
tags=[API_TAG_MODELS],
)
async def start_dynamo_nim_version_upload(
*,
dynamo_nim_entities: tuple[DynamoNimVersion, DynamoNim] = GetDynamoNimVersion,
session: AsyncSession = Depends(get_session),
):
dynamo_nim_version, _ = dynamo_nim_entities
dynamo_nim_version.upload_status = DynamoNimUploadStatus.Uploading
try:
session.add(dynamo_nim_version)
await session.flush()
await session.refresh(dynamo_nim_version)
except SQLAlchemyError as e:
logger.error("Something went wrong with adding the Dynamo NIM")
raise HTTPException(status_code=500, detail=str(e))
logger.debug("Setting Dynamo NIM upload status to Uploading.")
await session.commit()
schema = await convert_dynamo_nim_version_model_to_schema(
session, [dynamo_nim_version]
)
return schema[0]
@router.get("/api/v1/healthz")
async def health_check():
return {"status": "ok"}
"""
DB to Schema Converters
"""
async def convert_dynamo_nim_model_to_schema(
session: AsyncSession, entities: List[DynamoNim]
) -> List[DynamoNimSchema]:
dynamo_nim_schemas = []
for entity in entities:
try:
statement = (
select(DynamoNimVersion)
.where(
DynamoNimVersion.dynamo_nim_id == entity.id,
)
.order_by(desc(DynamoNimVersion.created_at))
.limit(DEFAULT_LIMIT)
)
total_statement = select(func.count(col(DynamoNimVersion.id))).where(
DynamoNimVersion.dynamo_nim_id == entity.id
)
result = await session.exec(total_statement)
total = result.first()
if not total:
total = 0
result = await session.exec(statement)
dynamo_nim_versions = list(result.all())
dynamo_nim_version_schemas = (
await convert_dynamo_nim_version_model_to_schema(
session, dynamo_nim_versions, entity
)
)
# Add timezone info for API responses
created_at = make_aware(entity.created_at)
updated_at = make_aware(entity.updated_at)
deleted_at = make_aware(entity.deleted_at) if entity.deleted_at else None
dynamo_nim_schemas.append(
DynamoNimSchema(
uid=entity.id,
created_at=created_at,
updated_at=updated_at,
deleted_at=deleted_at,
name=entity.name,
resource_type=ResourceType.DynamoNim,
labels=[],
latest_bento=(
None
if not dynamo_nim_version_schemas
else dynamo_nim_version_schemas[0]
),
latest_bentos=dynamo_nim_version_schemas,
n_bentos=total,
description=entity.description,
)
)
except SQLAlchemyError as e:
logger.error(
"Something went wrong with getting associated Dynamo NIM versions"
)
raise HTTPException(status_code=500, detail=str(e))
return dynamo_nim_schemas
async def convert_dynamo_nim_version_model_to_schema(
session: AsyncSession,
entities: List[DynamoNimVersion],
dynamo_nim: Optional[DynamoNim] = None,
) -> List[DynamoNimVersionSchema]:
dynamo_nim_version_schemas = []
for entity in entities:
if not dynamo_nim:
statement = select(DynamoNim).where(DynamoNim.id == entity.dynamo_nim_id)
results = await session.exec(statement)
dynamo_nim = results.first()
if dynamo_nim:
# Add timezone info for API responses
created_at = make_aware(utc_now_naive()) # make_aware(entity.created_at)
updated_at = make_aware(utc_now_naive()) # make_aware(entity.updated_at)
# upload_started_at = (
# make_aware(entity.upload_started_at)
# if entity.upload_started_at
# else None
# )
# upload_finished_at = (
# make_aware(entity.upload_finished_at)
# if entity.upload_finished_at
# else None
# )
build_at = make_aware(utc_now_naive()) # make_aware(entity.build_at)
# description = entity.description or ""
dynamo_nim_version_schema = DynamoNimVersionSchema(
description="",
version=entity.version,
image_build_status=entity.image_build_status,
upload_status=str(entity.upload_status.value),
upload_finished_reason=entity.upload_finished_reason,
uid=entity.id,
name=dynamo_nim.name,
created_at=created_at,
resource_type=ResourceType.DynamoNimVersion,
labels=[],
manifest=entity.manifest,
updated_at=updated_at,
bento_repository_uid=dynamo_nim.id,
# upload_started_at=upload_started_at,
# upload_finished_at=upload_finished_at,
transmission_strategy=TransmissionStrategy.Proxy,
build_at=build_at,
)
dynamo_nim_version_schemas.append(dynamo_nim_version_schema)
else:
raise HTTPException(
status_code=500, detail="Failed to find related Dynamo NIM"
) # Should never happen
return dynamo_nim_version_schemas
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fastapi import APIRouter
router = APIRouter()
@router.get("/healthz")
@router.get("/readyz")
async def health_check():
"""Health check endpoint.
Returns:
dict: Status information
"""
return {"status": "healthy"}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from datetime import datetime, timezone
from typing import Optional
import base58
from sqlalchemy import Column, DateTime
from sqlmodel import Field as SQLField
from sqlmodel import UniqueConstraint
from .components import DynamoNimBase, DynamoNimVersionBase
"""
This file stores all of the models/tables stored in the SQL database.
This is needed because otherwise we get an error like so:
raise exc.InvalidRequestError(sqlalchemy.exc.InvalidRequestError:
When initializing mapper Mapper[Checkpoint(checkpoint)],
expression "relationship("Optional['Model']")" seems to be using a generic class as the
argument to relationship(); please state the generic argument using an annotation, e.g.
"parent_model: Mapped[Optional['Model']] = relationship()"
"""
def get_random_id(prefix: str) -> str:
u = uuid.uuid4()
return f"{prefix}-{base58.b58encode(u.bytes).decode('ascii')}"
def new_compound_entity_id() -> str:
return get_random_id("compound")
# Define a function to create timezone-naive datetime objects
def utc_now_naive() -> datetime:
"""Return current UTC time without timezone info for database compatibility"""
now = datetime.now(timezone.utc)
return now.replace(tzinfo=None)
# Utility function to strip timezone info from datetime objects
def make_naive(dt: Optional[datetime]) -> Optional[datetime]:
"""Convert a datetime to naive (no timezone) if it has timezone info"""
if dt is None:
return None
if dt.tzinfo is not None:
return dt.replace(tzinfo=None)
return dt
# Utility function to add UTC timezone to naive datetime objects
def make_aware(dt: Optional[datetime]) -> Optional[datetime]:
"""Add UTC timezone to naive datetime objects"""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
class DynamoNimVersion(DynamoNimVersionBase, table=True):
"""A row in the dynamo nim table."""
__tablename__ = "dynamonimversion"
__table_args__ = (
UniqueConstraint("dynamo_nim_id", "version", name="version_unique_per_nim"),
)
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
# Override the datetime fields to explicitly use timezone-naive datetimes
# created_at: datetime = SQLField(
# sa_column=Column(DateTime, nullable=False, default=utc_now_naive)
# )
# updated_at: datetime = SQLField(
# sa_column=Column(
# DateTime, nullable=False, default=utc_now_naive, onupdate=utc_now_naive
# )
# )
# upload_started_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
# upload_finished_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
build_at: datetime = SQLField(sa_column=Column(DateTime, nullable=False))
dynamo_nim_id: str = SQLField(foreign_key="dynamonim.id")
class DynamoNim(DynamoNimBase, table=True):
"""A row in the dynamo nim table."""
__tablename__ = "dynamonim"
id: str = SQLField(default_factory=new_compound_entity_id, primary_key=True)
# Override the datetime fields to explicitly use timezone-naive datetimes
# created_at: datetime = SQLField(
# sa_column=Column(DateTime, nullable=False, default=utc_now_naive)
# )
# updated_at: datetime = SQLField(
# sa_column=Column(
# DateTime, nullable=False, default=utc_now_naive, onupdate=utc_now_naive
# )
# )
# deleted_at: datetime = SQLField(sa_column=Column(DateTime, nullable=True))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from typing import Any, AsyncGenerator
import boto3
from botocore.exceptions import ClientError
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
logger = logging.getLogger(__name__)
### SQL database
DB_URL_PARTS = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME"]
POSTGRES_DB_URL_FORMAT = (
"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
def get_db_url_from_env():
database_url = os.getenv("DATABASE_URL", None)
if database_url:
return database_url
db_creds = {key: os.getenv(key) for key in DB_URL_PARTS}
db_creds["DB_PORT"] = os.getenv("DB_PORT", "5432")
if all(list(db_creds.values())):
# we can construct db url from parts
return POSTGRES_DB_URL_FORMAT.format(**db_creds)
return None
database_url = get_db_url_from_env()
connect_args = {}
if not database_url: # default to sqlite in-memory
sqlite_file_name = "database.db"
database_url = f"sqlite+aiosqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
logger.warning(
"WARNING: Using SQLite in-memory database, no data persistence"
) # noqa: T201
os.environ["API_DATABASE_URL"] = database_url
engine = create_async_engine(
url=database_url, echo=True, pool_pre_ping=True, connect_args=connect_args
)
async def get_session() -> AsyncGenerator[AsyncSession, Any]:
async_session = async_sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
async def create_db_and_tables_async():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
### S3 storage
DYN_OBJECT_STORE_BUCKET = os.getenv("DYN_OBJECT_STORE_BUCKET", "dynamo-storage").lower()
def get_s3_client():
s3_key = os.getenv("DYN_OBJECT_STORE_ID")
s3_secret = os.getenv("DYN_OBJECT_STORE_KEY")
s3_url = os.getenv("DYN_OBJECT_STORE_ENDPOINT")
if not s3_url:
raise ValueError("DYN_OBJECT_STORE_ENDPOINT is required for S3 connection")
if not s3_key:
raise ValueError("DYN_OBJECT_STORE_ID is required for S3 authentication")
if not s3_secret:
raise ValueError("DYN_OBJECT_STORE_KEY is required for S3 authentication")
return boto3.client(
"s3",
aws_access_key_id=s3_key,
aws_secret_access_key=s3_secret,
endpoint_url=s3_url,
)
class S3Storage:
def __init__(self):
self.s3_client = get_s3_client()
self.bucket_name = DYN_OBJECT_STORE_BUCKET.replace("_", "-").lower()
self.ensure_bucket_exists()
def ensure_bucket_exists(self):
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
# Bucket doesn't exist, create it
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
except ClientError as create_error:
logger.error(
f"Failed to create bucket {self.bucket_name}: {create_error}"
)
raise
else:
logger.error(f"Error checking bucket {self.bucket_name}: {e}")
raise
def upload_file(self, file_data, object_name):
try:
self.s3_client.put_object(
Bucket=self.bucket_name, Key=object_name, Body=file_data
)
except ClientError as e:
logger.error(f"Error uploading file to S3: {e}")
raise
def download_file(self, object_name):
try:
response = self.s3_client.get_object(
Bucket=self.bucket_name, Key=object_name
)
return response["Body"].read()
except ClientError as e:
logger.error(f"Error downloading file from S3: {e}")
raise
S3_STORAGE_INSTANCE: S3Storage | None = None
def get_s3_storage() -> S3Storage:
global S3_STORAGE_INSTANCE
if S3_STORAGE_INSTANCE is None:
S3_STORAGE_INSTANCE = S3Storage()
assert isinstance(S3_STORAGE_INSTANCE, S3Storage)
return S3_STORAGE_INSTANCE
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import uvicorn
from fastapi import FastAPI
from .api.dynamo import router as dynamo_router # type: ignore
from .api.health_check import router as health_check_router
from .api.storage import create_db_and_tables_async
# Configure logging to write to stdout
def setup_logging():
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
fmt="%(asctime)s %(levelname)s - %(module)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console_handler.setFormatter(console_format)
logger = logging.getLogger("ai_dynamo_store")
logger.addHandler(console_handler)
return logger
logger = setup_logging()
async def initialize_database():
try:
await create_db_and_tables_async()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise
async def run_app():
"""Create and configure the FastAPI application.
Returns:
FastAPI: The configured application instance
"""
app = FastAPI(
title="AI Dynamo Store",
description="AI Dynamo Store for managing Dynamo artifacts",
version="0.1.0",
)
app.include_router(health_check_router)
app.include_router(dynamo_router)
port = int(os.getenv("SERVICE_PORT", "8000"))
await initialize_database()
# Start the FastAPI server
config = uvicorn.Config(app=app, host="0.0.0.0", port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from .app import run_app
def main():
asyncio.run(run_app())
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3"
services:
###
# Postgres service
# adapted from https://github.com/docker-library/docs/blob/master/postgres/README.md#-via-docker-compose-or-docker-stack-deploy
###
postgres:
image: postgres:16.2
restart: always
environment:
PGUSER: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pgadmin
POSTGRES_DB: postgres
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 30s
timeout: 30s
retries: 3
###
# Minio
# adapted from https://github.com/minio/minio/blob/master/docs/orchestration/docker-compose/docker-compose.yaml
###
minio:
image: quay.io/minio/minio:RELEASE.2024-06-29T01-20-47Z
command: minio server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: dynamo-minio
MINIO_ROOT_PASSWORD: dynamo-minio
ports:
- "9000:9000"
- "9001:9001"
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ai-dynamo-store"
version = "0.1.0"
description = "AI Dynamo Store for managing Dynamo artifacts"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115.11",
"uvicorn>=0.34.0",
"sqlalchemy>=2.0.39",
"pydantic>=2.10.6,<2.11.0",
"aiosqlite==0.21.0",
"asyncpg==0.30.0",
"base58==2.1.1",
"boto3==1.37.1",
"botocore==1.37.1",
"sqlmodel==0.0.22",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.0.0",
"ruff>=0.0.270",
"pytest>=7.0",
"pytest-asyncio>=0.21.0"
]
[project.scripts]
ai-dynamo-store = "ai_dynamo_store.main:main"
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment