Unverified Commit c0c664a9 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: better error handling in Router slot manager (#4496)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 55c66a63
......@@ -46,6 +46,7 @@ use crate::{
LocalBlockHash, RouterRequest, RouterResponse, WorkerSelectionResult, WorkerWithDpRank,
},
scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest},
sequence::SequenceError,
subscriber::start_kv_router_background,
},
local_model::runtime_config::ModelRuntimeConfig,
......@@ -395,22 +396,26 @@ impl KvRouter {
compute_seq_hash_for_block(&block_hashes)
});
self.scheduler
if let Err(e) = self
.scheduler
.add_request(
request_id,
request_id.clone(),
maybe_seq_hashes,
isl_tokens,
overlap_blocks,
worker,
)
.await;
.await
{
tracing::warn!("Failed to add request {request_id}: {e}");
}
}
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<()> {
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
self.scheduler.mark_prefill_completed(request_id).await
}
pub async fn free(&self, request_id: &str) -> Result<()> {
pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
self.scheduler.free(request_id).await
}
......@@ -629,7 +634,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
if !prefill_marked {
if let Err(e) = chooser.mark_prefill_completed(&context_id).await {
tracing::warn!("Failed to mark prefill completed for request {context_id}: {e:?}");
tracing::warn!("Failed to mark prefill completed for request {context_id}: {e}");
}
prefill_marked = true;
}
......@@ -660,7 +665,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
if let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id}: {e:?}");
tracing::warn!("Failed to free request {context_id}: {e}");
}
});
Ok(ResponseStream::new(wrapped_stream, stream_context))
......
......@@ -19,7 +19,7 @@ use super::RouterConfigOverride;
use super::WorkerSelector;
use super::indexer::OverlapScores;
use super::protocols::{DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank};
use super::sequence::ActiveSequencesMultiWorker;
use super::sequence::{ActiveSequencesMultiWorker, SequenceError};
use crate::tokens::SequenceHash;
......@@ -263,9 +263,7 @@ impl KvScheduler {
)
.await
{
tracing::warn!(
"Failed to add request {request_id} to local slot tracker: {e:?}"
);
tracing::warn!("Failed to add request {request_id}: {e}");
}
}
Err(KvSchedulerError::NoEndpoints) => {
......@@ -332,20 +330,19 @@ impl KvScheduler {
isl: usize,
overlap: u32,
worker: WorkerWithDpRank,
) {
let _ = self
.slots
) -> Result<(), SequenceError> {
self.slots
.add_request(request_id, token_sequence, isl, overlap, worker)
.await;
.await
}
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<()> {
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
self.slots
.mark_prefill_completed(&request_id.to_string())
.await
}
pub async fn free(&self, request_id: &str) -> Result<()> {
pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
self.slots.free(&request_id.to_string()).await
}
......
......@@ -43,6 +43,28 @@ use crate::kv_router::ACTIVE_SEQUENCES_SUBJECT;
use crate::local_model::runtime_config::ModelRuntimeConfig;
use dynamo_runtime::CancellationToken;
/// Errors that can occur during sequence management operations
#[derive(Debug, thiserror::Error)]
pub enum SequenceError {
#[error("Worker {worker:?} not found")]
WorkerNotFound { worker: WorkerWithDpRank },
#[error("Request {request_id} already exists (assigned to worker {worker:?})")]
DuplicateRequest {
request_id: String,
worker: WorkerWithDpRank,
},
#[error("Request {request_id} not found")]
RequestNotFound { request_id: String },
#[error("Failed to publish event: {0}")]
PublishFailed(#[from] anyhow::Error),
#[error("Failed to send command to worker: channel closed")]
WorkerChannelClosed,
}
/// Duration after which stale requests are forcibly expired (5 minutes)
const EXPIRY_DURATION: Duration = Duration::from_secs(300);
......@@ -620,9 +642,18 @@ impl ActiveSequencesMultiWorker {
isl: usize,
overlap: u32,
worker: WorkerWithDpRank,
) -> Result<()> {
) -> Result<(), SequenceError> {
// Check for worker existence
if !self.senders.contains_key(&worker) {
return Err(anyhow::anyhow!("Worker {:?} not found", worker));
return Err(SequenceError::WorkerNotFound { worker });
}
// Check for duplicate request
if let Some(existing_worker) = self.request_to_worker.get(&request_id) {
return Err(SequenceError::DuplicateRequest {
request_id,
worker: *existing_worker,
});
}
// Create response channel
......@@ -658,12 +689,12 @@ impl ActiveSequencesMultiWorker {
overlap,
resp_tx,
})
.map_err(|_| anyhow::anyhow!("Failed to send add_request command to worker"))?;
.map_err(|_| SequenceError::WorkerChannelClosed)?;
// Wait for response and handle removed requests
let removed_requests = resp_rx
.await
.map_err(|_| anyhow::anyhow!("Failed to receive response from worker"))?;
.map_err(|_| SequenceError::WorkerChannelClosed)?;
// Remove expired requests from request_to_worker mapping
for expired_id in &removed_requests {
......@@ -673,12 +704,21 @@ impl ActiveSequencesMultiWorker {
Ok(())
}
pub async fn free(&self, request_id: &RequestId) -> Result<()> {
let worker = self
.request_to_worker
.get(request_id)
.map(|entry| *entry)
.ok_or_else(|| anyhow::anyhow!("Request ID not found in request_to_worker mapping"))?;
/// Free all blocks associated with a request
///
/// Note: This operation is idempotent. Calling it multiple times for the same request
/// will log a warning but not return an error (double free is allowed).
pub async fn free(&self, request_id: &RequestId) -> Result<(), SequenceError> {
// Check if request exists - if not, it's already been freed (idempotent)
let Some(worker) = self.request_to_worker.get(request_id).map(|entry| *entry) else {
tracing::debug!("Request {request_id} not found, already freed (idempotent)");
return Ok(());
};
// Verify worker still exists
if !self.senders.contains_key(&worker) {
return Err(SequenceError::WorkerNotFound { worker });
}
// Publish event only if replica_sync is enabled
if self.replica_sync {
......@@ -700,7 +740,7 @@ impl ActiveSequencesMultiWorker {
.send(UpdateSequences::Free {
request_id: request_id.clone(),
})
.map_err(|_| anyhow::anyhow!("Failed to send free command to worker"))?;
.map_err(|_| SequenceError::WorkerChannelClosed)?;
self.request_to_worker.remove(request_id);
......@@ -708,12 +748,25 @@ impl ActiveSequencesMultiWorker {
}
/// Mark prefill as completed for a request
pub async fn mark_prefill_completed(&self, request_id: &RequestId) -> Result<()> {
///
/// Note: Calling this multiple times for the same request is allowed and will be a no-op
/// after the first call (idempotent).
pub async fn mark_prefill_completed(
&self,
request_id: &RequestId,
) -> Result<(), SequenceError> {
let worker = self
.request_to_worker
.get(request_id)
.map(|entry| *entry)
.ok_or_else(|| anyhow::anyhow!("Request ID not found in request_to_worker mapping"))?;
.ok_or_else(|| SequenceError::RequestNotFound {
request_id: request_id.clone(),
})?;
// Verify worker still exists
if !self.senders.contains_key(&worker) {
return Err(SequenceError::WorkerNotFound { worker });
}
// Publish event only if replica_sync is enabled
if self.replica_sync {
......@@ -735,9 +788,7 @@ impl ActiveSequencesMultiWorker {
.send(UpdateSequences::MarkPrefillCompleted {
request_id: request_id.clone(),
})
.map_err(|_| {
anyhow::anyhow!("Failed to send mark_prefill_completed command to worker")
})?;
.map_err(|_| SequenceError::WorkerChannelClosed)?;
Ok(())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment