Unverified Commit 08cb08c1 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Canary Health Check. (#2903)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent 7ce8d0ef
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
TRT-LLM-specific health check configuration.
This module defines the default health check payload for TRT-LLM backends.
"""
from dynamo.health_check import HealthCheckPayload
class TrtllmHealthCheckPayload(HealthCheckPayload):
"""
TRT-LLM-specific health check payload.
Provides TRT-LLM defaults and inherits environment override support from base class.
"""
def __init__(self):
"""
Initialize TRT-LLM health check payload with TRT-LLM-specific defaults.
"""
# Set TRT-LLM default payload - minimal request that completes quickly
self.default_payload = {
"messages": [{"role": "user", "content": "1"}],
"max_tokens": 1,
"temperature": 0.0,
"stream": False,
}
super().__init__()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
vLLM-specific health check configuration.
This module defines the default health check payload for vLLM backends.
"""
from dynamo.health_check import HealthCheckPayload
class VllmHealthCheckPayload(HealthCheckPayload):
"""
vLLM-specific health check payload.
Provides vLLM defaults and inherits environment override support from base class.
"""
def __init__(self):
"""
Initialize vLLM health check payload with vLLM-specific defaults.
"""
# Set vLLM default payload - minimal request that completes quickly
# The handler expects token_ids, sampling_options, and stop_conditions
self.default_payload = {
"token_ids": [1], # Single token for minimal processing
"sampling_options": {
"max_tokens": 1,
"temperature": 0.0,
},
"stop_conditions": {
"stop": None,
"stop_token_ids": None,
"include_stop_str_in_output": False,
"ignore_eos": False,
"min_tokens": 0,
},
}
super().__init__()
...@@ -30,6 +30,7 @@ from .args import ( ...@@ -30,6 +30,7 @@ from .args import (
parse_args, parse_args,
) )
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler from .handlers import DecodeWorkerHandler, PrefillWorkerHandler
from .health_check import VllmHealthCheckPayload
from .publisher import StatLoggerFactory from .publisher import StatLoggerFactory
configure_dynamo_logging() configure_dynamo_logging()
...@@ -151,6 +152,9 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -151,6 +152,9 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
runtime, component, engine_client, default_sampling_params runtime, component, engine_client, default_sampling_params
) )
# Get health check payload (checks env var and falls back to vLLM default)
health_check_payload = VllmHealthCheckPayload().to_dict()
try: try:
logger.debug("Starting serve_endpoint for prefill worker") logger.debug("Starting serve_endpoint for prefill worker")
await asyncio.gather( await asyncio.gather(
...@@ -162,6 +166,7 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -162,6 +166,7 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
handler.generate, handler.generate,
graceful_shutdown=True, graceful_shutdown=True,
metrics_labels=[("model", config.model)], metrics_labels=[("model", config.model)],
health_check_payload=health_check_payload,
), ),
clear_endpoint.serve_endpoint( clear_endpoint.serve_endpoint(
handler.clear_kv_blocks, metrics_labels=[("model", config.model)] handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
...@@ -263,6 +268,9 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -263,6 +268,9 @@ async def init(runtime: DistributedRuntime, config: Config):
custom_template_path=config.custom_jinja_template, custom_template_path=config.custom_jinja_template,
) )
# Get health check payload (checks env var and falls back to vLLM default)
health_check_payload = VllmHealthCheckPayload().to_dict()
try: try:
logger.debug("Starting serve_endpoint for decode worker") logger.debug("Starting serve_endpoint for decode worker")
await asyncio.gather( await asyncio.gather(
...@@ -272,6 +280,7 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -272,6 +280,7 @@ async def init(runtime: DistributedRuntime, config: Config):
handler.generate, handler.generate,
graceful_shutdown=config.migration_limit <= 0, graceful_shutdown=config.migration_limit <= 0,
metrics_labels=[("model", config.model)], metrics_labels=[("model", config.model)],
health_check_payload=health_check_payload,
), ),
clear_endpoint.serve_endpoint( clear_endpoint.serve_endpoint(
handler.clear_kv_blocks, metrics_labels=[("model", config.model)] handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
......
...@@ -817,7 +817,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ...@@ -817,7 +817,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"}, {Name: "DYN_SYSTEM_ENABLED", Value: "true"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"}, {Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: `["generate"]`}, {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"},
{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, {Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"},
{Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}, {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"},
}, },
...@@ -927,7 +927,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ...@@ -927,7 +927,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"}, {Name: "DYN_SYSTEM_ENABLED", Value: "true"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"}, {Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: `["generate"]`}, {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"},
{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"}, {Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"},
{Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"}, {Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"},
}, },
......
...@@ -1828,7 +1828,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -1828,7 +1828,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
}, },
{ {
Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Value: `["generate"]`, Value: "[\"generate\"]",
}, },
{ {
Name: "WORKER_ENV_1", Name: "WORKER_ENV_1",
...@@ -1979,7 +1979,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -1979,7 +1979,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
}, },
{ {
Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Value: `["generate"]`, Value: "[\"generate\"]",
}, },
{ {
Name: "WORKER_ENV_1", Name: "WORKER_ENV_1",
...@@ -2624,7 +2624,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -2624,7 +2624,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
}, },
{ {
Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Value: `["generate"]`, Value: "[\"generate\"]",
}, },
{ {
Name: "WORKER_ENV_1", Name: "WORKER_ENV_1",
...@@ -2763,7 +2763,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -2763,7 +2763,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
}, },
{ {
Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Value: `["generate"]`, Value: "[\"generate\"]",
}, },
{ {
Name: "WORKER_ENV_1", Name: "WORKER_ENV_1",
...@@ -4525,8 +4525,8 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) { ...@@ -4525,8 +4525,8 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) {
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-deployment"}, {Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-deployment"},
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"}, {Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"}, {Name: "DYN_SYSTEM_ENABLED", Value: "true"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"}, {Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"},
}, },
VolumeMounts: []corev1.VolumeMount{ VolumeMounts: []corev1.VolumeMount{
{ {
......
...@@ -1215,27 +1215,6 @@ dependencies = [ ...@@ -1215,27 +1215,6 @@ dependencies = [
"syn 2.0.106", "syn 2.0.106",
] ]
[[package]]
name = "derive_more"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
"unicode-xid",
]
[[package]] [[package]]
name = "dialoguer" name = "dialoguer"
version = "0.11.0" version = "0.11.0"
...@@ -1463,10 +1442,8 @@ dependencies = [ ...@@ -1463,10 +1442,8 @@ dependencies = [
"anyhow", "anyhow",
"dynamo-async-openai", "dynamo-async-openai",
"lazy_static", "lazy_static",
"num-traits",
"openai-harmony", "openai-harmony",
"regex", "regex",
"rustpython-parser",
"serde", "serde",
"serde_json", "serde_json",
"tracing", "tracing",
...@@ -1485,7 +1462,6 @@ dependencies = [ ...@@ -1485,7 +1462,6 @@ dependencies = [
"dlpark", "dlpark",
"dynamo-async-openai", "dynamo-async-openai",
"dynamo-llm", "dynamo-llm",
"dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"either", "either",
"futures", "futures",
...@@ -2238,15 +2214,6 @@ dependencies = [ ...@@ -2238,15 +2214,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "getopts"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df"
dependencies = [
"unicode-width",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.16" version = "0.2.16"
...@@ -2364,9 +2331,6 @@ name = "hashbrown" ...@@ -2364,9 +2331,6 @@ name = "hashbrown"
version = "0.14.5" version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
...@@ -2844,33 +2808,12 @@ dependencies = [ ...@@ -2844,33 +2808,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "is-macro"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d57a3e447e24c22647738e4607f1df1e0ec6f72e16182c4cd199f647cdfb0e4"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]] [[package]]
name = "is_terminal_polyfill" name = "is_terminal_polyfill"
version = "1.70.1" version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.12.1" version = "0.12.1"
...@@ -2960,12 +2903,6 @@ dependencies = [ ...@@ -2960,12 +2903,6 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "lalrpop-util"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507460a910eb7b32ee961886ff48539633b788a36b65692b95f225b844c82553"
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.5.0" version = "1.5.0"
...@@ -3103,64 +3040,6 @@ version = "0.2.2" ...@@ -3103,64 +3040,6 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30"
[[package]]
name = "malachite"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fbdf9cb251732db30a7200ebb6ae5d22fe8e11397364416617d2c2cf0c51cb5"
dependencies = [
"malachite-base",
"malachite-nz",
"malachite-q",
]
[[package]]
name = "malachite-base"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ea0ed76adf7defc1a92240b5c36d5368cfe9251640dcce5bd2d0b7c1fd87aeb"
dependencies = [
"hashbrown 0.14.5",
"itertools 0.11.0",
"libm",
"ryu",
]
[[package]]
name = "malachite-bigint"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8"
dependencies = [
"derive_more",
"malachite",
"num-integer",
"num-traits",
"paste",
]
[[package]]
name = "malachite-nz"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34a79feebb2bc9aa7762047c8e5495269a367da6b5a90a99882a0aeeac1841f7"
dependencies = [
"itertools 0.11.0",
"libm",
"malachite-base",
]
[[package]]
name = "malachite-q"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f235d5747b1256b47620f5640c2a17a88c7569eebdf27cd9cb130e1a619191"
dependencies = [
"itertools 0.11.0",
"malachite-base",
"malachite-nz",
]
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.1.0" version = "0.1.0"
...@@ -3858,44 +3737,6 @@ dependencies = [ ...@@ -3858,44 +3737,6 @@ dependencies = [
"indexmap 2.11.0", "indexmap 2.11.0",
] ]
[[package]]
name = "phf"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_codegen"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a"
dependencies = [
"phf_generator",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [
"phf_shared",
"rand 0.8.5",
]
[[package]]
name = "phf_shared"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.10" version = "1.1.10"
...@@ -4939,63 +4780,6 @@ dependencies = [ ...@@ -4939,63 +4780,6 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "rustpython-ast"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cdaf8ee5c1473b993b398c174641d3aa9da847af36e8d5eb8291930b72f31a5"
dependencies = [
"is-macro",
"malachite-bigint",
"rustpython-parser-core",
"static_assertions",
]
[[package]]
name = "rustpython-parser"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "868f724daac0caf9bd36d38caf45819905193a901e8f1c983345a68e18fb2abb"
dependencies = [
"anyhow",
"is-macro",
"itertools 0.11.0",
"lalrpop-util",
"log",
"malachite-bigint",
"num-traits",
"phf",
"phf_codegen",
"rustc-hash 1.1.0",
"rustpython-ast",
"rustpython-parser-core",
"tiny-keccak",
"unic-emoji-char",
"unic-ucd-ident",
"unicode_names2",
]
[[package]]
name = "rustpython-parser-core"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b6c12fa273825edc7bccd9a734f0ad5ba4b8a2f4da5ff7efe946f066d0f4ad"
dependencies = [
"is-macro",
"memchr",
"rustpython-parser-vendored",
]
[[package]]
name = "rustpython-parser-vendored"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04fcea49a4630a3a5d940f4d514dc4f575ed63c14c3e3ed07146634aed7f67a6"
dependencies = [
"memchr",
"once_cell",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.22" version = "1.0.22"
...@@ -5335,12 +5119,6 @@ dependencies = [ ...@@ -5335,12 +5119,6 @@ dependencies = [
"quote", "quote",
] ]
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.11" version = "0.4.11"
...@@ -5667,15 +5445,6 @@ dependencies = [ ...@@ -5667,15 +5445,6 @@ dependencies = [
"time-core", "time-core",
] ]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]] [[package]]
name = "tinystr" name = "tinystr"
version = "0.8.1" version = "0.8.1"
...@@ -6139,58 +5908,6 @@ version = "0.2.2" ...@@ -6139,58 +5908,6 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c" checksum = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c"
[[package]]
name = "unic-char-property"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8c57a407d9b6fa02b4795eb81c5b6652060a15a7903ea981f3d723e6c0be221"
dependencies = [
"unic-char-range",
]
[[package]]
name = "unic-char-range"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0398022d5f700414f6b899e10b8348231abf9173fa93144cbc1a43b9793c1fbc"
[[package]]
name = "unic-common"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d7ff825a6a654ee85a63e80f92f054f904f21e7d12da4e22f9834a4aaa35bc"
[[package]]
name = "unic-emoji-char"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b07221e68897210270a38bde4babb655869637af0f69407f96053a34f76494d"
dependencies = [
"unic-char-property",
"unic-char-range",
"unic-ucd-version",
]
[[package]]
name = "unic-ucd-ident"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e230a37c0381caa9219d67cf063aa3a375ffed5bf541a452db16e744bdab6987"
dependencies = [
"unic-char-property",
"unic-char-range",
"unic-ucd-version",
]
[[package]]
name = "unic-ucd-version"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96bd2f2237fe450fcd0a1d2f5f4e91711124f7857ba2e964247776ebeeb7b0c4"
dependencies = [
"unic-common",
]
[[package]] [[package]]
name = "unicase" name = "unicase"
version = "2.8.1" version = "2.8.1"
...@@ -6230,40 +5947,12 @@ version = "0.2.1" ...@@ -6230,40 +5947,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]] [[package]]
name = "unicode_categories" name = "unicode_categories"
version = "0.1.1" version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "unicode_names2"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1673eca9782c84de5f81b82e4109dcfb3611c8ba0d52930ec4a9478f547b2dd"
dependencies = [
"phf",
"unicode_names2_generator",
]
[[package]]
name = "unicode_names2_generator"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91e5b84611016120197efd7dc93ef76774f4e084cd73c9fb3ea4a86c570c56e"
dependencies = [
"getopts",
"log",
"phf_codegen",
"rand 0.8.5",
]
[[package]] [[package]]
name = "unindent" name = "unindent"
version = "0.2.4" version = "0.2.4"
......
...@@ -565,24 +565,51 @@ impl Component { ...@@ -565,24 +565,51 @@ impl Component {
#[pymethods] #[pymethods]
impl Endpoint { impl Endpoint {
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None))] #[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))]
fn serve_endpoint<'p>( fn serve_endpoint<'p>(
&self, &self,
py: Python<'p>, py: Python<'p>,
generator: PyObject, generator: PyObject,
graceful_shutdown: Option<bool>, graceful_shutdown: Option<bool>,
metrics_labels: Option<Vec<(String, String)>>, metrics_labels: Option<Vec<(String, String)>>,
health_check_payload: Option<&Bound<'p, PyDict>>,
) -> PyResult<Bound<'p, PyAny>> { ) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new( let engine = Arc::new(engine::PythonAsyncEngine::new(
generator, generator,
self.event_loop.clone(), self.event_loop.clone(),
)?); )?);
let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?; let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?;
let builder = self
// Convert Python dict to serde_json::Value if provided and validate it's an object
let health_payload_json = health_check_payload
.map(|dict| pythonize::depythonize::<serde_json::Value>(dict))
.transpose()
.map_err(|err| {
pyo3::exceptions::PyTypeError::new_err(format!(
"Failed to convert health_check_payload: {}",
err
))
})?;
// Require an object/dict
if let Some(ref payload) = health_payload_json {
if !payload.is_object() {
return Err(pyo3::exceptions::PyTypeError::new_err(
"health_check_payload must be a JSON object (dict)",
));
}
}
let mut builder = self
.inner .inner
.endpoint_builder() .endpoint_builder()
.metrics_labels(metrics_labels) .metrics_labels(metrics_labels)
.handler(ingress); .handler(ingress);
if let Some(payload) = health_payload_json {
builder = builder.health_check_payload(payload);
}
let graceful_shutdown = graceful_shutdown.unwrap_or(true); let graceful_shutdown = graceful_shutdown.unwrap_or(true);
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
builder builder
......
...@@ -217,7 +217,7 @@ class Endpoint: ...@@ -217,7 +217,7 @@ class Endpoint:
... ...
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None) -> None: async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None:
""" """
Serve an endpoint discoverable by all connected clients at Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}` `{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
...@@ -226,6 +226,8 @@ class Endpoint: ...@@ -226,6 +226,8 @@ class Endpoint:
handler: The request handler function handler: The request handler function
graceful_shutdown: Whether to wait for inflight requests to complete during shutdown (default: True) graceful_shutdown: Whether to wait for inflight requests to complete during shutdown (default: True)
metrics_labels: Optional list of metrics labels to add to the metrics metrics_labels: Optional list of metrics labels to add to the metrics
health_check_payload: Optional dict containing the health check request payload
that will be used to verify endpoint health
""" """
... ...
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Health check utilities for Dynamo backends.
This module provides a base class for backend-specific health check payloads.
Each backend should extend HealthCheckPayload and define its default payload.
"""
import json
import logging
import os
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
def load_health_check_from_env(
env_var: str = "DYN_HEALTH_CHECK_PAYLOAD",
) -> Optional[Dict[str, Any]]:
"""
Load health check payload from environment variable.
Supports two formats:
1. JSON string: export DYN_HEALTH_CHECK_PAYLOAD='{"prompt": "test", "max_tokens": 1}'
2. File path: export DYN_HEALTH_CHECK_PAYLOAD='@/path/to/health_check.json'
Args:
env_var: Name of the environment variable to check (default: DYN_HEALTH_CHECK_PAYLOAD)
Returns:
Dict containing the health check payload, or None if not set.
"""
env_value = os.environ.get(env_var)
if not env_value:
return None
try:
if env_value.startswith("@"):
# Load from file
file_path = env_value[1:]
with open(file_path, "r") as f:
parsed = json.load(f)
else:
# Parse as JSON
parsed = json.loads(env_value)
if not isinstance(parsed, dict):
logger.warning(
"%s must be a JSON object (dict). Got: %s",
env_var,
type(parsed).__name__,
)
return None
return parsed
except (json.JSONDecodeError, FileNotFoundError, OSError) as e:
logger.warning("Failed to parse %s: %s", env_var, e)
return None
class HealthCheckPayload:
"""
Base class for managing health check payloads.
Each backend should extend this class and set self.default_payload
in their __init__ method.
Environment variable DYN_HEALTH_CHECK_PAYLOAD can override the default.
"""
default_payload: Dict[str, Any] # Type hint for mypy - set by subclasses
def __init__(self):
"""
Initialize health check payload.
Subclasses should call super().__init__() after setting self.default_payload.
"""
if not hasattr(self, "default_payload"):
raise NotImplementedError(
"Subclass must set self.default_payload before calling super().__init__()"
)
self._payload = None
def to_dict(self) -> Dict[str, Any]:
"""
Get the health check payload as a dictionary.
Returns the environment override if DYN_HEALTH_CHECK_PAYLOAD is set,
otherwise returns the default payload.
"""
if self._payload is None:
# Check for environment override
self._payload = load_health_check_from_env() or self.default_payload
return self._payload
...@@ -37,6 +37,13 @@ pub struct EndpointConfig { ...@@ -37,6 +37,13 @@ pub struct EndpointConfig {
/// Whether to wait for inflight requests to complete during shutdown /// Whether to wait for inflight requests to complete during shutdown
#[builder(default = "true")] #[builder(default = "true")]
graceful_shutdown: bool, graceful_shutdown: bool,
/// Health check payload for this endpoint
/// This payload will be sent to the endpoint during health checks
/// to verify it's responding properly
#[educe(Debug(ignore))]
#[builder(default, setter(into, strip_option))]
health_check_payload: Option<serde_json::Value>,
} }
impl EndpointConfigBuilder { impl EndpointConfigBuilder {
...@@ -52,8 +59,15 @@ impl EndpointConfigBuilder { ...@@ -52,8 +59,15 @@ impl EndpointConfigBuilder {
} }
pub async fn start(self) -> Result<()> { pub async fn start(self) -> Result<()> {
let (endpoint, lease, handler, stats_handler, metrics_labels, graceful_shutdown) = let (
self.build_internal()?.dissolve(); endpoint,
lease,
handler,
stats_handler,
metrics_labels,
graceful_shutdown,
health_check_payload,
) = self.build_internal()?.dissolve();
let lease = lease.or(endpoint.drt().primary_lease()); let lease = lease.or(endpoint.drt().primary_lease());
let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0); let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
...@@ -115,6 +129,23 @@ impl EndpointConfigBuilder { ...@@ -115,6 +129,23 @@ impl EndpointConfigBuilder {
let etcd_path = endpoint.etcd_path_with_lease_id(lease_id); let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
let etcd_client = endpoint.component.drt.etcd_client.clone(); let etcd_client = endpoint.component.drt.etcd_client.clone();
// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
let instance = Instance {
component: component_name.clone(),
endpoint: endpoint_name.clone(),
namespace: namespace_name.clone(),
instance_id: lease_id,
transport: TransportType::NatsTcp(subject.clone()),
};
tracing::debug!(subject = %subject, "Registering endpoint health check target");
let guard = system_health.lock().unwrap();
guard.register_health_check_target(&subject, instance, health_check_payload.clone());
if let Some(notifier) = guard.get_endpoint_health_check_notifier(&subject) {
handler.set_endpoint_health_check_notifier(notifier)?;
}
}
let cancel_token = if let Some(lease) = lease.as_ref() { let cancel_token = if let Some(lease) = lease.as_ref() {
// Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs // Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
let combined_token = CancellationToken::new(); let combined_token = CancellationToken::new();
......
...@@ -21,6 +21,12 @@ const DEFAULT_SYSTEM_PORT: u16 = 9090; ...@@ -21,6 +21,12 @@ const DEFAULT_SYSTEM_PORT: u16 = 9090;
const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health"; const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live"; const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
/// Default health check configuration
/// This is the wait time before sending canary health checks when no activity is detected
pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
/// Default timeout for individual health check requests
pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig { pub struct WorkerConfig {
/// Grace shutdown period for the system server. /// Grace shutdown period for the system server.
...@@ -124,6 +130,24 @@ pub struct RuntimeConfig { ...@@ -124,6 +130,24 @@ pub struct RuntimeConfig {
#[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")] #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_live_path: String, pub system_live_path: String,
/// Enable active health checking with payloads
/// Set this at runtime with environment variable DYN_HEALTH_CHECK_ENABLED
#[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub health_check_enabled: bool,
/// Canary wait time in seconds (time to wait before sending health check when no activity)
/// Set this at runtime with environment variable DYN_CANARY_WAIT_TIME
#[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub canary_wait_time_secs: u64,
/// Health check request timeout in seconds
/// Set this at runtime with environment variable DYN_HEALTH_CHECK_REQUEST_TIMEOUT
#[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub health_check_request_timeout_secs: u64,
} }
impl fmt::Display for RuntimeConfig { impl fmt::Display for RuntimeConfig {
...@@ -150,6 +174,13 @@ impl fmt::Display for RuntimeConfig { ...@@ -150,6 +174,13 @@ impl fmt::Display for RuntimeConfig {
)?; )?;
write!(f, ", system_health_path={}", self.system_health_path)?; write!(f, ", system_health_path={}", self.system_health_path)?;
write!(f, ", system_live_path={}", self.system_live_path)?; write!(f, ", system_live_path={}", self.system_live_path)?;
write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
write!(
f,
", health_check_request_timeout_secs={}",
self.health_check_request_timeout_secs
)?;
Ok(()) Ok(())
} }
...@@ -194,6 +225,37 @@ impl RuntimeConfig { ...@@ -194,6 +225,37 @@ impl RuntimeConfig {
_ => None, _ => None,
} }
})) }))
.merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
// filters out empty environment variables
match std::env::var(&full_key) {
Ok(v) if !v.is_empty() => {
// Map DYN_HEALTH_CHECK_* to the correct field names
let mapped_key = match k.as_str() {
"ENABLED" => "health_check_enabled",
"REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
_ => k.as_str(),
};
Some(mapped_key.into())
}
_ => None,
}
}))
.merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
let full_key = format!("DYN_CANARY_{}", k.as_str());
// filters out empty environment variables
match std::env::var(&full_key) {
Ok(v) if !v.is_empty() => {
// Map DYN_CANARY_* to the correct field names
let mapped_key = match k.as_str() {
"WAIT_TIME" => "canary_wait_time_secs",
_ => k.as_str(),
};
Some(mapped_key.into())
}
_ => None,
}
}))
} }
/// Load the runtime configuration from the environment and configuration files /// Load the runtime configuration from the environment and configuration files
...@@ -205,6 +267,15 @@ impl RuntimeConfig { ...@@ -205,6 +267,15 @@ impl RuntimeConfig {
/// ///
/// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM` /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> { pub fn from_settings() -> Result<RuntimeConfig> {
// Check for deprecated environment variable
if std::env::var("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS").is_ok() {
tracing::warn!(
"DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
System health is now determined by endpoints that register with health check payloads. \
Please update your configuration to register health check payloads directly on endpoints."
);
}
let config: RuntimeConfig = Self::figment().extract()?; let config: RuntimeConfig = Self::figment().extract()?;
config.validate()?; config.validate()?;
Ok(config) Ok(config)
...@@ -227,6 +298,9 @@ impl RuntimeConfig { ...@@ -227,6 +298,9 @@ impl RuntimeConfig {
use_endpoint_health_status: vec![], use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
health_check_enabled: false,
canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
} }
} }
...@@ -256,6 +330,9 @@ impl Default for RuntimeConfig { ...@@ -256,6 +330,9 @@ impl Default for RuntimeConfig {
use_endpoint_health_status: vec![], use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(), system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(), system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
health_check_enabled: false,
canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
} }
} }
} }
......
...@@ -155,6 +155,31 @@ impl DistributedRuntime { ...@@ -155,6 +155,31 @@ impl DistributedRuntime {
); );
} }
// Start health check manager if enabled
if config.health_check_enabled {
let health_check_config = crate::health_check::HealthCheckConfig {
canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
request_timeout: std::time::Duration::from_secs(
config.health_check_request_timeout_secs,
),
};
// Start the health check manager (spawns per-endpoint monitoring tasks)
match crate::health_check::start_health_check_manager(
distributed_runtime.clone(),
Some(health_check_config),
)
.await
{
Ok(()) => tracing::info!(
"Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
config.canary_wait_time_secs,
config.health_check_request_timeout_secs
),
Err(e) => tracing::error!("Health check manager failed to start: {}", e),
}
}
Ok(distributed_runtime) Ok(distributed_runtime)
} }
......
This diff is collapsed.
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, OnceLock, Weak}, sync::{Arc, OnceLock, Weak},
time::Instant,
}; };
pub use anyhow::{ pub use anyhow::{
...@@ -24,6 +23,7 @@ pub use config::RuntimeConfig; ...@@ -24,6 +23,7 @@ pub use config::RuntimeConfig;
pub mod component; pub mod component;
pub mod discovery; pub mod discovery;
pub mod engine; pub mod engine;
pub mod health_check;
pub mod system_status_server; pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo; pub use system_status_server::SystemStatusServerInfo;
pub mod instances; pub mod instances;
...@@ -37,6 +37,7 @@ pub mod runtime; ...@@ -37,6 +37,7 @@ pub mod runtime;
pub mod service; pub mod service;
pub mod slug; pub mod slug;
pub mod storage; pub mod storage;
pub mod system_health;
pub mod traits; pub mod traits;
pub mod transports; pub mod transports;
pub mod utils; pub mod utils;
...@@ -45,6 +46,7 @@ pub mod worker; ...@@ -45,6 +46,7 @@ pub mod worker;
pub mod distributed; pub mod distributed;
pub use distributed::distributed_test_utils; pub use distributed::distributed_test_utils;
pub use futures::stream; pub use futures::stream;
pub use system_health::{HealthCheckTarget, SystemHealth};
pub use tokio_util::sync::CancellationToken; pub use tokio_util::sync::CancellationToken;
pub use worker::Worker; pub use worker::Worker;
...@@ -73,106 +75,6 @@ pub struct Runtime { ...@@ -73,106 +75,6 @@ pub struct Runtime {
graceful_shutdown_tracker: Arc<GracefulShutdownTracker>, graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
} }
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
start_time: Instant,
uptime_gauge: OnceLock<prometheus::Gauge>,
}
impl SystemHealth {
pub fn new(
starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
) -> Self {
let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status {
endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
}
SystemHealth {
system_health: starting_health_status,
endpoint_health,
use_endpoint_health_status,
health_path,
live_path,
start_time: Instant::now(),
uptime_gauge: OnceLock::new(),
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
self.system_health = status;
}
pub fn set_endpoint_health_status(&mut self, endpoint: &str, status: HealthStatus) {
self.endpoint_health.insert(endpoint.to_string(), status);
}
/// Returns the overall health status and endpoint health statuses
pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
let mut endpoints: HashMap<String, String> = HashMap::new();
for (endpoint, ready) in &self.endpoint_health {
endpoints.insert(
endpoint.clone(),
if *ready == HealthStatus::Ready {
"ready".to_string()
} else {
"notready".to_string()
},
);
}
let healthy = if !self.use_endpoint_health_status.is_empty() {
self.use_endpoint_health_status.iter().all(|endpoint| {
self.endpoint_health
.get(endpoint)
.is_some_and(|status| *status == HealthStatus::Ready)
})
} else {
self.system_health == HealthStatus::Ready
};
(healthy, endpoints)
}
/// Initialize the uptime gauge using the provided metrics registry
pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
&self,
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
distributed_runtime::UPTIME_SECONDS,
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
self.uptime_gauge
.set(gauge)
.map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
Ok(())
}
/// Get the current uptime as a Duration
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}
/// Update the uptime gauge with the current uptime value
pub fn update_uptime_gauge(&self) {
if let Some(gauge) = self.uptime_gauge.get() {
gauge.set(self.uptime().as_secs_f64());
}
}
}
/// Type alias for runtime callback functions to reduce complexity /// Type alias for runtime callback functions to reduce complexity
/// ///
/// This type represents an Arc-wrapped callback function that can be: /// This type represents an Arc-wrapped callback function that can be:
......
...@@ -8,6 +8,7 @@ pub mod egress; ...@@ -8,6 +8,7 @@ pub mod egress;
pub mod ingress; pub mod ingress;
pub mod tcp; pub mod tcp;
use crate::SystemHealth;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
use anyhow::Result; use anyhow::Result;
...@@ -272,6 +273,8 @@ struct RequestControlMessage { ...@@ -272,6 +273,8 @@ struct RequestControlMessage {
pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> { pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
segment: OnceLock<Arc<SegmentSource<Req, Resp>>>, segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
metrics: OnceLock<Arc<WorkHandlerMetrics>>, metrics: OnceLock<Arc<WorkHandlerMetrics>>,
/// Endpoint-specific notifier for health check timer resets
endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
} }
impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
...@@ -279,6 +282,7 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { ...@@ -279,6 +282,7 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
Arc::new(Self { Arc::new(Self {
segment: OnceLock::new(), segment: OnceLock::new(),
metrics: OnceLock::new(), metrics: OnceLock::new(),
endpoint_health_check_notifier: OnceLock::new(),
}) })
} }
...@@ -342,6 +346,15 @@ pub trait PushWorkHandler: Send + Sync { ...@@ -342,6 +346,15 @@ pub trait PushWorkHandler: Send + Sync {
endpoint: &crate::component::Endpoint, endpoint: &crate::component::Endpoint,
metrics_labels: Option<&[(&str, &str)]>, metrics_labels: Option<&[(&str, &str)]>,
) -> Result<()>; ) -> Result<()>;
/// Set the endpoint-specific notifier for health check timer resets
fn set_endpoint_health_check_notifier(
&self,
_notifier: Arc<tokio::sync::Notify>,
) -> Result<()> {
// Default implementation for backwards compatibility
Ok(())
}
} }
/* /*
......
...@@ -126,6 +126,14 @@ where ...@@ -126,6 +126,14 @@ where
Ingress::add_metrics(self, endpoint, metrics_labels) Ingress::add_metrics(self, endpoint, metrics_labels)
} }
fn set_endpoint_health_check_notifier(&self, notifier: Arc<tokio::sync::Notify>) -> Result<()> {
use crate::pipeline::network::Ingress;
self.endpoint_health_check_notifier
.set(notifier)
.map_err(|_| anyhow::anyhow!("Endpoint health check notifier already set"))?;
Ok(())
}
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> { async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> {
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
...@@ -306,6 +314,11 @@ where ...@@ -306,6 +314,11 @@ where
.inc(); .inc();
} }
} }
// Notify the health check manager that the stream has finished.
// This resets the timer, delaying the next canary health check.
if let Some(notifier) = self.endpoint_health_check_notifier.get() {
notifier.notify_one();
}
} }
// Ensure the metrics guard is not dropped until the end of the function. // Ensure the metrics guard is not dropped until the end of the function.
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! System health monitoring and health check management
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
time::Instant,
};
use tokio::sync::mpsc;
use crate::component;
use crate::config::HealthStatus;
use crate::metrics::prometheus_names::distributed_runtime;
/// Health check target containing instance info and payload
#[derive(Clone, Debug)]
pub struct HealthCheckTarget {
pub instance: component::Instance,
pub payload: serde_json::Value,
}
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: Arc<std::sync::RwLock<HashMap<String, HealthStatus>>>,
/// Maps endpoint subject to health check target (instance + payload)
health_check_targets: Arc<std::sync::RwLock<HashMap<String, HealthCheckTarget>>>,
/// Maps endpoint subject to its specific health check notifier
health_check_notifiers: Arc<std::sync::RwLock<HashMap<String, Arc<tokio::sync::Notify>>>>,
/// Channel for new endpoint registrations
/// This solves the race condition where HealthCheckManager starts before endpoints are registered
/// Using a channel ensures no registrations are lost.
new_endpoint_tx: mpsc::UnboundedSender<String>,
new_endpoint_rx: Arc<std::sync::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
start_time: Instant,
uptime_gauge: OnceLock<prometheus::Gauge>,
}
impl SystemHealth {
pub fn new(
starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
) -> Self {
let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status {
endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
}
// Create the channel for endpoint registration notifications
let (tx, rx) = mpsc::unbounded_channel();
SystemHealth {
system_health: starting_health_status,
endpoint_health: Arc::new(std::sync::RwLock::new(endpoint_health)),
health_check_targets: Arc::new(std::sync::RwLock::new(HashMap::new())),
health_check_notifiers: Arc::new(std::sync::RwLock::new(HashMap::new())),
new_endpoint_tx: tx,
new_endpoint_rx: Arc::new(std::sync::Mutex::new(Some(rx))),
use_endpoint_health_status,
health_path,
live_path,
start_time: Instant::now(),
uptime_gauge: OnceLock::new(),
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
self.system_health = status;
}
pub fn set_endpoint_health_status(&self, endpoint: &str, status: HealthStatus) {
let mut endpoint_health = self.endpoint_health.write().unwrap();
endpoint_health.insert(endpoint.to_string(), status);
}
/// Returns the overall health status and endpoint health statuses
/// System health is determined by ALL endpoints that have registered health checks
pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
let health_check_targets = self.health_check_targets.read().unwrap();
let endpoint_health = self.endpoint_health.read().unwrap();
let mut endpoints: HashMap<String, String> = HashMap::new();
for (endpoint, status) in endpoint_health.iter() {
endpoints.insert(
endpoint.clone(),
if *status == HealthStatus::Ready {
"ready".to_string()
} else {
"notready".to_string()
},
);
}
let healthy = if !self.use_endpoint_health_status.is_empty() {
self.use_endpoint_health_status.iter().all(|endpoint| {
endpoint_health
.get(endpoint)
.is_some_and(|status| *status == HealthStatus::Ready)
})
} else {
// If we have registered health check targets, use them to determine health
if !health_check_targets.is_empty() {
health_check_targets
.iter()
.all(|(endpoint_subject, _target)| {
endpoint_health
.get(endpoint_subject)
.is_some_and(|status| *status == HealthStatus::Ready)
})
} else {
// No health check targets registered, use simple system health
self.system_health == HealthStatus::Ready
}
};
(healthy, endpoints)
}
/// Register a health check target for an endpoint
pub fn register_health_check_target(
&self,
endpoint_subject: &str,
instance: component::Instance,
payload: serde_json::Value,
) {
let key = endpoint_subject.to_owned();
// Atomically check+insert under a single write lock to avoid races.
let inserted = {
let mut targets = self.health_check_targets.write().unwrap();
match targets.entry(key.clone()) {
std::collections::hash_map::Entry::Occupied(_) => false,
std::collections::hash_map::Entry::Vacant(v) => {
v.insert(HealthCheckTarget { instance, payload });
true
}
}
};
if !inserted {
tracing::warn!(
"Attempted to re-register health check for endpoint '{}'; ignoring.",
key
);
return;
}
// Create and store a unique notifier for this endpoint (idempotent).
{
let mut notifiers = self.health_check_notifiers.write().unwrap();
notifiers
.entry(key.clone())
.or_insert_with(|| Arc::new(tokio::sync::Notify::new()));
}
// Initialize endpoint health status conservatively to NotReady.
{
let mut endpoint_health = self.endpoint_health.write().unwrap();
endpoint_health
.entry(key.clone())
.or_insert(HealthStatus::NotReady);
}
if let Err(e) = self.new_endpoint_tx.send(key.clone()) {
tracing::error!(
"Failed to send endpoint '{}' registration to health check manager: {}. \
Health checks will not be performed for this endpoint.",
key,
e
);
}
}
/// Get all health check targets
pub fn get_health_check_targets(&self) -> Vec<(String, HealthCheckTarget)> {
let targets = self.health_check_targets.read().unwrap();
targets
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
/// Check if any health check targets are registered
pub fn has_health_check_targets(&self) -> bool {
let targets = self.health_check_targets.read().unwrap();
!targets.is_empty()
}
/// Get list of endpoints with health check targets
pub fn get_health_check_endpoints(&self) -> Vec<String> {
let targets = self.health_check_targets.read().unwrap();
targets.keys().cloned().collect()
}
/// Get health check target for a specific endpoint
pub fn get_health_check_target(&self, endpoint: &str) -> Option<HealthCheckTarget> {
let targets = self.health_check_targets.read().unwrap();
targets.get(endpoint).cloned()
}
/// Get the endpoint health status (Ready/NotReady)
pub fn get_endpoint_health_status(&self, endpoint: &str) -> Option<HealthStatus> {
let endpoint_health = self.endpoint_health.read().unwrap();
endpoint_health.get(endpoint).cloned()
}
/// Get the endpoint-specific health check notifier
pub fn get_endpoint_health_check_notifier(
&self,
endpoint_subject: &str,
) -> Option<Arc<tokio::sync::Notify>> {
let notifiers = self.health_check_notifiers.read().unwrap();
notifiers.get(endpoint_subject).cloned()
}
/// Take the receiver for new endpoint registrations (can only be called once)
/// This is used by HealthCheckManager to receive notifications of new endpoints
pub fn take_new_endpoint_receiver(&self) -> Option<mpsc::UnboundedReceiver<String>> {
self.new_endpoint_rx.lock().unwrap().take()
}
/// Initialize the uptime gauge using the provided metrics registry
pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
&self,
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
distributed_runtime::UPTIME_SECONDS,
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
self.uptime_gauge
.set(gauge)
.map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
Ok(())
}
/// Get the current uptime as a Duration
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}
/// Update the uptime gauge with the current uptime value
pub fn update_uptime_gauge(&self) {
if let Some(gauge) = self.uptime_gauge.get() {
gauge.set(self.uptime().as_secs_f64());
}
}
/// Get the health check path
pub fn health_path(&self) -> &str {
&self.health_path
}
/// Get the liveness check path
pub fn live_path(&self) -> &str {
&self.live_path
}
}
...@@ -84,15 +84,15 @@ pub async fn spawn_system_status_server( ...@@ -84,15 +84,15 @@ pub async fn spawn_system_status_server(
.system_health .system_health
.lock() .lock()
.unwrap() .unwrap()
.health_path .health_path()
.clone(); .to_string();
let live_path = server_state let live_path = server_state
.drt() .drt()
.system_health .system_health
.lock() .lock()
.unwrap() .unwrap()
.live_path .live_path()
.clone(); .to_string();
let app = Router::new() let app = Router::new()
.route( .route(
...@@ -156,9 +156,10 @@ pub async fn spawn_system_status_server( ...@@ -156,9 +156,10 @@ pub async fn spawn_system_status_server(
Ok((actual_address, handle)) Ok((actual_address, handle))
} }
/// Health handler /// Health handler with optional active health checking
#[tracing::instrument(skip_all, level = "trace")] #[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Get basic health status
let system_health = state.drt().system_health.lock().unwrap(); let system_health = state.drt().system_health.lock().unwrap();
let (healthy, endpoints) = system_health.get_health_status(); let (healthy, endpoints) = system_health.get_health_status();
let uptime = Some(system_health.uptime()); let uptime = Some(system_health.uptime());
...@@ -173,7 +174,7 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { ...@@ -173,7 +174,7 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let response = json!({ let response = json!({
"status": healthy_string, "status": healthy_string,
"uptime": uptime, "uptime": uptime,
"endpoints": endpoints "endpoints": endpoints,
}); });
tracing::trace!("Response {}", response.to_string()); tracing::trace!("Response {}", response.to_string());
...@@ -608,7 +609,8 @@ mod integration_tests { ...@@ -608,7 +609,8 @@ mod integration_tests {
// Create the ingress and start the endpoint service // Create the ingress and start the endpoint service
let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap(); let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
// Start the service and endpoint // Start the service and endpoint with a health check payload
// This will automatically register the endpoint for health monitoring
tokio::spawn(async move { tokio::spawn(async move {
let _ = component let _ = component
.service_builder() .service_builder()
...@@ -618,6 +620,9 @@ mod integration_tests { ...@@ -618,6 +620,9 @@ mod integration_tests {
.endpoint(ENDPOINT_NAME) .endpoint(ENDPOINT_NAME)
.endpoint_builder() .endpoint_builder()
.handler(ingress) .handler(ingress)
.health_check_payload(serde_json::json!({
"test": "health_check"
}))
.start() .start()
.await; .await;
}); });
...@@ -702,4 +707,105 @@ mod integration_tests { ...@@ -702,4 +707,105 @@ mod integration_tests {
) )
.await; .await;
} }
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_health_check_with_payload_and_timeout() {
// Test the complete health check flow with the new canary-based system:
crate::logging::init();
temp_env::async_with_vars(
[
("DYN_SYSTEM_ENABLED", Some("true")),
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
(
"DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Some("[\"test.endpoint\"]"),
),
// Enable health check with short intervals for testing
("DYN_HEALTH_CHECK_ENABLED", Some("true")),
("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
("RUST_LOG", Some("info")), // Enable logging for test
],
async {
let drt = Arc::new(create_test_drt_async().await);
// Get system status server info
let system_info = drt
.system_status_server_info()
.expect("System status server should be started");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
// Register an endpoint with health check payload
let endpoint = "test.endpoint";
let health_check_payload = serde_json::json!({
"prompt": "health check test",
"_health_check": true
});
// Register the endpoint and its health check payload
{
let system_health = drt.system_health.lock().unwrap();
system_health.register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "health".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 1,
transport: crate::component::TransportType::NatsTcp(
endpoint.to_string(),
),
},
health_check_payload.clone(),
);
}
// Check initial health - should be ready (default state)
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 503, "Should be unhealthy initially (default state)");
assert!(
body.contains("\"status\":\"notready\""),
"Should show notready status initially"
);
// Set endpoint to healthy state
drt.system_health
.lock()
.unwrap()
.set_endpoint_health_status(endpoint, HealthStatus::Ready);
// Check health again - should now be healthy
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 200, "Should be healthy due to recent response");
assert!(
body.contains("\"status\":\"ready\""),
"Should show ready status after response"
);
// Verify the endpoint status in SystemHealth directly
let endpoint_status = drt
.system_health
.lock()
.unwrap()
.get_endpoint_health_status(endpoint);
assert_eq!(
endpoint_status,
Some(HealthStatus::Ready),
"SystemHealth should show endpoint as Ready after response"
);
},
)
.await;
}
} }
...@@ -76,10 +76,10 @@ spec: ...@@ -76,10 +76,10 @@ spec:
envs: envs:
- name: DYN_SYSTEM_ENABLED - name: DYN_SYSTEM_ENABLED
value: "true" value: "true"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
- name: DYN_SYSTEM_PORT - name: DYN_SYSTEM_PORT
value: "9090" value: "9090"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
startupProbe: startupProbe:
......
...@@ -76,10 +76,10 @@ spec: ...@@ -76,10 +76,10 @@ spec:
envs: envs:
- name: DYN_SYSTEM_ENABLED - name: DYN_SYSTEM_ENABLED
value: "true" value: "true"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
- name: DYN_SYSTEM_PORT - name: DYN_SYSTEM_PORT
value: "9090" value: "9090"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
startupProbe: startupProbe:
...@@ -126,10 +126,10 @@ spec: ...@@ -126,10 +126,10 @@ spec:
envs: envs:
- name: DYN_SYSTEM_ENABLED - name: DYN_SYSTEM_ENABLED
value: "true" value: "true"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
- name: DYN_SYSTEM_PORT - name: DYN_SYSTEM_PORT
value: "9090" value: "9090"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
startupProbe: startupProbe:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment