Unverified Commit defe5de7 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: release gil for publisher (#3747)


Signed-off-by: default avatarmichaelfeil <me@michaelfeil.eu>
parent 7cfac6b8
...@@ -263,7 +263,7 @@ impl KvEventPublisher { ...@@ -263,7 +263,7 @@ impl KvEventPublisher {
#[pyo3(signature = (event_id, token_ids, num_block_tokens, block_hashes, lora_id, parent_hash=None))] #[pyo3(signature = (event_id, token_ids, num_block_tokens, block_hashes, lora_id, parent_hash=None))]
fn publish_stored( fn publish_stored(
&mut self, &mut self,
_py: Python, py: Python,
event_id: u64, event_id: u64,
token_ids: Vec<u32>, token_ids: Vec<u32>,
num_block_tokens: Vec<u64>, num_block_tokens: Vec<u64>,
...@@ -271,38 +271,50 @@ impl KvEventPublisher { ...@@ -271,38 +271,50 @@ impl KvEventPublisher {
lora_id: u64, lora_id: u64,
parent_hash: Option<i64>, parent_hash: Option<i64>,
) -> PyResult<()> { ) -> PyResult<()> {
let block_hashes_u64: Vec<u64> = block_hashes.iter().map(|&h| h as u64).collect(); let kv_block_size = self.kv_block_size as u32;
let event = KvCacheEvent { let dp_rank = self.dp_rank;
event_id, let warning_count = self.warning_count.clone();
data: KvCacheEventData::Stored(KvCacheStoreData { let inner = self.inner.clone();
parent_hash: parent_hash.map(ExternalSequenceBlockHash::from),
blocks: create_stored_blocks( py.allow_threads(|| {
self.kv_block_size as u32, let block_hashes_u64: Vec<u64> = block_hashes.iter().map(|&h| h as u64).collect();
&token_ids, let event = KvCacheEvent {
&num_block_tokens, event_id,
&block_hashes_u64, data: KvCacheEventData::Stored(KvCacheStoreData {
lora_id, parent_hash: parent_hash.map(ExternalSequenceBlockHash::from),
&self.warning_count, blocks: create_stored_blocks(
), kv_block_size,
}), &token_ids,
dp_rank: self.dp_rank, &num_block_tokens,
}; &block_hashes_u64,
lora_id,
&warning_count,
),
}),
dp_rank,
};
self.inner.publish(event).map_err(to_pyerr) inner.publish(event).map_err(to_pyerr)
})
} }
fn publish_removed(&self, _py: Python, event_id: u64, block_hashes: Vec<i64>) -> PyResult<()> { fn publish_removed(&self, py: Python, event_id: u64, block_hashes: Vec<i64>) -> PyResult<()> {
let block_hashes: Vec<ExternalSequenceBlockHash> = block_hashes let dp_rank = self.dp_rank;
.into_iter() let inner = self.inner.clone();
.map(ExternalSequenceBlockHash::from)
.collect(); py.allow_threads(|| {
let event = KvCacheEvent { let block_hashes: Vec<ExternalSequenceBlockHash> = block_hashes
event_id, .into_iter()
data: KvCacheEventData::Removed(KvCacheRemoveData { block_hashes }), .map(ExternalSequenceBlockHash::from)
dp_rank: self.dp_rank, .collect();
}; let event = KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData { block_hashes }),
dp_rank,
};
self.inner.publish(event).map_err(to_pyerr) inner.publish(event).map_err(to_pyerr)
})
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment