Unverified Commit 7fe89c74 authored by Kris Hung's avatar Kris Hung Committed by GitHub
Browse files

fix: Fix race condition in TP>1 when ImmediateTransferResult arrives before CreateSlot (#5393)

parent 3ee98925
......@@ -133,13 +133,23 @@ impl std::fmt::Debug for CachedRequestData {
}
}
/// Information about a new slot to be created on the worker.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewSlotInfo {
/// The request ID for the new slot.
pub request_id: String,
/// Expected number of immediate (onboard) operations for this slot.
/// This enables proper completion tracking and avoids race conditions in TP>1.
pub expected_immediate_ops: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectorMetadata {
/// The iteration at which the metadata was built.
pub iteration: u64,
/// The new slots that were created in this iteration.
pub new_slots: Vec<String>,
pub new_slots: Vec<NewSlotInfo>,
/// The operations that were initialized in this iteration.
pub operations: Vec<WorkerTransferRequest>,
......@@ -154,8 +164,12 @@ impl ConnectorMetadata {
}
}
pub fn create_slot(&mut self, request_id: String) {
self.new_slots.push(request_id);
/// Create a slot with the expected number of immediate operations.
pub fn create_slot(&mut self, request_id: String, expected_immediate_ops: u64) {
self.new_slots.push(NewSlotInfo {
request_id,
expected_immediate_ops,
});
}
pub fn add_operations(&mut self, xfer_reqs: Vec<WorkerTransferRequest>) {
......
......@@ -21,7 +21,7 @@ use dynamo_llm::block_manager::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources,
locality::Logical,
},
connector::*,
connector::{*, protocol::RequestType},
kv_consolidator::EventSource,
};
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
......@@ -348,17 +348,32 @@ impl Leader for KvConnectorLeader {
//
// This is kind of a nice abstraction as it keeps the events simplier; however, we now create the request-slot
// once for onboarding (this loop), then again for prefill/decode (new_requests loop).
//
// TODO(krish): Consider a more deterministic way to count immediate ops.
// Currently we count by filtering pending_ops at runtime. A higher-level approach
// (e.g., tracking count when onboard_blocks is called, or deriving from architecture
// config) might be more robust against potential timing-related issues.
for request_id in onboarding_slots.iter() {
let shared_slot = self.slot_manager().get_slot(request_id)?;
let mut slot = shared_slot
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
md.create_slot(request_id.clone());
let pending_ops_opt = slot.take_pending_operations();
if let Some(pending_ops) = slot.take_pending_operations() {
tracing::debug!("adding {} pending onboarding operations", pending_ops.len());
if let Some(pending_ops) = pending_ops_opt {
// Count immediate (onboard) operations for this slot
let num_immediate = pending_ops
.iter()
.filter(|op| op.request_type == RequestType::Immediate)
.count() as u64;
// Create slot with expected immediate ops BEFORE adding operations
md.create_slot(request_id.clone(), num_immediate);
md.add_operations(pending_ops);
} else {
// No operations, create slot with 0 expected immediate ops
md.create_slot(request_id.clone(), 0);
}
assert!(
......@@ -373,6 +388,19 @@ impl Leader for KvConnectorLeader {
// todo: update the code and abstraction to account for this two-phase lifecycle.
for new_req in &scheduler_output.new_requests {
let request_id = &new_req.request_id;
let already_created = md.new_slots.iter().any(|s| &s.request_id == request_id);
// Skip if this slot was already created in the onboarding_slots loop above.
// This prevents overwriting the slot with expected_immediate_ops=0 when it should have the correct count.
if already_created {
assert!(
inflight_requests.remove(request_id),
"request_id {request_id} not found in inflight_requests: "
);
continue;
}
assert!(
inflight_requests.remove(request_id),
"request_id {request_id} not found in inflight_requests: "
......@@ -383,9 +411,6 @@ impl Leader for KvConnectorLeader {
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
// inform the worker that a new request-slot should be created
md.create_slot(new_req.request_id.clone());
slot.record_start_iteration(iteration)?;
debug_assert!(
......@@ -404,13 +429,21 @@ impl Leader for KvConnectorLeader {
slot.apply_scheduler_output(&[], &[], new_req.num_computed_tokens, scheduled_tokens)?;
if let Some(pending_ops) = slot.take_pending_operations() {
tracing::debug!(
"adding {} pending operations for slot {}",
pending_ops.len(),
new_req.request_id
);
let pending_ops_opt = slot.take_pending_operations();
if let Some(pending_ops) = pending_ops_opt {
// Count immediate (onboard) operations for this slot
let num_immediate = pending_ops
.iter()
.filter(|op| op.request_type == RequestType::Immediate)
.count() as u64;
// Create slot with expected immediate ops BEFORE adding operations
md.create_slot(new_req.request_id.clone(), num_immediate);
md.add_operations(pending_ops);
} else {
// No operations, create slot with 0 expected immediate ops
md.create_slot(new_req.request_id.clone(), 0);
}
}
......
......@@ -4,6 +4,7 @@
use super::*;
use crate::block_manager::BlockManagerBuilder;
use dynamo_llm::block_manager::connector::protocol::RequestType;
use dynamo_llm::block_manager::kv_consolidator::EventSource;
use crate::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState,
......@@ -310,23 +311,50 @@ impl Leader for KvConnectorLeader {
//
// This is kind of a nice abstraction as it keeps the events simplier; however, we now create the request-slot
// once for onboarding (this loop), then again for prefill/decode (new_requests loop).
//
// TODO(krish): Consider a more deterministic way to count immediate ops.
// Currently we count by filtering pending_ops at runtime. A higher-level approach
// (e.g., tracking count when onboard_blocks is called, or deriving from architecture
// config) might be more robust against potential timing-related issues.
for request_id in onboarding_slots.iter() {
let shared_slot = self.slot_manager().get_slot(request_id)?;
let mut slot = shared_slot
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
md.create_slot(request_id.clone());
let pending_ops_opt = slot.take_pending_operations();
if let Some(pending_ops) = slot.take_pending_operations() {
tracing::debug!("adding {} pending onboarding operations", pending_ops.len());
if let Some(pending_ops) = pending_ops_opt {
// Count immediate (onboard) operations for this slot
let num_immediate = pending_ops
.iter()
.filter(|op| op.request_type == RequestType::Immediate)
.count() as u64;
// Create slot with expected immediate ops BEFORE adding operations
md.create_slot(request_id.clone(), num_immediate);
md.add_operations(pending_ops);
} else {
md.create_slot(request_id.clone(), 0);
}
}
// todo: update the code and abstraction to account for this two-phase lifecycle.
for new_req in &scheduler_output.new_requests {
let request_id = &new_req.request_id;
let already_created = md.new_slots.iter().any(|s| &s.request_id == request_id);
// Skip if this slot was already created in the onboarding_slots loop above.
// This prevents overwriting the slot with expected_immediate_ops=0 when it should have the correct count.
if already_created {
assert!(
inflight_requests.remove(request_id),
"request_id {request_id} not found in inflight_requests: "
);
continue;
}
assert!(
inflight_requests.remove(request_id),
"request_id {request_id} not found in inflight_requests: "
......@@ -337,9 +365,6 @@ impl Leader for KvConnectorLeader {
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
// inform the worker that a new request-slot should be created
md.create_slot(new_req.request_id.clone());
slot.record_start_iteration(iteration)?;
debug_assert!(
......@@ -363,13 +388,27 @@ impl Leader for KvConnectorLeader {
scheduled_tokens,
)?;
if let Some(pending_ops) = slot.take_pending_operations() {
let pending_ops_opt = slot.take_pending_operations();
if let Some(pending_ops) = pending_ops_opt {
// Count immediate (onboard) operations for this slot
let num_immediate = pending_ops
.iter()
.filter(|op| op.request_type == RequestType::Immediate)
.count() as u64;
// Create slot with expected immediate ops BEFORE adding operations
md.create_slot(new_req.request_id.clone(), num_immediate);
tracing::debug!(
"adding {} pending operations for slot {}",
"adding {} pending operations for slot {} ({} immediate)",
pending_ops.len(),
new_req.request_id
new_req.request_id,
num_immediate
);
md.add_operations(pending_ops);
} else {
md.create_slot(new_req.request_id.clone(), 0);
}
}
......
......@@ -187,9 +187,17 @@ impl Worker for KvConnectorWorker {
// - for each action in the metadata, add the action to the request slot
// - send the list of actions to the engine to track completion
for slot in metadata.new_slots {
debug_assert!(!self.connector.has_slot(&slot), "slot already exists");
self.connector.create_slot(slot)?;
for slot_info in metadata.new_slots {
debug_assert!(
!self.connector.has_slot(&slot_info.request_id),
"slot already exists"
);
// Create slot with expected immediate ops count BEFORE any operations arrive.
// This ensures proper completion tracking and avoids race conditions in TP>1.
self.connector.create_slot_with_immediate_ops(
slot_info.request_id,
slot_info.expected_immediate_ops,
)?;
}
let mut onboarding_operations = Vec::new();
......
......@@ -251,9 +251,17 @@ impl Worker for KvConnectorWorker {
// - for each action in the metadata, add the action to the request slot
// - send the list of actions to the engine to track completion
for slot in metadata.new_slots {
debug_assert!(!self.connector.has_slot(&slot), "slot already exists");
self.connector.create_slot(slot)?;
for slot_info in &metadata.new_slots {
debug_assert!(
!self.connector.has_slot(&slot_info.request_id),
"slot already exists"
);
// Create slot with expected immediate ops count BEFORE any operations arrive.
// This ensures proper completion tracking and avoids race conditions in TP>1.
self.connector.create_slot_with_immediate_ops(
slot_info.request_id.clone(),
slot_info.expected_immediate_ops,
)?;
}
let mut onboarding_operations = Vec::new();
......
......@@ -152,10 +152,22 @@ pub struct WorkerSchedulerClientSlot {
}
impl WorkerSchedulerClientSlot {
fn make_scheduler_slot_request(&self, request_id: String) -> SchedulerCreateSlotDetails {
fn new() -> Self {
Self {
operations: Vec::new(),
completed: Arc::new(AtomicU64::new(0)),
}
}
fn make_scheduler_slot_request(
&self,
request_id: String,
expected_immediate_ops: u64,
) -> SchedulerCreateSlotDetails {
SchedulerCreateSlotDetails {
request_id,
completed: self.completed.clone(),
expected_immediate_ops,
}
}
......@@ -165,14 +177,20 @@ impl WorkerSchedulerClientSlot {
}
impl WorkerSchedulerClient {
pub fn create_slot(&mut self, request_id: String) -> Result<(), SchedulerError> {
// create a request slot with the child token
// this will be the local worker slot
let slot = WorkerSchedulerClientSlot::default();
let request = slot.make_scheduler_slot_request(request_id.clone());
/// Create a slot with the expected number of immediate (onboard) operations.
/// This count is used to properly track completion and must match the number of
/// ImmediateTransferResult messages that will be received.
pub fn create_slot_with_immediate_ops(
&mut self,
request_id: String,
expected_immediate_ops: u64,
) -> Result<(), SchedulerError> {
// create a request slot
let slot = WorkerSchedulerClientSlot::new();
let request = slot.make_scheduler_slot_request(request_id.clone(), expected_immediate_ops);
// insert the slot into the local worker slots map
self.slots.insert(request_id, slot);
self.slots.insert(request_id.clone(), slot);
// send a request to insert the slot into the engine state
self.scheduler_tx
......@@ -181,6 +199,11 @@ impl WorkerSchedulerClient {
Ok(())
}
/// Create a slot with no expected immediate operations (backward compatibility).
pub fn create_slot(&mut self, request_id: String) -> Result<(), SchedulerError> {
self.create_slot_with_immediate_ops(request_id, 0)
}
pub fn remove_slot(&mut self, request_id: &String) {
let slot = self.slots.remove(request_id).expect("slot does not exist");
assert!(slot.is_complete());
......@@ -222,11 +245,8 @@ impl WorkerSchedulerClient {
pub fn is_complete(&self, request_id: &str) -> bool {
match self.slots.get(request_id) {
Some(slot) => slot.completed.load(Ordering::Relaxed) == slot.operations.len() as u64,
None => {
tracing::debug!(request_id, "slot not found - likely aborted");
true
}
Some(slot) => slot.is_complete(),
None => true,
}
}
......@@ -382,17 +402,38 @@ impl Scheduler {
#[tracing::instrument(level = "debug", skip_all, fields(request_id = %req.request_id))]
fn add_slot(&mut self, req: SchedulerCreateSlotDetails) {
let request_id = req.request_id.clone();
debug_assert!(!self.slots.contains_key(&request_id), "slot already exists");
tracing::debug!("engine state adding slot");
let slot = SchedulerSlot::new(req);
if let Some(unprocessed_results) = self.unprocessed_immediate_results.remove(&request_id) {
tracing::debug!(
"found {} unprocessed immediate results; adding to slot",
unprocessed_results.len()
// In TP>1, multiple workers send CreateSlot for the same request_id.
// ImmediateTransferResults can arrive before ANY worker's slot is created.
//
// We need to apply the buffered count to EVERY worker's slot, not just the first one.
// Use `get` instead of `remove` to keep the buffered results available for all workers.
// The buffered results will be cleared when the request is removed (finished).
let slot = SchedulerSlot {
completed: req.completed,
};
// Check for buffered ImmediateTransferResults that arrived before the slot was created.
// Apply buffered count to this worker's slot.
if let Some(buffered_results) = self.unprocessed_immediate_results.get(&request_id) {
let num_buffered = buffered_results.len() as u64;
// Sanity check: buffered results should never exceed expected count.
// If this happens, there's a mismatch between leader's count and actual results.
debug_assert!(
num_buffered <= req.expected_immediate_ops,
"buffered results ({}) exceed expected immediate ops ({})",
num_buffered,
req.expected_immediate_ops
);
slot.completed
.fetch_add(unprocessed_results.len() as u64, Ordering::Relaxed);
// Use num_buffered (not expected_immediate_ops) because we only mark operations
// as complete that have actually completed. Remaining results will arrive later
// via handle_immediate_result() and increment the counter then.
slot.completed.fetch_add(num_buffered, Ordering::Relaxed);
}
self.slots.insert(request_id, slot);
}
......@@ -407,11 +448,9 @@ impl Scheduler {
"any scheduled request should be removed and enqueued/scheduled before the slot is removed"
);
let maybe_unprocessed_results = self.unprocessed_immediate_results.remove(&request_id);
debug_assert!(
maybe_unprocessed_results.is_none() || maybe_unprocessed_results.unwrap().is_empty(),
"any unprocessed immediate results should be removed before the slot is removed"
);
// In TP>1, buffered results are NOT removed in add_slot (they're applied to ALL workers).
// Clean them up here when the request is finished.
self.unprocessed_immediate_results.remove(&request_id);
tracing::debug!(
request_id,
......@@ -651,20 +690,14 @@ impl ScheduledTaskAsyncResult {
pub struct SchedulerCreateSlotDetails {
pub request_id: String,
pub completed: Arc<AtomicU64>,
/// Expected number of immediate (onboard) operations for this slot.
pub expected_immediate_ops: u64,
}
pub struct SchedulerSlot {
completed: Arc<AtomicU64>,
}
impl SchedulerSlot {
fn new(req: SchedulerCreateSlotDetails) -> Self {
Self {
completed: req.completed,
}
}
}
pub trait TaskScheduler {
fn start_iteration(&mut self, iteration: u64) -> Result<(), SchedulerError>;
}
......@@ -731,15 +764,19 @@ mod tests {
scheduler.step().await;
assert_eq!(scheduler.unprocessed_immediate_results.len(), 1);
// the request is completed
worker_client.create_slot("test".to_string()).unwrap();
// the request is completed - create slot with expected_immediate_ops=1
worker_client
.create_slot_with_immediate_ops("test".to_string(), 1)
.unwrap();
assert!(!scheduler.slots.contains_key("test"));
scheduler.step().await;
assert!(scheduler.slots.contains_key("test"));
// the unprocessed results should now be processed
assert_eq!(scheduler.unprocessed_immediate_results.len(), 0);
// Buffered results are not removed in add_slot() - cleanup happens in remove_slot()
// when the request finishes. This ensures all workers in TP>1 can have the buffered
// count applied. The buffered count has already been applied to the slot's completed counter.
assert_eq!(scheduler.unprocessed_immediate_results.len(), 1);
// neither the worker nor the scheduler should have observed the completion yet
// this is because the worker has not yet requested it
......@@ -764,6 +801,26 @@ mod tests {
// the worker has not issued any operations yet
assert_eq!(worker_client.slots.get("test").unwrap().operations.len(), 0);
// enqueue the operation so is_complete() will return true (completed=1, operations.len()=1)
let worker_request = WorkerTransferRequest {
request_id: "test".to_string(),
uuid: operation_id,
transfer_type: TransferType::Load,
request_type: RequestType::Immediate,
};
worker_client.enqueue_request(worker_request);
assert_eq!(worker_client.slots.get("test").unwrap().operations.len(), 1);
assert!(worker_client.is_complete("test"));
// verify that remove_slot() cleans up the buffered results
assert_eq!(scheduler.unprocessed_immediate_results.len(), 1);
worker_client.remove_slot(&"test".to_string());
scheduler.step().await;
// after remove_slot(), the buffered results should be cleaned up
assert_eq!(scheduler.unprocessed_immediate_results.len(), 0);
assert!(!scheduler.slots.contains_key("test"));
}
/// This test verifies that the scheduler can handle the case where the transfer engine's
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment