Unverified Commit 9a93eb75 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: Remove http temp cancel token, use real one (#6344)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 9352da7a
...@@ -24,6 +24,7 @@ pub async fn run( ...@@ -24,6 +24,7 @@ pub async fn run(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut grpc_service_builder = kserve::KserveService::builder() let mut grpc_service_builder = kserve::KserveService::builder()
.port(engine_config.local_model().http_port()) // [WIP] generalize port.. .port(engine_config.local_model().http_port()) // [WIP] generalize port..
.http_cancel_token(Some(distributed_runtime.primary_token()))
.with_request_template(engine_config.local_model().request_template()); .with_request_template(engine_config.local_model().request_template());
// Set HTTP metrics port if provided (for parallel test execution) // Set HTTP metrics port if provided (for parallel test execution)
......
...@@ -48,6 +48,8 @@ pub async fn run( ...@@ -48,6 +48,8 @@ pub async fn run(
if let Some(http_host) = local_model.http_host() { if let Some(http_host) = local_model.http_host() {
http_service_builder = http_service_builder.host(http_host); http_service_builder = http_service_builder.host(http_host);
} }
http_service_builder =
http_service_builder.cancel_token(Some(distributed_runtime.primary_token()));
http_service_builder = http_service_builder =
http_service_builder.with_request_template(engine_config.local_model().request_template()); http_service_builder.with_request_template(engine_config.local_model().request_template());
......
...@@ -177,6 +177,9 @@ pub struct KserveServiceConfig { ...@@ -177,6 +177,9 @@ pub struct KserveServiceConfig {
#[builder(setter(into), default = "String::from(\"0.0.0.0\")")] #[builder(setter(into), default = "String::from(\"0.0.0.0\")")]
http_metrics_host: String, http_metrics_host: String,
#[builder(default = "None")]
http_cancel_token: Option<CancellationToken>,
/// gRPC server tuning configuration. /// gRPC server tuning configuration.
/// Default: GrpcTuningConfig::from_env() - reads from environment variables with fallback to defaults. /// Default: GrpcTuningConfig::from_env() - reads from environment variables with fallback to defaults.
#[builder(default = "GrpcTuningConfig::from_env()")] #[builder(default = "GrpcTuningConfig::from_env()")]
...@@ -257,6 +260,7 @@ impl KserveServiceConfigBuilder { ...@@ -257,6 +260,7 @@ impl KserveServiceConfigBuilder {
let http_service = http_service::HttpService::builder() let http_service = http_service::HttpService::builder()
.port(config.http_metrics_port) .port(config.http_metrics_port)
.host(config.http_metrics_host.clone()) .host(config.http_metrics_host.clone())
.cancel_token(config.http_cancel_token)
// Disable all inference endpoints - only use for metrics/health // Disable all inference endpoints - only use for metrics/health
.enable_chat_endpoints(false) .enable_chat_endpoints(false)
.enable_cmpl_endpoints(false) .enable_cmpl_endpoints(false)
......
...@@ -208,6 +208,9 @@ pub struct HttpServiceConfig { ...@@ -208,6 +208,9 @@ pub struct HttpServiceConfig {
#[builder(default = "None")] #[builder(default = "None")]
discovery: Option<Arc<dyn Discovery>>, discovery: Option<Arc<dyn Discovery>>,
#[builder(default = "None")]
cancel_token: Option<CancellationToken>,
} }
impl HttpService { impl HttpService {
...@@ -369,22 +372,17 @@ impl HttpServiceConfigBuilder { ...@@ -369,22 +372,17 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?; let config: HttpServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
// Create a temporary cancel token for building - will be replaced in spawn/run let cancel_token = config.cancel_token.unwrap_or_default();
let temp_cancel_token = CancellationToken::new();
// Use the provided discovery client, or fall back to a no-op memory-backed one // Use the provided discovery client, or fall back to a no-op memory-backed one
// (for in-process modes that don't need discovery) // (for in-process modes that don't need discovery)
let discovery_client = config.discovery.unwrap_or_else(|| { let discovery_client = config.discovery.unwrap_or_else(|| {
use dynamo_runtime::discovery::KVStoreDiscovery; use dynamo_runtime::discovery::KVStoreDiscovery;
Arc::new(KVStoreDiscovery::new( Arc::new(KVStoreDiscovery::new(
dynamo_runtime::storage::kv::Manager::memory(), dynamo_runtime::storage::kv::Manager::memory(),
temp_cancel_token.child_token(), cancel_token.child_token(),
)) as Arc<dyn Discovery> )) as Arc<dyn Discovery>
}); });
let state = Arc::new(State::new( let state = Arc::new(State::new(model_manager, discovery_client, cancel_token));
model_manager,
discovery_client,
temp_cancel_token,
));
state state
.flags .flags
.set(&EndpointType::Chat, config.enable_chat_endpoints); .set(&EndpointType::Chat, config.enable_chat_endpoints);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment