Commit 5ddc7f7d authored by Maksim Khadkevich's avatar Maksim Khadkevich Committed by GitHub
Browse files

feat: moved compoundAI operator, APIserver, and examples (#10)

parent 14ce7e03
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
VERSION 0.8
############### ARTIFACTS TARGETS ##############################
# These targets are invoked in child Earthfiles to pass top-level files that are out of their build context
# https://docs.earthly.dev/earthly-0.6/best-practices#copying-files-from-outside-the-build-context
############### SHARED LIBRARY TARGETS ##############################
golang-base:
FROM golang:1.23
RUN apt-get update && apt-get install -y git && apt-get clean && rm -rf /var/lib/apt/lists/* && curl -sSfL https://github.com/golangci/golangci-lint/releases/download/v1.61.0/golangci-lint-1.61.0-linux-amd64.tar.gz | tar -xzv && mv golangci-lint-1.61.0-linux-amd64/golangci-lint /usr/local/bin/
############### ALL TARGETS ##############################
all-test:
BUILD ./deploy/compoundai/operator+test
# BUILD ./deploy/compoundai/api-server+test #TODO: mkhadkevich earthly tests fail https://gitlab-master.nvidia.com/aire/microservices/compoundai/-/jobs/144475821
all-docker:
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
BUILD ./deploy/compoundai/operator+docker --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --CI_COMMIT_SHA=$CI_COMMIT_SHA
BUILD ./deploy/compoundai/api-server+docker --CI_REGISTRY_IMAGE=$CI_REGISTRY_IMAGE --CI_COMMIT_SHA=$CI_COMMIT_SHA
all-lint:
BUILD ./deploy/compoundai/operator+lint
all:
BUILD +all-test
BUILD +all-docker
BUILD +all-lint
# For testing
custom:
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
BUILD +all-test
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"
[build]
args_bin = []
bin = "./tmp/main"
cmd = "go build -o ./tmp/main ./api"
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
exclude_file = []
exclude_regex = ["_test.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html"]
include_file = []
kill_delay = "0s"
log = "build-errors.log"
poll = false
poll_interval = 0
post_cmd = []
pre_cmd = []
rerun = false
rerun_delay = 500
send_interrupt = false
stop_on_error = false
[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"
[log]
main_only = false
time = false
[misc]
clean_on_exit = false
[proxy]
app_port = 0
enabled = false
proxy_port = 0
[screen]
clear_on_rebuild = false
keep_scroll = true
# Local development env
DB_USER="postgres"
DB_PASSWORD="pgadmin"
DB_HOST="localhost"
DB_PORT=5432
DB_NAME="postgres"
DMS_HOST="localhost"
DMS_PORT=8080
NDS_HOST="localhost"
NDS_PORT=8001
DEFAULT_KUBE_NAMESPACE="compoundai"
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin/*
Dockerfile.cross
# Test binary, built with `go test -c`
*.test
# Temporary folder used by Air
tmp
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build the manager binary
FROM golang:1.23 AS builder
ARG TARGETOS
ARG TARGETARCH
WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download
# Copy the go source
COPY api/ api/
COPY .env .env
# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o server api/main.go
# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/server .
COPY --from=builder /workspace/.env .
USER 65532:65532
ENTRYPOINT ["/server"]
VERSION 0.8
build:
FROM golang:1.23
ARG TARGETOS
ARG TARGETARCH
WORKDIR /workspace
COPY go.mod go.mod
COPY go.sum go.sum
RUN go mod download
COPY api/ api/
COPY .env .env
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o server api/main.go
SAVE ARTIFACT /workspace/server
SAVE ARTIFACT /workspace/.env
#TODO: mkhadkevich earthly tests fail https://gitlab-master.nvidia.com/aire/microservices/compoundai/-/jobs/144475821
#test:
# FROM +build
# # copy test files
# COPY tests/ tests/
# RUN go test ./...
docker:
ARG CI_REGISTRY_IMAGE=my-registry
ARG CI_COMMIT_SHA=latest
ARG IMAGE=compound-api-server
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY +build/server .
COPY +build/.env .
USER 65532:65532
ENTRYPOINT ["/server"]
SAVE IMAGE --push $CI_REGISTRY_IMAGE/$IMAGE:$CI_COMMIT_SHA
\ No newline at end of file
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
func SendRequestJSON(url string, method string, body *any) (*http.Response, []byte, error) {
var req *http.Request
var err error
if body != nil {
jsonData, err := json.Marshal(body)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal request: %v", err)
}
req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, nil, fmt.Errorf("failed to create request: %v", err)
}
} else {
req, err = http.NewRequest(method, url, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to create request: %v", err)
}
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, nil, fmt.Errorf("received non-OK response: %v, %s", resp.Status, respBody)
}
return resp, respBody, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consts
import (
"errors"
"gorm.io/gorm"
)
var (
ErrNotFound = gorm.ErrRecordNotFound
ErrNoPermission = errors.New("no permission")
ErrEmptyData = errors.New("data is nil")
ErrNoImplemented = errors.New("no implemented")
ErrTimeout = errors.New("timeout")
YataiOrganizationHeaderName = "X-Yatai-Organization"
NgcOrganizationHeaderName = "Nv-Ngc-Org"
NgcUserHeaderName = "Nv-Actor-Id"
CompoundNimContainerPortName = "http"
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consts
const (
KubeLabelCompoundNim = "yatai.ai/bento-repository"
KubeLabelCompoundNimVersion = "yatai.ai/bento"
KubeLabelCompoundNimVersionDeployment = "yatai.ai/bento-deployment"
KubeImageBuilderMainContainer = "builder"
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package env
import (
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
)
func SetupEnv() {
err := godotenv.Load()
if err != nil {
log.Fatal().Msgf("Failed to load env during setup %s", err.Error())
}
_, err = SetResourceScope()
if err != nil {
log.Fatal().Msgf("Failed to set resource scope during env setup %s", err.Error())
}
_, err = SetNdsHost()
if err != nil {
log.Fatal().Msgf("Failed to set nds urls during env setup %s", err.Error())
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package env
import (
"fmt"
"sync"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/utils"
)
var (
NdsHostBase string
once sync.Once
)
func GetNdsUrl() string {
baseUrl := GetNdsHost()
return fmt.Sprintf("http://%s", baseUrl)
}
func GetNdsHost() string {
return NdsHostBase
}
func SetNdsHost() (string, error) {
var err error
once.Do(func() { // We cache and reuse the same NDS host
NDS_HOST, syncErr := utils.MustGetEnv("NDS_HOST")
if syncErr != nil {
err = syncErr
return
}
NDS_PORT, syncErr := utils.MustGetEnv("NDS_PORT")
if syncErr != nil {
err = syncErr
return
}
NdsHostBase = fmt.Sprintf("%s:%s", NDS_HOST, NDS_PORT)
})
if err != nil {
return "", err
}
return NdsHostBase, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package env
import (
"fmt"
"os"
"sync"
)
type ResourceScope string
const (
OrganizationScope ResourceScope = "organization"
UserScope ResourceScope = "user"
)
var (
ApplicationScope ResourceScope
getScopeOnce sync.Once
)
func SetResourceScope() (ResourceScope, error) {
var err error
getScopeOnce.Do(func() {
scope := os.Getenv("RESOURCE_SCOPE")
if scope == "" {
scope = string(UserScope)
}
switch ResourceScope(scope) {
case OrganizationScope, UserScope:
ApplicationScope = ResourceScope(scope)
default:
err = fmt.Errorf("invalid scope value: %s", scope)
return
}
})
if err != nil {
return "", err
}
return ApplicationScope, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"fmt"
"os"
)
func MustGetEnv(key string) (string, error) {
value := os.Getenv(key)
if value == "" {
return "", fmt.Errorf("environment variable %s is not set", key)
}
return value, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/converters"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/services"
)
type clusterController struct{}
var ClusterController = clusterController{}
func (s *clusterController) GetCluster(ctx *gin.Context, clusterName string) (*models.Cluster, error) {
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
return nil, err
}
cluster, err := services.ClusterService.GetByName(ctx, ownership.OrganizationId, clusterName)
if err != nil {
return nil, err
}
return cluster, nil
}
func (c *clusterController) Create(ctx *gin.Context) {
var schema schemas.CreateClusterSchema
if err := ctx.ShouldBindJSON(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(400, err)
}
cluster, err := services.ClusterService.Create(ctx, services.CreateClusterOption{
Name: schema.Name,
OrganizationId: ownership.OrganizationId,
CreatorId: ownership.UserId,
Description: schema.Description,
KubeConfig: schema.KubeConfig,
})
if err != nil {
log.Info().Msgf("Failed to create cluster: %s", err.Error())
ctx.JSON(500, gin.Error{Err: err})
return
}
ctx.JSON(200, converters.ToClusterFullSchema(cluster))
}
func (c *clusterController) Update(ctx *gin.Context) {
var schema schemas.UpdateClusterSchema
clusterName := ctx.Param("clusterName")
if err := ctx.ShouldBindJSON(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
cluster, err := c.GetCluster(ctx, clusterName)
if err != nil {
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find cluster with the name %s", clusterName)})
return
}
cluster, err = services.ClusterService.Update(ctx, cluster, services.UpdateClusterOption{
Description: schema.Description,
KubeConfig: schema.KubeConfig,
})
if err != nil {
log.Info().Msgf("Failed to update cluster: %s", err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Error updating cluster %s", err.Error())})
return
}
ctx.JSON(200, converters.ToClusterFullSchema(cluster))
}
func (c *clusterController) Get(ctx *gin.Context) {
clusterName := ctx.Param("clusterName")
cluster, err := c.GetCluster(ctx, clusterName)
if err != nil {
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find cluster with the name %s", clusterName)})
return
}
ctx.JSON(200, converters.ToClusterFullSchema(cluster))
}
func (c *clusterController) List(ctx *gin.Context) {
var schema schemas.ListQuerySchema
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
clusters, total, err := services.ClusterService.List(ctx, services.ListClusterOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
OrganizationId: &ownership.OrganizationId,
})
if err != nil {
log.Info().Msgf("Failed to list clusters: %s", err.Error())
ctx.JSON(400, gin.H{"Error": fmt.Sprintf("List clusters %s", err.Error())})
return
}
clusterList := schemas.ClusterListSchema{
BaseListSchema: schemas.BaseListSchema{
Start: schema.Start,
Count: schema.Count,
Total: total,
},
Items: converters.ToClusterSchemaList(clusters),
}
ctx.JSON(200, clusterList)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"errors"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/gin-gonic/gin"
)
const OwnershipInfoKey = "_ownershipInfoKey"
func GetOwnershipInfo(ctx *gin.Context) (*schemas.OwnershipSchema, error) {
ownership_ := ctx.Value(OwnershipInfoKey)
if ownership_ == nil {
return nil, consts.ErrNotFound
}
ownership, ok := ownership_.(*schemas.OwnershipSchema)
if !ok {
return nil, errors.New("current ownership is not an ownership struct")
}
return ownership, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"errors"
"fmt"
"strings"
"time"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/converters"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/services"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
)
type compoundComponentController struct{}
var CompoundComponentController = compoundComponentController{}
func (c *compoundComponentController) Register(ctx *gin.Context) {
var getCluster schemas.GetClusterSchema
var registerCompoundComponentSchema schemas.RegisterCompoundComponentSchema
if err := ctx.ShouldBindUri(&getCluster); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindJSON(&registerCompoundComponentSchema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
names := []string{getCluster.ClusterName}
clusters, _, err := services.ClusterService.List(ctx, services.ListClusterOption{
Names: &names,
})
if err != nil {
errMsg := fmt.Sprintf("Failed to get clusters %s when registering Compound Component: %s", getCluster.ClusterName, err.Error())
log.Error().Msg(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
kubeNamespace := strings.TrimSpace(registerCompoundComponentSchema.KubeNamespace)
// nolint: ineffassign, staticcheck
tx, ctx_, df, err := database.DatabaseUtil.StartTransaction(ctx)
defer func() { df(err) }()
log.Info().Msgf("Registering compound component for %d clusters", len(clusters))
var compoundComponent *models.CompoundComponent
for _, cluster := range clusters {
compoundComponent, err = services.CompoundComponentService.GetByName(ctx_, cluster.ID, string(registerCompoundComponentSchema.Name))
isNotFound := errors.Is(err, consts.ErrNotFound)
if err != nil && !isNotFound {
log.Error().Msgf("Failed to get compoundComponent: %s", err.Error())
ctx.JSON(500, gin.H{"error": "failed to get compoundComponent"})
return
}
manifest := &schemas.CompoundComponentManifestSchema{
SelectorLabels: registerCompoundComponentSchema.SelectorLabels,
}
if registerCompoundComponentSchema.Manifest != nil {
manifest = registerCompoundComponentSchema.Manifest
}
if isNotFound {
compoundComponent, err = services.CompoundComponentService.Create(ctx_, services.CreateCompoundComponentOption{
ClusterId: cluster.ID,
Name: string(registerCompoundComponentSchema.Name),
KubeNamespace: kubeNamespace,
Version: registerCompoundComponentSchema.Version,
Manifest: manifest,
})
} else {
now := time.Now()
now_ := &now
opt := services.UpdateCompoundComponentOption{
LatestHeartbeatAt: &now_,
Version: &registerCompoundComponentSchema.Version,
Manifest: &manifest,
}
if compoundComponent.Version != registerCompoundComponentSchema.Version {
opt.LatestInstalledAt = &now_
}
compoundComponent, err = services.CompoundComponentService.Update(ctx_, compoundComponent, opt)
}
if err != nil {
log.Error().Msgf("Failed to register compoundComponent: %s", err.Error())
ctx.JSON(500, gin.H{"error": "failed to register compoundComponent"})
return
}
}
tx.Commit()
compoundComponentSchema, err := converters.ToCompoundComponentSchema(ctx, compoundComponent)
if err != nil {
log.Error().Msgf("Failed to convert compound component model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, compoundComponentSchema)
}
func (c *compoundComponentController) ListAll(ctx *gin.Context) {
compoundComponents, err := services.CompoundComponentService.List(ctx, services.ListCompoundComponentOption{})
if err != nil {
errMsg := fmt.Sprintf("Failed to get all compoundComponents: %s", err.Error())
log.Error().Msg(errMsg)
ctx.JSON(400, gin.H{"error": errMsg})
return
}
compoundComponentSchema, err := converters.ToCompoundComponentSchemas(ctx, compoundComponents)
if err != nil {
log.Error().Msgf("Failed to convert compound component model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, compoundComponentSchema)
}
func (c *compoundComponentController) List(ctx *gin.Context) {
var getCluster schemas.GetClusterSchema
if err := ctx.ShouldBindUri(&getCluster); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
names := []string{getCluster.ClusterName}
clusters, _, err := services.ClusterService.List(ctx, services.ListClusterOption{
Names: &names,
})
if err != nil {
errMsg := fmt.Sprintf("Failed to get clusters %s when registering Compound Component: %s", getCluster.ClusterName, err.Error())
log.Error().Msg(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
clusterIds := []uint{}
for _, cluster := range clusters {
clusterIds = append(clusterIds, cluster.ID)
}
compoundComponents, err := services.CompoundComponentService.List(ctx, services.ListCompoundComponentOption{
ClusterIds: &clusterIds,
})
if err != nil {
errMsg := fmt.Sprintf("Failed to get compoundComponents for the cluster %s: %s", getCluster.ClusterName, err.Error())
log.Error().Msg(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
compoundComponentSchema, err := converters.ToCompoundComponentSchemas(ctx, compoundComponents)
if err != nil {
log.Error().Msgf("Failed to convert compound component model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, compoundComponentSchema)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/env"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/converters"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/mocks"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemasv2"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/services"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/invopop/jsonschema"
"github.com/rs/zerolog/log"
)
type deploymentController struct{}
var DeploymentController = deploymentController{}
type CreateDeploymentSchema struct {
schemas.CreateDeploymentSchema
}
func (c *deploymentController) Create(ctx *gin.Context) {
clusterName := ctx.Param("clusterName")
var schema CreateDeploymentSchema
if err := ctx.ShouldBindJSON(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
cluster, err := ClusterController.GetCluster(ctx, clusterName)
if err != nil {
ctx.JSON(404, fmt.Sprintf("Could not find cluster with the name %s", clusterName))
return
}
deployment, err := c.createDeploymentHelper(ctx, cluster, schema)
if err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
log.Info().Msgf("Created deployment: %+v", deployment)
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) createDeploymentHelper(ctx *gin.Context, cluster *models.Cluster, schema CreateDeploymentSchema) (*models.Deployment, error) {
description := ""
if schema.Description != nil {
description = *schema.Description
}
_, ctx_, df, err := database.DatabaseUtil.StartTransaction(ctx)
defer func() { df(err) }() // Clean up the transaction
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
return nil, err
}
deployment, err := services.DeploymentService.Create(ctx_, services.CreateDeploymentOption{
CreatorId: ownership.UserId,
ClusterId: cluster.ID,
Name: schema.Name,
Description: description,
KubeNamespace: schema.KubeNamespace,
})
if err != nil {
log.Error().Msgf("Creating deployment failed: %s", err.Error())
return nil, fmt.Errorf("creating deployment failed: %s", err.Error())
}
_, err = c.updateDeploymentEntities(ctx_, schema.UpdateDeploymentSchema, deployment, ownership)
if err != nil {
log.Error().Msgf("Failed to update deployment %s entities %s", deployment.Name, err.Error())
return nil, fmt.Errorf("failed to update deployment %s entities %s", deployment.Name, err.Error())
}
return deployment, nil
}
func (c *deploymentController) SyncStatus(ctx *gin.Context) {
var schema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
deployment, err := getDeployment(ctx, &schema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", schema.DeploymentName))
return
}
status, err := services.DeploymentService.SyncStatus(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to sync deployment %s status: %s", deployment.Name, err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deployment.Status = status
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) Update(ctx *gin.Context) {
var updateSchema schemas.UpdateDeploymentSchema
var getSchema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&getSchema); err != nil {
log.Error().Msgf("Error binding: %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindJSON(&updateSchema); err != nil {
log.Error().Msgf("Error binding: %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, err)
return
}
deployment, err := getDeployment(ctx, &getSchema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", getSchema.DeploymentName))
return
}
tx, ctx_, df, err := database.DatabaseUtil.StartTransaction(ctx)
defer func() { df(err) }() // Clean up the transaction
deployment, err = services.DeploymentService.Update(ctx_, deployment, services.UpdateDeploymentOption{
Description: updateSchema.Description,
})
if err != nil {
log.Error().Msgf("Could not update deployment with the name %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(500, fmt.Sprintf("Could not update deployment with the name %s", getSchema.DeploymentName))
return
}
if updateSchema.DoNotDeploy {
deployment, err = c.updateDeploymentInformation(ctx_, updateSchema, deployment)
if err != nil {
log.Error().Msgf("Could not update deployment information %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(500, err.Error())
return
}
} else {
deployment, err = c.updateDeploymentEntities(ctx_, updateSchema, deployment, ownership)
if err != nil {
log.Error().Msgf("Could not update deployment entities %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(500, err.Error())
return
}
}
tx.Commit()
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) updateDeploymentEntities(ctx context.Context, schema schemas.UpdateDeploymentSchema, deployment *models.Deployment, ownership *schemas.OwnershipSchema) (*models.Deployment, error) {
compoundNimVersions := map[string]*schemas.CompoundNimVersionFullSchema{}
for _, target := range schema.Targets {
compoundNimVersionSchema, err := services.DatastoreService.GetCompoundNimVersion(ctx, target.CompoundNim, target.Version)
if err != nil {
return nil, err
}
compoundNimVersions[fmt.Sprintf("%s:%s", target.CompoundNim, target.Version)] = compoundNimVersionSchema
}
log.Info().Msgf("Found %d Compound NIM versions", len(compoundNimVersions))
// Mark previous revisions as inactive...
status_ := schemas.DeploymentRevisionStatusActive
oldDeploymentRevisions, total, err := services.DeploymentRevisionService.List(ctx, services.ListDeploymentRevisionOption{
DeploymentId: &deployment.ID,
Status: &status_,
})
if err != nil {
return nil, err
}
var oldDeploymentTargets = make([]*models.DeploymentTarget, 0)
log.Info().Msgf("Marking %d version as inactive", total)
for _, oldDeploymentRevision := range oldDeploymentRevisions {
_, err = services.DeploymentRevisionService.Update(ctx, oldDeploymentRevision, services.UpdateDeploymentRevisionOption{
Status: schemas.DeploymentRevisionStatusPtr(schemas.DeploymentRevisionStatusInactive),
})
if err != nil {
return nil, err
}
_oldDeploymentTargets, _, err := services.DeploymentTargetService.List(ctx, services.ListDeploymentTargetOption{
DeploymentRevisionId: &oldDeploymentRevision.ID,
})
oldDeploymentTargets = append(oldDeploymentTargets, _oldDeploymentTargets...)
if err != nil {
return nil, err
}
}
// Create a new revision
deploymentRevision, err := services.DeploymentRevisionService.Create(ctx, services.CreateDeploymentRevisionOption{
CreatorId: ownership.UserId,
DeploymentId: deployment.ID,
Status: schemas.DeploymentRevisionStatusActive,
})
if err != nil {
return nil, err
}
// Create deployment targets
deploymentTargets := make([]*models.DeploymentTarget, 0, len(schema.Targets))
for _, createDeploymentTargetSchema := range schema.Targets {
createDeploymentTargetSchema.Config.KubeResourceVersion = ""
createDeploymentTargetSchema.Config.KubeResourceUid = ""
compoundNimTag := fmt.Sprintf("%s:%s", createDeploymentTargetSchema.CompoundNim, createDeploymentTargetSchema.Version)
deploymentTarget, err := services.DeploymentTargetService.Create(ctx, services.CreateDeploymentTargetOption{
CreatorId: ownership.UserId,
DeploymentId: deployment.ID,
DeploymentRevisionId: deploymentRevision.ID,
CompoundNimVersionId: compoundNimVersions[compoundNimTag].Uid,
CompoundNimVersionTag: compoundNimTag,
Config: createDeploymentTargetSchema.Config,
})
if err != nil {
return nil, err
}
deploymentTargets = append(deploymentTargets, deploymentTarget)
}
log.Info().Msgf("Terminating %d inactive deployment targets", len(oldDeploymentTargets))
for _, oldDeploymentTarget := range oldDeploymentTargets {
_, err := services.DeploymentTargetService.Terminate(ctx, oldDeploymentTarget)
if err != nil {
return nil, err
}
}
// Deploy new revision
err = services.DeploymentRevisionService.Deploy(ctx, deploymentRevision, deploymentTargets, ownership, false)
if err != nil {
return nil, err
}
return deployment, nil
}
func (c *deploymentController) updateDeploymentInformation(ctx context.Context, schema schemas.UpdateDeploymentSchema, deployment *models.Deployment) (*models.Deployment, error) {
status_ := schemas.DeploymentRevisionStatusActive
activeReploymentRevisions, _, err := services.DeploymentRevisionService.List(ctx, services.ListDeploymentRevisionOption{
DeploymentId: &deployment.ID,
Status: &status_,
})
if err != nil {
return nil, err
}
targetSchemaCompoundVersionNims := map[string]*schemas.CreateDeploymentTargetSchema{}
for _, targetSchema := range schema.Targets {
targetSchemaCompoundVersionNims[fmt.Sprintf("%s:%s", targetSchema.CompoundNim, targetSchema.Version)] = targetSchema
}
var activeDeploymentTargets = make([]*models.DeploymentTarget, 0)
for _, activeReploymentRevision := range activeReploymentRevisions {
_activeDeploymentTargets, _, err := services.DeploymentTargetService.List(ctx, services.ListDeploymentTargetOption{
DeploymentRevisionId: &activeReploymentRevision.ID,
})
activeDeploymentTargets = append(activeDeploymentTargets, _activeDeploymentTargets...)
if err != nil {
return nil, err
}
}
for _, activeDeploymentTarget := range activeDeploymentTargets {
if createDeploymentTargetSchema, ok := targetSchemaCompoundVersionNims[activeDeploymentTarget.CompoundNimVersionTag]; ok {
config := activeDeploymentTarget.Config
config.KubeResourceUid = createDeploymentTargetSchema.Config.KubeResourceUid
config.KubeResourceVersion = createDeploymentTargetSchema.Config.KubeResourceVersion
_, err = services.DeploymentTargetService.Update(ctx, activeDeploymentTarget, services.UpdateDeploymentTargetOption{
Config: &config,
})
if err != nil {
return nil, err
}
}
}
return deployment, nil
}
func (c *deploymentController) Get(ctx *gin.Context) {
var schema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
deployment, err := getDeployment(ctx, &schema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", schema.DeploymentName))
return
}
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) Terminate(ctx *gin.Context) {
var schema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
deployment, err := getDeployment(ctx, &schema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", schema.DeploymentName))
return
}
deployment, err = c.doTerminate(ctx, deployment)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) doTerminate(ctx *gin.Context, deployment *models.Deployment) (*models.Deployment, error) {
tx, ctx_, df, err := database.DatabaseUtil.StartTransaction(ctx)
defer func() { df(err) }() // Clean up the transaction
deployment, err = services.DeploymentService.Terminate(ctx_, deployment)
if err != nil {
errMsg := fmt.Sprintf("Could not terminate deployment with the name: %s", err.Error())
log.Error().Msgf(errMsg)
return nil, errors.New(errMsg)
}
tx.Commit()
return deployment, nil
}
func (c *deploymentController) Delete(ctx *gin.Context) {
var schema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
deployment, err := getDeployment(ctx, &schema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", schema.DeploymentName))
return
}
deployment, err = services.DeploymentService.Delete(ctx, deployment)
if err != nil {
log.Error().Msgf("Could not delete deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not delete deployment with the name %s", schema.DeploymentName)})
return
}
deploymentSchema, err := converters.ToDeploymentSchema(ctx, deployment)
if err != nil {
log.Error().Msgf("Failed to convert deployment model to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func setListDeploymentOptionsScope(opt *services.ListDeploymentOption, ownership *schemas.OwnershipSchema) {
opt.OrganizationId = &ownership.OrganizationId
if env.ApplicationScope == env.UserScope {
opt.CreatorId = &ownership.UserId
}
}
func (c *deploymentController) ListClusterDeployments(ctx *gin.Context) {
var schema schemas.ListQuerySchema
var getCluster schemas.GetClusterSchema
if err := ctx.ShouldBindUri(&getCluster); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
cluster, err := ClusterController.GetCluster(ctx, getCluster.ClusterName)
if err != nil {
log.Error().Msgf("Could not find cluster with the name %s: %s", getCluster.ClusterName, err.Error())
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find cluster with the name %s", getCluster.ClusterName)})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
listOpt := services.ListDeploymentOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
ClusterId: &cluster.ID,
}
setListDeploymentOptionsScope(&listOpt, ownership)
deployments, total, err := services.DeploymentService.List(ctx, listOpt)
if err != nil {
log.Error().Msgf("Could not find deployments for the cluster %s with the following opts %+v: %s", getCluster.ClusterName, listOpt, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not find deployments %s", err.Error())})
return
}
deploymentSchemas, err := converters.ToDeploymentSchemas(ctx, deployments)
if err != nil {
log.Error().Msgf("Failed to convert deployment model list to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentListSchema := &schemas.DeploymentListSchema{
BaseListSchema: schemas.BaseListSchema{
Total: total,
Start: schema.Start,
Count: schema.Count,
},
Items: deploymentSchemas,
}
ctx.JSON(200, deploymentListSchema)
}
func (c *deploymentController) ListCompoundNimDeployments(ctx *gin.Context) {
var schema schemas.ListQuerySchema
var getCompoundNimSchema schemas.GetCompoundNimSchema
if err := ctx.ShouldBindUri(&getCompoundNimSchema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentOpt := services.ListDeploymentOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
CompoundNimName: &getCompoundNimSchema.CompoundNimName,
}
setListDeploymentOptionsScope(&deploymentOpt, ownership)
deployments, total, err := services.DeploymentService.List(ctx, deploymentOpt)
if err != nil {
log.Error().Msgf("Could not find deployments for the compound nim %s with the following opts %+v: %s", getCompoundNimSchema.CompoundNimName, deploymentOpt, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not find deployments %s", err.Error())})
return
}
deploymentSchemas, err := converters.ToDeploymentSchemas(ctx, deployments)
if err != nil {
log.Error().Msgf("Failed to convert deployment model list to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentListSchema := &schemas.DeploymentListSchema{
BaseListSchema: schemas.BaseListSchema{
Total: total,
Start: schema.Start,
Count: schema.Count,
},
Items: deploymentSchemas,
}
ctx.JSON(200, deploymentListSchema)
}
func (c *deploymentController) ListCompoundNimVersionDeployments(ctx *gin.Context) {
var schema schemas.ListQuerySchema
var getCompoundNimVersionSchema schemas.GetCompoundNimVersionSchema
if err := ctx.ShouldBindUri(&getCompoundNimVersionSchema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, err.Error())
return
}
deploymentOpt := services.ListDeploymentOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
CompoundNimTag: getCompoundNimVersionSchema.Tag(),
}
setListDeploymentOptionsScope(&deploymentOpt, ownership)
deployments, total, err := services.DeploymentService.List(ctx, deploymentOpt)
if err != nil {
log.Error().Msgf("Could not find deployments for the compound nim version %s with the following opts %+v: %s", *getCompoundNimVersionSchema.Tag(), deploymentOpt, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not find deployments %s", err.Error())})
return
}
deploymentSchemas, err := converters.ToDeploymentSchemas(ctx, deployments)
if err != nil {
log.Error().Msgf("Failed to convert deployment model list to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentListSchema := &schemas.DeploymentListSchema{
BaseListSchema: schemas.BaseListSchema{
Total: total,
Start: schema.Start,
Count: schema.Count,
},
Items: deploymentSchemas,
}
ctx.JSON(200, deploymentListSchema)
}
func (c *deploymentController) ListDeployments(ctx *gin.Context) {
var schema schemas.ListQuerySchema
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
listOpt := services.ListDeploymentOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
}
setListDeploymentOptionsScope(&listOpt, ownership)
deployments, total, err := services.DeploymentService.List(ctx, listOpt)
if err != nil {
log.Error().Msgf("Could not get all deployments for the cluster with the following opts %+v: %s", listOpt, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not find deployments %s", err.Error())})
return
}
deploymentSchemas, err := converters.ToDeploymentSchemas(ctx, deployments)
if err != nil {
log.Error().Msgf("Failed to convert deployment model list to schema: %s", err.Error())
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
deploymentListSchema := &schemas.DeploymentListSchema{
BaseListSchema: schemas.BaseListSchema{
Total: total,
Start: schema.Start,
Count: schema.Count,
},
Items: deploymentSchemas,
}
ctx.JSON(200, deploymentListSchema)
}
func (c *deploymentController) CreationJSONSchema(ctx *gin.Context) {
reflector := jsonschema.Reflector{}
res := reflector.Reflect(schemas.CreateDeploymentSchema{})
if res != nil {
res.Version = "http://json-schema.org/draft-04/schema#"
}
ctx.JSON(200, res)
}
func getDeployment(ctx *gin.Context, s *schemas.GetDeploymentSchema) (*models.Deployment, error) {
cluster, err := ClusterController.GetCluster(ctx, s.ClusterName)
if err != nil {
return nil, err
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
return nil, err
}
var deployment *models.Deployment
switch env.ApplicationScope {
case env.UserScope:
deployment, err = services.DeploymentService.GetByNameAndCreator(
ctx, cluster.ID, s.KubeNamespace, s.DeploymentName, ownership.UserId,
)
if err != nil {
return nil, err
}
case env.OrganizationScope:
deployment, err = services.DeploymentService.GetByName(
ctx, cluster.ID, s.KubeNamespace, s.DeploymentName,
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown application scope: %s", env.ApplicationScope)
}
if err != nil {
return nil, err
}
return deployment, nil
}
// The start of the V2 deployment APIs
func (c *deploymentController) CreateV2(ctx *gin.Context) {
cluster, kubeNamespace, err := getClusterAndInfo(ctx)
if err != nil {
return // ctx set in helper function
}
var schema schemasv2.CreateDeploymentSchema
if err := ctx.ShouldBindJSON(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
compoundNim, compoundNimVersion, err := parseCompoundNimVersion(schema.CompoundNim)
if err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
createDeploymentTarget, err := c.buildDeploymentTargetConfiguration(&schema.UpdateDeploymentSchema, compoundNim, compoundNimVersion)
if err != nil {
log.Error().Msgf("Failed to build createDeploymentTarget schema %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
// Determine the deployment name
deploymentName := schema.Name
if deploymentName == "" {
deploymentName = fmt.Sprintf("dep-%s-%s--%s", compoundNim, compoundNimVersion, uuid.New().String())
deploymentName = deploymentName[:63] // Max label length for k8s
}
fmt.Println("Deployment Name:", deploymentName)
body, _ := json.Marshal(createDeploymentTarget)
log.Info().Msgf("Got the following target: %s", body)
// Create the CreateDeploymentSchema instance
createDeploymentSchema := CreateDeploymentSchema{
CreateDeploymentSchema: schemas.CreateDeploymentSchema{
Name: deploymentName,
KubeNamespace: kubeNamespace,
UpdateDeploymentSchema: schemas.UpdateDeploymentSchema{
Targets: []*schemas.CreateDeploymentTargetSchema{
createDeploymentTarget,
},
},
},
}
deployment, err := c.createDeploymentHelper(ctx, cluster, createDeploymentSchema)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
// Getting mocked creator
creator := mocks.DefaultUser()
deploymentSchema, err := converters.ToDeploymentSchemaV2(ctx, cluster, deployment, creator)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) GetV2(ctx *gin.Context) {
cluster, deployment, err := getDeploymentAndInfo(ctx)
if err != nil {
return // ctx set in helper function
}
// Getting mocked creator
creator := mocks.DefaultUser()
deploymentSchema, err := converters.ToDeploymentSchemaV2(ctx, cluster, deployment, creator)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) UpdateV2(ctx *gin.Context) {
cluster, deployment, err := getDeploymentAndInfo(ctx)
if err != nil {
return // ctx set in helper function
}
var schema schemasv2.UpdateDeploymentSchema
if err := ctx.ShouldBindJSON(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
ownership, err := GetOwnershipInfo(ctx)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
// Getting the k8s namespace
kubeNamespace := os.Getenv("DEFAULT_KUBE_NAMESPACE")
if kubeNamespace == "" {
kubeNamespace = "compoundai"
}
compoundNim, compoundNimVersion, err := parseCompoundNimVersion(schema.CompoundNim)
if err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
createDeploymentTarget, err := c.buildDeploymentTargetConfiguration(&schema, compoundNim, compoundNimVersion)
if err != nil {
log.Error().Msgf("Failed to build createDeploymentTarget schema %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
updateDeploymentSchema := schemas.UpdateDeploymentSchema{
Targets: []*schemas.CreateDeploymentTargetSchema{
createDeploymentTarget,
},
}
tx, ctx_, df, err := database.DatabaseUtil.StartTransaction(ctx)
defer func() { df(err) }() // Clean up the transaction
deployment, err = c.updateDeploymentEntities(ctx_, updateDeploymentSchema, deployment, ownership)
if err != nil {
log.Error().Msgf("Could not update deployment entities %s: %s", deployment.Name, err.Error())
ctx.JSON(500, err.Error())
return
}
tx.Commit()
// Getting mocked creator
creator := mocks.DefaultUser()
deploymentSchema, err := converters.ToDeploymentSchemaV2(ctx, cluster, deployment, creator)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func parseCompoundNimVersion(compoundNimTag string) (string, string, error) {
var compoundNim, compoundNimVersion string
compoundNimParts := strings.Split(compoundNimTag, ":")
if len(compoundNimParts) == 2 {
compoundNim = compoundNimParts[0]
compoundNimVersion = compoundNimParts[1]
} else {
return "", "", fmt.Errorf("invalid Compound Nim format, expected 'compoundnim:version'")
}
fmt.Println("Compound Nim:", compoundNim)
fmt.Println("Compound Nim Version:", compoundNimVersion)
return compoundNim, compoundNimVersion, nil
}
func (c *deploymentController) buildDeploymentTargetConfiguration(schema *schemasv2.UpdateDeploymentSchema, compoundNim, compoundNimVersion string) (*schemas.CreateDeploymentTargetSchema, error) {
// Extract the first service from Services map
var firstServiceSpec schemasv2.ServiceSpec
for _, serviceSpec := range schema.Services {
firstServiceSpec = serviceSpec
break
}
hpaMinReplica := int32(firstServiceSpec.Scaling.MinReplicas)
hpaMaxRepica := int32(firstServiceSpec.Scaling.MaxReplicas)
enableIngress := false
// Convert service configuration into CreateDeploymentTargetSchema
createDeploymentTarget := &schemas.CreateDeploymentTargetSchema{
CompoundNim: compoundNim,
Version: compoundNimVersion,
Config: &schemas.DeploymentTargetConfig{
HPAConf: &schemas.DeploymentTargetHPAConf{
MinReplicas: &hpaMinReplica,
MaxReplicas: &hpaMaxRepica,
},
Resources: &schemas.Resources{
Requests: &schemas.ResourceItem{
CPU: firstServiceSpec.ConfigOverrides.Resources.Requests.CPU,
GPU: firstServiceSpec.ConfigOverrides.Resources.Requests.GPU,
Memory: firstServiceSpec.ConfigOverrides.Resources.Requests.Memory,
},
Limits: &schemas.ResourceItem{
CPU: firstServiceSpec.ConfigOverrides.Resources.Limits.CPU,
GPU: firstServiceSpec.ConfigOverrides.Resources.Limits.GPU,
Memory: firstServiceSpec.ConfigOverrides.Resources.Limits.Memory,
},
},
DeploymentOverrides: &schemas.DeploymentOverrides{
ColdStartTimeout: firstServiceSpec.ColdStartTimeout,
},
// Assuming Envs, Runners, EnableIngress, DeploymentStrategy are default values or nil
EnableIngress: &enableIngress, // Assuming false as default
DeploymentStrategy: nil, // Assuming no specific strategy as default
ExternalServices: firstServiceSpec.ExternalServices,
},
}
return createDeploymentTarget, nil
}
func (c *deploymentController) TerminateV2(ctx *gin.Context) {
cluster, deployment, err := getDeploymentAndInfo(ctx)
if err != nil {
return // ctx set in helper function
}
deployment, err = c.doTerminate(ctx, deployment)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
// Getting mocked creator
creator := mocks.DefaultUser()
deploymentSchema, err := converters.ToDeploymentSchemaV2(ctx, cluster, deployment, creator)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func (c *deploymentController) DeleteV2(ctx *gin.Context) {
cluster, deployment, err := getDeploymentAndInfo(ctx)
if err != nil {
return // ctx set in helper function
}
deployment, err = services.DeploymentService.Delete(ctx, deployment)
if err != nil {
log.Error().Msgf("Could not delete deployment with the name %s: %s", deployment.Name, err.Error())
ctx.JSON(500, gin.H{"error": fmt.Sprintf("Could not delete deployment with the name %s", deployment.Name)})
return
}
// Getting mocked creator
creator := mocks.DefaultUser()
deploymentSchema, err := converters.ToDeploymentSchemaV2(ctx, cluster, deployment, creator)
if err != nil {
ctx.JSON(500, gin.H{"error": err.Error()})
return
}
ctx.JSON(200, deploymentSchema)
}
func getDeploymentAndInfo(ctx *gin.Context) (*models.Cluster, *models.Deployment, error) {
cluster, kubeNamespace, err := getClusterAndInfo(ctx)
if err != nil {
return nil, nil, err
}
var getSchema schemasv2.GetDeploymentSchema
if err := ctx.ShouldBindUri(&getSchema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return nil, nil, err
}
deployment, err := getDeployment(ctx, getSchema.ToV1(cluster.Name, kubeNamespace))
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(404, fmt.Sprintf("Could not find deployment with the name %s", getSchema.DeploymentName))
return nil, nil, err
}
return cluster, deployment, err
}
func getClusterAndInfo(ctx *gin.Context) (*models.Cluster, string, error) {
clusterName := ctx.Query("cluster")
if clusterName == "" {
clusterName = "default"
}
log.Info().Msgf("Got clusterName: %s", clusterName)
cluster, err := ClusterController.GetCluster(ctx, clusterName)
if err != nil {
log.Error().Msgf("Could not find cluster with the name %s: %s", clusterName, err.Error())
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find cluster with the name %s", clusterName)})
return nil, "", err
}
// Getting the k8s namespace
kubeNamespace := os.Getenv("DEFAULT_KUBE_NAMESPACE")
if kubeNamespace == "" {
kubeNamespace = "compoundai"
}
return cluster, kubeNamespace, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"fmt"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/converters"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/services"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
)
type deploymentRevisionController struct{}
var DeploymentRevisionController = deploymentRevisionController{}
func (c *deploymentRevisionController) List(ctx *gin.Context) {
var schema schemas.ListQuerySchema
var getSchema schemas.GetDeploymentSchema
if err := ctx.ShouldBindUri(&getSchema); err != nil {
log.Error().Msgf("Error binding: %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
if err := ctx.ShouldBindQuery(&schema); err != nil {
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
deployment, err := getDeployment(ctx, &getSchema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", getSchema.DeploymentName, err.Error())
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find deployment with the name %s", getSchema.DeploymentName)})
return
}
deploymentRevisions, total, err := services.DeploymentRevisionService.List(ctx, services.ListDeploymentRevisionOption{
BaseListOption: services.BaseListOption{
Start: &schema.Start,
Count: &schema.Count,
Search: schema.Search,
},
DeploymentId: &deployment.ID,
})
if err != nil {
errMsg := fmt.Sprintf("Failed to get deployment revisions %s", err.Error())
log.Error().Msgf(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
deploymentRevisionSchemas, err := converters.ToDeploymentRevisionSchemas(ctx, deploymentRevisions)
if err != nil {
errMsg := fmt.Sprintf("Failed to convert models to deployment revision schemas %s", err.Error())
log.Error().Msgf(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
log.Info().Msgf("Got %d deployment revisions", len(deploymentRevisionSchemas))
deploymentRevisionListSchema := schemas.DeploymentRevisionListSchema{
BaseListSchema: schemas.BaseListSchema{
Total: total,
Start: schema.Start,
Count: schema.Count,
},
Items: deploymentRevisionSchemas,
}
ctx.JSON(200, deploymentRevisionListSchema)
}
func (c *deploymentRevisionController) Get(ctx *gin.Context) {
var schema schemas.GetDeploymentRevisionSchema
if err := ctx.ShouldBindUri(&schema); err != nil {
log.Error().Msgf("Error binding: %s", err.Error())
ctx.JSON(400, gin.H{"error": err.Error()})
return
}
_, err := getDeployment(ctx, &schema.GetDeploymentSchema)
if err != nil {
log.Error().Msgf("Could not find deployment with the name %s: %s", schema.DeploymentName, err.Error())
ctx.JSON(404, gin.H{"error": fmt.Sprintf("Could not find deployment with the name %s", schema.DeploymentName)})
return
}
deploymentRevision, err := services.DeploymentRevisionService.GetByUid(ctx, schema.RevisionUid)
if err != nil {
errMsg := fmt.Sprintf("Failed to get deployment revisions %s for %s", schema.DeploymentName, err.Error())
log.Error().Msgf(errMsg)
ctx.JSON(404, gin.H{"error": errMsg})
return
}
deploymentRevisionSchema, err := converters.ToDeploymentRevisionSchema(ctx, deploymentRevision)
if err != nil {
errMsg := fmt.Sprintf("Failed to convert model to deployment revision schema %s", err.Error())
log.Error().Msgf(errMsg)
ctx.JSON(500, gin.H{"error": errMsg})
return
}
ctx.JSON(200, deploymentRevisionSchema)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"net/http"
"github.com/gin-gonic/gin"
)
type healthController struct{}
var HealthController = healthController{}
func (h *healthController) Get(gin *gin.Context) {
gin.JSON(http.StatusOK, "ok")
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"github.com/gin-gonic/gin"
)
type infoController struct{}
var InfoController = infoController{}
type InfoSchema struct {
IsSaas bool `json:"is_saas"`
SaasDomainSuffix string `json:"saas_domain_suffix"`
}
func (c *infoController) GetInfo(ctx *gin.Context) {
schema := InfoSchema{
IsSaas: true,
SaasDomainSuffix: "",
}
ctx.JSON(200, schema)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment