Unverified Commit 7da510cf authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

chore: many bug fixes and improvements when testing planner (#2776)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarhongkuan <hongkuanz@nvidia.com>
parent 945a5729
...@@ -15,7 +15,7 @@ spec: ...@@ -15,7 +15,7 @@ spec:
app: nixl-benchmark app: nixl-benchmark
spec: spec:
imagePullSecrets: imagePullSecrets:
- name: nvcrimagepullsecret - name: nvcr-imagepullsecret
containers: containers:
- name: nixl-benchmark - name: nixl-benchmark
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:nixlbench-e42c07a8 image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:nixlbench-e42c07a8
......
...@@ -51,7 +51,7 @@ def main(args): ...@@ -51,7 +51,7 @@ def main(args):
isl, osl = get_isl_osl(t_req) isl, osl = get_isl_osl(t_req)
output_data.append( output_data.append(
{ {
"timestamp": t_req * 1000, # in ms "timestamp": int(t_req * 1000), # in ms, integer
"input_length": isl, "input_length": isl,
"output_length": osl, "output_length": osl,
"hash_ids": np.random.choice( "hash_ids": np.random.choice(
......
...@@ -111,19 +111,24 @@ spec: ...@@ -111,19 +111,24 @@ spec:
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1 image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command: command:
- /bin/sh - python3
- -c
args: args:
- >- - -m
python3 -m dynamo.sglang - dynamo.sglang
--model-path Qwen/Qwen3-0.6B - --model-path
--served-model-name Qwen/Qwen3-0.6B - Qwen/Qwen3-0.6B
--page-size 16 - --served-model-name
--tp 1 - Qwen/Qwen3-0.6B
--trust-remote-code - --page-size
--skip-tokenizer-init - "16"
--disaggregation-mode decode - --tp
--disaggregation-transfer-backend nixl - "1"
- --trust-remote-code
- --skip-tokenizer-init
- --disaggregation-mode
- decode
- --disaggregation-transfer-backend
- nixl
SGLangPrefillWorker: SGLangPrefillWorker:
dynamoNamespace: dynamo dynamoNamespace: dynamo
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
...@@ -137,16 +142,21 @@ spec: ...@@ -137,16 +142,21 @@ spec:
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1 image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0811-1
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command: command:
- /bin/sh - python3
- -c
args: args:
- >- - -m
python3 -m dynamo.sglang - dynamo.sglang
--model-path Qwen/Qwen3-0.6B - --model-path
--served-model-name Qwen/Qwen3-0.6B - Qwen/Qwen3-0.6B
--page-size 16 - --served-model-name
--tp 1 - Qwen/Qwen3-0.6B
--trust-remote-code - --page-size
--skip-tokenizer-init - "16"
--disaggregation-mode prefill - --tp
--disaggregation-transfer-backend nixl - "1"
- --trust-remote-code
- --skip-tokenizer-init
- --disaggregation-mode
- prefill
- --disaggregation-transfer-backend
- nixl
...@@ -117,7 +117,9 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -117,7 +117,9 @@ async def init(runtime: DistributedRuntime, config: Config):
# Requests queue until ready_event is set # Requests queue until ready_event is set
await asyncio.gather( await asyncio.gather(
generate_endpoint.serve_endpoint( generate_endpoint.serve_endpoint(
handler.generate, graceful_shutdown=False, metrics_labels=metrics_labels handler.generate,
graceful_shutdown == config.migration_limit <= 0,
metrics_labels=metrics_labels,
), ),
register_model(), register_model(),
) )
......
...@@ -121,10 +121,12 @@ spec: ...@@ -121,10 +121,12 @@ spec:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1 image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - python3
- -c
args: args:
- "python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --migration-limit=3" - -m
- dynamo.vllm
- --model
- Qwen/Qwen3-0.6B
VllmPrefillWorker: VllmPrefillWorker:
dynamoNamespace: vllm-disagg-planner dynamoNamespace: vllm-disagg-planner
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
...@@ -144,7 +146,10 @@ spec: ...@@ -144,7 +146,10 @@ spec:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1 image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.4.1
workingDir: /workspace/components/backends/vllm workingDir: /workspace/components/backends/vllm
command: command:
- /bin/sh - python3
- -c
args: args:
- python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker --migration-limit=3 - -m
- dynamo.vllm
- --model
- Qwen/Qwen3-0.6B
- --is-prefill-worker
...@@ -84,8 +84,12 @@ async def worker(runtime: DistributedRuntime): ...@@ -84,8 +84,12 @@ async def worker(runtime: DistributedRuntime):
if config.is_prefill_worker: if config.is_prefill_worker:
await init_prefill(runtime, config) await init_prefill(runtime, config)
logger.debug("init_prefill completed")
else: else:
await init(runtime, config) await init(runtime, config)
logger.debug("init completed")
logger.debug("Worker function completed, exiting...")
def setup_vllm_engine(config, stat_logger=None): def setup_vllm_engine(config, stat_logger=None):
...@@ -147,6 +151,7 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -147,6 +151,7 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
) )
try: try:
logger.debug("Starting serve_endpoint for prefill worker")
await asyncio.gather( await asyncio.gather(
# for prefill, we want to shutdown the engine after all prefill requests are finished because # for prefill, we want to shutdown the engine after all prefill requests are finished because
# (temp reason): we don't support re-routing prefill requests # (temp reason): we don't support re-routing prefill requests
...@@ -161,10 +166,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): ...@@ -161,10 +166,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
handler.clear_kv_blocks, metrics_labels=[("model", config.model)] handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
), ),
) )
logger.debug("serve_endpoint completed for prefill worker")
except Exception as e: except Exception as e:
logger.error(f"Failed to serve endpoints: {e}") logger.error(f"Failed to serve endpoints: {e}")
raise raise
finally: finally:
logger.debug("Cleaning up prefill worker")
handler.cleanup() handler.cleanup()
...@@ -254,22 +261,25 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -254,22 +261,25 @@ async def init(runtime: DistributedRuntime, config: Config):
) )
try: try:
logger.debug("Starting serve_endpoint for decode worker")
await asyncio.gather( await asyncio.gather(
# for decode, we want to transfer the in-flight requests to other decode engines, # for decode, we want to transfer the in-flight requests to other decode engines,
# because waiting them to finish can take a long time for long OSLs # because waiting them to finish can take a long time for long OSLs
generate_endpoint.serve_endpoint( generate_endpoint.serve_endpoint(
handler.generate, handler.generate,
graceful_shutdown=False, graceful_shutdown=config.migration_limit <= 0,
metrics_labels=[("model", config.model)], metrics_labels=[("model", config.model)],
), ),
clear_endpoint.serve_endpoint( clear_endpoint.serve_endpoint(
handler.clear_kv_blocks, metrics_labels=[("model", config.model)] handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
), ),
) )
logger.debug("serve_endpoint completed for decode worker")
except Exception as e: except Exception as e:
logger.error(f"Failed to serve endpoints: {e}") logger.error(f"Failed to serve endpoints: {e}")
raise raise
finally: finally:
logger.debug("Cleaning up decode worker")
# Cleanup background tasks # Cleanup background tasks
handler.cleanup() handler.cleanup()
......
...@@ -42,10 +42,14 @@ class BasePredictor(ABC): ...@@ -42,10 +42,14 @@ class BasePredictor(ABC):
def add_data_point(self, value): def add_data_point(self, value):
"""Add new data point to the buffer""" """Add new data point to the buffer"""
if not math.isnan(value): if math.isnan(value):
self.data_buffer.append(value) value = 0
if len(self.data_buffer) == 0 and value == 0:
# skip the beginning idle period
return
else: else:
self.data_buffer.append(0) self.data_buffer.append(value)
def get_last_value(self): def get_last_value(self):
"""Get the last value from the buffer""" """Get the last value from the buffer"""
...@@ -126,6 +130,10 @@ class ProphetPredictor(BasePredictor): ...@@ -126,6 +130,10 @@ class ProphetPredictor(BasePredictor):
# Use proper datetime for Prophet # Use proper datetime for Prophet
timestamp = self.start_date + timedelta(seconds=self.curr_step) timestamp = self.start_date + timedelta(seconds=self.curr_step)
value = 0 if math.isnan(value) else value value = 0 if math.isnan(value) else value
if len(self.data_buffer) == 0 and value == 0:
# skip the beginning idle period
return
self.data_buffer.append({"ds": timestamp, "y": value}) self.data_buffer.append({"ds": timestamp, "y": value})
self.curr_step += 1 self.curr_step += 1
......
...@@ -259,18 +259,24 @@ class Planner: ...@@ -259,18 +259,24 @@ class Planner:
# compute how many replicas are needed for prefill # compute how many replicas are needed for prefill
# here we assume the prefill bias is purely due to request queueing # here we assume the prefill bias is purely due to request queueing
# and we increase the number of prefill replicas linearly to account for the queueing delay # and we increase the number of prefill replicas linearly to account for the queueing delay
pred_prefill_load_per_gpu = ( pred_prefill_throughput = (
next_num_req next_num_req
* next_isl * next_isl
/ self.args.adjustment_interval / self.args.adjustment_interval
* min(1, self.p_correction_factor) * min(1, self.p_correction_factor)
) )
next_num_p = math.ceil( next_num_p = math.ceil(
pred_prefill_load_per_gpu pred_prefill_throughput
/ self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl) / self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl)
/ self.args.prefill_engine_num_gpu / self.args.prefill_engine_num_gpu
) )
logger.info(
f"Prefill calculation: {pred_prefill_throughput:.2f}(p_thpt) / "
f"{self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl) * self.args.prefill_engine_num_gpu:.2f}(p_engine_cap) = "
f"{next_num_p}(num_p)"
)
# compute how many replicas are needed for decode # compute how many replicas are needed for decode
# 1. apply d_correction_factor to the ITL SLA # 1. apply d_correction_factor to the ITL SLA
# Prevent divide by zero when d_correction_factor is 0 (no metrics yet) # Prevent divide by zero when d_correction_factor is 0 (no metrics yet)
...@@ -290,14 +296,19 @@ class Planner: ...@@ -290,14 +296,19 @@ class Planner:
itl=corrected_itl, context_length=next_isl + next_osl / 2 itl=corrected_itl, context_length=next_isl + next_osl / 2
) )
# 3. compute number of decode replicas needed # 3. compute number of decode replicas needed
pred_decode_throughput = next_num_req * next_osl / self.args.adjustment_interval
next_num_d = math.ceil( next_num_d = math.ceil(
next_num_req pred_decode_throughput
* next_osl
/ self.args.adjustment_interval
/ pred_decode_thpt_per_gpu / pred_decode_thpt_per_gpu
/ self.args.decode_engine_num_gpu / self.args.decode_engine_num_gpu
) )
logger.info(
f"Decode calculation: {pred_decode_throughput:.2f}(d_thpt) / "
f"{pred_decode_thpt_per_gpu * self.args.decode_engine_num_gpu:.2f}(d_engine_cap) = "
f"{next_num_d}(num_d)"
)
# correct num_p and num_d based on the gpu budget # correct num_p and num_d based on the gpu budget
next_num_p = max(next_num_p, self.args.min_endpoint) next_num_p = max(next_num_p, self.args.min_endpoint)
next_num_d = max(next_num_d, self.args.min_endpoint) next_num_d = max(next_num_d, self.args.min_endpoint)
......
...@@ -176,46 +176,6 @@ impl ModelWatcher { ...@@ -176,46 +176,6 @@ impl ModelWatcher {
.await .await
.with_context(|| model_name.clone())?; .with_context(|| model_name.clone())?;
if !active_instances.is_empty() { if !active_instances.is_empty() {
let mut update_tx = true;
let mut model_type: ModelType = model_entry.model_type;
if model_entry.model_type == ModelType::Chat
&& self.manager.list_chat_completions_models().is_empty()
{
self.manager.remove_chat_completions_model(&model_name).ok();
model_type = ModelType::Chat;
} else if model_entry.model_type == ModelType::Completion
&& self.manager.list_completions_models().is_empty()
{
self.manager.remove_completions_model(&model_name).ok();
model_type = ModelType::Completion;
} else if model_entry.model_type == ModelType::Embedding
&& self.manager.list_embeddings_models().is_empty()
{
self.manager.remove_embeddings_model(&model_name).ok();
model_type = ModelType::Embedding;
} else if model_entry.model_type == ModelType::Backend {
if self.manager.list_chat_completions_models().is_empty() {
self.manager.remove_chat_completions_model(&model_name).ok();
model_type = ModelType::Chat;
}
if self.manager.list_completions_models().is_empty() {
self.manager.remove_completions_model(&model_name).ok();
if model_type == ModelType::Chat {
model_type = ModelType::Backend;
} else {
model_type = ModelType::Completion;
}
}
} else {
tracing::debug!(
"Model {} is still active in other instances, not removing",
model_name
);
update_tx = false;
}
if update_tx && let Some(tx) = &self.model_update_tx {
tx.send(ModelUpdate::Removed(model_type)).await.ok();
}
return Ok(None); return Ok(None);
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
// limitations under the License. // limitations under the License.
use derive_getters::Dissolve; use derive_getters::Dissolve;
use tokio_util::sync::CancellationToken;
use super::*; use super::*;
...@@ -114,9 +115,52 @@ impl EndpointConfigBuilder { ...@@ -114,9 +115,52 @@ impl EndpointConfigBuilder {
.await .await
.map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;
let cancel_token = lease // Create a token that responds to both runtime shutdown and lease expiration
.map(|l| l.child_token()) let runtime_shutdown_token = endpoint.drt().child_token();
.unwrap_or_else(|| endpoint.drt().child_token());
// Extract all values needed from endpoint before any spawns
let namespace_name = endpoint.component.namespace.name.clone();
let component_name = endpoint.component.name.clone();
let endpoint_name = endpoint.name.clone();
let system_health = endpoint.drt().system_health.clone();
let subject = endpoint.subject_to(lease_id);
let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
let etcd_client = endpoint.component.drt.etcd_client.clone();
let cancel_token = if let Some(lease) = lease.as_ref() {
// Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
let combined_token = CancellationToken::new();
let combined_for_select = combined_token.clone();
let lease_token = lease.child_token();
// Use secondary runtime for this lightweight monitoring task
endpoint.drt().runtime().secondary().spawn(async move {
tokio::select! {
_ = lease_token.cancelled() => {
tracing::trace!("Lease cancelled, triggering endpoint shutdown");
}
_ = runtime_shutdown_token.cancelled() => {
tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
}
}
combined_for_select.cancel();
});
combined_token
} else {
// No lease, just use runtime shutdown token
runtime_shutdown_token
};
// Register with graceful shutdown tracker if needed
if graceful_shutdown {
tracing::debug!(
"Registering endpoint '{}' with graceful shutdown tracker",
endpoint.name
);
let tracker = endpoint.drt().graceful_shutdown_tracker();
tracker.register_endpoint();
} else {
tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
}
let push_endpoint = PushEndpoint::builder() let push_endpoint = PushEndpoint::builder()
.service_handler(handler) .service_handler(handler)
...@@ -126,35 +170,54 @@ impl EndpointConfigBuilder { ...@@ -126,35 +170,54 @@ impl EndpointConfigBuilder {
.map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;
// launch in primary runtime // launch in primary runtime
let task = tokio::spawn(push_endpoint.start( let tracker_clone = if graceful_shutdown {
Some(endpoint.drt().graceful_shutdown_tracker())
} else {
None
};
// Create clones for the async closure
let namespace_name_for_task = namespace_name.clone();
let component_name_for_task = component_name.clone();
let endpoint_name_for_task = endpoint_name.clone();
let task = tokio::spawn(async move {
let result = push_endpoint
.start(
service_endpoint, service_endpoint,
endpoint.component.namespace.name.clone(), namespace_name_for_task,
endpoint.component.name.clone(), component_name_for_task,
endpoint.name.clone(), endpoint_name_for_task,
lease_id, lease_id,
endpoint.drt().system_health.clone(), system_health,
)); )
.await;
// Unregister from graceful shutdown tracker
if let Some(tracker) = tracker_clone {
tracing::debug!("Unregistering endpoint from graceful shutdown tracker");
tracker.unregister_endpoint();
}
result
});
// make the components service endpoint discovery in etcd // make the components service endpoint discovery in etcd
// client.register_service() // client.register_service()
let info = Instance { let info = Instance {
component: endpoint.component.name.clone(), component: component_name,
endpoint: endpoint.name.clone(), endpoint: endpoint_name,
namespace: endpoint.component.namespace.name.clone(), namespace: namespace_name,
instance_id: lease_id, instance_id: lease_id,
transport: TransportType::NatsTcp(endpoint.subject_to(lease_id)), transport: TransportType::NatsTcp(subject),
}; };
let info = serde_json::to_vec_pretty(&info)?; let info = serde_json::to_vec_pretty(&info)?;
if let Some(etcd_client) = &endpoint.component.drt.etcd_client if let Some(etcd_client) = &etcd_client
&& let Err(e) = etcd_client && let Err(e) = etcd_client
.kv_create( .kv_create(&etcd_path, info, Some(lease_id))
&endpoint.etcd_path_with_lease_id(lease_id),
info,
Some(lease_id),
)
.await .await
{ {
tracing::error!("Failed to register discoverable service: {:?}", e); tracing::error!("Failed to register discoverable service: {:?}", e);
......
...@@ -24,6 +24,7 @@ use crate::{ ...@@ -24,6 +24,7 @@ use crate::{
transports::{etcd, nats, tcp}, transports::{etcd, nats, tcp},
}; };
use super::utils::GracefulShutdownTracker;
use super::{Arc, DistributedRuntime, OK, OnceCell, Result, Runtime, SystemHealth, Weak, error}; use super::{Arc, DistributedRuntime, OK, OnceCell, Result, Runtime, SystemHealth, Weak, error};
use std::sync::OnceLock; use std::sync::OnceLock;
...@@ -260,6 +261,10 @@ impl DistributedRuntime { ...@@ -260,6 +261,10 @@ impl DistributedRuntime {
self.runtime.child_token() self.runtime.child_token()
} }
pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
self.runtime.graceful_shutdown_tracker()
}
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> { pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone() self.instance_sources.clone()
} }
......
...@@ -23,7 +23,6 @@ use std::{ ...@@ -23,7 +23,6 @@ use std::{
sync::{Arc, OnceLock, Weak}, sync::{Arc, OnceLock, Weak},
time::Instant, time::Instant,
}; };
use tokio::sync::Mutex;
pub use anyhow::{ pub use anyhow::{
Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise, Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
...@@ -64,6 +63,7 @@ pub use worker::Worker; ...@@ -64,6 +63,7 @@ pub use worker::Worker;
use crate::metrics::prometheus_names::distributed_runtime; use crate::metrics::prometheus_names::distributed_runtime;
use component::{Endpoint, InstanceSource}; use component::{Endpoint, InstanceSource};
use utils::GracefulShutdownTracker;
use config::HealthStatus; use config::HealthStatus;
...@@ -81,6 +81,8 @@ pub struct Runtime { ...@@ -81,6 +81,8 @@ pub struct Runtime {
primary: RuntimeType, primary: RuntimeType,
secondary: RuntimeType, secondary: RuntimeType,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
endpoint_shutdown_token: CancellationToken,
graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
} }
/// Current Health Status /// Current Health Status
...@@ -271,7 +273,7 @@ pub struct DistributedRuntime { ...@@ -271,7 +273,7 @@ pub struct DistributedRuntime {
// startup. Will not start etcd. // startup. Will not start etcd.
is_static: bool, is_static: bool,
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>, instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Health Status // Health Status
system_health: Arc<std::sync::Mutex<SystemHealth>>, system_health: Arc<std::sync::Mutex<SystemHealth>>,
......
...@@ -66,7 +66,7 @@ impl PushEndpoint { ...@@ -66,7 +66,7 @@ impl PushEndpoint {
// process shutdown // process shutdown
_ = self.cancellation_token.cancelled() => { _ = self.cancellation_token.cancelled() => {
tracing::info!("Shutting down service"); tracing::info!("PushEndpoint received cancellation signal, shutting down service");
if let Err(e) = endpoint.stop().await { if let Err(e) = endpoint.stop().await {
tracing::warn!("Failed to stop NATS service: {:?}", e); tracing::warn!("Failed to stop NATS service: {:?}", e);
} }
......
...@@ -25,13 +25,14 @@ ...@@ -25,13 +25,14 @@
//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is //! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
//! private; however, for now we are exposing most objects as fully public while the API is maturing. //! private; however, for now we are exposing most objects as fully public while the API is maturing.
use super::utils::GracefulShutdownTracker;
use super::{Result, Runtime, RuntimeType, error}; use super::{Result, Runtime, RuntimeType, error};
use crate::config::{self, RuntimeConfig}; use crate::config::{self, RuntimeConfig};
use futures::Future; use futures::Future;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::sync::Arc; use std::sync::{Arc, atomic::Ordering};
use tokio::{signal, task::JoinHandle}; use tokio::{signal, sync::Mutex, task::JoinHandle};
pub use tokio_util::sync::CancellationToken; pub use tokio_util::sync::CancellationToken;
...@@ -43,6 +44,9 @@ impl Runtime { ...@@ -43,6 +44,9 @@ impl Runtime {
// create a cancellation token // create a cancellation token
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
// create endpoint shutdown token as a child of the main token
let endpoint_shutdown_token = cancellation_token.child_token();
// secondary runtime for background ectd/nats tasks // secondary runtime for background ectd/nats tasks
let secondary = match secondary { let secondary = match secondary {
Some(secondary) => secondary, Some(secondary) => secondary,
...@@ -57,6 +61,8 @@ impl Runtime { ...@@ -57,6 +61,8 @@ impl Runtime {
primary: runtime, primary: runtime,
secondary, secondary,
cancellation_token, cancellation_token,
endpoint_shutdown_token,
graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
}) })
} }
...@@ -107,14 +113,48 @@ impl Runtime { ...@@ -107,14 +113,48 @@ impl Runtime {
self.cancellation_token.clone() self.cancellation_token.clone()
} }
/// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method. /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s endpoint shutdown token.
pub fn child_token(&self) -> CancellationToken { pub fn child_token(&self) -> CancellationToken {
self.cancellation_token.child_token() self.endpoint_shutdown_token.child_token()
}
/// Get access to the graceful shutdown tracker
pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
self.graceful_shutdown_tracker.clone()
} }
/// Shuts down the [`Runtime`] instance /// Shuts down the [`Runtime`] instance
pub fn shutdown(&self) { pub fn shutdown(&self) {
self.cancellation_token.cancel(); tracing::info!("Runtime shutdown initiated");
// Spawn the shutdown coordination task BEFORE cancelling tokens
let tracker = self.graceful_shutdown_tracker.clone();
let main_token = self.cancellation_token.clone();
let endpoint_token = self.endpoint_shutdown_token.clone();
// Use the runtime handle to spawn the task
let handle = self.primary();
handle.spawn(async move {
// Phase 1: Cancel endpoint shutdown token to stop accepting new requests
tracing::info!("Phase 1: Cancelling endpoint shutdown token");
endpoint_token.cancel();
// Phase 2: Wait for all graceful endpoints to complete
tracing::info!("Phase 2: Waiting for graceful endpoints to complete");
let count = tracker.get_count();
tracing::info!("Active graceful endpoints: {}", count);
if count != 0 {
tracker.wait_for_completion().await;
}
// Phase 3: Now shutdown NATS/ETCD by cancelling the main token
tracing::info!(
"Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
);
main_token.cancel();
});
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
pub use tokio::time::{Duration, Instant}; pub use tokio::time::{Duration, Instant};
pub mod graceful_shutdown;
pub mod leader_worker_barrier; pub mod leader_worker_barrier;
pub mod pool; pub mod pool;
pub mod stream; pub mod stream;
...@@ -22,3 +23,5 @@ pub mod task; ...@@ -22,3 +23,5 @@ pub mod task;
pub mod tasks; pub mod tasks;
pub mod typed_prefix_watcher; pub mod typed_prefix_watcher;
pub mod worker_monitor; pub mod worker_monitor;
pub use graceful_shutdown::GracefulShutdownTracker;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Notify;
/// Tracks graceful shutdown state for endpoints
pub struct GracefulShutdownTracker {
active_endpoints: AtomicUsize,
shutdown_complete: Notify,
}
impl std::fmt::Debug for GracefulShutdownTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GracefulShutdownTracker")
.field(
"active_endpoints",
&self.active_endpoints.load(Ordering::SeqCst),
)
.finish()
}
}
impl GracefulShutdownTracker {
pub(crate) fn new() -> Self {
Self {
active_endpoints: AtomicUsize::new(0),
shutdown_complete: Notify::new(),
}
}
pub(crate) fn register_endpoint(&self) {
let count = self.active_endpoints.fetch_add(1, Ordering::SeqCst);
tracing::debug!(
"Endpoint registered, total active: {} -> {}",
count,
count + 1
);
}
pub(crate) fn unregister_endpoint(&self) {
let prev = self.active_endpoints.fetch_sub(1, Ordering::SeqCst);
tracing::debug!(
"Endpoint unregistered, remaining active: {} -> {}",
prev,
prev - 1
);
if prev == 1 {
// Last endpoint completed
tracing::info!("Last endpoint completed, notifying all waiters");
self.shutdown_complete.notify_waiters();
}
}
/// Get the current count of active endpoints
pub(crate) fn get_count(&self) -> usize {
self.active_endpoints.load(Ordering::Acquire)
}
pub(crate) async fn wait_for_completion(&self) {
loop {
// Create the waiter BEFORE checking the condition
let notified = self.shutdown_complete.notified();
let count = self.active_endpoints.load(Ordering::SeqCst);
tracing::trace!("Checking completion status, active endpoints: {}", count);
if count == 0 {
tracing::debug!("All endpoints completed");
break;
}
// Only wait if there are still active endpoints
tracing::debug!("Waiting for {} endpoints to complete", count);
notified.await;
tracing::trace!("Received notification, rechecking...");
}
}
// This method is no longer needed since we can access the tracker directly
}
...@@ -45,7 +45,7 @@ python components/planner/src/dynamo/planner/utils/perf_interpolation.py \ ...@@ -45,7 +45,7 @@ python components/planner/src/dynamo/planner/utils/perf_interpolation.py \
--profile_results_dir tests/planner/profiling_results/H200_TP1P_TP1D/ \ --profile_results_dir tests/planner/profiling_results/H200_TP1P_TP1D/ \
--isl 3000 \ --isl 3000 \
--osl 300 \ --osl 300 \
--ttft 0.1 \ --ttft 0.2 \
--itl 0.01 --itl 0.01
# output: # output:
...@@ -54,7 +54,7 @@ TTFT=0.1s, ITL=0.01s ...@@ -54,7 +54,7 @@ TTFT=0.1s, ITL=0.01s
Using profile results from tests/planner/profiling_results/H200_TP1P_TP1D/ Using profile results from tests/planner/profiling_results/H200_TP1P_TP1D/
Interpolating prefill performance ... Interpolating prefill performance ...
Estimated TTFT=0.060s <= target TTFT=0.100s. Requests can queue 0.040s maximally while meeting TTFT SLA. Estimated TTFT=0.060s <= target TTFT=0.200s. Requests can queue 0.140s maximally while meeting TTFT SLA.
Estimated throughput: 49481.09 tokens/s/gpu. Request rate at 16.49 requests/s will saturate one GPU. Estimated throughput: 49481.09 tokens/s/gpu. Request rate at 16.49 requests/s will saturate one GPU.
Interpolating decode performance ... Interpolating decode performance ...
...@@ -74,17 +74,17 @@ For TP1 H200 engine, planner should scale between 1P1D and 3P3D. ...@@ -74,17 +74,17 @@ For TP1 H200 engine, planner should scale between 1P1D and 3P3D.
```bash ```bash
python benchmarks/sin_load_generator/sin_synth.py \ python benchmarks/sin_load_generator/sin_synth.py \
--time-duration 1800 \ --time-duration 1800 \
--request-rate-min 12 \ --request-rate-min 5 \
--request-rate-max 36 \ --request-rate-max 45 \
--request-rate-period 600 \ --request-rate-period 600 \
--isl1 3000 \ --isl1 3000 \
--osl1 300 \ --osl1 300 \
--isl2 3000 \ --isl2 3000 \
--osl2 300 \ --osl2 300 \
--output-file rr-12-36_i3000o300.jsonl --output-file rr-5-45_i3000o300.jsonl
``` ```
The dataset starts at 12 requests/s, increases to 36 requests/s at t=300s, decreases back to 12 requests/s at t=600s, and repeats. The dataset starts at 5 requests/s, increases to 45 requests/s at t=300s, decreases back to 5 requests/s at t=600s, and repeats.
The total duration is 30 minutes or 1800 seconds. The total duration is 30 minutes or 1800 seconds.
## Planner Dry Run ## Planner Dry Run
...@@ -105,15 +105,15 @@ python components/planner/test/planner_sla_dryrun.py \ ...@@ -105,15 +105,15 @@ python components/planner/test/planner_sla_dryrun.py \
--output-plot <path_to_output_plot> --output-plot <path_to_output_plot>
``` ```
For example, to dry run SLA planner for the previous FP8 8B on H200 using the generated `rr-12-36_i3000o300.jsonl` dataset, For example, to dry run SLA planner for the previous FP8 8B on H200 using the generated `rr-5-45_i3000o300.jsonl` dataset,
```bash ```bash
python components/planner/test/planner_sla_dryrun.py \ python components/planner/test/planner_sla_dryrun.py \
--ttft 0.1 \ --ttft 0.2 \
--itl 0.01 \ --itl 0.01 \
--adjustment-interval 60 \ --adjustment-interval 60 \
--profile-results-dir tests/planner/profiling_results/H200_TP1P_TP1D/ \ --profile-results-dir tests/planner/profiling_results/H200_TP1P_TP1D/ \
--dataset rr-12-36_i3000o300.jsonl \ --dataset rr-5-45_i3000o300.jsonl \
--start-num-p 1 \ --start-num-p 1 \
--start-num-d 1 \ --start-num-d 1 \
--output-plot dryrun_plot.png --output-plot dryrun_plot.png
...@@ -139,8 +139,9 @@ This directory contains comprehensive tests for validating the SLA planner's sca ...@@ -139,8 +139,9 @@ This directory contains comprehensive tests for validating the SLA planner's sca
1. **Unit Tests** (`test_replica_calculation.py`) - Test the mathematical formulas for calculating prefill and decode replicas in isolation 1. **Unit Tests** (`test_replica_calculation.py`) - Test the mathematical formulas for calculating prefill and decode replicas in isolation
2. **End-to-End Tests** (`scaling/run_scaling_test.sh`) - Test complete workflow including Kubernetes deployment, load generation, and pod scaling validation 2. **End-to-End Tests** (`scaling/run_scaling_test.sh`) - Test complete workflow including Kubernetes deployment, load generation, and pod scaling validation
3. **End-to-End Perf Tests** (see instructions below) - Compare performance (goodput and goodput/GPU) on deployments with and without sla planner
### Quick Start ### Quick Start for Unit Tests and End-to-End Tests
#### Run Unit Tests Only #### Run Unit Tests Only
Test the replica calculation logic without requiring Kubernetes: Test the replica calculation logic without requiring Kubernetes:
...@@ -190,6 +191,59 @@ The main test scenario validates prefill scaling for H200 with 1P1D → 2P1D con ...@@ -190,6 +191,59 @@ The main test scenario validates prefill scaling for H200 with 1P1D → 2P1D con
- **Total test duration**: ~7 minutes + scaling observation - **Total test duration**: ~7 minutes + scaling observation
- **Smart cleanup**: Only removes deployment if test created it (preserves existing deployments) - **Smart cleanup**: Only removes deployment if test created it (preserves existing deployments)
### Instructions for End-to-End Perf Tests
In this test, we compare performance (goodput and goodput/GPU) on deployments on the following four deployments using the aforementioned 8b FP8 model on H200 and the dataset used in dryrun:
- Config 1 with inefficient P/D ratio: 3xTP1P_1xTP1D_4GPU
`./perf_test_configs/disagg_8b_3p1d.yaml`
- Config 2 with best static deployment: 2xTP1P_2xTP1D_4GPU
`./perf_test_configs/disagg_8b_2p2d.yaml`
- Config 3 with inefficient parallelization mapping: 1xTP2P_1xTP2D_4GPU
`./perf_test_configs/disagg_8b_tp2.yaml`
- Config 4 with sla planner: `./perf_test_configs/disagg_8b_planner.yaml`
To run the test on each configuration, first deploy the corresponding DynamoGraphDeployment by
```bash
kubectl apply -f ./perf_test_configs/<config_file_name> -n <namespace>
```
When running deployment with sla-planner, to reduce the image pulling time, deploy a `DaemonSet` to cache the image in advance:
```bash
kubectl apply -f ./perf_test_configs/image_cache_daemonset.yaml -n <namespace>
```
Then, port-forward or shell into the frontend pod and run GenAI-Perf to get the goodput:
```bash
genai-perf profile \
--model nvidia/Llama-3.1-8B-Instruct-FP8 \
--tokenizer nvidia/Llama-3.1-8B-Instruct-FP8 \
--endpoint-type chat \
--url localhost:8000 \ # or the port-forwarded port
--streaming \
--input-file payload:/workspace/rr-5-45_i3000o300.jsonl \ # path to the generated load dataset \
--fixed-schedule True \
--goodput time_to_first_token:200 inter_token_latency:10 \
-- -v -max-threads 64 \
```
> [!NOTE]
> Sometimes, when sla planner scales down the number of workers, a few requests will error out and cause GenAI-Perf to stuck. We are aware of this issue and are working on fixing it.
#### E2E Perf Test Results
![Results](./figures/sla_planner_perf.png)
The table below shows the performance improvement of SLA planner across different deployment configurations:
| Baseline | Goodput Improvement | Goodput/GPU Improvement |
|---------------|-----------------|-------------------------|
| Inefficient P/D ratio | 725% | 600% |
| Inefficient parallelization mapping | 311% | 249% |
| Best static deployment | 52% | 29% |`
### Prerequisites ### Prerequisites
**For Unit Tests:** **For Unit Tests:**
......
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: vllm-agg
spec:
services:
Frontend:
dynamoNamespace: vllm-agg
componentType: main
replicas: 1
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 20
periodSeconds: 5
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
exec:
command:
- /bin/sh
- -c
- 'curl -s http://localhost:8000/health | jq -e ".status == \"healthy\""'
initialDelaySeconds: 60
periodSeconds: 60
timeoutSeconds: 30
failureThreshold: 10
resources:
requests:
cpu: "16"
memory: "10Gi"
limits:
cpu: "128"
memory: "100Gi"
extraPodSpec:
mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:hzhou-0825-01
workingDir: /workspace/components/backends/vllm
command:
- /bin/sh
- -c
args:
- "python3 -m dynamo.frontend --http-port 8000"
VllmDecodeWorker:
dynamoNamespace: vllm-agg
envFromSecret: hf-token-secret
componentType: worker
replicas: 1
livenessProbe:
httpGet:
path: /live
port: 9090
periodSeconds: 5
timeoutSeconds: 30
failureThreshold: 1
readinessProbe:
httpGet:
path: /health
port: 9090
periodSeconds: 10
timeoutSeconds: 30
failureThreshold: 60
resources:
requests:
cpu: "16"
memory: "20Gi"
gpu: "1"
limits:
cpu: "128"
memory: "100Gi"
gpu: "1"
envs:
- name: DYN_SYSTEM_ENABLED
value: "true"
- name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
value: "[\"generate\"]"
- name: DYN_SYSTEM_PORT
value: "9090"
extraPodSpec:
mainContainer:
startupProbe:
httpGet:
path: /health
port: 9090
periodSeconds: 10
failureThreshold: 60
image: nvcr.io/nvidian/nim-llm-dev/vllm-runtime:hzhou-0825-01
workingDir: /workspace/components/backends/vllm
command:
- /bin/sh
- -c
args:
- "python3 -m dynamo.vllm --model nvidia/Llama-3.1-8B-Instruct-FP8 --no-enable-prefix-caching 2>&1 | tee /tmp/vllm.log"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment