Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
317b9614
Unverified
Commit
317b9614
authored
Apr 07, 2026
by
Julien Mancuso
Committed by
GitHub
Apr 07, 2026
Browse files
fix(operator): use ConfigMap for vLLM multinode wait-for-leader script (#7954)
parent
4cdc49c2
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
241 additions
and
66 deletions
+241
-66
deploy/operator/internal/controller/dynamographdeployment_controller.go
...r/internal/controller/dynamographdeployment_controller.go
+22
-0
deploy/operator/internal/discovery/resource.go
deploy/operator/internal/discovery/resource.go
+1
-1
deploy/operator/internal/dynamo/backend_vllm.go
deploy/operator/internal/dynamo/backend_vllm.go
+148
-26
deploy/operator/internal/dynamo/backend_vllm_test.go
deploy/operator/internal/dynamo/backend_vllm_test.go
+67
-36
deploy/operator/internal/dynamo/graph.go
deploy/operator/internal/dynamo/graph.go
+3
-3
No files found.
deploy/operator/internal/controller/dynamographdeployment_controller.go
View file @
317b9614
...
@@ -326,6 +326,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
...
@@ -326,6 +326,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
return
ReconcileResult
{},
fmt
.
Errorf
(
"failed to reconcile EPP resources: %w"
,
err
)
return
ReconcileResult
{},
fmt
.
Errorf
(
"failed to reconcile EPP resources: %w"
,
err
)
}
}
// Reconcile the wait-for-leader ConfigMap for multinode mp deployments
err
=
r
.
reconcileWaitLeaderConfigMap
(
ctx
,
dynamoDeployment
)
if
err
!=
nil
{
logger
.
Error
(
err
,
"Failed to reconcile wait-leader ConfigMap"
)
return
ReconcileResult
{},
fmt
.
Errorf
(
"failed to reconcile wait-leader ConfigMap: %w"
,
err
)
}
// Determine if any service is multinode
// Determine if any service is multinode
hasMultinode
:=
dynamoDeployment
.
HasAnyMultinodeService
()
hasMultinode
:=
dynamoDeployment
.
HasAnyMultinodeService
()
...
@@ -1582,6 +1589,21 @@ func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Cont
...
@@ -1582,6 +1589,21 @@ func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Cont
return
nil
return
nil
}
}
// reconcileWaitLeaderConfigMap ensures the wait-for-leader Python script
// ConfigMap exists for multinode DGDs. The ConfigMap is only mounted by
// vLLM mp worker pods (via UpdatePodSpec); for other backends it is inert.
func
(
r
*
DynamoGraphDeploymentReconciler
)
reconcileWaitLeaderConfigMap
(
ctx
context
.
Context
,
dgd
*
nvidiacomv1alpha1
.
DynamoGraphDeployment
)
error
{
if
!
dgd
.
HasAnyMultinodeService
()
{
return
nil
}
cm
:=
dynamo
.
GenerateWaitLeaderConfigMap
(
dgd
.
Name
,
dgd
.
Namespace
)
_
,
_
,
err
:=
commoncontroller
.
SyncResource
(
ctx
,
r
,
dgd
,
func
(
ctx
context
.
Context
)
(
*
corev1
.
ConfigMap
,
bool
,
error
)
{
return
cm
,
false
,
nil
})
return
err
}
func
(
r
*
DynamoGraphDeploymentReconciler
)
FinalizeResource
(
ctx
context
.
Context
,
dynamoDeployment
*
nvidiacomv1alpha1
.
DynamoGraphDeployment
)
error
{
func
(
r
*
DynamoGraphDeploymentReconciler
)
FinalizeResource
(
ctx
context
.
Context
,
dynamoDeployment
*
nvidiacomv1alpha1
.
DynamoGraphDeployment
)
error
{
// for now doing nothing
// for now doing nothing
return
nil
return
nil
...
...
deploy/operator/internal/discovery/resource.go
View file @
317b9614
...
@@ -55,7 +55,7 @@ func GetK8sDiscoveryRole(dgdName string, namespace string) *rbacv1.Role {
...
@@ -55,7 +55,7 @@ func GetK8sDiscoveryRole(dgdName string, namespace string) *rbacv1.Role {
Rules
:
[]
rbacv1
.
PolicyRule
{
Rules
:
[]
rbacv1
.
PolicyRule
{
{
{
APIGroups
:
[]
string
{
apiGroupCore
},
APIGroups
:
[]
string
{
apiGroupCore
},
Resources
:
[]
string
{
"endpoints"
},
Resources
:
[]
string
{
"endpoints"
,
"pods"
},
Verbs
:
[]
string
{
"get"
,
"list"
,
"watch"
},
Verbs
:
[]
string
{
"get"
,
"list"
,
"watch"
},
},
},
{
{
...
...
deploy/operator/internal/dynamo/backend_vllm.go
View file @
317b9614
...
@@ -2,6 +2,7 @@ package dynamo
...
@@ -2,6 +2,7 @@ package dynamo
import
(
import
(
"fmt"
"fmt"
"regexp"
"strconv"
"strconv"
"strings"
"strings"
...
@@ -9,6 +10,7 @@ import (
...
@@ -9,6 +10,7 @@ import (
commonconsts
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonconsts
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/featuregate"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/featuregate"
corev1
"k8s.io/api/core/v1"
corev1
"k8s.io/api/core/v1"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log"
)
)
...
@@ -20,7 +22,9 @@ const (
...
@@ -20,7 +22,9 @@ const (
dataParallelSizeFlag
=
"--data-parallel-size"
dataParallelSizeFlag
=
"--data-parallel-size"
)
)
type
VLLMBackend
struct
{}
type
VLLMBackend
struct
{
ParentGraphDeploymentName
string
}
func
(
b
*
VLLMBackend
)
UpdateContainer
(
container
*
corev1
.
Container
,
numberOfNodes
int32
,
role
Role
,
component
*
v1alpha1
.
DynamoComponentDeploymentSharedSpec
,
serviceName
string
,
multinodeDeployer
MultinodeDeployer
)
{
func
(
b
*
VLLMBackend
)
UpdateContainer
(
container
*
corev1
.
Container
,
numberOfNodes
int32
,
role
Role
,
component
*
v1alpha1
.
DynamoComponentDeploymentSharedSpec
,
serviceName
string
,
multinodeDeployer
MultinodeDeployer
)
{
isMultinode
:=
numberOfNodes
>
1
isMultinode
:=
numberOfNodes
>
1
...
@@ -78,44 +82,162 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes
...
@@ -78,44 +82,162 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes
}
}
}
}
func
(
b
*
VLLMBackend
)
UpdatePodSpec
(
podSpec
*
corev1
.
PodSpec
,
numberOfNodes
int32
,
role
Role
,
component
*
v1alpha1
.
DynamoComponentDeploymentSharedSpec
,
serviceName
string
,
multinodeDeployer
MultinodeDeployer
)
{
const
(
if
numberOfNodes
<=
1
||
role
!=
RoleWorker
||
!
shouldUseMpBackend
(
component
.
Annotations
)
{
waitLeaderConfigMapSuffix
=
"wait-leader-script"
return
waitLeaderScriptKey
=
"wait-for-leader.py"
}
waitLeaderVolumeName
=
"wait-leader-script"
waitLeaderMountPath
=
"/scripts"
)
if
len
(
podSpec
.
Containers
)
==
0
{
// WaitLeaderScript is the Python script that verifies leader pod health via
return
// the K8s API before attempting a TCP connection. It reads LEADER_HOST and
}
// LEADER_PORT from environment variables so the script content is generic.
const
WaitLeaderScript
=
`import socket, time, json, ssl, urllib.request, os
leaderHostname
:=
multinodeDeployer
.
GetLeaderHostname
(
serviceName
)
SA = "/var/run/secrets/kubernetes.io/serviceaccount"
mainImage
:=
podSpec
.
Containers
[
0
]
.
Image
host = os.environ["LEADER_HOST"]
port = int(os.environ["LEADER_PORT"])
def _k8s_ctx():
return ssl.create_default_context(cafile=f"{SA}/ca.crt")
def _k8s_headers():
token = open(f"{SA}/token").read()
return {"Authorization": f"Bearer {token}"}
def _k8s_api():
ns = open(f"{SA}/namespace").read()
return f"https://kubernetes.default.svc/api/v1/namespaces/{ns}/pods"
def leader_pod_is_healthy():
try:
ip = socket.gethostbyname(host)
except socket.gaierror:
return False, "DNS resolution failed", None, None
try:
req = urllib.request.Request(
f"{_k8s_api()}?fieldSelector=status.podIP={ip}",
headers=_k8s_headers(),
)
resp = json.loads(urllib.request.urlopen(req, context=_k8s_ctx(), timeout=5).read())
pods = resp.get("items", [])
if not pods:
return False, f"no pod found with IP {ip}", None, ip
pod = pods[0]
name = pod["metadata"].get("name", "unknown")
uid = pod["metadata"].get("uid", "unknown")
phase = pod.get("status", {}).get("phase")
deletion_ts = pod["metadata"].get("deletionTimestamp")
info = f"ip={ip} pod={name} uid={uid} phase={phase} deletionTimestamp={deletion_ts}"
if deletion_ts:
return False, f"pod {name} is terminating", info, ip
if phase != "Running":
return False, f"pod {name} phase is {phase}", info, ip
return True, "", info, ip
except Exception as e:
# Fall back to TCP-only when the API is unavailable (e.g. 403 no RBAC)
return True, f"K8s API unavailable ({e}), falling back to TCP", f"ip={ip}", ip
waitScript
:=
fmt
.
Sprintf
(
`import socket, time
host, port = "%s", %s
print(f"Waiting for leader master port at {host}:{port}...", flush=True)
print(f"Waiting for leader master port at {host}:{port}...", flush=True)
time.sleep(5)
start = time.monotonic()
start = time.monotonic()
last_status = start
last_status = start
last_err = ""
last_err = ""
while True:
while True:
healthy, reason, pod_info, leader_ip = leader_pod_is_healthy()
if healthy:
try:
try:
s = socket.create_connection((
host
, port), timeout=2)
s = socket.create_connection((
leader_ip
, port), timeout=2)
s.close()
s.close()
elapsed = time.monotonic() - start
elapsed = time.monotonic() - start
print(f"Leader master port ready (waited {elapsed:.1f}s)", flush=True)
print(f"Leader master port ready (waited {elapsed:.1f}s)
[{pod_info}]
", flush=True)
break
break
except Exception as e:
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
last_err = f"tcp: {type(e).__name__}: {e} [{pod_info}]"
else:
last_err = f"{reason} [{pod_info}]" if pod_info else reason
now = time.monotonic()
now = time.monotonic()
if now - last_status >= 30:
if now - last_status >= 30:
print(f"Still waiting for {host}:{port}... ({now - start:.0f}s elapsed, last
error
: {last_err})", flush=True)
print(f"Still waiting for {host}:{port}... ({now - start:.0f}s elapsed, last: {last_err})", flush=True)
last_status = now
last_status = now
time.sleep(2)
time.sleep(5)
`
,
leaderHostname
,
commonconsts
.
VLLMMpMasterPort
)
`
// k8sVarPattern matches Kubernetes $(VAR) env-var expansion syntax.
var
k8sVarPattern
=
regexp
.
MustCompile
(
`\$\((\w+)\)`
)
// k8sToShellVarSyntax converts Kubernetes $(VAR) references to shell ${VAR}
// so that variables can be expanded by a shell at runtime. Plain $VAR
// references (e.g. from LWS) are already valid shell syntax and left as-is.
func
k8sToShellVarSyntax
(
s
string
)
string
{
return
k8sVarPattern
.
ReplaceAllString
(
s
,
`${$1}`
)
}
// GetWaitLeaderConfigMapName returns the ConfigMap name for a given DGD.
func
GetWaitLeaderConfigMapName
(
dgdName
string
)
string
{
return
fmt
.
Sprintf
(
"%s-%s"
,
dgdName
,
waitLeaderConfigMapSuffix
)
}
// GenerateWaitLeaderConfigMap creates a ConfigMap containing the wait-for-leader
// Python script. One ConfigMap is created per DGD and owned by the DGD.
func
GenerateWaitLeaderConfigMap
(
dgdName
,
namespace
string
)
*
corev1
.
ConfigMap
{
return
&
corev1
.
ConfigMap
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
GetWaitLeaderConfigMapName
(
dgdName
),
Namespace
:
namespace
,
Labels
:
map
[
string
]
string
{
commonconsts
.
KubeLabelDynamoGraphDeploymentName
:
dgdName
,
},
},
Data
:
map
[
string
]
string
{
waitLeaderScriptKey
:
WaitLeaderScript
,
},
}
}
func
(
b
*
VLLMBackend
)
UpdatePodSpec
(
podSpec
*
corev1
.
PodSpec
,
numberOfNodes
int32
,
role
Role
,
component
*
v1alpha1
.
DynamoComponentDeploymentSharedSpec
,
serviceName
string
,
multinodeDeployer
MultinodeDeployer
)
{
if
numberOfNodes
<=
1
||
role
!=
RoleWorker
||
!
shouldUseMpBackend
(
component
.
Annotations
)
{
return
}
if
len
(
podSpec
.
Containers
)
==
0
||
b
.
ParentGraphDeploymentName
==
""
{
return
}
leaderHostname
:=
multinodeDeployer
.
GetLeaderHostname
(
serviceName
)
mainImage
:=
podSpec
.
Containers
[
0
]
.
Image
cmName
:=
GetWaitLeaderConfigMapName
(
b
.
ParentGraphDeploymentName
)
podSpec
.
Volumes
=
append
(
podSpec
.
Volumes
,
corev1
.
Volume
{
Name
:
waitLeaderVolumeName
,
VolumeSource
:
corev1
.
VolumeSource
{
ConfigMap
:
&
corev1
.
ConfigMapVolumeSource
{
LocalObjectReference
:
corev1
.
LocalObjectReference
{
Name
:
cmName
,
},
},
},
})
// Use sh -c so the shell expands variable references at runtime.
// Grove/LWS env vars are appended to init containers AFTER our env
// vars, so Kubernetes $(VAR) expansion (which is order-dependent)
// cannot resolve them. The shell sees all env vars regardless of
// definition order.
shellHostname
:=
k8sToShellVarSyntax
(
leaderHostname
)
initContainer
:=
corev1
.
Container
{
initContainer
:=
corev1
.
Container
{
Name
:
"wait-for-leader-mp"
,
Name
:
"wait-for-leader-mp"
,
Image
:
mainImage
,
Image
:
mainImage
,
Command
:
[]
string
{
"python3"
,
"-c"
,
waitScript
},
Command
:
[]
string
{
"sh"
,
"-c"
,
fmt
.
Sprintf
(
`export LEADER_HOST="%s" LEADER_PORT="%s" && exec python3 %s/%s`
,
shellHostname
,
commonconsts
.
VLLMMpMasterPort
,
waitLeaderMountPath
,
waitLeaderScriptKey
)},
VolumeMounts
:
[]
corev1
.
VolumeMount
{
{
Name
:
waitLeaderVolumeName
,
MountPath
:
waitLeaderMountPath
,
ReadOnly
:
true
,
},
},
}
}
podSpec
.
InitContainers
=
append
(
podSpec
.
InitContainers
,
initContainer
)
podSpec
.
InitContainers
=
append
(
podSpec
.
InitContainers
,
initContainer
)
...
...
deploy/operator/internal/dynamo/backend_vllm_test.go
View file @
317b9614
...
@@ -560,7 +560,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
...
@@ -560,7 +560,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
}
}
func
TestVLLMBackend_UpdatePodSpec
(
t
*
testing
.
T
)
{
func
TestVLLMBackend_UpdatePodSpec
(
t
*
testing
.
T
)
{
backend
:=
&
VLLMBackend
{}
backend
:=
&
VLLMBackend
{
ParentGraphDeploymentName
:
"test-dgd"
}
tests
:=
[]
struct
{
tests
:=
[]
struct
{
name
string
name
string
...
@@ -570,10 +570,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
...
@@ -570,10 +570,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
multinodeDeployer
MultinodeDeployer
multinodeDeployer
MultinodeDeployer
initialPodSpec
*
corev1
.
PodSpec
initialPodSpec
*
corev1
.
PodSpec
expectInitContainer
bool
expectInitContainer
bool
expectedInitName
string
expectedInitImage
string
expectedInitImage
string
expectedInitCommandLen
int
expectedLeaderHost
string
expectWaitScriptContent
string
}{
}{
{
{
name
:
"mp worker with Grove deployer injects init container"
,
name
:
"mp worker with Grove deployer injects init container"
,
...
@@ -591,10 +589,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
...
@@ -591,10 +589,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
},
},
},
},
expectInitContainer
:
true
,
expectInitContainer
:
true
,
expectedInitName
:
"wait-for-leader-mp"
,
expectedInitImage
:
"vllm:latest"
,
expectedInitImage
:
"vllm:latest"
,
expectedInitCommandLen
:
3
,
expectedLeaderHost
:
"${GROVE_PCSG_NAME}-${GROVE_PCSG_INDEX}-test-service-ldr-0.${GROVE_HEADLESS_SERVICE}"
,
expectWaitScriptContent
:
"$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)"
,
},
},
{
{
name
:
"mp worker with LWS deployer injects init container"
,
name
:
"mp worker with LWS deployer injects init container"
,
...
@@ -612,10 +608,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
...
@@ -612,10 +608,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
},
},
},
},
expectInitContainer
:
true
,
expectInitContainer
:
true
,
expectedInitName
:
"wait-for-leader-mp"
,
expectedInitImage
:
"vllm:v2"
,
expectedInitImage
:
"vllm:v2"
,
expectedInitCommandLen
:
3
,
expectedLeaderHost
:
"$LWS_LEADER_ADDRESS"
,
expectWaitScriptContent
:
"$LWS_LEADER_ADDRESS"
,
},
},
{
{
name
:
"mp leader does not inject init container"
,
name
:
"mp leader does not inject init container"
,
...
@@ -683,10 +677,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
...
@@ -683,10 +677,8 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
},
},
},
},
expectInitContainer
:
true
,
expectInitContainer
:
true
,
expectedInitName
:
"wait-for-leader-mp"
,
expectedInitImage
:
"vllm:latest"
,
expectedInitImage
:
"vllm:latest"
,
expectedInitCommandLen
:
3
,
expectedLeaderHost
:
"${GROVE_PCSG_NAME}-${GROVE_PCSG_INDEX}-test-service-ldr-0.${GROVE_HEADLESS_SERVICE}"
,
expectWaitScriptContent
:
"$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)"
,
},
},
}
}
...
@@ -695,27 +687,66 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
...
@@ -695,27 +687,66 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
g
:=
gomega
.
NewGomegaWithT
(
t
)
g
:=
gomega
.
NewGomegaWithT
(
t
)
initialInitCount
:=
len
(
tt
.
initialPodSpec
.
InitContainers
)
initialInitCount
:=
len
(
tt
.
initialPodSpec
.
InitContainers
)
initialVolCount
:=
len
(
tt
.
initialPodSpec
.
Volumes
)
backend
.
UpdatePodSpec
(
tt
.
initialPodSpec
,
tt
.
numberOfNodes
,
tt
.
role
,
tt
.
component
,
"test-service"
,
tt
.
multinodeDeployer
)
backend
.
UpdatePodSpec
(
tt
.
initialPodSpec
,
tt
.
numberOfNodes
,
tt
.
role
,
tt
.
component
,
"test-service"
,
tt
.
multinodeDeployer
)
if
tt
.
expectInitContainer
{
if
tt
.
expectInitContainer
{
g
.
Expect
(
len
(
tt
.
initialPodSpec
.
InitContainers
))
.
To
(
gomega
.
Equal
(
initialInitCount
+
1
))
g
.
Expect
(
tt
.
initialPodSpec
.
InitContainers
)
.
To
(
gomega
.
HaveLen
(
initialInitCount
+
1
))
g
.
Expect
(
tt
.
initialPodSpec
.
Volumes
)
.
To
(
gomega
.
HaveLen
(
initialVolCount
+
1
))
injected
:=
tt
.
initialPodSpec
.
InitContainers
[
len
(
tt
.
initialPodSpec
.
InitContainers
)
-
1
]
injected
:=
tt
.
initialPodSpec
.
InitContainers
[
len
(
tt
.
initialPodSpec
.
InitContainers
)
-
1
]
g
.
Expect
(
injected
.
Name
)
.
To
(
gomega
.
Equal
(
tt
.
expectedInitName
))
g
.
Expect
(
injected
.
Name
)
.
To
(
gomega
.
Equal
(
"wait-for-leader-mp"
))
g
.
Expect
(
injected
.
Image
)
.
To
(
gomega
.
Equal
(
tt
.
expectedInitImage
))
g
.
Expect
(
injected
.
Image
)
.
To
(
gomega
.
Equal
(
tt
.
expectedInitImage
))
g
.
Expect
(
len
(
injected
.
Command
))
.
To
(
gomega
.
Equal
(
tt
.
expectedInitCommandLen
))
g
.
Expect
(
injected
.
Command
[
0
])
.
To
(
gomega
.
Equal
(
"python3"
))
expectedCmd
:=
fmt
.
Sprintf
(
g
.
Expect
(
injected
.
Command
[
1
])
.
To
(
gomega
.
Equal
(
"-c"
))
`export LEADER_HOST="%s" LEADER_PORT="%s" && exec python3 /scripts/wait-for-leader.py`
,
g
.
Expect
(
injected
.
Command
[
2
])
.
To
(
gomega
.
ContainSubstring
(
tt
.
expectWaitScriptContent
))
tt
.
expectedLeaderHost
,
commonconsts
.
VLLMMpMasterPort
)
g
.
Expect
(
injected
.
Command
[
2
])
.
To
(
gomega
.
ContainSubstring
(
"socket.create_connection"
))
g
.
Expect
(
injected
.
Command
)
.
To
(
gomega
.
Equal
([]
string
{
"sh"
,
"-c"
,
expectedCmd
}))
g
.
Expect
(
injected
.
Command
[
2
])
.
To
(
gomega
.
ContainSubstring
(
commonconsts
.
VLLMMpMasterPort
))
g
.
Expect
(
injected
.
Env
)
.
To
(
gomega
.
BeEmpty
())
g
.
Expect
(
injected
.
VolumeMounts
)
.
To
(
gomega
.
HaveLen
(
1
))
g
.
Expect
(
injected
.
VolumeMounts
[
0
]
.
Name
)
.
To
(
gomega
.
Equal
(
"wait-leader-script"
))
g
.
Expect
(
injected
.
VolumeMounts
[
0
]
.
MountPath
)
.
To
(
gomega
.
Equal
(
"/scripts"
))
g
.
Expect
(
injected
.
VolumeMounts
[
0
]
.
ReadOnly
)
.
To
(
gomega
.
BeTrue
())
vol
:=
tt
.
initialPodSpec
.
Volumes
[
len
(
tt
.
initialPodSpec
.
Volumes
)
-
1
]
g
.
Expect
(
vol
.
Name
)
.
To
(
gomega
.
Equal
(
"wait-leader-script"
))
g
.
Expect
(
vol
.
ConfigMap
)
.
ToNot
(
gomega
.
BeNil
())
g
.
Expect
(
vol
.
ConfigMap
.
Name
)
.
To
(
gomega
.
Equal
(
"test-dgd-wait-leader-script"
))
}
else
{
}
else
{
g
.
Expect
(
len
(
tt
.
initialPodSpec
.
InitContainers
))
.
To
(
gomega
.
Equal
(
initialInitCount
))
g
.
Expect
(
tt
.
initialPodSpec
.
InitContainers
)
.
To
(
gomega
.
HaveLen
(
initialInitCount
))
g
.
Expect
(
tt
.
initialPodSpec
.
Volumes
)
.
To
(
gomega
.
HaveLen
(
initialVolCount
))
}
}
})
})
}
}
}
}
func
TestGenerateWaitLeaderConfigMap
(
t
*
testing
.
T
)
{
g
:=
gomega
.
NewGomegaWithT
(
t
)
cm
:=
GenerateWaitLeaderConfigMap
(
"my-dgd"
,
"my-ns"
)
g
.
Expect
(
cm
.
Name
)
.
To
(
gomega
.
Equal
(
"my-dgd-wait-leader-script"
))
g
.
Expect
(
cm
.
Namespace
)
.
To
(
gomega
.
Equal
(
"my-ns"
))
g
.
Expect
(
cm
.
Labels
)
.
To
(
gomega
.
HaveKeyWithValue
(
commonconsts
.
KubeLabelDynamoGraphDeploymentName
,
"my-dgd"
))
g
.
Expect
(
cm
.
Data
)
.
To
(
gomega
.
HaveKey
(
"wait-for-leader.py"
))
script
:=
cm
.
Data
[
"wait-for-leader.py"
]
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
`os.environ["LEADER_HOST"]`
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
`os.environ["LEADER_PORT"]`
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"leader_pod_is_healthy"
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"kubernetes.default.svc"
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"fieldSelector=status.podIP="
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"deletionTimestamp"
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"socket.create_connection"
))
g
.
Expect
(
script
)
.
To
(
gomega
.
ContainSubstring
(
"time.sleep(5)"
))
}
func
TestGetWaitLeaderConfigMapName
(
t
*
testing
.
T
)
{
g
:=
gomega
.
NewGomegaWithT
(
t
)
g
.
Expect
(
GetWaitLeaderConfigMapName
(
"foo"
))
.
To
(
gomega
.
Equal
(
"foo-wait-leader-script"
))
}
func
TestShouldUseMpBackend
(
t
*
testing
.
T
)
{
func
TestShouldUseMpBackend
(
t
*
testing
.
T
)
{
// Version-based gate behavior is tested in featuregate.TestOperatorOriginFeatureGate_IsEnabled.
// Version-based gate behavior is tested in featuregate.TestOperatorOriginFeatureGate_IsEnabled.
// These tests focus on the explicit override logic and its interaction with the feature gate.
// These tests focus on the explicit override logic and its interaction with the feature gate.
...
...
deploy/operator/internal/dynamo/graph.go
View file @
317b9614
...
@@ -895,12 +895,12 @@ type MultinodeDeployer interface {
...
@@ -895,12 +895,12 @@ type MultinodeDeployer interface {
}
}
// BackendFactory creates backend instances based on the framework type
// BackendFactory creates backend instances based on the framework type
func
BackendFactory
(
backendFramework
BackendFramework
,
operatorConfig
*
configv1alpha1
.
OperatorConfiguration
)
Backend
{
func
BackendFactory
(
backendFramework
BackendFramework
,
operatorConfig
*
configv1alpha1
.
OperatorConfiguration
,
parentGraphDeploymentName
string
)
Backend
{
switch
backendFramework
{
switch
backendFramework
{
case
BackendFrameworkSGLang
:
case
BackendFrameworkSGLang
:
return
&
SGLangBackend
{}
return
&
SGLangBackend
{}
case
BackendFrameworkVLLM
:
case
BackendFrameworkVLLM
:
return
&
VLLMBackend
{}
return
&
VLLMBackend
{
ParentGraphDeploymentName
:
parentGraphDeploymentName
}
case
BackendFrameworkTRTLLM
:
case
BackendFrameworkTRTLLM
:
return
&
TRTLLMBackend
{
return
&
TRTLLMBackend
{
MpiRunSecretName
:
operatorConfig
.
MPI
.
SSHSecretName
,
MpiRunSecretName
:
operatorConfig
.
MPI
.
SSHSecretName
,
...
@@ -1121,7 +1121,7 @@ func GenerateBasePodSpec(
...
@@ -1121,7 +1121,7 @@ func GenerateBasePodSpec(
if
multinodeDeployer
==
nil
{
if
multinodeDeployer
==
nil
{
return
nil
,
fmt
.
Errorf
(
"unsupported multinode deployment type: %s"
,
multinodeDeploymentType
)
return
nil
,
fmt
.
Errorf
(
"unsupported multinode deployment type: %s"
,
multinodeDeploymentType
)
}
}
backend
:=
BackendFactory
(
backendFramework
,
operatorConfig
)
backend
:=
BackendFactory
(
backendFramework
,
operatorConfig
,
parentGraphDeploymentName
)
if
backend
==
nil
{
if
backend
==
nil
{
return
nil
,
fmt
.
Errorf
(
"unsupported backend framework: %s"
,
backendFramework
)
return
nil
,
fmt
.
Errorf
(
"unsupported backend framework: %s"
,
backendFramework
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment