Unverified Commit 334ce551 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

fix: k8s native discovery (#4305)

parent 9a765d6d
...@@ -43,6 +43,7 @@ const ( ...@@ -43,6 +43,7 @@ const (
KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash" KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash"
KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend" KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelDynamoDiscoveryEnabled = "nvidia.com/dynamo-discovery-enabled"
KubeLabelValueFalse = "false" KubeLabelValueFalse = "false"
KubeLabelValueTrue = "true" KubeLabelValueTrue = "true"
......
...@@ -1298,6 +1298,10 @@ func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResour ...@@ -1298,6 +1298,10 @@ func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResour
if isK8sDiscovery { if isK8sDiscovery {
labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes" labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
} }
// Discovery is enabled for non frontend components
if isK8sDiscovery && !opt.dynamoComponentDeployment.IsFrontendComponent() {
labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
}
var servicePort corev1.ServicePort var servicePort corev1.ServicePort
if opt.dynamoComponentDeployment.IsFrontendComponent() { if opt.dynamoComponentDeployment.IsFrontendComponent() {
......
...@@ -368,8 +368,9 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -368,8 +368,9 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// if k8s discovery is enabled, create a service for each component // if k8s discovery is enabled, create a service for each component
// else, only create for the frontend component // else, only create for the frontend component
if r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) || component.ComponentType == consts.ComponentTypeFrontend { isK8sDiscoveryEnabled := r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations)
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName) if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend {
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName, isK8sDiscoveryEnabled)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the main component service") logger.Error(err, "failed to generate the main component service")
return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err) return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err)
......
...@@ -396,7 +396,7 @@ func getCliqueStartupDependencies( ...@@ -396,7 +396,7 @@ func getCliqueStartupDependencies(
return nil return nil
} }
func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.DynamoGraphDeployment, component *v1alpha1.DynamoComponentDeploymentSharedSpec, componentName string) (*corev1.Service, error) { func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.DynamoGraphDeployment, component *v1alpha1.DynamoComponentDeploymentSharedSpec, componentName string, isK8sDiscoveryEnabled bool) (*corev1.Service, error) {
if component.DynamoNamespace == nil { if component.DynamoNamespace == nil {
return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName) return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName)
} }
...@@ -431,6 +431,15 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy ...@@ -431,6 +431,15 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy
Ports: []corev1.ServicePort{servicePort}, Ports: []corev1.ServicePort{servicePort},
}, },
} }
if isK8sDiscoveryEnabled {
service.Labels = map[string]string{
commonconsts.KubeLabelDynamoDiscoveryBackend: "kubernetes",
}
// Discovery is enabled for non frontend components
if component.ComponentType != commonconsts.ComponentTypeFrontend {
service.Labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
}
}
return service, nil return service, nil
} }
......
...@@ -16,7 +16,7 @@ When deploying with the Dynamo operator, simply add the annotation to your DGD m ...@@ -16,7 +16,7 @@ When deploying with the Dynamo operator, simply add the annotation to your DGD m
```yaml ```yaml
metadata: metadata:
annotations: annotations:
nvidia.com/dynamo-discover-backend: kubernetes nvidia.com/dynamo-discovery-backend: kubernetes
``` ```
The operator will automatically configure the required EndpointSlices, labels, and pod environment variables. See [`dgd.yaml`](./dgd.yaml) for a complete example. The operator will automatically configure the required EndpointSlices, labels, and pod environment variables. See [`dgd.yaml`](./dgd.yaml) for a complete example.
......
...@@ -17,9 +17,6 @@ spec: ...@@ -17,9 +17,6 @@ spec:
dynamoNamespace: dynamo dynamoNamespace: dynamo
componentType: frontend componentType: frontend
replicas: 1 replicas: 1
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: ${IMAGE} image: ${IMAGE}
...@@ -30,29 +27,6 @@ spec: ...@@ -30,29 +27,6 @@ spec:
resources: resources:
limits: limits:
gpu: "1" gpu: "1"
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
- name: DYN_SYSTEM_STARTING_HEALTH_STATUS
value: "notready"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\", \"clear_kv_blocks\"]"
readinessProbe:
httpGet:
path: /health
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
livenessProbe:
httpGet:
path: /live
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
extraPodSpec: extraPodSpec:
terminationGracePeriodSeconds: 120 terminationGracePeriodSeconds: 120
mainContainer: mainContainer:
...@@ -72,29 +46,6 @@ spec: ...@@ -72,29 +46,6 @@ spec:
resources: resources:
limits: limits:
gpu: "1" gpu: "1"
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
- name: DYN_SYSTEM_STARTING_HEALTH_STATUS
value: "notready"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\", \"clear_kv_blocks\"]"
readinessProbe:
httpGet:
path: /health
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
livenessProbe:
httpGet:
path: /live
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
extraPodSpec: extraPodSpec:
terminationGracePeriodSeconds: 120 terminationGracePeriodSeconds: 120
mainContainer: mainContainer:
......
...@@ -68,11 +68,12 @@ impl DiscoveryDaemon { ...@@ -68,11 +68,12 @@ impl DiscoveryDaemon {
let (reader, writer) = reflector::store(); let (reader, writer) = reflector::store();
// Apply label selector to only watch discovery-enabled EndpointSlices // Apply label selector to only watch discovery-enabled EndpointSlices
let watch_config = let watch_config = Config::default()
Config::default().labels("nvidia.com/dynamo-discovery-backend=kubernetes"); .labels("nvidia.com/dynamo-discovery-backend=kubernetes")
.labels("nvidia.com/dynamo-discovery-enabled=true");
tracing::info!( tracing::info!(
"Daemon watching EndpointSlices with label: nvidia.com/dynamo-discovery-backend=kubernetes" "Daemon watching EndpointSlices with labels: nvidia.com/dynamo-discovery-backend=kubernetes, nvidia.com/dynamo-discovery-enabled=true"
); );
// Spawn reflector task (runs independently) // Spawn reflector task (runs independently)
...@@ -181,8 +182,8 @@ impl DiscoveryDaemon { ...@@ -181,8 +182,8 @@ impl DiscoveryDaemon {
) -> Result<MetadataSnapshot> { ) -> Result<MetadataSnapshot> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
// Extract ALL ready endpoints (instance_id, pod_name, pod_ip) directly from reflector // Extract ALL ready endpoints (instance_id, pod_name, pod_ip, system_port) directly from reflector
let all_endpoints: Vec<(u64, String, String)> = reader let all_endpoints: Vec<(u64, String, String, u16)> = reader
.state() .state()
.iter() .iter()
.flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref())) .flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref()))
...@@ -194,25 +195,26 @@ impl DiscoveryDaemon { ...@@ -194,25 +195,26 @@ impl DiscoveryDaemon {
); );
// Concurrent fetch: Fetch metadata for all endpoints in parallel // Concurrent fetch: Fetch metadata for all endpoints in parallel
let fetch_futures = all_endpoints let fetch_futures =
.into_iter() all_endpoints
.map(|(instance_id, pod_name, pod_ip)| { .into_iter()
let daemon = self.clone(); .map(|(instance_id, pod_name, pod_ip, system_port)| {
async move { let daemon = self.clone();
match daemon.fetch_metadata(&pod_name, &pod_ip).await { async move {
Ok(metadata) => Some((instance_id, metadata)), match daemon.fetch_metadata(&pod_name, &pod_ip, system_port).await {
Err(e) => { Ok(metadata) => Some((instance_id, metadata)),
tracing::warn!( Err(e) => {
"Failed to fetch metadata for pod {} (instance_id={:x}): {}", tracing::warn!(
pod_name, "Failed to fetch metadata for pod {} (instance_id={:x}): {}",
instance_id, pod_name,
e instance_id,
); e
None );
None
}
} }
} }
} });
});
// Execute fetches concurrently with bounded parallelism // Execute fetches concurrently with bounded parallelism
let results: Vec<_> = futures::stream::iter(fetch_futures) let results: Vec<_> = futures::stream::iter(fetch_futures)
...@@ -241,7 +243,12 @@ impl DiscoveryDaemon { ...@@ -241,7 +243,12 @@ impl DiscoveryDaemon {
} }
/// Fetch metadata for a single pod (with caching) /// Fetch metadata for a single pod (with caching)
async fn fetch_metadata(&self, pod_name: &str, pod_ip: &str) -> Result<Arc<DiscoveryMetadata>> { async fn fetch_metadata(
&self,
pod_name: &str,
pod_ip: &str,
system_port: u16,
) -> Result<Arc<DiscoveryMetadata>> {
let instance_id = hash_pod_name(pod_name); let instance_id = hash_pod_name(pod_name);
// Check cache // Check cache
...@@ -258,7 +265,7 @@ impl DiscoveryDaemon { ...@@ -258,7 +265,7 @@ impl DiscoveryDaemon {
} }
// Cache miss: fetch from HTTP // Cache miss: fetch from HTTP
let url = format!("http://{}:{}/metadata", pod_ip, self.pod_info.system_port); let url = format!("http://{}:{}/metadata", pod_ip, system_port);
tracing::debug!("Fetching metadata from {url}"); tracing::debug!("Fetching metadata from {url}");
......
...@@ -13,11 +13,34 @@ pub fn hash_pod_name(pod_name: &str) -> u64 { ...@@ -13,11 +13,34 @@ pub fn hash_pod_name(pod_name: &str) -> u64 {
hasher.finish() hasher.finish()
} }
/// Extract the system port from an EndpointSlice's ports
/// Looks for a port with name "system", returns None if not found
fn extract_system_port(slice: &EndpointSlice) -> Option<u16> {
slice.ports.as_ref().and_then(|ports| {
ports
.iter()
.find(|p| p.name.as_deref() == Some("system"))
.and_then(|p| p.port.map(|port| port as u16))
})
}
/// Extract endpoint information from an EndpointSlice /// Extract endpoint information from an EndpointSlice
/// Returns (instance_id, pod_name, pod_ip) tuples for ready endpoints /// Returns (instance_id, pod_name, pod_ip, system_port) tuples for ready endpoints
pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, String)> { pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, String, u16)> {
let mut result = Vec::new(); // Extract system port from EndpointSlice ports
let system_port = match extract_system_port(slice) {
Some(port) => port,
None => {
let slice_name = slice.metadata.name.as_deref().unwrap_or("unknown");
tracing::warn!(
"EndpointSlice '{}' did not have a system port defined",
slice_name
);
return Vec::new();
}
};
let mut result = Vec::new();
let endpoints = &slice.endpoints; let endpoints = &slice.endpoints;
for endpoint in endpoints { for endpoint in endpoints {
...@@ -46,7 +69,7 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, ...@@ -46,7 +69,7 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String,
// Get first IP only (avoid duplicate instance IDs) // Get first IP only (avoid duplicate instance IDs)
if let Some(ip) = endpoint.addresses.first() { if let Some(ip) = endpoint.addresses.first() {
result.push((instance_id, pod_name.to_string(), ip.clone())); result.push((instance_id, pod_name.to_string(), ip.clone(), system_port));
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment