Unverified Commit 94fa72ca authored by Kris Hung's avatar Kris Hung Committed by GitHub
Browse files

fix: Fix KVBM GPU memory leak (#4171)


Signed-off-by: default avatarkrishung5 <krish@nvidia.com>
parent 06bc1580
...@@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader { ...@@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader {
// grab the slot // grab the slot
let shared_slot = self.slot_manager().get_slot(&request_id)?; let shared_slot = self.slot_manager().get_slot(&request_id)?;
// mark the slot as finished // Acquire lock BEFORE marking as finished
// This ensures we check state and prevent new operations from being created
let mut slot = shared_slot let mut slot = shared_slot
.lock() .lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
slot.mark_as_finished(self.iteration_counter)?;
// todo: allow the request to resolve when it should exit // Mark the slot as finished (sets state to Finishing if there are operations,
// the request may have some outstanding operations // or Finished if all operations are complete)
// we would like to inform it to shutdown, then have it signal to the work that is officially gone, slot.mark_as_finished(self.iteration_counter)?;
// then we can remove the slot and trigger the worker to clean up as well.
// remove the request from the inflight requests // remove the request from the inflight requests
self.inflight_requests.remove(&request_id); self.inflight_requests.remove(&request_id);
// remove it from the manager as we will never use it again
self.slot_manager().remove_slot(&request_id)?;
// if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused // if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which // otherwise, we return true, which means there are still outstanding operations on gpu blocks which
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side // must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
// of the connector api which will be used to inform vllm that the request is finished. // of the connector api which will be used to inform vllm that the request is finished.
if let SlotState::Finished = slot.state() { if let SlotState::Finished = slot.state() {
// All operations complete - safe to remove slot and tell vLLM blocks are free
self.slot_manager().remove_slot(&request_id)?;
Ok(false) Ok(false)
} else { } else {
debug_assert!(matches!(slot.state(), SlotState::Finishing)); debug_assert!(matches!(slot.state(), SlotState::Finishing));
// Still has pending operations - keep slot alive for worker to process
// Don't remove slot here. Worker needs it to process the finish event.
// Worker will remove it after verifying all operations are complete.
// The lock on the slot prevents new operations from being created in offload_blocks()
Ok(true) Ok(true)
} }
} }
......
...@@ -712,14 +712,35 @@ impl Slot for VllmConnectorSlot { ...@@ -712,14 +712,35 @@ impl Slot for VllmConnectorSlot {
} }
fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> { fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> {
self.state = SlotState::Finishing; // Check if there are any pending operations
tracing::info!( let has_pending_ops = self
request_id = %self.request_id, .pending_operations
"request set to finish: cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", .as_ref()
self.tokens_cached_from_device, .map(|ops| !ops.is_empty())
self.tokens_cached_from_host, .unwrap_or(false);
self.tokens_cached_from_disk
); if has_pending_ops {
// There are pending operations - need to wait for them to complete
self.state = SlotState::Finishing;
tracing::debug!(
request_id = %self.request_id,
pending_operations = self.pending_operations.as_ref().unwrap().len(),
"request set to finish (with pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}",
self.tokens_cached_from_device,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
} else {
// No pending operations - can immediately mark as finished
self.state = SlotState::Finished;
tracing::debug!(
request_id = %self.request_id,
"request set to finished (no pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}",
self.tokens_cached_from_device,
self.tokens_cached_from_host,
self.tokens_cached_from_disk
);
}
Ok(()) Ok(())
} }
...@@ -989,6 +1010,12 @@ impl VllmConnectorSlot { ...@@ -989,6 +1010,12 @@ impl VllmConnectorSlot {
block_ids: &[BlockId], block_ids: &[BlockId],
token_blocks: &[TokenBlock], token_blocks: &[TokenBlock],
) -> Result<(), SlotError> { ) -> Result<(), SlotError> {
// Check if slot is in Finishing state before creating operations
// If we're finishing, don't create new operations
if matches!(self.state, SlotState::Finishing | SlotState::Finished) {
return Ok(());
}
assert!(block_ids.len() == token_blocks.len()); assert!(block_ids.len() == token_blocks.len());
let operation_id = uuid::Uuid::new_v4(); let operation_id = uuid::Uuid::new_v4();
...@@ -1173,8 +1200,8 @@ impl LocalTransferEngine { ...@@ -1173,8 +1200,8 @@ impl LocalTransferEngine {
task_token: CancellationToken, task_token: CancellationToken,
kvbm_metrics: KvbmMetrics, kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel(); let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::<LocalOnboardRequest>();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel(); let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::<LocalOffloadRequest>();
// Clone resources needed for tasks // Clone resources needed for tasks
let block_manager_offload = self.block_manager.clone(); let block_manager_offload = self.block_manager.clone();
...@@ -1212,6 +1239,10 @@ impl LocalTransferEngine { ...@@ -1212,6 +1239,10 @@ impl LocalTransferEngine {
tracing::debug!("LocalOffloadTask: received cancellation signal"); tracing::debug!("LocalOffloadTask: received cancellation signal");
break; break;
} }
let request_id = req.request_id.clone();
let operation_id = req.operation_id;
if let Err(e) = process_offload_request( if let Err(e) = process_offload_request(
req, req,
&block_manager_offload, &block_manager_offload,
...@@ -1221,6 +1252,30 @@ impl LocalTransferEngine { ...@@ -1221,6 +1252,30 @@ impl LocalTransferEngine {
.await .await
{ {
tracing::error!("LocalOffloadTask: error processing request: {:?}", e); tracing::error!("LocalOffloadTask: error processing request: {:?}", e);
// Create a fake/immediate transfer request that completes instantly.
// Otherwise, worker side might stuck and cause memory leak.
let fake_xfer = BlockTransferRequest {
from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type
to_pool: BlockTransferPool::Host, // (offload path, but no blocks)
blocks: vec![], // Empty - nothing to transfer
connector_req: Some(LeaderTransferRequest {
request_id: request_id.clone(),
uuid: operation_id,
requirement: None,
request_type: RequestType::Immediate, // Immediate = completes instantly
}),
};
match leader_offload.transfer_blocks_request(fake_xfer).await {
Ok(notify_receiver) => {
// Wait for the fake transfer to "complete" (should be instant)
let _ = notify_receiver.await;
}
Err(_xfer_err) => {
// Failed to create completion notification - error already logged above
}
}
} }
} }
Ok(()) Ok(())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment