Commit 602352ce authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

chore: rename dynamo (#44)


Co-authored-by: default avatarBiswa Panda <biswa.panda@gmail.com>
parent ecf53ce2
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type OrganizationSchema struct {
ResourceSchema
Creator *UserSchema `json:"creator"`
Description string `json:"description"`
}
type OrganizationFullSchema struct {
OrganizationSchema
}
type OrganizationListSchema struct {
BaseListSchema
Items []*OrganizationSchema `json:"items"`
}
type UpdateOrganizationSchema struct {
Description *string `json:"description"`
}
type CreateOrganizationSchema struct {
Name string `json:"name"`
Description string `json:"description"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type MemberRole string
const (
MemberRoleGuest MemberRole = "guest"
MemberRoleDeveloper MemberRole = "developer"
MemberRoleAdmin MemberRole = "admin"
)
type OrganizationMemberSchema struct {
BaseSchema
Role MemberRole `json:"role"`
Creator *UserSchema `json:"creator"`
User UserSchema `json:"user"`
Organization OrganizationSchema `json:"organization"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type OwnershipSchema struct {
OrganizationId string
UserId string
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type IResourceSchema interface {
GetType() ResourceType
GetName() string
}
type ResourceSchema struct {
BaseSchema
Name string `json:"name"`
Labels []LabelItemSchema `json:"labels"`
ResourceType ResourceType `json:"resource_type" enum:"user,organization,cluster,compound_nim,compound_nim_version,deployment,deployment_revision,model_repository,model,api_token"`
}
func (r ResourceSchema) GetType() ResourceType {
return r.ResourceType
}
func (r ResourceSchema) GetName() string {
return r.Name
}
func (s *ResourceSchema) TypeName() string {
return string(s.ResourceType)
}
type ResourceItem struct {
CPU string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
GPU string `json:"gpu,omitempty"`
Custom map[string]string `json:"custom,omitempty"`
}
type Resources struct {
Requests *ResourceItem `json:"requests,omitempty"`
Limits *ResourceItem `json:"limits,omitempty"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type ResourceType string
const (
ResourceTypeUser ResourceType = "user"
ResourceTypeOrganization ResourceType = "organization"
ResourceTypeCluster ResourceType = "cluster"
ResourceTypeCompoundNim ResourceType = "compound_nim"
ResourceTypeCompoundNimVersion ResourceType = "compound_nim_version"
ResourceTypeDeployment ResourceType = "deployment"
ResourceTypeDeploymentRevision ResourceType = "deployment_revision"
ResourceTypeTerminalRecord ResourceType = "terminal_record"
ResourceTypeModelRepository ResourceType = "model_repository"
ResourceTypeModel ResourceType = "model"
ResourceTypeLabel ResourceType = "label"
ResourceTypeApiToken ResourceType = "api_token"
ResourceTypeCompoundAIComponent ResourceType = "yatai_component"
)
func (type_ ResourceType) Ptr() *ResourceType {
return &type_
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type UserSchema struct {
ResourceSchema
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Email string `json:"email"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
type VersionSchema struct {
Version string `json:"version"`
GitCommit string `json:"git_commit"`
BuildDate string `json:"build_date"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemasv2
import "github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
type ClusterSchema struct {
schemas.ResourceSchema
Description string `json:"description"`
OrganizationName string `json:"organization_name"`
Creator *schemas.UserSchema `json:"creator"`
IsFirst *bool `json:"is_first,omitempty"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemasv2
import "github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
type DeploymentSchema struct {
schemas.ResourceSchema
Creator *schemas.UserSchema `json:"creator"`
Cluster *ClusterSchema `json:"cluster"`
Status schemas.DeploymentStatus `json:"status" enum:"unknown,non-deployed,running,unhealthy,failed,deploying"`
URLs []string `json:"urls"`
LatestRevision *schemas.DeploymentRevisionSchema `json:"latest_revision"`
KubeNamespace string `json:"kube_namespace"`
}
type GetDeploymentSchema struct {
DeploymentName string `uri:"deploymentName" binding:"required"`
}
func (s *GetDeploymentSchema) ToV1(clusterName string, namespace string) *schemas.GetDeploymentSchema {
return &schemas.GetDeploymentSchema{
GetClusterSchema: schemas.GetClusterSchema{
ClusterName: clusterName,
},
KubeNamespace: namespace,
DeploymentName: s.DeploymentName,
}
}
type CreateDeploymentSchema struct {
UpdateDeploymentSchema
Name string `json:"name"`
}
type UpdateDeploymentSchema struct {
DeploymentConfigSchema
CompoundNim string `json:"bento"`
}
type DeploymentConfigSchema struct {
AccessAuthorization bool `json:"access_authorization"`
Envs interface{} `json:"envs,omitempty"`
Secrets interface{} `json:"secrets,omitempty"`
Services map[string]ServiceSpec `json:"services"`
}
type ServiceSpec struct {
Scaling ScalingSpec `json:"scaling"`
ConfigOverrides ConfigOverridesSpec `json:"config_overrides"`
ExternalServices map[string]schemas.ExternalService `json:"external_services,omitempty"`
ColdStartTimeout *int32 `json:"cold_start_timeout,omitempty"`
}
type ScalingSpec struct {
MinReplicas int `json:"min_replicas"`
MaxReplicas int `json:"max_replicas"`
}
type ConfigOverridesSpec struct {
Resources schemas.Resources `json:"resources"`
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"fmt"
"strings"
"gorm.io/gorm"
)
type BaseListOption struct {
Start *uint
Count *uint
Search *string
Keywords *[]string
KeywordFieldNames *[]string
}
func (opt BaseListOption) BindQueryWithLimit(query *gorm.DB) *gorm.DB {
if opt.Count != nil {
query = query.Limit(int(*opt.Count))
}
if opt.Start != nil {
query = query.Offset(int(*opt.Start))
}
return query
}
func (opt BaseListOption) BindQueryWithKeywords(query *gorm.DB, tableName string) *gorm.DB {
tableName = query.Statement.Quote(tableName)
keywordFieldNames := []string{"name"}
if opt.KeywordFieldNames != nil {
keywordFieldNames = *opt.KeywordFieldNames
}
if opt.Search != nil && *opt.Search != "" {
sqlPieces := make([]string, 0, len(keywordFieldNames))
args := make([]interface{}, 0, len(keywordFieldNames))
for _, keywordFieldName := range keywordFieldNames {
keywordFieldName = query.Statement.Quote(keywordFieldName)
sqlPieces = append(sqlPieces, fmt.Sprintf("%s.%s LIKE ?", tableName, keywordFieldName))
args = append(args, fmt.Sprintf("%%%s%%", *opt.Search))
}
query = query.Where(fmt.Sprintf("(%s)", strings.Join(sqlPieces, " OR ")), args...)
}
if opt.Keywords != nil {
sqlPieces := make([]string, 0, len(keywordFieldNames))
args := make([]interface{}, 0, len(keywordFieldNames))
for _, keywordFieldName := range keywordFieldNames {
keywordFieldName = query.Statement.Quote(keywordFieldName)
sqlPieces_ := make([]string, 0, len(*opt.Keywords))
for _, keyword := range *opt.Keywords {
sqlPieces_ = append(sqlPieces_, fmt.Sprintf("%s.%s LIKE ?", tableName, keywordFieldName))
args = append(args, fmt.Sprintf("%%%s%%", keyword))
}
sqlPieces = append(sqlPieces, fmt.Sprintf("(%s)", strings.Join(sqlPieces_, " AND ")))
}
query = query.Where(fmt.Sprintf("(%s)", strings.Join(sqlPieces, " OR ")), args...)
}
return query
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"errors"
"strings"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"k8s.io/apimachinery/pkg/util/validation"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
type clusterService struct{}
var ClusterService = clusterService{}
type CreateClusterOption struct {
CreatorId string
OrganizationId string
Name string
Description string
KubeConfig string
}
type UpdateClusterOption struct {
Description *string
KubeConfig *string
}
type ListClusterOption struct {
BaseListOption
OrganizationId *string
Ids *[]uint
Names *[]string
CreatorIds *[]uint
Order *string
}
func (s *clusterService) Create(ctx context.Context, opt CreateClusterOption) (*models.Cluster, error) {
errs := validation.IsDNS1035Label(opt.Name)
if len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}
db := s.getDB(ctx)
log.Info().Msg("Starting create cluster transaction")
cluster := models.Cluster{
Resource: models.Resource{
Name: opt.Name,
},
OrganizationAssociate: models.OrganizationAssociate{
OrganizationId: opt.OrganizationId,
},
CreatorAssociate: models.CreatorAssociate{
UserId: opt.CreatorId,
},
Description: opt.Description,
KubeConfig: opt.KubeConfig,
}
if err := db.Create(&cluster).Error; err != nil {
return nil, err
}
log.Info().Msg("Finished create cluster transaction")
return &cluster, nil
}
func (s *clusterService) Update(ctx context.Context, c *models.Cluster, opt UpdateClusterOption) (*models.Cluster, error) {
var err error
updaters := make(map[string]interface{})
if opt.Description != nil {
updaters["description"] = *opt.Description
defer func() {
if err == nil {
c.Description = *opt.Description
}
}()
}
if opt.KubeConfig != nil {
updaters["kube_config"] = *opt.KubeConfig
defer func() {
if err == nil {
c.KubeConfig = *opt.KubeConfig
}
}()
}
if len(updaters) == 0 {
return c, nil
}
db := s.getDB(ctx)
log.Info().Msgf("Updating cluster with updaters: %+v", updaters)
err = db.Where("id = ?", c.ID).Updates(updaters).Error
if err != nil {
log.Error().Msgf("Failed to update cluster: %s", err.Error())
return nil, err
}
return c, err
}
func (s *clusterService) Get(ctx context.Context, id uint) (*models.Cluster, error) {
var cluster models.Cluster
db := s.getDB(ctx)
err := db.Where("id = ?", id).First(&cluster).Error
if err != nil {
log.Error().Msgf("Failed to get cluster by id %d: %s", id, err.Error())
return nil, err
}
if cluster.ID == 0 {
return nil, consts.ErrNotFound
}
return &cluster, nil
}
func (s *clusterService) GetByUid(ctx context.Context, uid string) (*models.Cluster, error) {
var cluster models.Cluster
db := s.getDB(ctx)
err := db.Where("uid = ?", uid).First(&cluster).Error
if err != nil {
log.Error().Msgf("Failed to get cluster by uid %s: %s", uid, err.Error())
return nil, err
}
if cluster.ID == 0 {
return nil, consts.ErrNotFound
}
return &cluster, nil
}
func (s *clusterService) GetByName(ctx context.Context, organizationId string, name string) (*models.Cluster, error) {
var cluster models.Cluster
db := s.getDB(ctx)
err := db.Where("organization_id = ?", organizationId).Where("name = ?", name).First(&cluster).Error
if err != nil {
log.Error().Msgf("Failed to get cluster by name %s: %s", name, err.Error())
return nil, err
}
if cluster.ID == 0 {
return nil, consts.ErrNotFound
}
return &cluster, nil
}
func (s *clusterService) GetIdByName(ctx context.Context, organizationId uint, name string) (uint, error) {
var cluster models.Cluster
db := s.getDB(ctx)
err := db.Select("id").Where("organization_id = ?", organizationId).Where("name = ?", name).First(&cluster).Error
return cluster.ID, err
}
func (s *clusterService) List(ctx context.Context, opt ListClusterOption) ([]*models.Cluster, uint, error) {
clusters := make([]*models.Cluster, 0)
query := s.getDB(ctx)
if opt.Ids != nil {
if len(*opt.Ids) == 0 {
return clusters, 0, nil
}
query = query.Where("id in (?)", *opt.Ids)
}
if opt.Names != nil {
if len(*opt.Names) == 0 {
return clusters, 0, nil
}
query = query.Where("name in (?)", *opt.Names)
}
if opt.OrganizationId != nil {
query = query.Where("organization_id = ?", *opt.OrganizationId)
}
var total int64
err := query.Count(&total).Error
if err != nil {
return nil, 0, err
}
query = opt.BindQueryWithLimit(query)
if opt.Ids == nil {
if opt.Order == nil {
query = query.Order("id DESC")
} else {
query = query.Order(*opt.Order)
}
}
err = query.Find(&clusters).Error
if err != nil {
return nil, 0, err
}
return clusters, uint(total), err
}
func (s *clusterService) getDB(ctx context.Context) *gorm.DB {
db := database.DatabaseUtil.GetDBSession(ctx).Model(&models.Cluster{})
return db
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"strings"
"time"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"k8s.io/apimachinery/pkg/util/validation"
)
type compoundComponentService struct{}
var CompoundComponentService = compoundComponentService{}
type CreateCompoundComponentOption struct {
CreatorId uint
OrganizationId uint
ClusterId uint
Name string
Description string
Version string
KubeNamespace string
Manifest *schemas.CompoundComponentManifestSchema
}
type UpdateCompoundComponentOption struct {
Description *string
Version *string
LatestInstalledAt **time.Time
LatestHeartbeatAt **time.Time
Manifest **schemas.CompoundComponentManifestSchema
}
func (s *compoundComponentService) Create(ctx context.Context, opt CreateCompoundComponentOption) (*models.CompoundComponent, error) {
errs := validation.IsDNS1035Label(opt.Name)
if len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}
errs = validation.IsDNS1035Label(opt.KubeNamespace)
if len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}
now := time.Now()
compoundComponent := models.CompoundComponent{
Resource: models.Resource{
Name: opt.Name,
},
ClusterAssociate: models.ClusterAssociate{
ClusterId: opt.ClusterId,
},
Description: opt.Description,
KubeNamespace: opt.KubeNamespace,
Manifest: opt.Manifest,
Version: opt.Version,
LatestInstalledAt: &now,
LatestHeartbeatAt: &now,
}
err := s.getDB(ctx).Create(&compoundComponent).Error
if err != nil {
return nil, err
}
return &compoundComponent, err
}
func (s *compoundComponentService) Update(ctx context.Context, b *models.CompoundComponent, opt UpdateCompoundComponentOption) (*models.CompoundComponent, error) {
var err error
updaters := make(map[string]interface{})
if opt.Description != nil {
updaters["description"] = *opt.Description
defer func() {
if err == nil {
b.Description = *opt.Description
}
}()
}
if opt.LatestHeartbeatAt != nil {
updaters["latest_heartbeat_at"] = *opt.LatestHeartbeatAt
defer func() {
if err == nil {
b.LatestHeartbeatAt = *opt.LatestHeartbeatAt
}
}()
}
if opt.LatestInstalledAt != nil {
updaters["latest_installed_at"] = *opt.LatestInstalledAt
defer func() {
if err == nil {
b.LatestInstalledAt = *opt.LatestInstalledAt
}
}()
}
if opt.Version != nil {
updaters["version"] = *opt.Version
defer func() {
if err == nil {
b.Version = *opt.Version
}
}()
}
if opt.Manifest != nil {
updaters["manifest"] = *opt.Manifest
defer func() {
if err == nil {
b.Manifest = *opt.Manifest
}
}()
}
if len(updaters) == 0 {
return b, nil
}
err = s.getDB(ctx).Where("id = ?", b.ID).Updates(updaters).Error
if err != nil {
return nil, err
}
return b, err
}
func (s *compoundComponentService) Get(ctx context.Context, id uint) (*models.CompoundComponent, error) {
var compoundComponent models.CompoundComponent
err := s.getDB(ctx).Preload(clause.Associations).Where("id = ?", id).First(&compoundComponent).Error
if err != nil {
return nil, err
}
if compoundComponent.ID == 0 {
return nil, consts.ErrNotFound
}
return &compoundComponent, nil
}
func (s *compoundComponentService) GetByUid(ctx context.Context, uid string) (*models.CompoundComponent, error) {
var compoundComponent models.CompoundComponent
err := s.getDB(ctx).Preload(clause.Associations).Where("uid = ?", uid).First(&compoundComponent).Error
if err != nil {
return nil, err
}
if compoundComponent.ID == 0 {
return nil, consts.ErrNotFound
}
return &compoundComponent, nil
}
func (s *compoundComponentService) GetByName(ctx context.Context, clusterId uint, name string) (*models.CompoundComponent, error) {
var compoundComponent models.CompoundComponent
err := s.getDB(ctx).Where("cluster_id = ?", clusterId).Where("name = ?", name).First(&compoundComponent).Error
if err != nil {
return nil, errors.Wrapf(err, "get compoundComponent %s", name)
}
if compoundComponent.ID == 0 {
return nil, consts.ErrNotFound
}
return &compoundComponent, nil
}
func (s *compoundComponentService) ListByUids(ctx context.Context, uids []string) ([]*models.CompoundComponent, error) {
compoundComponents := make([]*models.CompoundComponent, 0, len(uids))
if len(uids) == 0 {
return compoundComponents, nil
}
err := s.getDB(ctx).Preload(clause.Associations).Where("uid in (?)", uids).Find(&compoundComponents).Error
return compoundComponents, err
}
type ListCompoundComponentOption struct {
Ids *[]uint `json:"ids"`
ClusterId *uint `json:"cluster_id"`
ClusterIds *[]uint `json:"cluster_ids"`
OrganizationId *uint `json:"organization_id"`
}
func (s *compoundComponentService) List(ctx context.Context, opt ListCompoundComponentOption) ([]*models.CompoundComponent, error) {
query := s.getDB(ctx).Preload(clause.Associations)
if opt.OrganizationId != nil {
query = query.Where("organization_id = ?", *opt.OrganizationId)
}
if opt.ClusterIds != nil {
query = query.Where("cluster_id in (?)", *opt.ClusterIds)
}
if opt.ClusterId != nil {
query = query.Where("cluster_id = ?", *opt.ClusterId)
}
if opt.Ids != nil {
query = query.Where("id in (?)", *opt.Ids)
}
compoundComponents := make([]*models.CompoundComponent, 0)
err := query.Find(&compoundComponents).Error
if err != nil {
return nil, err
}
return compoundComponents, err
}
func (s *compoundComponentService) getDB(ctx context.Context) *gorm.DB {
db := database.DatabaseUtil.GetDBSession(ctx).Model(&models.CompoundComponent{})
return db
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/client"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/env"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/rs/zerolog/log"
)
type datastoreService struct{}
var DatastoreService = datastoreService{}
/**
This service connects to the Nemo Datastore Microservice
Note: We should not do any write requests via this service as transactionality is not guaranteed in this way
**/
func (s *datastoreService) GetCompoundNimVersion(ctx context.Context, compoundNim string, compoundNimVersion string) (*schemas.CompoundNimVersionFullSchema, error) {
ndsUrl := env.GetNdsUrl()
getUrl := fmt.Sprintf("%s/api/v1/bento_repositories/%s/bentos/%s", ndsUrl, compoundNim, compoundNimVersion)
_, body, err := client.SendRequestJSON(getUrl, http.MethodGet, nil)
if err != nil {
log.Error().Msgf("Failed to get Compound NIM version %s:%s from %s", compoundNim, compoundNimVersion, ndsUrl)
return nil, err
}
var schema schemas.CompoundNimVersionFullSchema
if err = json.Unmarshal(body, &schema); err != nil {
log.Error().Msgf("Failed to unmarshal into a Compound NIM version schema: %s", err.Error())
return nil, err
}
return &schema, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"fmt"
"strings"
"time"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/validation"
)
type deploymentService struct{}
var DeploymentService = deploymentService{}
type CreateDeploymentOption struct {
CreatorId string
ClusterId uint
Name string
Description string
KubeNamespace string
}
type UpdateDeploymentOption struct {
Description *string
Status *schemas.DeploymentStatus
}
type UpdateDeploymentStatusOption struct {
Status *schemas.DeploymentStatus
SyncingAt **time.Time
UpdatedAt **time.Time
}
type ListDeploymentOption struct {
BaseListOption
ClusterId *uint
CreatorId *string
LastUpdaterId *uint
OrganizationId *string
ClusterIds *[]string
CreatorIds *[]uint
LastUpdaterIds *[]uint
OrganizationIds *[]string
Ids *[]uint
CompoundNimVersionIds *[]uint
Statuses *[]schemas.DeploymentStatus
Order *string
CompoundNimName *string
CompoundNimTag *string
}
func (s *deploymentService) Create(ctx context.Context, opt CreateDeploymentOption) (*models.Deployment, error) {
errs := validation.IsDNS1035Label(opt.Name)
if len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}
errs = validation.IsDNS1035Label(opt.KubeNamespace)
if len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}
guid := uuid.New()
deployment := models.Deployment{
Resource: models.Resource{
Name: opt.Name,
},
ClusterAssociate: models.ClusterAssociate{
ClusterId: opt.ClusterId,
},
CreatorAssociate: models.CreatorAssociate{
UserId: opt.CreatorId,
},
Description: opt.Description,
Status: schemas.DeploymentStatusNonDeployed,
KubeDeployToken: guid.String(),
KubeNamespace: opt.KubeNamespace,
}
db := s.getDB(ctx)
err := db.Create(&deployment).Error
if err != nil {
log.Error().Msgf("Failed to create deployment %s", err.Error())
return nil, err
}
return &deployment, err
}
func (s *deploymentService) Update(ctx context.Context, b *models.Deployment, opt UpdateDeploymentOption) (*models.Deployment, error) {
var err error
updaters := make(map[string]interface{})
if opt.Description != nil {
updaters["description"] = *opt.Description
defer func() {
if err == nil {
b.Description = *opt.Description
}
}()
}
if opt.Status != nil {
updaters["status"] = *opt.Status
defer func() {
if err == nil {
b.Status = *opt.Status
}
}()
}
if len(updaters) == 0 {
return b, nil
}
log.Info().Msgf("Updating deployment with updaters %+v", updaters)
err = s.getDB(ctx).Where("id = ?", b.ID).Updates(updaters).Error
if err != nil {
return nil, err
}
return b, err
}
func (s *deploymentService) Get(ctx context.Context, id uint) (*models.Deployment, error) {
var deployment models.Deployment
err := s.getDB(ctx).Where("id = ?", id).First(&deployment).Error
if err != nil {
log.Error().Msgf("Failed to get deployment by id %d: %s", id, err.Error())
return nil, err
}
if deployment.ID == 0 {
return nil, consts.ErrNotFound
}
return &deployment, nil
}
func (s *deploymentService) GetByUid(ctx context.Context, uid string) (*models.Deployment, error) {
var deployment models.Deployment
err := s.getDB(ctx).Where("uid = ?", uid).First(&deployment).Error
if err != nil {
log.Error().Msgf("Failed to get deployment by uid %s: %s", uid, err.Error())
return nil, err
}
if deployment.ID == 0 {
return nil, consts.ErrNotFound
}
return &deployment, nil
}
func (s *deploymentService) GetByName(ctx context.Context, clusterId uint, kubeNamespace, name string) (*models.Deployment, error) {
var deployment models.Deployment
err := s.getDB(ctx).Where("cluster_id = ?", clusterId).Where("kube_namespace = ?", kubeNamespace).Where("name = ?", name).First(&deployment).Error
if err != nil {
log.Error().Msgf("Failed to get deployment by name and creator %s: %s", name, err.Error())
return nil, err
}
if deployment.ID == 0 {
return nil, consts.ErrNotFound
}
return &deployment, nil
}
func (s *deploymentService) GetByNameAndCreator(ctx context.Context, clusterId uint, kubeNamespace, name string, creatorId string) (*models.Deployment, error) {
var deployment models.Deployment
err := s.getDB(ctx).Where("cluster_id = ?", clusterId).Where("kube_namespace = ?", kubeNamespace).Where("name = ?", name).Where("user_id = ?", creatorId).First(&deployment).Error
if err != nil {
log.Error().Msgf("Failed to get deployment by name %s: %s", name, err.Error())
return nil, err
}
if deployment.ID == 0 {
return nil, consts.ErrNotFound
}
return &deployment, nil
}
func (s *deploymentService) Delete(ctx context.Context, deployment *models.Deployment) (*models.Deployment, error) {
if deployment.Status != schemas.DeploymentStatusTerminated && deployment.Status != schemas.DeploymentStatusTerminating {
return nil, errors.New("deployment is not terminated")
}
return deployment, s.getDB(ctx).Unscoped().Delete(deployment).Error
}
func (s *deploymentService) Terminate(ctx context.Context, deployment *models.Deployment) (*models.Deployment, error) {
deployment, err := s.UpdateStatus(ctx, deployment, UpdateDeploymentStatusOption{
Status: schemas.DeploymentStatusTerminating.Ptr(),
})
if err != nil {
return nil, err
}
start := uint(0)
count := uint(1)
deploymentRevisions, _, err := DeploymentRevisionService.List(ctx, ListDeploymentRevisionOption{
BaseListOption: BaseListOption{
Start: &start,
Count: &count,
},
DeploymentId: &deployment.ID,
Status: schemas.DeploymentRevisionStatusActive.Ptr(),
})
if err != nil {
return nil, err
}
log.Info().Msgf("Fetched %d active deployment revisions to terminate", len(deploymentRevisions))
for _, deploymentRevision := range deploymentRevisions {
err = DeploymentRevisionService.Terminate(ctx, deploymentRevision)
if err != nil {
return nil, err
}
}
_, err = s.SyncStatus(ctx, deployment)
return deployment, err
}
func (s *deploymentService) UpdateStatus(ctx context.Context, deployment *models.Deployment, opt UpdateDeploymentStatusOption) (*models.Deployment, error) {
updater := map[string]interface{}{}
if opt.Status != nil {
deployment.Status = *opt.Status
updater["status"] = *opt.Status
}
if opt.SyncingAt != nil {
deployment.StatusSyncingAt = *opt.SyncingAt
updater["status_syncing_at"] = *opt.SyncingAt
}
if opt.UpdatedAt != nil {
deployment.StatusUpdatedAt = *opt.UpdatedAt
updater["status_updated_at"] = *opt.UpdatedAt
}
log.Info().Msgf("Updating deployment with updaters %+v", updater)
err := s.getDB(ctx).Where("id = ?", deployment.ID).Updates(updater).Error
return deployment, err
}
func (s *deploymentService) SyncStatus(ctx context.Context, d *models.Deployment) (schemas.DeploymentStatus, error) {
now := time.Now()
nowPtr := &now
_, err := s.UpdateStatus(ctx, d, UpdateDeploymentStatusOption{
SyncingAt: &nowPtr,
})
if err != nil {
log.Error().Msgf("Failed to update sync time for deployment %s: %s", d.Name, err.Error())
return d.Status, err
}
currentStatus, err := s.getStatusFromK8s(ctx, d)
if err != nil {
log.Error().Msgf("Failed to get deployment status from k8s for deployment %s: %s", d.Name, err.Error())
return currentStatus, err
}
now = time.Now()
nowPtr = &now
_, err = s.UpdateStatus(ctx, d, UpdateDeploymentStatusOption{
Status: &currentStatus,
UpdatedAt: &nowPtr,
})
if err != nil {
return currentStatus, err
}
return currentStatus, nil
}
func (s *deploymentService) List(ctx context.Context, opt ListDeploymentOption) ([]*models.Deployment, uint, error) {
query := s.getDB(ctx)
if opt.Ids != nil {
query = query.Where("deployment.id in (?)", *opt.Ids)
}
query = query.Joins("LEFT JOIN deployment_revision ON deployment_revision.deployment_id = deployment.id AND deployment_revision.status = ?", schemas.DeploymentRevisionStatusActive)
joinOnDeploymentTargets := query.Joins("LEFT JOIN deployment_target ON deployment_target.deployment_revision_id = deployment_revision.id")
if opt.CompoundNimName != nil {
query = joinOnDeploymentTargets.Where("deployment_target.compound_nim_version_tag LIKE ?", *opt.CompoundNimName+":%")
}
if opt.CompoundNimTag != nil {
query = joinOnDeploymentTargets.Where("deployment_target.compound_nim_version_tag = ?", *opt.CompoundNimTag)
}
if opt.CompoundNimVersionIds != nil {
query = joinOnDeploymentTargets.Where("deployment_target.compound_nim_version_id IN (?)", *opt.CompoundNimVersionIds)
}
if opt.ClusterId != nil {
query = query.Where("deployment.cluster_id = ?", *opt.ClusterId)
}
if opt.ClusterIds != nil {
query = query.Where("deployment.cluster_id IN (?)", *opt.ClusterIds)
}
if opt.Statuses != nil {
query = query.Where("deployment.status IN (?)", *opt.Statuses)
}
if opt.OrganizationId != nil {
query = query.Joins("LEFT JOIN cluster ON cluster.id = deployment.cluster_id")
query = query.Where("cluster.organization_id = ?", *opt.OrganizationId)
}
if opt.CreatorId != nil {
query = query.Where("deployment.user_id = ?", *opt.CreatorId)
}
query = opt.BindQueryWithKeywords(query, "deployment")
query = query.Select("deployment_revision.*, deployment.*")
var total int64
err := query.Count(&total).Error
if err != nil {
return nil, 0, err
}
query = opt.BindQueryWithLimit(query)
if opt.Order != nil {
query = query.Order(*opt.Order)
} else {
query.Order("deployment.id DESC")
}
deployments := make([]*models.Deployment, 0)
err = query.Find(&deployments).Error
if err != nil {
return nil, 0, err
}
return deployments, uint(total), err
}
func (s *deploymentService) getDB(ctx context.Context) *gorm.DB {
db := database.DatabaseUtil.GetDBSession(ctx).Model(&models.Deployment{})
return db
}
func (s *deploymentService) getStatusFromK8s(ctx context.Context, d *models.Deployment) (schemas.DeploymentStatus, error) {
defaultStatus := schemas.DeploymentStatusUnknown
cluster, err := ClusterService.Get(ctx, d.ClusterId)
if err != nil {
return defaultStatus, err
}
namespace := d.KubeNamespace
_, podLister, err := GetPodInformer(ctx, cluster, namespace)
if err != nil {
return defaultStatus, err
}
imageBuilderPods := make([]*apiv1.Pod, 0)
status_ := schemas.DeploymentRevisionStatusActive
deploymentRevisions, _, err := DeploymentRevisionService.List(ctx, ListDeploymentRevisionOption{
DeploymentId: &d.ID,
Status: &status_,
})
if err != nil {
return defaultStatus, err
}
deploymentRevisionIds := make([]uint, 0, len(deploymentRevisions))
for _, deploymentRevision := range deploymentRevisions {
deploymentRevisionIds = append(deploymentRevisionIds, deploymentRevision.ID)
}
deploymentTargets, _, err := DeploymentTargetService.List(ctx, ListDeploymentTargetOption{
DeploymentRevisionIds: &deploymentRevisionIds,
})
if err != nil {
return defaultStatus, err
}
for _, deploymentTarget := range deploymentTargets {
compoundNimParts := strings.Split(deploymentTarget.CompoundNimVersionTag, ":")
if len(compoundNimParts) != 2 {
return defaultStatus, errors.Errorf("Invalid format for CompoundNIM version tag %s. Expected 2 parts got %d", deploymentTarget.CompoundNimVersionTag, len(compoundNimParts))
}
imageBuilderPodsSelector, err := labels.Parse(fmt.Sprintf("%s=%s,%s=%s", consts.KubeLabelCompoundNim, compoundNimParts[0], consts.KubeLabelCompoundNimVersion, compoundNimParts[1]))
if err != nil {
return defaultStatus, err
}
var pods_ []*apiv1.Pod
pods_, err = K8sService.ListPodsBySelector(ctx, podLister, imageBuilderPodsSelector)
if err != nil {
return defaultStatus, err
}
imageBuilderPods = append(imageBuilderPods, pods_...)
}
log.Info().Msgf("Fetched %d image builder jobs", len(imageBuilderPods))
if len(imageBuilderPods) != 0 {
for _, imageBuilderPod := range imageBuilderPods {
for _, container := range imageBuilderPod.Status.ContainerStatuses {
if container.Name == consts.KubeImageBuilderMainContainer {
if container.State.Waiting != nil || container.State.Running != nil {
return schemas.DeploymentStatusImageBuilding, nil
} else if container.State.Terminated != nil {
if container.State.Terminated.ExitCode != 0 {
return schemas.DeploymentStatusImageBuildFailed, nil
}
}
}
}
}
}
pods, err := K8sService.ListPodsByDeployment(ctx, podLister, d)
if err != nil {
return defaultStatus, err
}
log.Info().Msgf("Fetched %d pods", len(pods))
if len(pods) == 0 {
if d.Status == schemas.DeploymentStatusTerminating || d.Status == schemas.DeploymentStatusTerminated {
return schemas.DeploymentStatusTerminated, nil
}
if d.Status == schemas.DeploymentStatusDeploying {
return schemas.DeploymentStatusDeploying, nil
}
return schemas.DeploymentStatusNonDeployed, nil
}
if d.Status == schemas.DeploymentStatusTerminated {
return d.Status, nil
}
hasFailed := false
hasRunning := false
hasPending := false
for _, p := range pods {
log.Info().Msgf("pod %s has status %s", p.Name, p.Status.Phase)
podStatus := p.Status
if podStatus.Phase == apiv1.PodRunning {
hasRunning = true
}
if podStatus.Phase == apiv1.PodFailed {
hasFailed = true
}
if podStatus.Phase == apiv1.PodPending {
hasPending = true
}
}
var deploymentStatus schemas.DeploymentStatus
if d.Status == schemas.DeploymentStatusTerminating {
if !hasRunning {
deploymentStatus = schemas.DeploymentStatusTerminated
} else {
deploymentStatus = schemas.DeploymentStatusTerminating
}
} else if hasFailed && hasRunning {
if hasPending {
deploymentStatus = schemas.DeploymentStatusDeploying
} else {
deploymentStatus = schemas.DeploymentStatusUnhealthy
}
} else if hasPending {
deploymentStatus = schemas.DeploymentStatusDeploying
} else if hasRunning {
deploymentStatus = schemas.DeploymentStatusRunning
}
log.Info().Msgf("The current status of the deployment is %s", deploymentStatus)
return deploymentStatus, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/utils"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/crds"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
type deploymentManagementService struct{}
var DeploymentManagementService = deploymentManagementService{}
type DMSConfiguration struct {
Version string `json:"version"`
Data interface{} `json:"data"`
}
type DMSCreateRequest struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
ResourceType crds.CustomResourceType `json:"type"`
Configuration interface{} `json:"configuration"`
Labels map[string]string `json:"labels"`
}
type DMSResponseStatus struct {
Status string `json:"status"`
Message string `json:"message"`
}
type DMSCreateResponse struct {
Id string `json:"id"`
Status DMSResponseStatus `json:"status"`
Configuration interface{} `json:"configuration"`
}
func (s *deploymentManagementService) Create(ctx context.Context, deploymentTarget *models.DeploymentTarget, deployOption *models.DeployOption, ownership *schemas.OwnershipSchema) (*models.DeploymentTarget, error) {
dmsHost, dmsPort, err := getDMSPortAndHost()
if err != nil {
log.Error().Msg(err.Error())
return nil, err
}
url := fmt.Sprintf("http://%s:%s/v1/deployments", dmsHost, dmsPort)
deployment, err := DeploymentService.Get(ctx, deploymentTarget.DeploymentId)
if err != nil {
log.Info().Msg("Could not find associated deployment")
return nil, err
}
defer func() {
if err != nil {
s.Delete(ctx, deploymentTarget)
}
}()
compoundNimDeployment, compoundNimRequest := s.transformToDMSRequestsV1alpha1(deployment, deploymentTarget, ownership)
body, err := sendRequest(compoundNimDeployment, url, http.MethodPost)
if err != nil {
return nil, err
}
var result DMSCreateResponse
err = json.Unmarshal(body, &result)
if err != nil {
fmt.Println("Error unmarshaling:", err)
return nil, err
}
deploymentTarget.KubeDeploymentId = result.Id
body, err = sendRequest(compoundNimRequest, url, http.MethodPost)
if err != nil {
return nil, err
}
err = json.Unmarshal(body, &result)
if err != nil {
fmt.Println("Error unmarshaling:", err)
return nil, err
}
deploymentTarget.KubeRequestId = result.Id
return deploymentTarget, nil
}
func (s *deploymentManagementService) Delete(ctx context.Context, deploymentTarget *models.DeploymentTarget) error {
dmsHost, dmsPort, err := getDMSPortAndHost()
if err != nil {
log.Error().Msg(err.Error())
return err
}
if deploymentTarget.KubeDeploymentId != "" {
urlDeployment := fmt.Sprintf("http://%s:%s/v1/deployments/%s", dmsHost, dmsPort, deploymentTarget.KubeDeploymentId)
_, err := sendRequest(nil, urlDeployment, http.MethodDelete)
if err != nil {
return err
}
}
if deploymentTarget.KubeRequestId != "" {
urlRequest := fmt.Sprintf("http://%s:%s/v1/deployments/%s", os.Getenv("DMS_HOST"), os.Getenv("DMS_PORT"), deploymentTarget.KubeRequestId)
_, err := sendRequest(nil, urlRequest, http.MethodDelete)
if err != nil {
return err
}
}
return nil
}
func (s *deploymentManagementService) transformToDMSRequestsV1alpha1(deployment *models.Deployment, deploymentTarget *models.DeploymentTarget, ownership *schemas.OwnershipSchema) (compoundNimDeployment DMSCreateRequest, compoundNimRequest DMSCreateRequest) {
translatedTag := s.translateCompoundNimVersionTagToRFC1123(deploymentTarget.CompoundNimVersionTag)
livenessProbe, readinessProbe := createProbeSpecs(deploymentTarget.Config.DeploymentOverrides)
compoundNimDeployment = DMSCreateRequest{
Name: deployment.Name,
Namespace: deployment.KubeNamespace,
ResourceType: crds.CompoundNimDeployment,
Configuration: crds.CompoundNimDeploymentConfigurationV1Alpha1{
Data: crds.CompoundNimDeploymentData{
CompoundNimVersion: translatedTag,
Resources: *deploymentTarget.Config.Resources,
ExternalServices: deploymentTarget.Config.ExternalServices,
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
},
Version: crds.ApiVersion,
},
Labels: map[string]string{
consts.NgcOrganizationHeaderName: ownership.OrganizationId,
consts.NgcUserHeaderName: ownership.UserId,
},
}
compoundNimRequest = DMSCreateRequest{
Name: translatedTag,
Namespace: deployment.KubeNamespace,
ResourceType: crds.CompoundNimRequest,
Configuration: crds.CompoundNimRequestConfigurationV1Alpha1{
Data: crds.CompoundNimRequestData{
CompoundNimVersionTag: deploymentTarget.CompoundNimVersionTag,
},
Version: crds.ApiVersion,
},
Labels: map[string]string{
consts.NgcOrganizationHeaderName: ownership.OrganizationId,
consts.NgcUserHeaderName: ownership.UserId,
},
}
return
}
func createProbeSpecs(deploymentOverrides *schemas.DeploymentOverrides) (livenessProbe *corev1.Probe, readinessProbe *corev1.Probe) {
if deploymentOverrides != nil && deploymentOverrides.ColdStartTimeout != nil {
livenessProbe = &corev1.Probe{
InitialDelaySeconds: *deploymentOverrides.ColdStartTimeout,
TimeoutSeconds: 20,
FailureThreshold: 6,
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/livez",
Port: intstr.FromString(consts.CompoundNimContainerPortName),
},
},
}
readinessProbe = &corev1.Probe{
InitialDelaySeconds: *deploymentOverrides.ColdStartTimeout,
TimeoutSeconds: 5,
FailureThreshold: 12,
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.FromString(consts.CompoundNimContainerPortName),
},
},
}
}
return
}
func getDMSPortAndHost() (string, string, error) {
dmsHost, err := utils.MustGetEnv("DMS_HOST")
if err != nil {
return "", "", err
}
dmsPort, err := utils.MustGetEnv("DMS_PORT")
if err != nil {
return "", "", err
}
return dmsHost, dmsPort, nil
}
/**
* Translates a Compound NIM Version tag to a valid RFC 1123 DNS label.
*
* This function makes the following modifications to the input string:
* 1. Replaces all ":" characters with "--" because colons are not permitted in DNS labels.
* 2. If the resulting string exceeds the 63-character limit imposed by RFC 1123, it truncates
* the string to 63 characters.
*
* @param {string} tag - The original CompoundAI Nim tag that needs to be converted.
* @returns {string} - A string that complies with the RFC 1123 DNS label format.
*
* Example:
* Input: "nim:latest"
* Output: "nim--latest"
*/
func (s *deploymentManagementService) translateCompoundNimVersionTagToRFC1123(tag string) string {
translated := strings.ReplaceAll(tag, ":", "--")
// If the length exceeds 63 characters, truncate it
if len(translated) > 63 {
translated = translated[:63]
}
return translated
}
func sendRequest(payload interface{}, url string, method string) ([]byte, error) {
jsonData, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %v", err)
}
req, err := http.NewRequest(method, url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("received non-OK response: %v, %s", resp.Status, body)
}
return body, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
type deploymentRevisionService struct{}
var DeploymentRevisionService = deploymentRevisionService{}
type CreateDeploymentRevisionOption struct {
CreatorId string
DeploymentId uint
Status schemas.DeploymentRevisionStatus
}
type UpdateDeploymentRevisionOption struct {
Status *schemas.DeploymentRevisionStatus
}
type ListDeploymentRevisionOption struct {
BaseListOption
DeploymentId *uint
DeploymentIds *[]uint
Ids *[]uint
Status *schemas.DeploymentRevisionStatus
}
func (s *deploymentRevisionService) Create(ctx context.Context, opt CreateDeploymentRevisionOption) (*models.DeploymentRevision, error) {
deploymentRevision := models.DeploymentRevision{
CreatorAssociate: models.CreatorAssociate{
UserId: opt.CreatorId,
},
DeploymentAssociate: models.DeploymentAssociate{
DeploymentId: opt.DeploymentId,
},
Status: opt.Status,
}
err := s.getDB(ctx).Create(&deploymentRevision).Error
if err != nil {
return nil, err
}
return &deploymentRevision, err
}
func (s *deploymentRevisionService) Update(ctx context.Context, deploymentRevision *models.DeploymentRevision, opt UpdateDeploymentRevisionOption) (*models.DeploymentRevision, error) {
var err error
updaters := make(map[string]interface{})
if opt.Status != nil {
updaters["status"] = *opt.Status
defer func() {
if err == nil {
deploymentRevision.Status = *opt.Status
}
}()
}
if len(updaters) == 0 {
return deploymentRevision, nil
}
log.Info().Msgf("Updating deployment revision with updaters: %+v", updaters)
err = s.getDB(ctx).Where("id = ?", deploymentRevision.ID).Updates(updaters).Error
if err != nil {
log.Error().Msgf("Failed to update deployment revision: %s", err.Error())
return nil, err
}
return deploymentRevision, err
}
func (s *deploymentRevisionService) Get(ctx context.Context, id uint) (*models.DeploymentRevision, error) {
var deploymentRevision models.DeploymentRevision
err := s.getDB(ctx).Where("id = ?", id).First(&deploymentRevision).Error
if err != nil {
log.Error().Msgf("Failed to get deployment revision by id %d: %s", id, err.Error())
return nil, err
}
if deploymentRevision.ID == 0 {
return nil, consts.ErrNotFound
}
return &deploymentRevision, nil
}
func (s *deploymentRevisionService) GetByUid(ctx context.Context, uid string) (*models.DeploymentRevision, error) {
var deploymentRevision models.DeploymentRevision
err := s.getDB(ctx).Where("uid = ?", uid).First(&deploymentRevision).Error
if err != nil {
log.Error().Msgf("Failed to get deployment revision by uid %s: %s", uid, err.Error())
return nil, err
}
if deploymentRevision.ID == 0 {
return nil, consts.ErrNotFound
}
return &deploymentRevision, nil
}
func (s *deploymentRevisionService) List(ctx context.Context, opt ListDeploymentRevisionOption) ([]*models.DeploymentRevision, uint, error) {
query := s.getDB(ctx)
if opt.DeploymentId != nil {
query = query.Where("deployment_revision.deployment_id = ?", *opt.DeploymentId)
}
if opt.DeploymentIds != nil {
query = query.Where("deployment_revision.deployment_id in (?)", *opt.DeploymentIds)
}
if opt.Status != nil {
query = query.Where("deployment_revision.status = ?", *opt.Status)
}
if opt.Ids != nil {
query = query.Where("deployment_revision.id in (?)", *opt.Ids)
}
query = query.Select("distinct(deployment_revision.*)")
var total int64
err := query.Count(&total).Error
if err != nil {
return nil, 0, err
}
deployments := make([]*models.DeploymentRevision, 0)
query = opt.BindQueryWithLimit(query)
err = query.Order("deployment_revision.id DESC").Find(&deployments).Error
if err != nil {
return nil, 0, err
}
return deployments, uint(total), err
}
func (s *deploymentRevisionService) GetDeployOption(ctx context.Context, deploymentRevision *models.DeploymentRevision, force bool) (*models.DeployOption, error) {
deployOption := &models.DeployOption{
Force: force,
}
return deployOption, nil
}
func (s *deploymentRevisionService) Terminate(ctx context.Context, deploymentRevision *models.DeploymentRevision) (err error) {
deploymentTargets, _, err := DeploymentTargetService.List(ctx, ListDeploymentTargetOption{
DeploymentRevisionId: &deploymentRevision.ID,
})
if err != nil {
log.Error().Msgf("Failed to fetch deployment targets when terminating revision: %s", err.Error())
}
for _, target := range deploymentTargets {
_, err := DeploymentTargetService.Terminate(ctx, target)
if err != nil {
log.Error().Msgf("Error occurred when terminating targets for revision: %s", err.Error())
return err
}
}
status := schemas.DeploymentRevisionStatusInactive
_, err = s.Update(ctx, deploymentRevision, UpdateDeploymentRevisionOption{
Status: &status,
})
if err != nil {
log.Error().Msgf("Failed to set revision status to inactive: %s", err.Error())
return err
}
return nil
}
func (s *deploymentRevisionService) Deploy(ctx context.Context, deploymentRevision *models.DeploymentRevision, deploymentTargets []*models.DeploymentTarget, ownership *schemas.OwnershipSchema, force bool) (err error) {
_, err = DeploymentService.Get(ctx, deploymentRevision.DeploymentId)
if err != nil {
return
}
deployOption, err := s.GetDeployOption(ctx, deploymentRevision, force)
if err != nil {
return
}
if len(deploymentTargets) == 0 {
deploymentTargets, _, err = DeploymentTargetService.List(ctx, ListDeploymentTargetOption{
DeploymentRevisionId: &deploymentRevision.ID,
})
if err != nil {
return
}
}
// Can not use goroutine here because of pgx transaction bug
for _, deploymentTarget := range deploymentTargets {
_, err = DeploymentTargetService.Deploy(ctx, deploymentTarget, deployOption, ownership)
if err != nil {
return
}
}
return nil
}
func (s *deploymentRevisionService) getDB(ctx context.Context) *gorm.DB {
db := database.DatabaseUtil.GetDBSession(ctx).Model(&models.DeploymentRevision{})
return db
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"fmt"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/database"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/schemas"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
type deploymentTargetService struct{}
var DeploymentTargetService = deploymentTargetService{}
type CreateDeploymentTargetOption struct {
CreatorId string
DeploymentId uint
DeploymentRevisionId uint
CompoundNimVersionId string
CompoundNimVersionTag string
Config *schemas.DeploymentTargetConfig
}
type UpdateDeploymentTargetOption struct {
Config **schemas.DeploymentTargetConfig
}
type ListDeploymentTargetOption struct {
BaseListOption
DeploymentRevisionStatus *schemas.DeploymentRevisionStatus
DeploymentId *uint
DeploymentIds *[]uint
DeploymentRevisionId *uint
DeploymentRevisionIds *[]uint
Type *schemas.DeploymentTargetType
}
func (s *deploymentTargetService) Create(ctx context.Context, opt CreateDeploymentTargetOption) (*models.DeploymentTarget, error) {
if opt.Config == nil {
defaultCPU := int32(80)
defaultGPU := int32(80)
defaultMinReplicas := int32(2)
defaultMaxReplicas := int32(10)
opt.Config = &schemas.DeploymentTargetConfig{
Resources: &schemas.Resources{
Requests: &schemas.ResourceItem{
CPU: "500m",
Memory: "1G",
},
Limits: &schemas.ResourceItem{
CPU: "1000m",
Memory: "2G",
},
},
HPAConf: &schemas.DeploymentTargetHPAConf{
CPU: &defaultCPU,
GPU: &defaultGPU,
MinReplicas: &defaultMinReplicas,
MaxReplicas: &defaultMaxReplicas,
},
}
}
deploymentTarget := models.DeploymentTarget{
CreatorAssociate: models.CreatorAssociate{
UserId: opt.CreatorId,
},
DeploymentAssociate: models.DeploymentAssociate{
DeploymentId: opt.DeploymentId,
},
DeploymentRevisionAssociate: models.DeploymentRevisionAssociate{
DeploymentRevisionId: opt.DeploymentRevisionId,
},
CompoundNimVersionAssociate: models.CompoundNimVersionAssociate{
CompoundNimVersionId: opt.CompoundNimVersionId,
CompoundNimVersionTag: opt.CompoundNimVersionTag,
},
Config: opt.Config,
}
err := s.getDB(ctx).Create(&deploymentTarget).Error
if err != nil {
return nil, err
}
return &deploymentTarget, err
}
func (s *deploymentTargetService) Get(ctx context.Context, id uint) (*models.DeploymentTarget, error) {
var deploymentTarget models.DeploymentTarget
err := s.getDB(ctx).Where("id = ?", id).First(&deploymentTarget).Error
if err != nil {
log.Error().Msgf("Failed to get deployment revision by id %d: %s", id, err.Error())
return nil, err
}
if deploymentTarget.ID == 0 {
return nil, consts.ErrNotFound
}
return &deploymentTarget, nil
}
func (s *deploymentTargetService) GetByUid(ctx context.Context, uid string) (*models.DeploymentTarget, error) {
var deploymentTarget models.DeploymentTarget
err := s.getDB(ctx).Where("uid = ?", uid).First(&deploymentTarget).Error
if err != nil {
log.Error().Msgf("Failed to get deployment revision by uid %s: %s", uid, err.Error())
return nil, err
}
if deploymentTarget.ID == 0 {
return nil, consts.ErrNotFound
}
return &deploymentTarget, nil
}
func (s *deploymentTargetService) List(ctx context.Context, opt ListDeploymentTargetOption) ([]*models.DeploymentTarget, uint, error) {
query := s.getDB(ctx)
if opt.DeploymentRevisionStatus != nil {
query = query.Joins("INNER JOIN deployment_revision ON deployment_revision.id = deployment_target.deployment_revision_id and deployment_revision.status = ?", *opt.DeploymentRevisionStatus)
}
if opt.DeploymentId != nil {
query = query.Where("deployment_target.deployment_id = ?", *opt.DeploymentId)
}
if opt.DeploymentRevisionId != nil {
query = query.Where("deployment_target.deployment_revision_id = ?", *opt.DeploymentRevisionId)
}
if opt.DeploymentIds != nil {
query = query.Where("deployment_target.deployment_id in (?)", *opt.DeploymentIds)
}
if opt.DeploymentRevisionIds != nil {
query = query.Where("deployment_target.deployment_revision_id in (?)", *opt.DeploymentRevisionIds)
}
if opt.Type != nil {
query = query.Where("deployment_target.type = ?", *opt.Type)
}
var total int64
err := query.Count(&total).Error
if err != nil {
return nil, 0, err
}
deploymentTargets := make([]*models.DeploymentTarget, 0)
query = opt.BindQueryWithLimit(query)
err = query.Order("deployment_target.id ASC").Find(&deploymentTargets).Error
if err != nil {
return nil, 0, err
}
return deploymentTargets, uint(total), err
}
func (s *deploymentTargetService) Update(ctx context.Context, b *models.DeploymentTarget, opt UpdateDeploymentTargetOption) (*models.DeploymentTarget, error) {
var err error
updaters := make(map[string]interface{})
if opt.Config != nil {
updaters["config"] = *opt.Config
defer func() {
if err == nil {
b.Config = *opt.Config
}
}()
}
if len(updaters) == 0 {
return b, nil
}
log.Info().Msgf("Updating deployment target with updaters: %+v", updaters)
err = s.getDB(ctx).Where("id = ?", b.ID).Updates(updaters).Error
return b, err
}
func (s *deploymentTargetService) Deploy(ctx context.Context, deploymentTarget *models.DeploymentTarget, deployOption *models.DeployOption, ownership *schemas.OwnershipSchema) (*models.DeploymentTarget, error) {
err := s.getDB(ctx).Where("id = ?", deploymentTarget.ID).Save(deploymentTarget).Error
if err != nil {
deleteErr := DeploymentManagementService.Delete(ctx, deploymentTarget)
if deleteErr != nil {
log.Error().Msg("Failed to clean up kube resources for erroneous deployment")
}
err = fmt.Errorf("failed to update deploymentTarget after creating kube resources: %s", err.Error())
return nil, err
}
return deploymentTarget, nil
}
func (s *deploymentTargetService) Terminate(ctx context.Context, deploymentTarget *models.DeploymentTarget) (*models.DeploymentTarget, error) {
err := DeploymentManagementService.Delete(ctx, deploymentTarget)
if err != nil {
log.Error().Msgf("Failed to terminate kube resources for deployment target %s\n", deploymentTarget.CompoundNimVersionTag)
return nil, err
}
return deploymentTarget, nil
}
func (s *deploymentTargetService) getDB(ctx context.Context) *gorm.DB {
db := database.DatabaseUtil.GetDBSession(ctx).Model(&models.DeploymentTarget{})
return db
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"encoding/json"
"fmt"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/common/consts"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"github.com/ghodss/yaml"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientCmdApi "k8s.io/client-go/tools/clientcmd/api"
clientCmdLatest "k8s.io/client-go/tools/clientcmd/api/latest"
clientCmdApiV1 "k8s.io/client-go/tools/clientcmd/api/v1"
)
type k8sService struct{}
var K8sService IK8sService = &k8sService{}
func (s *k8sService) GetK8sClient(kubeConfig string) (kubernetes.Interface, error) {
var restConfig *rest.Config
var err error
if kubeConfig == "" {
restConfig, err = rest.InClusterConfig()
if err != nil {
kubeConfig :=
clientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename()
restConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}
}
} else {
configV1 := clientCmdApiV1.Config{}
var jsonBytes []byte
jsonBytes, err := yaml.YAMLToJSON([]byte(kubeConfig))
if err != nil {
return nil, err
}
err = json.Unmarshal(jsonBytes, &configV1)
if err != nil {
return nil, err
}
var configObject runtime.Object
configObject, err = clientCmdLatest.Scheme.ConvertToVersion(&configV1, clientCmdApi.SchemeGroupVersion)
if err != nil {
return nil, err
}
configInternal := configObject.(*clientCmdApi.Config)
restConfig, err = clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{
ClusterDefaults: clientCmdApi.Cluster{Server: ""},
}).ClientConfig()
if err != nil {
return nil, err
}
}
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
return clientSet, nil
}
func (s *k8sService) ListPodsByDeployment(ctx context.Context, podLister v1.PodNamespaceLister, deployment *models.Deployment) ([]*apiv1.Pod, error) {
selector, err := labels.Parse(fmt.Sprintf("%s = %s", consts.KubeLabelCompoundNimVersionDeployment, deployment.Name))
if err != nil {
return nil, err
}
return s.ListPodsBySelector(ctx, podLister, selector)
}
func (s *k8sService) ListPodsBySelector(ctx context.Context, podLister v1.PodNamespaceLister, selector labels.Selector) ([]*apiv1.Pod, error) {
pods, err := podLister.List(selector)
if err != nil {
return nil, err
}
return pods, nil
}
type IK8sService interface {
GetK8sClient(string) (kubernetes.Interface, error)
ListPodsByDeployment(context.Context, v1.PodNamespaceLister, *models.Deployment) ([]*apiv1.Pod, error)
ListPodsBySelector(context.Context, v1.PodNamespaceLister, labels.Selector) ([]*apiv1.Pod, error)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/dynemo-ai/dynemo/deploy/compoundai/api-server/api/models"
"k8s.io/client-go/informers"
informerAppsV1 "k8s.io/client-go/informers/apps/v1"
informerCoreV1 "k8s.io/client-go/informers/core/v1"
informerNetworkingV1 "k8s.io/client-go/informers/networking/v1"
listerAppsV1 "k8s.io/client-go/listers/apps/v1"
listerCoreV1 "k8s.io/client-go/listers/core/v1"
listerNetworkingV1 "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
)
type CacheKey string
var (
informerSyncTimeout = 30 * time.Second
informerFactoryCache = make(map[CacheKey]informers.SharedInformerFactory)
informerFactoryCacheRW = &sync.RWMutex{}
)
type getSharedInformerFactoryOption struct {
cluster *models.Cluster
namespace *string
}
func getSharedInformerFactory(option *getSharedInformerFactoryOption) (informers.SharedInformerFactory, error) {
var cacheKey CacheKey
if option.namespace != nil {
cacheKey = CacheKey(fmt.Sprintf("%s:%s", option.cluster.Name, *option.namespace))
} else {
cacheKey = CacheKey(option.cluster.Name)
}
informerFactoryCacheRW.Lock()
defer informerFactoryCacheRW.Unlock()
factory, ok := informerFactoryCache[cacheKey]
if !ok {
clientset, err := K8sService.GetK8sClient(option.cluster.KubeConfig)
if err != nil {
return nil, err
}
informerOptions := make([]informers.SharedInformerOption, 0)
if option.namespace != nil {
informerOptions = append(informerOptions, informers.WithNamespace(*option.namespace))
}
factory = informers.NewSharedInformerFactoryWithOptions(clientset, 0, informerOptions...)
}
return factory, nil
}
func startAndSyncInformer(ctx context.Context, informer cache.SharedIndexInformer) (err error) {
go informer.Run(ctx.Done())
ctx_, cancel := context.WithTimeout(ctx, informerSyncTimeout)
defer cancel()
if !cache.WaitForCacheSync(ctx_.Done(), informer.HasSynced) {
err = errors.New("timed out waiting for caches to sync informer")
return err
}
return nil
}
func GetPodInformer(ctx context.Context, cluster *models.Cluster, namespace string) (informerCoreV1.PodInformer, listerCoreV1.PodNamespaceLister, error) {
factory, err := getSharedInformerFactory(&getSharedInformerFactoryOption{
cluster: cluster,
namespace: &namespace,
})
if err != nil {
return nil, nil, err
}
podInformer := factory.Core().V1().Pods()
err = startAndSyncInformer(ctx, podInformer.Informer())
if err != nil {
return nil, nil, err
}
return podInformer, podInformer.Lister().Pods(namespace), nil
}
func GetDeploymentInformer(ctx context.Context, kubeCluster *models.Cluster, namespace string) (informerAppsV1.DeploymentInformer, listerAppsV1.DeploymentNamespaceLister, error) {
factory, err := getSharedInformerFactory(&getSharedInformerFactoryOption{
cluster: kubeCluster,
namespace: &namespace,
})
if err != nil {
return nil, nil, err
}
deploymentInformer := factory.Apps().V1().Deployments()
err = startAndSyncInformer(ctx, deploymentInformer.Informer())
if err != nil {
return nil, nil, err
}
return deploymentInformer, deploymentInformer.Lister().Deployments(namespace), nil
}
func GetIngressInformer(ctx context.Context, kubeCluster *models.Cluster, namespace string) (informerNetworkingV1.IngressInformer, listerNetworkingV1.IngressNamespaceLister, error) {
factory, err := getSharedInformerFactory(&getSharedInformerFactoryOption{
cluster: kubeCluster,
namespace: &namespace,
})
if err != nil {
return nil, nil, err
}
ingressInformer := factory.Networking().V1().Ingresses()
err = startAndSyncInformer(ctx, ingressInformer.Informer())
if err != nil {
return nil, nil, err
}
return ingressInformer, ingressInformer.Lister().Ingresses(namespace), nil
}
func GetEventInformer(ctx context.Context, cluster *models.Cluster, namespace string) (informerCoreV1.EventInformer, listerCoreV1.EventNamespaceLister, error) {
factory, err := getSharedInformerFactory(&getSharedInformerFactoryOption{
cluster: cluster,
namespace: &namespace,
})
if err != nil {
return nil, nil, err
}
eventInformer := factory.Core().V1().Events()
err = startAndSyncInformer(ctx, eventInformer.Informer())
if err != nil {
return nil, nil, err
}
return eventInformer, eventInformer.Lister().Events(namespace), nil
}
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3"
services:
###
# Postgres service
# adapted from https://github.com/docker-library/docs/blob/master/postgres/README.md#-via-docker-compose-or-docker-stack-deploy
###
postgres:
image: postgres:16.2
restart: always
environment:
PGUSER: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pgadmin
POSTGRES_DB: postgres
ports:
- "5432:5432"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-./local/data/postgres}:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 30s
timeout: 30s
retries: 3
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment