"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "32e46e000f77499f4dd7c0bed194e33856f2df24"
Unverified Commit 329752d9 authored by Kyle McGill's avatar Kyle McGill Committed by GitHub
Browse files

fix: Synchronizing on new thread to avoid delaing TRTLLM (#5333)

parent 7952ea88
......@@ -1578,6 +1578,7 @@ dependencies = [
"tokio-util",
"tracing",
"url",
"utoipa",
"uuid",
]
......@@ -7754,6 +7755,8 @@ dependencies = [
"quote",
"regex",
"syn 2.0.110",
"url",
"uuid",
]
[[package]]
......
......@@ -26,8 +26,9 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
def callback():
self.event.record()
self.event.synchronize()
self._connector.execute_offload_operations()
# Non-blocking: passes event to Rust for async polling
self._connector.submit_offload_on_event(self.event.cuda_event)
# Returns immediately - no CPU blocking
return callback
......
......@@ -3,7 +3,7 @@
use dynamo_llm::block_manager::connector::protocol::TransferType;
use dynamo_llm::block_manager::connector::scheduler::{
Scheduler, TransferSchedulerClient, WorkerSchedulerClient,
Scheduler, SchedulerMessage, TransferSchedulerClient, WorkerSchedulerClient,
};
use std::collections::HashSet;
......@@ -48,6 +48,11 @@ pub trait Worker: Send + Sync {
finished_gen_req_ids: Vec<u64>,
started_loading_req_ids: Vec<u64>,
) -> (Vec<u64>, Vec<u64>);
/// Submit offload operations to execute after the CUDA event completes (non-blocking).
/// Does slot bookkeeping synchronously, then spawns an async task to poll the event
/// and send operations to the scheduler when complete.
fn submit_offload_on_event(&mut self, event: u64) -> anyhow::Result<()>;
}
pub struct KvConnectorWorker {
......@@ -394,6 +399,33 @@ impl Worker for KvConnectorWorker {
(finished_offloading, finished_onboarding)
}
fn submit_offload_on_event(&mut self, event: u64) -> anyhow::Result<()> {
let operations = std::mem::take(&mut self.offloading_operations);
// Bookkeeping done synchronously while we have &mut self
for op in &operations {
self.connector.record_operation(&op.request_id, op.uuid);
}
// Clone channel for async use
let tx = self.connector.get_scheduler_tx();
// Use std::thread since we may be in a subprocess without tokio runtime
std::thread::spawn(move || {
// Block this thread until event completes (doesn't block main thread)
event_sync_blocking(event);
// Send operations to scheduler
for op in operations {
if let Err(e) = tx.send(SchedulerMessage::EnqueueRequest(op)) {
tracing::error!("Failed to send offload operation: {}", e);
}
}
});
Ok(())
}
}
#[pyclass]
......@@ -473,4 +505,10 @@ impl PyTrtllmKvConnectorWorker {
self.connector_worker
.get_finished(finished_gen_req_ids, started_loading_req_ids)
}
pub fn submit_offload_on_event(&mut self, event: u64) -> PyResult<()> {
self.connector_worker
.submit_offload_on_event(event)
.map_err(to_pyerr)
}
}
......@@ -229,6 +229,18 @@ impl WorkerSchedulerClient {
}
}
}
/// Clone the scheduler channel for async use.
pub fn get_scheduler_tx(&self) -> mpsc::UnboundedSender<SchedulerMessage> {
self.scheduler_tx.clone()
}
/// Record operation in slot (bookkeeping only, no send).
/// This updates the slot's expected operation count so is_complete() works correctly.
pub fn record_operation(&mut self, request_id: &str, uuid: uuid::Uuid) {
let slot = self.slots.get_mut(request_id).expect("slot does not exist");
slot.operations.push(uuid);
}
}
pub type Iteration = u64;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment