Unverified Commit 5b4e5496 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: deploy command should support passing config (#626)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatarhhzhang16 <54051230+hhzhang16@users.noreply.github.com>
Co-authored-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 2d746153
......@@ -89,6 +89,7 @@ async def create_deployment(deployment: CreateDeploymentSchema):
"ngc-organization": ownership["organization_id"],
"ngc-user": ownership["user_id"],
},
envs=deployment.envs,
)
# Create response schema
......
......@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict
from typing import Any, Dict, List, Optional
from kubernetes import client, config
......@@ -46,7 +46,11 @@ def create_custom_resource(
def create_dynamo_deployment(
name: str, namespace: str, dynamo_nim: str, labels: Dict[str, str]
name: str,
namespace: str,
dynamo_nim: str,
labels: Dict[str, str],
envs: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, Any]:
"""
Create a DynamoDeployment custom resource.
......@@ -56,6 +60,7 @@ def create_dynamo_deployment(
namespace: Target namespace
dynamo_nim: Bento name and version (format: name:version)
labels: Resource labels
envs: Optional list of environment variables
Returns:
Created deployment
......@@ -64,7 +69,7 @@ def create_dynamo_deployment(
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoDeployment",
"metadata": {"name": name, "namespace": namespace, "labels": labels},
"spec": {"dynamoNim": dynamo_nim, "services": {}},
"spec": {"dynamoNim": dynamo_nim, "services": {}, "envs": envs if envs else []},
}
return create_custom_resource(
......
......@@ -43,6 +43,72 @@ spec:
properties:
dynamoNim:
type: string
envs:
items:
properties:
name:
type: string
value:
type: string
valueFrom:
properties:
configMapKeyRef:
properties:
key:
type: string
name:
default: ""
type: string
optional:
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
fieldRef:
properties:
apiVersion:
type: string
fieldPath:
type: string
required:
- fieldPath
type: object
x-kubernetes-map-type: atomic
resourceFieldRef:
properties:
containerName:
type: string
divisor:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
resource:
type: string
required:
- resource
type: object
x-kubernetes-map-type: atomic
secretKeyRef:
properties:
key:
type: string
name:
default: ""
type: string
optional:
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
required:
- name
type: object
type: array
services:
additionalProperties:
properties:
......
......@@ -20,6 +20,7 @@
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
......@@ -36,6 +37,9 @@ type DynamoDeploymentSpec struct {
// if not set, the DynamoNimDeployment will be used as is
// +kubebuilder:validation:Optional
Services map[string]*DynamoNimDeployment `json:"services,omitempty"`
// Environment variables to be set in the deployment
// +kubebuilder:validation:Optional
Envs []corev1.EnvVar `json:"envs,omitempty"`
}
// DynamoDeploymentStatus defines the observed state of DynamoDeployment.
......
//go:build !ignore_autogenerated
/*
* SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
* SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech, Inc
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
......@@ -27,8 +27,8 @@ import (
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
......@@ -84,7 +84,7 @@ func (in *BaseStatus) DeepCopyInto(out *BaseStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]v1.Condition, len(*in))
*out = make([]metav1.Condition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -214,6 +214,13 @@ func (in *DynamoDeploymentSpec) DeepCopyInto(out *DynamoDeploymentSpec) {
(*out)[key] = outVal
}
}
if in.Envs != nil {
in, out := &in.Envs, &out.Envs
*out = make([]v1.EnvVar, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoDeploymentSpec.
......@@ -231,7 +238,7 @@ func (in *DynamoDeploymentStatus) DeepCopyInto(out *DynamoDeploymentStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]v1.Condition, len(*in))
*out = make([]metav1.Condition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -363,7 +370,7 @@ func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
}
if in.Envs != nil {
in, out := &in.Envs, &out.Envs
*out = make([]corev1.EnvVar, len(*in))
*out = make([]v1.EnvVar, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -408,12 +415,12 @@ func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
}
if in.LivenessProbe != nil {
in, out := &in.LivenessProbe, &out.LivenessProbe
*out = new(corev1.Probe)
*out = new(v1.Probe)
(*in).DeepCopyInto(*out)
}
if in.ReadinessProbe != nil {
in, out := &in.ReadinessProbe, &out.ReadinessProbe
*out = new(corev1.Probe)
*out = new(v1.Probe)
(*in).DeepCopyInto(*out)
}
if in.Replicas != nil {
......@@ -438,7 +445,7 @@ func (in *DynamoNimDeploymentStatus) DeepCopyInto(out *DynamoNimDeploymentStatus
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]v1.Condition, len(*in))
*out = make([]metav1.Condition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -590,14 +597,14 @@ func (in *DynamoNimRequestSpec) DeepCopyInto(out *DynamoNimRequestSpec) {
}
if in.ImageBuilderExtraContainerEnv != nil {
in, out := &in.ImageBuilderExtraContainerEnv, &out.ImageBuilderExtraContainerEnv
*out = make([]corev1.EnvVar, len(*in))
*out = make([]v1.EnvVar, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.ImageBuilderContainerResources != nil {
in, out := &in.ImageBuilderContainerResources, &out.ImageBuilderContainerResources
*out = new(corev1.ResourceRequirements)
*out = new(v1.ResourceRequirements)
(*in).DeepCopyInto(*out)
}
if in.OCIRegistryInsecure != nil {
......@@ -607,7 +614,7 @@ func (in *DynamoNimRequestSpec) DeepCopyInto(out *DynamoNimRequestSpec) {
}
if in.DownloaderContainerEnvFrom != nil {
in, out := &in.DownloaderContainerEnvFrom, &out.DownloaderContainerEnvFrom
*out = make([]corev1.EnvFromSource, len(*in))
*out = make([]v1.EnvFromSource, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -629,7 +636,7 @@ func (in *DynamoNimRequestStatus) DeepCopyInto(out *DynamoNimRequestStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]v1.Condition, len(*in))
*out = make([]metav1.Condition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
......@@ -663,7 +670,7 @@ func (in *DynamoNimSpec) DeepCopyInto(out *DynamoNimSpec) {
}
if in.ImagePullSecrets != nil {
in, out := &in.ImagePullSecrets, &out.ImagePullSecrets
*out = make([]corev1.LocalObjectReference, len(*in))
*out = make([]v1.LocalObjectReference, len(*in))
copy(*out, *in)
}
}
......
......@@ -43,6 +43,72 @@ spec:
properties:
dynamoNim:
type: string
envs:
items:
properties:
name:
type: string
value:
type: string
valueFrom:
properties:
configMapKeyRef:
properties:
key:
type: string
name:
default: ""
type: string
optional:
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
fieldRef:
properties:
apiVersion:
type: string
fieldPath:
type: string
required:
- fieldPath
type: object
x-kubernetes-map-type: atomic
resourceFieldRef:
properties:
containerName:
type: string
divisor:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
resource:
type: string
required:
- resource
type: object
x-kubernetes-map-type: atomic
secretKeyRef:
properties:
key:
type: string
name:
default: ""
type: string
optional:
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
required:
- name
type: object
type: array
services:
additionalProperties:
properties:
......
......@@ -23,6 +23,7 @@ import (
"strings"
"dario.cat/mergo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
......@@ -130,6 +131,13 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}
// Set common env vars on each of the dynamoNimDeployments
for _, deployment := range dynamoNimDeployments {
if len(dynamoDeployment.Spec.Envs) > 0 {
deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
}
}
// reconcile the dynamoNimRequest
dynamoNimRequest := &nvidiacomv1alpha1.DynamoNimRequest{
ObjectMeta: metav1.ObjectMeta{
......@@ -182,6 +190,27 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar)
// Add all common environment variables.
for _, env := range common {
envMap[env.Name] = env
}
// Override or add with service-specific environment variables.
for _, env := range specific {
envMap[env.Name] = env
}
// Convert the map back to a slice.
merged := make([]corev1.EnvVar, 0, len(envMap))
for _, env := range envMap {
merged = append(merged, env)
}
return merged
}
// SetupWithManager sets up the controller with the Manager.
func (r *DynamoDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"reflect"
"sort"
"testing"
corev1 "k8s.io/api/core/v1"
)
func Test_mergeEnvs(t *testing.T) {
type args struct {
common []corev1.EnvVar
specific []corev1.EnvVar
}
tests := []struct {
name string
args args
want []corev1.EnvVar
}{
{
name: "no_common_envs",
args: args{
common: []corev1.EnvVar{},
specific: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
{
name: "no_specific_envs",
args: args{
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
specific: []corev1.EnvVar{},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
{
name: "common_and_specific_envs",
args: args{
specific: []corev1.EnvVar{{Name: "BAZ", Value: "QUX"}},
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
want: []corev1.EnvVar{{Name: "BAZ", Value: "QUX"}, {Name: "FOO", Value: "BAR"}},
},
{
name: "common_and_specific_envs_with_same_name",
args: args{
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
specific: []corev1.EnvVar{{Name: "FOO", Value: "QUX"}},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "QUX"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := mergeEnvs(tt.args.common, tt.args.specific)
sort.Slice(got, func(i, j int) bool {
return got[i].Name < got[j].Name
})
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("mergeEnvs() = %v, want %v", got, tt.want)
}
})
}
}
......@@ -17,7 +17,6 @@
from __future__ import annotations
import collections
import json
import logging
import os
......@@ -27,7 +26,8 @@ from typing import Optional
import click
import rich
import yaml
from .utils import resolve_service_config
if t.TYPE_CHECKING:
P = t.ParamSpec("P") # type: ignore
......@@ -36,96 +36,6 @@ if t.TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]:
"""Parse a single CLI argument into service name, key, and value."""
parts = arg_name.split(".")
service = parts[0]
nested_keys = parts[1:]
# Special case: if this is a ServiceArgs.envs.* path, keep value as string
if (
len(nested_keys) >= 2
and nested_keys[0] == "ServiceArgs"
and nested_keys[1] == "envs"
):
value: t.Union[str, int, float, bool, dict, list] = arg_value
else:
# Parse value based on type for non-env vars
try:
value = json.loads(arg_value)
except json.JSONDecodeError:
if arg_value.isdigit():
value = int(arg_value)
elif arg_value.replace(".", "", 1).isdigit() and arg_value.count(".") <= 1:
value = float(arg_value)
elif arg_value.lower() in ("true", "false"):
value = arg_value.lower() == "true"
else:
value = arg_value
# Build nested dict structure
result = value
for key in reversed(nested_keys[1:]):
result = {key: result}
return service, nested_keys[0], result
def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
service_configs: t.DefaultDict[str, t.Dict[str, t.Any]] = collections.defaultdict(
dict
)
def deep_update(d: dict, key: str, value: t.Any):
"""
Recursively updates nested dictionaries. We use this to process arguments like
---Worker.ServiceArgs.env.CUDA_VISIBLE_DEVICES="0,1"
The _parse_service_arg function will parse this into:
service = "Worker"
nested_keys = ["ServiceArgs", "envs", "CUDA_VISIBLE_DEVICES"]
And returns returns: ("VllmWorker", "ServiceArgs", {"envs": {"CUDA_VISIBLE_DEVICES": "0,1"}})
We then use deep_update to update the service_configs dictionary with this nested value.
"""
if isinstance(value, dict) and key in d and isinstance(d[key], dict):
for k, v in value.items():
deep_update(d[key], k, v)
else:
d[key] = value
index = 0
while index < len(args):
next_arg = args[index]
if not (next_arg.startswith("--") or "." not in next_arg):
continue
try:
if "=" in next_arg:
arg_name, arg_value = next_arg.split("=", 1)
index += 1
elif args[index + 1] == "=":
arg_name = next_arg
arg_value = args[index + 2]
index += 3
else:
arg_name = next_arg
arg_value = args[index + 1]
index += 2
if arg_value.startswith("-"):
raise ValueError("Service arg value can not start with -")
arg_name = arg_name[2:]
service, key, value = _parse_service_arg(arg_name, arg_value)
deep_update(service_configs[service], key, value)
except Exception:
raise ValueError(f"Error parsing service arg: {args[index]}")
return service_configs
def build_serve_command() -> click.Group:
from dynamo.sdk.lib.logging import configure_server_logging
......@@ -215,27 +125,8 @@ def build_serve_command() -> click.Group:
from dynamo.sdk.lib.service import LinkedServices
service_configs: dict[str, dict[str, t.Any]] = {}
# Load file if provided
if file:
with open(file) as f:
yaml_configs = yaml.safe_load(f)
# Initialize service_configs as empty dict if it's None
# Convert nested YAML structure to flat dict with dot notation
for service, configs in yaml_configs.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
# Process service-specific options
cmdline_overrides: t.Dict[str, t.Any] = _parse_service_args(ctx.args)
for service, configs in cmdline_overrides.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
# Resolve service configs from yaml file, command line args into a python dict
service_configs = resolve_service_config(file, ctx.args)
# Process depends
if depends:
......
......@@ -15,6 +15,7 @@
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import collections
import contextlib
import json
import logging
......@@ -26,8 +27,13 @@ import typing as t
import click
import psutil
import yaml
from click import Command, Context
from dynamo.sdk.lib.logging import configure_server_logging
configure_server_logging()
logger = logging.getLogger(__name__)
DYN_LOCAL_STATE_DIR = "DYN_LOCAL_STATE_DIR"
......@@ -182,3 +188,150 @@ def save_dynamo_state(
json.dump(state, f)
logger.warning(f"Saved state to {state_file}")
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]:
"""Parse a single CLI argument into service name, key, and value."""
parts = arg_name.split(".")
service = parts[0]
nested_keys = parts[1:]
# Special case: if this is a ServiceArgs.envs.* path, keep value as string
if (
len(nested_keys) >= 2
and nested_keys[0] == "ServiceArgs"
and nested_keys[1] == "envs"
):
value: t.Union[str, int, float, bool, dict, list] = arg_value
else:
# Parse value based on type for non-env vars
try:
value = json.loads(arg_value)
except json.JSONDecodeError:
if arg_value.isdigit():
value = int(arg_value)
elif arg_value.replace(".", "", 1).isdigit() and arg_value.count(".") <= 1:
value = float(arg_value)
elif arg_value.lower() in ("true", "false"):
value = arg_value.lower() == "true"
else:
value = arg_value
# Build nested dict structure
result = value
for key in reversed(nested_keys[1:]):
result = {key: result}
return service, nested_keys[0], result
def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
service_configs: t.DefaultDict[str, t.Dict[str, t.Any]] = collections.defaultdict(
dict
)
def deep_update(d: dict, key: str, value: t.Any):
"""
Recursively updates nested dictionaries. We use this to process arguments like
---Worker.ServiceArgs.env.CUDA_VISIBLE_DEVICES="0,1"
The _parse_service_arg function will parse this into:
service = "Worker"
nested_keys = ["ServiceArgs", "envs", "CUDA_VISIBLE_DEVICES"]
And returns: ("VllmWorker", "ServiceArgs", {"envs": {"CUDA_VISIBLE_DEVICES": "0,1"}})
We then use deep_update to update the service_configs dictionary with this nested value.
"""
if isinstance(value, dict) and key in d and isinstance(d[key], dict):
for k, v in value.items():
deep_update(d[key], k, v)
else:
d[key] = value
index = 0
while index < len(args):
next_arg = args[index]
if not (next_arg.startswith("--") or "." not in next_arg):
continue
try:
if "=" in next_arg:
arg_name, arg_value = next_arg.split("=", 1)
index += 1
elif args[index + 1] == "=":
arg_name = next_arg
arg_value = args[index + 2]
index += 3
else:
arg_name = next_arg
arg_value = args[index + 1]
index += 2
if arg_value.startswith("-"):
raise ValueError("Service arg value can not start with -")
arg_name = arg_name[2:]
service, key, value = _parse_service_arg(arg_name, arg_value)
deep_update(service_configs[service], key, value)
except Exception:
raise ValueError(f"Error parsing service arg: {args[index]}")
return service_configs
def resolve_service_config(
config_file: str | t.TextIO | None = None,
args: list[str] | None = None,
) -> dict[str, dict[str, t.Any]]:
"""Resolve service configuration from file and command line arguments.
Args:
config_file: Path to YAML config file or file object
args: List of command line arguments
Returns:
Dictionary mapping service names to their configurations
"""
service_configs: dict[str, dict[str, t.Any]] = {}
# Check for deployment config first
if "DYN_DEPLOYMENT_CONFIG" in os.environ:
try:
deployment_config = yaml.safe_load(os.environ["DYN_DEPLOYMENT_CONFIG"])
# Use deployment config directly
service_configs = deployment_config
logger.info(f"Successfully loaded deployment config: {service_configs}")
logger.warning(
"DYN_DEPLOYMENT_CONFIG found in environment - ignoring configuration file and command line arguments"
)
except Exception as e:
logger.warning(f"Failed to parse DYN_DEPLOYMENT_CONFIG: {e}")
else:
# Load file if provided
if config_file:
with open(config_file) if isinstance(
config_file, str
) else contextlib.nullcontext(config_file) as f:
yaml_configs = yaml.safe_load(f)
logger.debug(f"Loaded config from file: {yaml_configs}")
# Initialize service_configs as empty dict if it's None
# Convert nested YAML structure to flat dict with dot notation
for service, configs in yaml_configs.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
# Process service-specific options
if args:
cmdline_overrides = _parse_service_args(args)
logger.debug(f"Applying command line overrides: {cmdline_overrides}")
for service, configs in cmdline_overrides.items():
if service not in service_configs:
service_configs[service] = {}
for key, value in configs.items():
service_configs[service][key] = value
logger.debug(f"Final resolved config: {service_configs}")
return service_configs
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Frontend:
message: "earth"
port: 8000
ServiceArgs:
workers: 1
resources:
cpu: "1"
Middle:
message: "moon"
ServiceArgs:
workers: 1
resources:
cpu: "1"
Backend:
message: "mars"
ServiceArgs:
workers: 2
resources:
cpu: "1"
......@@ -18,7 +18,11 @@ import logging
from pydantic import BaseModel
from dynamo.sdk import DYNAMO_IMAGE, api, depends, dynamo_endpoint, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.logging import configure_server_logging
# Configure logging
configure_server_logging()
logger = logging.getLogger(__name__)
"""
......@@ -61,13 +65,16 @@ class ResponseType(BaseModel):
class Backend:
def __init__(self) -> None:
logger.info("Starting backend")
config = ServiceConfig.get_instance()
self.message = config.get("Backend", {}).get("message", "back")
logger.info(f"Backend config message: {self.message}")
@dynamo_endpoint()
async def generate(self, req: RequestType):
"""Generate tokens."""
req_text = req.text
logger.info(f"Backend received: {req_text}")
text = f"{req_text}-back"
text = f"{req_text}-{self.message}"
for token in text.split():
yield f"Backend: {token}"
......@@ -81,13 +88,16 @@ class Middle:
def __init__(self) -> None:
logger.info("Starting middle")
config = ServiceConfig.get_instance()
self.message = config.get("Middle", {}).get("message", "mid")
logger.info(f"Middle config message: {self.message}")
@dynamo_endpoint()
async def generate(self, req: RequestType):
"""Forward requests to backend."""
req_text = req.text
logger.info(f"Middle received: {req_text}")
text = f"{req_text}-mid"
text = f"{req_text}-{self.message}"
next_request = RequestType(text=text).model_dump_json()
async for response in self.backend.generate(next_request):
logger.info(f"Middle received response: {response}")
......@@ -101,14 +111,19 @@ class Frontend:
middle = depends(Middle)
def __init__(self) -> None:
print("Starting frontend")
logger.info("Starting frontend")
config = ServiceConfig.get_instance()
self.message = config.get("Frontend", {}).get("message", "front")
self.port = config.get("Frontend", {}).get("port", 8000)
logger.info(f"Frontend config message: {self.message}")
logger.info(f"Frontend config port: {self.port}")
@api
async def generate(self, text):
"""Stream results from the pipeline."""
print(f"Frontend received: {text}")
print(f"Frontend received type: {type(text)}")
logger.info(f"Frontend received: {text}")
logger.info(f"Frontend received type: {type(text)}")
txt = RequestType(text=text)
print(f"Frontend sending: {type(txt)}")
logger.info(f"Frontend sending: {type(txt)}")
async for response in self.middle.generate(txt.model_dump_json()):
yield f"Frontend: {response}"
......@@ -30,6 +30,7 @@ dependencies = [
"types-psutil==7.0.0.20250218",
"kubernetes==32.0.1",
"ai-dynamo-runtime==0.1.1",
"distro",
]
classifiers = [
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment