Unverified Commit eac6a5fc authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: router, metric only add queue isl tokens (#8136)


Signed-off-by: default avatarmichaelfeil <63565275+michaelfeil@users.noreply.github.com>
parent 9cf9ef18
......@@ -243,6 +243,10 @@ where
self.queue.pending_count()
}
pub fn pending_isl_tokens(&self) -> usize {
self.queue.pending_isl_tokens()
}
pub fn worker_type(&self) -> &'static str {
self.worker_type
}
......
......@@ -60,6 +60,9 @@ pub struct SchedulerQueue<
/// Number of requests currently parked in the pending queue.
/// Incremented after push, decremented after pop. Lock-free reads via `Relaxed` load.
pending_count: AtomicUsize,
/// Sum of `isl_tokens` for requests currently parked in the pending queue.
/// Incremented after push, decremented after pop. Lock-free reads via `Relaxed` load.
pending_isl_tokens: AtomicUsize,
slots: Arc<ActiveSequencesMultiWorker<P>>,
workers_with_configs: watch::Receiver<HashMap<WorkerId, C>>,
/// Cached threshold fraction; None means queueing is disabled.
......@@ -94,6 +97,7 @@ impl<
Self {
pending: Mutex::new(BinaryHeap::new()),
pending_count: AtomicUsize::new(0),
pending_isl_tokens: AtomicUsize::new(0),
slots,
workers_with_configs,
threshold_frac,
......@@ -151,8 +155,11 @@ impl<
tracing::debug!("all workers busy, queueing request");
let arrival_offset = self.start_time.elapsed();
let key = self.policy.enqueue_key(arrival_offset, &request);
let isl_tokens = request.isl_tokens;
self.pending.lock().await.push(QueueEntry { key, request });
self.pending_count.fetch_add(1, AtomicOrdering::Relaxed);
self.pending_isl_tokens
.fetch_add(isl_tokens, AtomicOrdering::Relaxed);
} else {
self.schedule(request, decay_now).await;
}
......@@ -189,6 +196,8 @@ impl<
break;
};
self.pending_count.fetch_sub(1, AtomicOrdering::Relaxed);
self.pending_isl_tokens
.fetch_sub(entry.request.isl_tokens, AtomicOrdering::Relaxed);
tracing::debug!("scheduling request from pending queue");
self.schedule(entry.request, decay_now).await;
}
......@@ -303,6 +312,11 @@ impl<
self.pending_count.load(AtomicOrdering::Relaxed)
}
/// Sum of `isl_tokens` for requests currently parked in the pending queue (lock-free).
pub fn pending_isl_tokens(&self) -> usize {
self.pending_isl_tokens.load(AtomicOrdering::Relaxed)
}
/// Check if all eligible workers are busy based on threshold.
/// When `allowed` is `Some`, only those worker IDs are considered;
/// otherwise all registered workers are checked.
......
......@@ -141,6 +141,7 @@ pub fn register_worker_load_metrics(
/// disaggregated mode. At most 2 label combinations.
pub struct RouterQueueMetrics {
pub pending_requests: IntGaugeVec,
pub pending_isl_tokens: IntGaugeVec,
}
pub static ROUTER_QUEUE_METRICS: LazyLock<RouterQueueMetrics> =
......@@ -157,6 +158,14 @@ pub static ROUTER_QUEUE_METRICS: LazyLock<RouterQueueMetrics> =
&[labels::WORKER_TYPE],
)
.expect("Failed to create router_queue_pending_requests gauge"),
pending_isl_tokens: IntGaugeVec::new(
Opts::new(
format!("{}_router_queue_pending_isl_tokens", name_prefix::FRONTEND),
"Sum of isl_tokens for requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.expect("Failed to create router_queue_pending_isl_tokens gauge"),
});
impl RouterQueueMetrics {
......@@ -165,6 +174,12 @@ impl RouterQueueMetrics {
.with_label_values(&[worker_type])
.set(count as i64);
}
pub fn set_pending_isl_tokens(&self, worker_type: &str, tokens: usize) {
self.pending_isl_tokens
.with_label_values(&[worker_type])
.set(tokens as i64);
}
}
/// Register the router queue gauge with the given Prometheus registry.
......@@ -174,6 +189,7 @@ pub fn register_router_queue_metrics(
) -> Result<(), prometheus::Error> {
let m = &*ROUTER_QUEUE_METRICS;
registry.register(Box::new(m.pending_requests.clone()))?;
registry.register(Box::new(m.pending_isl_tokens.clone()))?;
Ok(())
}
......@@ -545,15 +561,30 @@ dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",wor
&[labels::WORKER_TYPE],
)
.unwrap(),
pending_isl_tokens: IntGaugeVec::new(
Opts::new(
format!("{}_router_queue_pending_isl_tokens", name_prefix::FRONTEND),
"Sum of isl_tokens for requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.unwrap(),
};
registry
.register(Box::new(metrics.pending_requests.clone()))
.unwrap();
registry
.register(Box::new(metrics.pending_isl_tokens.clone()))
.unwrap();
metrics.set_pending("decode", 5);
metrics.set_pending_isl_tokens("decode", 1024);
let output = gather_pef(&registry);
let expected = "\
# HELP dynamo_frontend_router_queue_pending_isl_tokens Sum of isl_tokens for requests pending in the router scheduler queue
# TYPE dynamo_frontend_router_queue_pending_isl_tokens gauge
dynamo_frontend_router_queue_pending_isl_tokens{worker_type=\"decode\"} 1024
# HELP dynamo_frontend_router_queue_pending_requests Number of requests pending in the router scheduler queue
# TYPE dynamo_frontend_router_queue_pending_requests gauge
dynamo_frontend_router_queue_pending_requests{worker_type=\"decode\"} 5
......
......@@ -97,6 +97,8 @@ where
tokio::spawn(async move {
let mut recheck_interval = tokio::time::interval(Duration::from_secs(60));
ROUTER_QUEUE_METRICS.set_pending(worker_type, metrics_scheduler.pending_count());
ROUTER_QUEUE_METRICS
.set_pending_isl_tokens(worker_type, metrics_scheduler.pending_isl_tokens());
loop {
tokio::select! {
......@@ -109,8 +111,11 @@ where
.set_pending(worker_type, metrics_scheduler.pending_count());
}
_ = recheck_interval.tick() => {
ROUTER_QUEUE_METRICS
.set_pending(worker_type, metrics_scheduler.pending_count());
ROUTER_QUEUE_METRICS.set_pending(worker_type, metrics_scheduler.pending_count());
ROUTER_QUEUE_METRICS.set_pending_isl_tokens(
worker_type,
metrics_scheduler.pending_isl_tokens(),
);
}
}
}
......@@ -149,6 +154,7 @@ where
)
.await;
ROUTER_QUEUE_METRICS.set_pending(self.worker_type(), self.pending_count());
ROUTER_QUEUE_METRICS.set_pending_isl_tokens(self.worker_type(), self.pending_isl_tokens());
response
}
......@@ -163,12 +169,14 @@ where
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
self.inner.mark_prefill_completed(request_id).await?;
ROUTER_QUEUE_METRICS.set_pending(self.worker_type(), self.pending_count());
ROUTER_QUEUE_METRICS.set_pending_isl_tokens(self.worker_type(), self.pending_isl_tokens());
Ok(())
}
pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
self.inner.free(request_id).await?;
ROUTER_QUEUE_METRICS.set_pending(self.worker_type(), self.pending_count());
ROUTER_QUEUE_METRICS.set_pending_isl_tokens(self.worker_type(), self.pending_isl_tokens());
Ok(())
}
......@@ -176,6 +184,10 @@ where
self.inner.pending_count()
}
pub fn pending_isl_tokens(&self) -> usize {
self.inner.pending_isl_tokens()
}
pub fn worker_type(&self) -> &'static str {
self.inner.worker_type()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment