Unverified Commit 7afb5431 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: add kube impl for discovery and add metadata endpoint (#4136)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatartmontfort <tmontfort@nvidia.com>
parent ec7af939
...@@ -226,6 +226,18 @@ version = "1.5.0" ...@@ -226,6 +226,18 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-broadcast"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532"
dependencies = [
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-channel" name = "async-channel"
version = "2.5.0" version = "2.5.0"
...@@ -578,6 +590,17 @@ dependencies = [ ...@@ -578,6 +590,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "backon"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef"
dependencies = [
"fastrand",
"gloo-timers",
"tokio",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.75" version = "0.3.75"
...@@ -2393,6 +2416,8 @@ dependencies = [ ...@@ -2393,6 +2416,8 @@ dependencies = [
"humantime", "humantime",
"inotify", "inotify",
"jsonschema", "jsonschema",
"k8s-openapi",
"kube",
"local-ip-address", "local-ip-address",
"log", "log",
"nid", "nid",
...@@ -3363,6 +3388,18 @@ dependencies = [ ...@@ -3363,6 +3388,18 @@ dependencies = [
"regex-syntax", "regex-syntax",
] ]
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.27" version = "0.3.27"
...@@ -3531,6 +3568,26 @@ dependencies = [ ...@@ -3531,6 +3568,26 @@ dependencies = [
"windows-sys 0.60.2", "windows-sys 0.60.2",
] ]
[[package]]
name = "home"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d"
dependencies = [
"windows-sys 0.61.0",
]
[[package]]
name = "hostname"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65"
dependencies = [
"cfg-if 1.0.3",
"libc",
"windows-link 0.1.3",
]
[[package]] [[package]]
name = "hound" name = "hound"
version = "3.5.1" version = "3.5.1"
...@@ -3703,6 +3760,7 @@ dependencies = [ ...@@ -3703,6 +3760,7 @@ dependencies = [
"http 1.3.1", "http 1.3.1",
"hyper 1.7.0", "hyper 1.7.0",
"hyper-util", "hyper-util",
"log",
"rustls", "rustls",
"rustls-native-certs 0.8.1", "rustls-native-certs 0.8.1",
"rustls-pki-types", "rustls-pki-types",
...@@ -4270,6 +4328,18 @@ dependencies = [ ...@@ -4270,6 +4328,18 @@ dependencies = [
"unicode-general-category", "unicode-general-category",
] ]
[[package]]
name = "json-patch"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90"
dependencies = [
"jsonptr",
"serde",
"serde_json",
"thiserror 1.0.69",
]
[[package]] [[package]]
name = "json5" name = "json5"
version = "0.4.1" version = "0.4.1"
...@@ -4281,6 +4351,29 @@ dependencies = [ ...@@ -4281,6 +4351,29 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "jsonpath-rust"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b"
dependencies = [
"pest",
"pest_derive",
"regex",
"serde_json",
"thiserror 2.0.16",
]
[[package]]
name = "jsonptr"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe"
dependencies = [
"serde",
"serde_json",
]
[[package]] [[package]]
name = "jsonschema" name = "jsonschema"
version = "0.17.1" version = "0.17.1"
...@@ -4321,6 +4414,18 @@ dependencies = [ ...@@ -4321,6 +4414,18 @@ dependencies = [
"rayon", "rayon",
] ]
[[package]]
name = "k8s-openapi"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692"
dependencies = [
"base64 0.22.1",
"chrono",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "kernel32-sys" name = "kernel32-sys"
version = "0.2.2" version = "0.2.2"
...@@ -4331,6 +4436,115 @@ dependencies = [ ...@@ -4331,6 +4436,115 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "kube"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e"
dependencies = [
"k8s-openapi",
"kube-client",
"kube-core",
"kube-derive",
"kube-runtime",
]
[[package]]
name = "kube-client"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36"
dependencies = [
"base64 0.22.1",
"bytes",
"chrono",
"either",
"futures",
"home",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.7.0",
"hyper-rustls",
"hyper-timeout",
"hyper-util",
"jsonpath-rust",
"k8s-openapi",
"kube-core",
"pem",
"rustls",
"secrecy",
"serde",
"serde_json",
"serde_yaml",
"thiserror 2.0.16",
"tokio",
"tokio-util",
"tower 0.5.2",
"tower-http",
"tracing",
]
[[package]]
name = "kube-core"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73"
dependencies = [
"chrono",
"derive_more 2.0.1",
"form_urlencoded",
"http 1.3.1",
"json-patch",
"k8s-openapi",
"schemars 1.0.4",
"serde",
"serde-value",
"serde_json",
"thiserror 2.0.16",
]
[[package]]
name = "kube-derive"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2"
dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"serde",
"serde_json",
"syn 2.0.106",
]
[[package]]
name = "kube-runtime"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd"
dependencies = [
"ahash",
"async-broadcast",
"async-stream",
"backon",
"educe",
"futures",
"hashbrown 0.15.5",
"hostname",
"json-patch",
"k8s-openapi",
"kube-client",
"parking_lot",
"pin-project",
"serde",
"serde_json",
"thiserror 2.0.16",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "kvbm-py3" name = "kvbm-py3"
version = "0.7.0" version = "0.7.0"
...@@ -4967,7 +5181,7 @@ dependencies = [ ...@@ -4967,7 +5181,7 @@ dependencies = [
"num-traits", "num-traits",
"objc", "objc",
"once_cell", "once_cell",
"ordered-float", "ordered-float 5.1.0",
"parking_lot", "parking_lot",
"radix_trie", "radix_trie",
"rand 0.9.2", "rand 0.9.2",
...@@ -5816,6 +6030,15 @@ version = "0.2.0" ...@@ -5816,6 +6030,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "ordered-float" name = "ordered-float"
version = "5.1.0" version = "5.1.0"
...@@ -5921,6 +6144,16 @@ dependencies = [ ...@@ -5921,6 +6144,16 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "pem"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be"
dependencies = [
"base64 0.22.1",
"serde_core",
]
[[package]] [[package]]
name = "pem-rfc7468" name = "pem-rfc7468"
version = "0.7.0" version = "0.7.0"
...@@ -7566,7 +7799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -7566,7 +7799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615"
dependencies = [ dependencies = [
"dyn-clone", "dyn-clone",
"schemars_derive", "schemars_derive 0.8.22",
"serde", "serde",
"serde_json", "serde_json",
] ]
...@@ -7591,6 +7824,7 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" ...@@ -7591,6 +7824,7 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
dependencies = [ dependencies = [
"dyn-clone", "dyn-clone",
"ref-cast", "ref-cast",
"schemars_derive 1.0.4",
"serde", "serde",
"serde_json", "serde_json",
] ]
...@@ -7607,6 +7841,18 @@ dependencies = [ ...@@ -7607,6 +7841,18 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "schemars_derive"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.106",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
...@@ -7748,6 +7994,16 @@ dependencies = [ ...@@ -7748,6 +7994,16 @@ dependencies = [
"typeid", "typeid",
] ]
[[package]]
name = "serde-value"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float 2.10.1",
"serde",
]
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
version = "0.11.2" version = "0.11.2"
...@@ -8927,6 +9183,7 @@ dependencies = [ ...@@ -8927,6 +9183,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"pin-project-lite", "pin-project-lite",
"slab",
"tokio", "tokio",
] ]
...@@ -9243,12 +9500,14 @@ version = "0.6.6" ...@@ -9243,12 +9500,14 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [ dependencies = [
"base64 0.22.1",
"bitflags 2.9.4", "bitflags 2.9.4",
"bytes", "bytes",
"futures-util", "futures-util",
"http 1.3.1", "http 1.3.1",
"http-body 1.0.1", "http-body 1.0.1",
"iri-string", "iri-string",
"mime",
"pin-project-lite", "pin-project-lite",
"tower 0.5.2", "tower 0.5.2",
"tower-layer", "tower-layer",
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Dynamo Service Discovery
## Overview
By default, Dynamo discovers endpoints and model cards through etcd. An experimental Kubernetes backend is available for discovery that uses native Kubernetes EndpointSlices, eliminating the dependency on etcd.
**Using DynamoGraphDeployment (Recommended):**
When deploying with the Dynamo operator, simply add the annotation to your DGD manifest:
```yaml
metadata:
annotations:
nvidia.com/dynamo-discover-backend: kubernetes
```
The operator will automatically configure the required EndpointSlices, labels, and pod environment variables. See [`dgd.yaml`](./dgd.yaml) for a complete example.
## Environment Variables
| **Variable** | **Description** | **Default** |
| ------------ | --------------- | ----------- |
| `DYN_DISCOVERY_BACKEND` | Discovery backend (`kv_store` for etcd or `kubernetes` for experimental EndpointSlice-based discovery) | `kv_store` |
## Metadata Endpoint
The Kubernetes backend exposes a `/metadata` endpoint on each pod that returns registered discovery information. This is used by the system status server to expose the discovery information to the clients on the discovery plane.
### Example Request
```bash
curl -s localhost:9090/metadata | jq
```
### Example Response
```json
{
"endpoints": {
"vllm-disagg/backend/generate": {
"component": "backend",
"endpoint": "generate",
"instance_id": 12345678901234567890,
"namespace": "vllm-disagg",
"transport": {
"nats_tcp": "vllm-disagg_backend.generate-abc123"
}
}
},
"model_cards": {}
}
```
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: dynamo
labels:
nvidia.com/dynamo-discovery-backend: kubernetes
spec:
envs:
- name: DYN_LOG
value: "debug"
services:
Frontend:
dynamoNamespace: dynamo
componentType: frontend
replicas: 1
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
extraPodSpec:
mainContainer:
image: ${IMAGE}
VllmDecodeWorker:
dynamoNamespace: vllm-disagg
componentType: backend
replicas: 1
resources:
limits:
gpu: "1"
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
- name: DYN_SYSTEM_STARTING_HEALTH_STATUS
value: "notready"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\", \"clear_kv_blocks\"]"
readinessProbe:
httpGet:
path: /health
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
livenessProbe:
httpGet:
path: /live
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
extraPodSpec:
terminationGracePeriodSeconds: 120
mainContainer:
image: ${IMAGE}
workingDir: /workspace/components/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- Qwen/Qwen3-0.6B
VllmPrefillWorker:
dynamoNamespace: vllm-disagg
componentType: prefill
replicas: 1
resources:
limits:
gpu: "1"
envs:
- name: DYN_SYSTEM_PORT
value: "9090"
- name: DYN_SYSTEM_STARTING_HEALTH_STATUS
value: "notready"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\", \"clear_kv_blocks\"]"
readinessProbe:
httpGet:
path: /health
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
livenessProbe:
httpGet:
path: /live
port: 9090
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 60
extraPodSpec:
terminationGracePeriodSeconds: 120
mainContainer:
image: ${IMAGE}
workingDir: /workspace/components/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- Qwen/Qwen3-0.6B
- --is-prefill-worker
\ No newline at end of file
...@@ -55,6 +55,12 @@ dependencies = [ ...@@ -55,6 +55,12 @@ dependencies = [
"equator", "equator",
] ]
[[package]]
name = "allocator-api2"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]] [[package]]
name = "android-tzdata" name = "android-tzdata"
version = "0.1.1" version = "0.1.1"
...@@ -173,6 +179,18 @@ version = "0.7.6" ...@@ -173,6 +179,18 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-broadcast"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532"
dependencies = [
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-channel" name = "async-channel"
version = "2.5.0" version = "2.5.0"
...@@ -467,6 +485,17 @@ dependencies = [ ...@@ -467,6 +485,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "backon"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef"
dependencies = [
"fastrand",
"gloo-timers",
"tokio",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.75" version = "0.3.75"
...@@ -1144,8 +1173,18 @@ version = "0.20.11" ...@@ -1144,8 +1173,18 @@ version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [ dependencies = [
"darling_core", "darling_core 0.20.11",
"darling_macro", "darling_macro 0.20.11",
]
[[package]]
name = "darling"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0"
dependencies = [
"darling_core 0.21.3",
"darling_macro 0.21.3",
] ]
[[package]] [[package]]
...@@ -1162,13 +1201,38 @@ dependencies = [ ...@@ -1162,13 +1201,38 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "darling_core"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn 2.0.106",
]
[[package]] [[package]]
name = "darling_macro" name = "darling_macro"
version = "0.20.11" version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [ dependencies = [
"darling_core", "darling_core 0.20.11",
"quote",
"syn 2.0.106",
]
[[package]]
name = "darling_macro"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core 0.21.3",
"quote", "quote",
"syn 2.0.106", "syn 2.0.106",
] ]
...@@ -1265,7 +1329,7 @@ version = "0.20.2" ...@@ -1265,7 +1329,7 @@ version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
dependencies = [ dependencies = [
"darling", "darling 0.20.11",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.106", "syn 2.0.106",
...@@ -1287,7 +1351,16 @@ version = "1.0.0" ...@@ -1287,7 +1351,16 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05"
dependencies = [ dependencies = [
"derive_more-impl", "derive_more-impl 1.0.0",
]
[[package]]
name = "derive_more"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
dependencies = [
"derive_more-impl 2.0.1",
] ]
[[package]] [[package]]
...@@ -1302,6 +1375,17 @@ dependencies = [ ...@@ -1302,6 +1375,17 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "derive_more-impl"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]] [[package]]
name = "dialoguer" name = "dialoguer"
version = "0.11.0" version = "0.11.0"
...@@ -1607,6 +1691,8 @@ dependencies = [ ...@@ -1607,6 +1691,8 @@ dependencies = [
"futures", "futures",
"humantime", "humantime",
"inotify", "inotify",
"k8s-openapi",
"kube",
"local-ip-address", "local-ip-address",
"log", "log",
"nid", "nid",
...@@ -1621,6 +1707,7 @@ dependencies = [ ...@@ -1621,6 +1707,7 @@ dependencies = [
"rand 0.9.2", "rand 0.9.2",
"rayon", "rayon",
"regex", "regex",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"socket2 0.5.10", "socket2 0.5.10",
...@@ -2387,6 +2474,18 @@ version = "0.3.3" ...@@ -2387,6 +2474,18 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.12" version = "0.4.12"
...@@ -2441,6 +2540,8 @@ version = "0.15.5" ...@@ -2441,6 +2540,8 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [ dependencies = [
"allocator-api2",
"equivalent",
"foldhash", "foldhash",
] ]
...@@ -2503,6 +2604,17 @@ dependencies = [ ...@@ -2503,6 +2604,17 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "hostname"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65"
dependencies = [
"cfg-if 1.0.3",
"libc",
"windows-link 0.1.3",
]
[[package]] [[package]]
name = "http" name = "http"
version = "1.3.1" version = "1.3.1"
...@@ -2587,6 +2699,7 @@ dependencies = [ ...@@ -2587,6 +2699,7 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"hyper-util", "hyper-util",
"log",
"rustls", "rustls",
"rustls-native-certs 0.8.1", "rustls-native-certs 0.8.1",
"rustls-pki-types", "rustls-pki-types",
...@@ -3073,6 +3186,18 @@ dependencies = [ ...@@ -3073,6 +3186,18 @@ dependencies = [
"unicode-general-category", "unicode-general-category",
] ]
[[package]]
name = "json-patch"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90"
dependencies = [
"jsonptr",
"serde",
"serde_json",
"thiserror 1.0.69",
]
[[package]] [[package]]
name = "json5" name = "json5"
version = "0.4.1" version = "0.4.1"
...@@ -3084,6 +3209,29 @@ dependencies = [ ...@@ -3084,6 +3209,29 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "jsonpath-rust"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b"
dependencies = [
"pest",
"pest_derive",
"regex",
"serde_json",
"thiserror 2.0.16",
]
[[package]]
name = "jsonptr"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe"
dependencies = [
"serde",
"serde_json",
]
[[package]] [[package]]
name = "jwalk" name = "jwalk"
version = "0.8.1" version = "0.8.1"
...@@ -3094,6 +3242,18 @@ dependencies = [ ...@@ -3094,6 +3242,18 @@ dependencies = [
"rayon", "rayon",
] ]
[[package]]
name = "k8s-openapi"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692"
dependencies = [
"base64 0.22.1",
"chrono",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "kernel32-sys" name = "kernel32-sys"
version = "0.2.2" version = "0.2.2"
...@@ -3104,6 +3264,115 @@ dependencies = [ ...@@ -3104,6 +3264,115 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "kube"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e"
dependencies = [
"k8s-openapi",
"kube-client",
"kube-core",
"kube-derive",
"kube-runtime",
]
[[package]]
name = "kube-client"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36"
dependencies = [
"base64 0.22.1",
"bytes",
"chrono",
"either",
"futures",
"home",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper-timeout",
"hyper-util",
"jsonpath-rust",
"k8s-openapi",
"kube-core",
"pem",
"rustls",
"secrecy",
"serde",
"serde_json",
"serde_yaml",
"thiserror 2.0.16",
"tokio",
"tokio-util",
"tower",
"tower-http",
"tracing",
]
[[package]]
name = "kube-core"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73"
dependencies = [
"chrono",
"derive_more 2.0.1",
"form_urlencoded",
"http",
"json-patch",
"k8s-openapi",
"schemars 1.0.4",
"serde",
"serde-value",
"serde_json",
"thiserror 2.0.16",
]
[[package]]
name = "kube-derive"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2"
dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"serde",
"serde_json",
"syn 2.0.106",
]
[[package]]
name = "kube-runtime"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd"
dependencies = [
"ahash",
"async-broadcast",
"async-stream",
"backon",
"educe",
"futures",
"hashbrown 0.15.5",
"hostname",
"json-patch",
"k8s-openapi",
"kube-client",
"parking_lot",
"pin-project",
"serde",
"serde_json",
"thiserror 2.0.16",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "lalrpop-util" name = "lalrpop-util"
version = "0.20.2" version = "0.20.2"
...@@ -3284,7 +3553,7 @@ version = "0.2.3" ...@@ -3284,7 +3553,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8" checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8"
dependencies = [ dependencies = [
"derive_more", "derive_more 1.0.0",
"malachite", "malachite",
"num-integer", "num-integer",
"num-traits", "num-traits",
...@@ -3993,6 +4262,15 @@ version = "0.2.0" ...@@ -3993,6 +4262,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "ordered-multimap" name = "ordered-multimap"
version = "0.7.3" version = "0.7.3"
...@@ -4073,6 +4351,16 @@ dependencies = [ ...@@ -4073,6 +4351,16 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "pem"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be"
dependencies = [
"base64 0.22.1",
"serde_core",
]
[[package]] [[package]]
name = "pem-rfc7468" name = "pem-rfc7468"
version = "0.7.0" version = "0.7.0"
...@@ -5397,10 +5685,23 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" ...@@ -5397,10 +5685,23 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
dependencies = [ dependencies = [
"dyn-clone", "dyn-clone",
"ref-cast", "ref-cast",
"schemars_derive",
"serde", "serde",
"serde_json", "serde_json",
] ]
[[package]]
name = "schemars_derive"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.106",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
...@@ -5473,10 +5774,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" ...@@ -5473,10 +5774,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.219" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [ dependencies = [
"serde_core",
"serde_derive", "serde_derive",
] ]
...@@ -5491,11 +5793,41 @@ dependencies = [ ...@@ -5491,11 +5793,41 @@ dependencies = [
"typeid", "typeid",
] ]
[[package]]
name = "serde-value"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float",
"serde",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.219" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "serde_derive_internals"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
...@@ -5601,7 +5933,7 @@ version = "3.14.0" ...@@ -5601,7 +5933,7 @@ version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f"
dependencies = [ dependencies = [
"darling", "darling 0.20.11",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.106", "syn 2.0.106",
...@@ -6197,6 +6529,7 @@ dependencies = [ ...@@ -6197,6 +6529,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"pin-project-lite", "pin-project-lite",
"slab",
"tokio", "tokio",
] ]
...@@ -6426,12 +6759,14 @@ version = "0.6.6" ...@@ -6426,12 +6759,14 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [ dependencies = [
"base64 0.22.1",
"bitflags 2.9.3", "bitflags 2.9.3",
"bytes", "bytes",
"futures-util", "futures-util",
"http", "http",
"http-body", "http-body",
"iri-string", "iri-string",
"mime",
"pin-project-lite", "pin-project-lite",
"tower", "tower",
"tower-layer", "tower-layer",
...@@ -6895,7 +7230,7 @@ version = "0.20.0" ...@@ -6895,7 +7230,7 @@ version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca"
dependencies = [ dependencies = [
"darling", "darling 0.20.11",
"once_cell", "once_cell",
"proc-macro-error2", "proc-macro-error2",
"proc-macro2", "proc-macro2",
......
...@@ -39,6 +39,7 @@ humantime = { workspace = true } ...@@ -39,6 +39,7 @@ humantime = { workspace = true }
parking_lot = { workspace = true } parking_lot = { workspace = true }
prometheus = { workspace = true } prometheus = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
...@@ -75,6 +76,10 @@ regex = { version = "1" } ...@@ -75,6 +76,10 @@ regex = { version = "1" }
socket2 = { version = "0.5.8" } socket2 = { version = "0.5.8" }
tokio-rayon = { version = "2.1" } tokio-rayon = { version = "2.1" }
# Kubernetes discovery backend
kube = { version = "2.0.1", default-features = false, features = ["runtime", "derive", "client", "rustls-tls", "aws-lc-rs"] }
k8s-openapi = { version = "0.26.0", features = ["v1_32"] }
[dev-dependencies] [dev-dependencies]
assert_matches = { version = "1.5.0" } assert_matches = { version = "1.5.0" }
criterion = { version = "0.5", features = ["async_tokio"] } criterion = { version = "0.5", features = ["async_tokio"] }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
mod daemon;
mod utils;
pub use utils::hash_pod_name;
use daemon::DiscoveryDaemon;
use utils::PodInfo;
use crate::CancellationToken;
use crate::discovery::{
Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryMetadata, DiscoveryQuery, DiscoverySpec,
DiscoveryStream, MetadataSnapshot,
};
use anyhow::Result;
use async_trait::async_trait;
use kube::Client as KubeClient;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Kubernetes-based discovery client
#[derive(Clone)]
pub struct KubeDiscoveryClient {
instance_id: u64,
metadata: Arc<RwLock<DiscoveryMetadata>>,
metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
}
impl KubeDiscoveryClient {
/// Create a new Kubernetes discovery client
///
/// # Arguments
/// * `metadata` - Shared metadata store (also used by system server)
/// * `cancel_token` - Cancellation token for shutdown
pub async fn new(
metadata: Arc<RwLock<DiscoveryMetadata>>,
cancel_token: CancellationToken,
) -> Result<Self> {
let pod_info = PodInfo::from_env()?;
let instance_id = hash_pod_name(&pod_info.pod_name);
tracing::info!(
"Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}",
pod_info.pod_name,
instance_id,
pod_info.pod_namespace
);
let kube_client = KubeClient::try_default()
.await
.map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;
// Create watch channel with initial empty snapshot
let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
// Create and spawn daemon
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;
tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
}
});
tracing::info!("Discovery daemon started");
Ok(Self {
instance_id,
metadata,
metadata_watch: watch_rx,
})
}
}
#[async_trait]
impl Discovery for KubeDiscoveryClient {
fn instance_id(&self) -> u64 {
self.instance_id
}
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance> {
let instance_id = self.instance_id();
let instance = spec.with_instance_id(instance_id);
tracing::debug!(
"Registering instance: {:?} with instance_id={:x}",
instance,
instance_id
);
// Write to local metadata
let mut metadata = self.metadata.write().await;
match &instance {
DiscoveryInstance::Endpoint(inst) => {
tracing::info!(
"Registered endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}",
inst.namespace,
inst.component,
inst.endpoint,
instance_id
);
metadata.register_endpoint(instance.clone())?;
}
DiscoveryInstance::Model {
namespace,
component,
endpoint,
..
} => {
tracing::info!(
"Registered model card: namespace={}, component={}, endpoint={}, instance_id={:x}",
namespace,
component,
endpoint,
instance_id
);
metadata.register_model_card(instance.clone())?;
}
}
Ok(instance)
}
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);
// Get current snapshot (may be empty if daemon hasn't fetched yet)
let snapshot = self.metadata_watch.borrow().clone();
tracing::debug!(
"List using snapshot seq={} with {} instances",
snapshot.sequence,
snapshot.instances.len()
);
// Filter snapshot by query
let instances = snapshot.filter(&query);
tracing::info!(
"KubeDiscoveryClient::list returning {} instances for query={:?}",
instances.len(),
query
);
Ok(instances)
}
async fn list_and_watch(
&self,
query: DiscoveryQuery,
cancel_token: Option<CancellationToken>,
) -> Result<DiscoveryStream> {
use tokio::sync::mpsc;
tracing::info!(
"KubeDiscoveryClient::list_and_watch started for query={:?}",
query
);
// Clone the watch receiver
let mut watch_rx = self.metadata_watch.clone();
// Create output stream
let (event_tx, event_rx) = mpsc::unbounded_channel();
// Generate unique stream identifier for tracing
let stream_id = uuid::Uuid::new_v4();
// Spawn task to process snapshots
tokio::spawn(async move {
let mut known_instances = HashSet::<u64>::new();
tracing::debug!(
stream_id = %stream_id,
"Watch started for query={:?}",
query
);
loop {
// Wait for next snapshot or cancellation
let watch_result = if let Some(ref token) = cancel_token {
tokio::select! {
result = watch_rx.changed() => result,
_ = token.cancelled() => {
tracing::info!(
stream_id = %stream_id,
"Watch cancelled via cancel token"
);
break;
}
}
} else {
watch_rx.changed().await
};
match watch_result {
Ok(()) => {
// Get latest snapshot
let snapshot = watch_rx.borrow_and_update().clone();
// Filter snapshot by query
let current_instances: HashSet<u64> = snapshot
.instances
.iter()
.filter_map(|(&instance_id, metadata)| {
let filtered = metadata.filter(&query);
if !filtered.is_empty() {
Some(instance_id)
} else {
None
}
})
.collect();
// Compute diff
let added: Vec<u64> = current_instances
.difference(&known_instances)
.copied()
.collect();
let removed: Vec<u64> = known_instances
.difference(&current_instances)
.copied()
.collect();
// Only log if there are changes
if !added.is_empty() || !removed.is_empty() {
tracing::debug!(
stream_id = %stream_id,
seq = snapshot.sequence,
added = added.len(),
removed = removed.len(),
total = current_instances.len(),
"Watch detected changes"
);
}
// Emit Added events
for instance_id in added {
if let Some(metadata) = snapshot.instances.get(&instance_id) {
let instances = metadata.filter(&query);
for instance in instances {
tracing::info!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance.instance_id()),
"Emitting Added event"
);
if event_tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() {
tracing::debug!(
stream_id = %stream_id,
"Watch receiver dropped"
);
return;
}
}
}
}
// Emit Removed events
for instance_id in removed {
tracing::info!(
stream_id = %stream_id,
instance_id = format!("{:x}", instance_id),
"Emitting Removed event"
);
if event_tx
.send(Ok(DiscoveryEvent::Removed(instance_id)))
.is_err()
{
tracing::debug!(stream_id = %stream_id, "Watch receiver dropped");
return;
}
}
// Update known set
known_instances = current_instances;
}
Err(_) => {
tracing::info!(
stream_id = %stream_id,
"Watch channel closed (daemon stopped)"
);
break;
}
}
}
});
// Convert receiver to stream
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx);
Ok(Box::pin(stream))
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::CancellationToken;
use crate::discovery::{DiscoveryMetadata, MetadataSnapshot};
use anyhow::Result;
use futures::StreamExt;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
Api, Client as KubeClient,
runtime::{WatchStreamExt, reflector, watcher, watcher::Config},
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use super::utils::{PodInfo, extract_endpoint_info, hash_pod_name};
const SNAPSHOT_POLL_INTERVAL_MS: u64 = 5000;
const MAX_CONCURRENT_FETCHES: usize = 20;
const METADATA_FETCH_TIMEOUT_SECS: u64 = 5;
/// Discovers and aggregates metadata from pods in the cluster
#[derive(Clone)]
pub(super) struct DiscoveryDaemon {
/// Kubernetes client
kube_client: KubeClient,
/// HTTP client for fetching remote metadata
http_client: reqwest::Client,
/// Cache of remote pod metadata (instance_id -> metadata)
cache: Arc<RwLock<HashMap<u64, Arc<DiscoveryMetadata>>>>,
// This pod's info
pod_info: PodInfo,
cancel_token: CancellationToken,
}
impl DiscoveryDaemon {
pub fn new(
kube_client: KubeClient,
pod_info: PodInfo,
cancel_token: CancellationToken,
) -> Result<Self> {
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(METADATA_FETCH_TIMEOUT_SECS))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?;
Ok(Self {
kube_client,
http_client,
cache: Arc::new(RwLock::new(HashMap::new())),
pod_info,
cancel_token,
})
}
/// Run the discovery daemon
pub async fn run(
self,
watch_tx: tokio::sync::watch::Sender<Arc<MetadataSnapshot>>,
) -> Result<()> {
tracing::info!("Discovery daemon starting");
// Create reflector for ALL EndpointSlices in our namespace
let endpoint_slices: Api<EndpointSlice> =
Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);
let (reader, writer) = reflector::store();
// Apply label selector to only watch discovery-enabled EndpointSlices
let watch_config =
Config::default().labels("nvidia.com/dynamo-discovery-backend=kubernetes");
tracing::info!(
"Daemon watching EndpointSlices with label: nvidia.com/dynamo-discovery-backend=kubernetes"
);
// Spawn reflector task (runs independently)
let reflector_stream = reflector(writer, watcher(endpoint_slices, watch_config))
.default_backoff()
.touched_objects()
.for_each(|res| {
match res {
Ok(obj) => {
tracing::debug!(
slice_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
"Daemon reflector updated EndpointSlice"
);
}
Err(e) => {
tracing::warn!("Daemon reflector error: {}", e);
}
}
futures::future::ready(())
});
tokio::spawn(reflector_stream);
// Polling loop
let mut sequence = 0u64;
let mut prev_instance_ids: HashSet<u64> = HashSet::new();
let mut interval =
tokio::time::interval(std::time::Duration::from_millis(SNAPSHOT_POLL_INTERVAL_MS));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
match self.aggregate_snapshot(&reader, sequence).await {
Ok(snapshot) => {
// Compare instance IDs to detect changes
let current_instance_ids: HashSet<u64> =
snapshot.instances.keys().copied().collect();
let instances_changed = current_instance_ids != prev_instance_ids;
if instances_changed {
// Compute what was added and removed
let added: Vec<u64> = current_instance_ids
.difference(&prev_instance_ids)
.copied()
.collect();
let removed: Vec<u64> = prev_instance_ids
.difference(&current_instance_ids)
.copied()
.collect();
tracing::info!(
"Daemon snapshot (seq={}): instances changed, total={}, added=[{}], removed=[{}]",
sequence,
current_instance_ids.len(),
added.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>().join(", "),
removed.iter().map(|id| format!("{:x}", id)).collect::<Vec<_>>().join(", ")
);
// Prune cache for removed instances
if !removed.is_empty() {
self.prune_cache(&removed).await;
}
// Broadcast the snapshot (only when changed)
if watch_tx.send(Arc::new(snapshot)).is_err() {
tracing::debug!("No watch subscribers, daemon stopping");
break;
}
prev_instance_ids = current_instance_ids;
} else {
tracing::trace!(
"Daemon snapshot (seq={}): no changes, {} instances",
sequence,
current_instance_ids.len()
);
}
sequence += 1;
}
Err(e) => {
tracing::error!("Failed to aggregate snapshot: {}", e);
// Continue on errors - don't crash daemon
}
}
}
_ = self.cancel_token.cancelled() => {
tracing::info!("Discovery daemon received cancellation");
break;
}
}
}
tracing::info!("Discovery daemon stopped");
Ok(())
}
/// Aggregate metadata from all pods into a snapshot
async fn aggregate_snapshot(
&self,
reader: &reflector::Store<EndpointSlice>,
sequence: u64,
) -> Result<MetadataSnapshot> {
let start = std::time::Instant::now();
// Extract ALL ready endpoints (instance_id, pod_name, pod_ip) directly from reflector
let all_endpoints: Vec<(u64, String, String)> = reader
.state()
.iter()
.flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref()))
.collect();
tracing::trace!(
"Daemon found {} ready endpoints to fetch",
all_endpoints.len()
);
// Concurrent fetch: Fetch metadata for all endpoints in parallel
let fetch_futures = all_endpoints
.into_iter()
.map(|(instance_id, pod_name, pod_ip)| {
let daemon = self.clone();
async move {
match daemon.fetch_metadata(&pod_name, &pod_ip).await {
Ok(metadata) => Some((instance_id, metadata)),
Err(e) => {
tracing::warn!(
"Failed to fetch metadata for pod {} (instance_id={:x}): {}",
pod_name,
instance_id,
e
);
None
}
}
}
});
// Execute fetches concurrently with bounded parallelism
let results: Vec<_> = futures::stream::iter(fetch_futures)
.buffer_unordered(MAX_CONCURRENT_FETCHES)
.collect()
.await;
// Build the snapshot
let instances: HashMap<u64, Arc<DiscoveryMetadata>> =
results.into_iter().flatten().collect();
let elapsed = start.elapsed();
tracing::trace!(
"Daemon snapshot complete (seq={}): {} instances in {:?}",
sequence,
instances.len(),
elapsed
);
Ok(MetadataSnapshot {
instances,
sequence,
timestamp: std::time::Instant::now(),
})
}
/// Fetch metadata for a single pod (with caching)
async fn fetch_metadata(&self, pod_name: &str, pod_ip: &str) -> Result<Arc<DiscoveryMetadata>> {
let instance_id = hash_pod_name(pod_name);
// Check cache
{
let cache = self.cache.read().await;
if let Some(cached) = cache.get(&instance_id) {
tracing::trace!(
"Cache hit for pod_name={}, instance_id={:x}",
pod_name,
instance_id
);
return Ok(cached.clone());
}
}
// Cache miss: fetch from HTTP
let url = format!("http://{}:{}/metadata", pod_ip, self.pod_info.system_port);
tracing::debug!("Fetching metadata from {url}");
let response = self
.http_client
.get(&url)
.send()
.await
.map_err(|e| anyhow::anyhow!("Failed to fetch metadata from {}: {}", url, e))?;
let metadata: DiscoveryMetadata = response
.json()
.await
.map_err(|e| anyhow::anyhow!("Failed to parse metadata from {}: {}", url, e))?;
let metadata = Arc::new(metadata);
// Cache it
{
let mut cache = self.cache.write().await;
// Check again in case another task inserted while we were fetching
if let Some(existing) = cache.get(&instance_id) {
tracing::debug!(
"Another task cached metadata for instance_id={:x} while we were fetching",
instance_id
);
return Ok(existing.clone());
}
cache.insert(instance_id, metadata.clone());
tracing::debug!(
"Cached metadata for pod_name={}, instance_id={:x}",
pod_name,
instance_id
);
}
Ok(metadata)
}
/// Prune cache entries for removed instances
async fn prune_cache(&self, removed_ids: &[u64]) {
let mut cache = self.cache.write().await;
for id in removed_ids {
if cache.remove(id).is_some() {
tracing::debug!("Pruned cache for removed instance_id={:x}", id);
}
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
/// Hash a pod name to get a consistent instance ID
pub fn hash_pod_name(pod_name: &str) -> u64 {
let mut hasher = DefaultHasher::new();
pod_name.hash(&mut hasher);
hasher.finish()
}
/// Extract endpoint information from an EndpointSlice
/// Returns (instance_id, pod_name, pod_ip) tuples for ready endpoints
pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, String)> {
let mut result = Vec::new();
let endpoints = &slice.endpoints;
for endpoint in endpoints {
// Check if endpoint is ready
let is_ready = endpoint
.conditions
.as_ref()
.and_then(|c| c.ready)
.unwrap_or(false);
if !is_ready {
continue;
}
// Get pod name from targetRef
let pod_name = match endpoint.target_ref.as_ref() {
Some(target_ref) => target_ref.name.as_deref().unwrap_or(""),
None => continue,
};
if pod_name.is_empty() {
continue;
}
let instance_id = hash_pod_name(pod_name);
// Get first IP only (avoid duplicate instance IDs)
if let Some(ip) = endpoint.addresses.first() {
result.push((instance_id, pod_name.to_string(), ip.clone()));
}
}
result
}
/// Pod information extracted from environment
#[derive(Debug, Clone)]
pub(super) struct PodInfo {
pub pod_name: String,
pub pod_namespace: String,
pub system_port: u16,
}
impl PodInfo {
/// Discover pod information from environment variables
pub fn from_env() -> Result<Self> {
let pod_name = std::env::var("POD_NAME")
.map_err(|_| anyhow::anyhow!("POD_NAME environment variable not set"))?;
let pod_namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| {
tracing::warn!("POD_NAMESPACE not set, defaulting to 'default'");
"default".to_string()
});
// Read system server port from config
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let system_port = config.system_port as u16;
Ok(Self {
pod_name,
pod_namespace,
system_port,
})
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use super::{DiscoveryInstance, DiscoveryQuery};
/// Key for organizing metadata internally
/// Format: "namespace/component/endpoint"
fn make_endpoint_key(namespace: &str, component: &str, endpoint: &str) -> String {
format!("{namespace}/{component}/{endpoint}")
}
/// Metadata stored on each pod and exposed via HTTP endpoint
/// This struct holds all discovery registrations for this pod instance
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DiscoveryMetadata {
/// Registered endpoint instances (key: "namespace/component/endpoint")
endpoints: HashMap<String, DiscoveryInstance>,
/// Registered model card instances (key: "namespace/component/endpoint")
model_cards: HashMap<String, DiscoveryInstance>,
}
impl DiscoveryMetadata {
/// Create a new empty metadata store
pub fn new() -> Self {
Self {
endpoints: HashMap::new(),
model_cards: HashMap::new(),
}
}
/// Register an endpoint instance
pub fn register_endpoint(&mut self, instance: DiscoveryInstance) -> Result<()> {
if let DiscoveryInstance::Endpoint(ref inst) = instance {
let key = make_endpoint_key(&inst.namespace, &inst.component, &inst.endpoint);
self.endpoints.insert(key, instance);
Ok(())
} else {
anyhow::bail!("Cannot register non-endpoint instance as endpoint")
}
}
/// Register a model card instance
pub fn register_model_card(&mut self, instance: DiscoveryInstance) -> Result<()> {
if let DiscoveryInstance::Model {
ref namespace,
ref component,
ref endpoint,
..
} = instance
{
let key = make_endpoint_key(namespace, component, endpoint);
self.model_cards.insert(key, instance);
Ok(())
} else {
anyhow::bail!("Cannot register non-model-card instance as model card")
}
}
/// Get all registered endpoints
pub fn get_all_endpoints(&self) -> Vec<DiscoveryInstance> {
self.endpoints.values().cloned().collect()
}
/// Get all registered model cards
pub fn get_all_model_cards(&self) -> Vec<DiscoveryInstance> {
self.model_cards.values().cloned().collect()
}
/// Get all registered instances (endpoints and model cards)
pub fn get_all(&self) -> Vec<DiscoveryInstance> {
self.endpoints
.values()
.chain(self.model_cards.values())
.cloned()
.collect()
}
/// Filter this metadata by query
pub fn filter(&self, query: &DiscoveryQuery) -> Vec<DiscoveryInstance> {
let all_instances = match query {
DiscoveryQuery::AllEndpoints
| DiscoveryQuery::NamespacedEndpoints { .. }
| DiscoveryQuery::ComponentEndpoints { .. }
| DiscoveryQuery::Endpoint { .. } => self.get_all_endpoints(),
DiscoveryQuery::AllModels
| DiscoveryQuery::NamespacedModels { .. }
| DiscoveryQuery::ComponentModels { .. }
| DiscoveryQuery::EndpointModels { .. } => self.get_all_model_cards(),
};
filter_instances(all_instances, query)
}
}
impl Default for DiscoveryMetadata {
fn default() -> Self {
Self::new()
}
}
/// Filter instances by query predicate
fn filter_instances(
instances: Vec<DiscoveryInstance>,
query: &DiscoveryQuery,
) -> Vec<DiscoveryInstance> {
match query {
DiscoveryQuery::AllEndpoints | DiscoveryQuery::AllModels => instances,
DiscoveryQuery::NamespacedEndpoints { namespace } => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Endpoint(i) => &i.namespace == namespace,
_ => false,
})
.collect(),
DiscoveryQuery::ComponentEndpoints {
namespace,
component,
} => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Endpoint(i) => {
&i.namespace == namespace && &i.component == component
}
_ => false,
})
.collect(),
DiscoveryQuery::Endpoint {
namespace,
component,
endpoint,
} => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Endpoint(i) => {
&i.namespace == namespace
&& &i.component == component
&& &i.endpoint == endpoint
}
_ => false,
})
.collect(),
DiscoveryQuery::NamespacedModels { namespace } => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Model { namespace: ns, .. } => ns == namespace,
_ => false,
})
.collect(),
DiscoveryQuery::ComponentModels {
namespace,
component,
} => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Model {
namespace: ns,
component: comp,
..
} => ns == namespace && comp == component,
_ => false,
})
.collect(),
DiscoveryQuery::EndpointModels {
namespace,
component,
endpoint,
} => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::Model {
namespace: ns,
component: comp,
endpoint: ep,
..
} => ns == namespace && comp == component && ep == endpoint,
_ => false,
})
.collect(),
}
}
/// Snapshot of all discovered instances and their metadata
#[derive(Clone, Debug)]
pub struct MetadataSnapshot {
/// Map of instance_id -> metadata
pub instances: HashMap<u64, Arc<DiscoveryMetadata>>,
/// Sequence number for debugging
pub sequence: u64,
/// Timestamp for observability
pub timestamp: std::time::Instant,
}
impl MetadataSnapshot {
pub fn empty() -> Self {
Self {
instances: HashMap::new(),
sequence: 0,
timestamp: std::time::Instant::now(),
}
}
/// Filter all instances in the snapshot by query
pub fn filter(&self, query: &DiscoveryQuery) -> Vec<DiscoveryInstance> {
self.instances
.values()
.flat_map(|metadata| metadata.filter(query))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::component::{Instance, TransportType};
#[test]
fn test_metadata_serde() {
let mut metadata = DiscoveryMetadata::new();
// Add an endpoint
let instance = DiscoveryInstance::Endpoint(Instance {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: "ep1".to_string(),
instance_id: 123,
transport: TransportType::NatsTcp("nats://localhost:4222".to_string()),
});
metadata.register_endpoint(instance).unwrap();
// Serialize
let json = serde_json::to_string(&metadata).unwrap();
// Deserialize
let deserialized: DiscoveryMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.endpoints.len(), 1);
assert_eq!(deserialized.model_cards.len(), 0);
}
#[tokio::test]
async fn test_concurrent_registration() {
use tokio::sync::RwLock;
let metadata = Arc::new(RwLock::new(DiscoveryMetadata::new()));
// Spawn multiple tasks registering concurrently
let handles: Vec<_> = (0..10)
.map(|i| {
let metadata = metadata.clone();
tokio::spawn(async move {
let mut meta = metadata.write().await;
let instance = DiscoveryInstance::Endpoint(Instance {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: format!("ep{}", i),
instance_id: i,
transport: TransportType::NatsTcp("nats://localhost:4222".to_string()),
});
meta.register_endpoint(instance).unwrap();
})
})
.collect();
// Wait for all to complete
for handle in handles {
handle.await.unwrap();
}
// Verify all registrations succeeded
let meta = metadata.read().await;
assert_eq!(meta.endpoints.len(), 10);
}
#[tokio::test]
async fn test_metadata_accessors() {
let mut metadata = DiscoveryMetadata::new();
// Register endpoints
for i in 0..3 {
let instance = DiscoveryInstance::Endpoint(Instance {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: format!("ep{}", i),
instance_id: i,
transport: TransportType::NatsTcp("nats://localhost:4222".to_string()),
});
metadata.register_endpoint(instance).unwrap();
}
// Register model cards
for i in 0..2 {
let instance = DiscoveryInstance::Model {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: format!("ep{}", i),
instance_id: i,
card_json: serde_json::json!({"model": "test"}),
};
metadata.register_model_card(instance).unwrap();
}
assert_eq!(metadata.get_all_endpoints().len(), 3);
assert_eq!(metadata.get_all_model_cards().len(), 2);
assert_eq!(metadata.get_all().len(), 5);
}
}
...@@ -8,10 +8,17 @@ use serde::{Deserialize, Serialize}; ...@@ -8,10 +8,17 @@ use serde::{Deserialize, Serialize};
use std::pin::Pin; use std::pin::Pin;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
mod metadata;
pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
mod mock; mod mock;
pub use mock::{MockDiscovery, SharedMockRegistry}; pub use mock::{MockDiscovery, SharedMockRegistry};
mod kv_store; mod kv_store;
pub use kv_store::KVStoreDiscovery; pub use kv_store::KVStoreDiscovery;
mod kube;
pub use kube::{KubeDiscoveryClient, hash_pod_name};
pub mod utils; pub mod utils;
use crate::component::TransportType; use crate::component::TransportType;
pub use utils::watch_and_extract_field; pub use utils::watch_and_extract_field;
......
...@@ -49,6 +49,10 @@ pub struct DistributedRuntime { ...@@ -49,6 +49,10 @@ pub struct DistributedRuntime {
// Service discovery client // Service discovery client
discovery_client: Arc<dyn discovery::Discovery>, discovery_client: Arc<dyn discovery::Discovery>,
// Discovery metadata (only used for Kubernetes backend)
// Shared with system status server to expose via /metadata endpoint
discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
// local registry for components // local registry for components
// the registry allows us to use share runtime resources across instances of the same component object. // the registry allows us to use share runtime resources across instances of the same component object.
// take for example two instances of a client to the same remote component. The registry allows us to use // take for example two instances of a client to the same remote component. The registry allows us to use
...@@ -134,13 +138,37 @@ impl DistributedRuntime { ...@@ -134,13 +138,37 @@ impl DistributedRuntime {
let nats_client_for_metrics = nats_client.clone(); let nats_client_for_metrics = nats_client.clone();
// Initialize discovery backed by KV store // Initialize discovery client based on backend configuration
let discovery_client = { let discovery_backend =
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
"kubernetes" => {
tracing::info!("Initializing Kubernetes discovery backend");
let metadata = Arc::new(tokio::sync::RwLock::new(
crate::discovery::DiscoveryMetadata::new(),
));
let client = crate::discovery::KubeDiscoveryClient::new(
metadata.clone(),
runtime.primary_token(),
)
.await
.inspect_err(
|err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
)?;
(Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
}
_ => {
tracing::info!("Initializing KV store discovery backend");
use crate::discovery::KVStoreDiscovery; use crate::discovery::KVStoreDiscovery;
(
Arc::new(KVStoreDiscovery::new( Arc::new(KVStoreDiscovery::new(
store.clone(), store.clone(),
runtime.primary_token(), runtime.primary_token(),
)) as Arc<dyn Discovery> )) as Arc<dyn Discovery>,
None,
)
}
}; };
let distributed_runtime = Self { let distributed_runtime = Self {
...@@ -151,6 +179,7 @@ impl DistributedRuntime { ...@@ -151,6 +179,7 @@ impl DistributedRuntime {
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()), system_status_server: Arc::new(OnceLock::new()),
discovery_client, discovery_client,
discovery_metadata,
component_registry: component::Registry::new(), component_registry: component::Registry::new(),
is_static, is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
...@@ -194,6 +223,7 @@ impl DistributedRuntime { ...@@ -194,6 +223,7 @@ impl DistributedRuntime {
port, port,
cancel_token, cancel_token,
Arc::new(distributed_runtime.clone()), Arc::new(distributed_runtime.clone()),
distributed_runtime.discovery_metadata.clone(),
) )
.await .await
{ {
...@@ -283,7 +313,7 @@ impl DistributedRuntime { ...@@ -283,7 +313,7 @@ impl DistributedRuntime {
} }
pub fn connection_id(&self) -> u64 { pub fn connection_id(&self) -> u64 {
self.store.connection_id() self.discovery_client.instance_id()
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
......
...@@ -56,18 +56,33 @@ impl Clone for SystemStatusServerInfo { ...@@ -56,18 +56,33 @@ impl Clone for SystemStatusServerInfo {
pub struct SystemStatusState { pub struct SystemStatusState {
// global drt registry is for printing out the entire Prometheus format output // global drt registry is for printing out the entire Prometheus format output
root_drt: Arc<crate::DistributedRuntime>, root_drt: Arc<crate::DistributedRuntime>,
// Discovery metadata (only for Kubernetes backend)
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
} }
impl SystemStatusState { impl SystemStatusState {
/// Create new system status server state with the provided distributed runtime /// Create new system status server state with the provided distributed runtime
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(
Ok(Self { root_drt: drt }) drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<Self> {
Ok(Self {
root_drt: drt,
discovery_metadata,
})
} }
/// Get a reference to the distributed runtime /// Get a reference to the distributed runtime
pub fn drt(&self) -> &crate::DistributedRuntime { pub fn drt(&self) -> &crate::DistributedRuntime {
&self.root_drt &self.root_drt
} }
/// Get a reference to the discovery metadata if available
pub fn discovery_metadata(
&self,
) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
self.discovery_metadata.as_ref()
}
} }
/// Start system status server with metrics support /// Start system status server with metrics support
...@@ -76,9 +91,10 @@ pub async fn spawn_system_status_server( ...@@ -76,9 +91,10 @@ pub async fn spawn_system_status_server(
port: u16, port: u16,
cancel_token: CancellationToken, cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>, drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create system status server state with the provided distributed runtime // Create system status server state with the provided distributed runtime
let server_state = Arc::new(SystemStatusState::new(drt)?); let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
let health_path = server_state let health_path = server_state
.drt() .drt()
.system_health() .system_health()
...@@ -114,6 +130,13 @@ pub async fn spawn_system_status_server( ...@@ -114,6 +130,13 @@ pub async fn spawn_system_status_server(
move || metrics_handler(state) move || metrics_handler(state)
}), }),
) )
.route(
"/metadata",
get({
let state = Arc::clone(&server_state);
move || metadata_handler(state)
}),
)
.fallback(|| async { .fallback(|| async {
tracing::info!("[fallback handler] called"); tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response() (StatusCode::NOT_FOUND, "Route not found").into_response()
...@@ -207,6 +230,42 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { ...@@ -207,6 +230,42 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
(StatusCode::OK, response) (StatusCode::OK, response)
} }
/// Metadata handler
#[tracing::instrument(skip_all, level = "trace")]
async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Check if discovery metadata is available
let metadata = match state.discovery_metadata() {
Some(metadata) => metadata,
None => {
tracing::debug!("Metadata endpoint called but no discovery metadata available");
return (
StatusCode::NOT_FOUND,
"Discovery metadata not available".to_string(),
)
.into_response();
}
};
// Read the metadata
let metadata_guard = metadata.read().await;
// Serialize to JSON
match serde_json::to_string(&*metadata_guard) {
Ok(json) => {
tracing::trace!("Returning metadata: {} bytes", json.len());
(StatusCode::OK, json).into_response()
}
Err(e) => {
tracing::error!("Failed to serialize metadata: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize metadata".to_string(),
)
.into_response()
}
}
}
// Regular tests: cargo test system_status_server --lib // Regular tests: cargo test system_status_server --lib
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment