Unverified Commit fb62e2cf authored by Anant Sharma's avatar Anant Sharma Committed by GitHub
Browse files

ci: add kvbm bindings to pre merge checks (#6042)


Signed-off-by: default avatarAnant Sharma <anants@nvidia.com>
parent bf6840e6
...@@ -68,8 +68,7 @@ jobs: ...@@ -68,8 +68,7 @@ jobs:
runs-on: runs-on:
group: Fastchecker group: Fastchecker
strategy: strategy:
# removing kvbm from here - it will fail to test with nixl dep enabled matrix: { dir: ['.', 'lib/bindings/python', 'lib/runtime/examples', 'launch/dynamo-run', 'lib/bindings/kvbm'] }
matrix: { dir: ['.', 'lib/bindings/python', 'lib/runtime/examples', 'launch/dynamo-run'] }
permissions: permissions:
contents: read contents: read
steps: steps:
...@@ -125,7 +124,7 @@ jobs: ...@@ -125,7 +124,7 @@ jobs:
runs-on: runs-on:
group: Fastchecker group: Fastchecker
strategy: strategy:
matrix: { dir: ['.', 'lib/bindings/python', 'lib/runtime/examples', 'launch/dynamo-run'] } matrix: { dir: ['.', 'lib/bindings/python', 'lib/runtime/examples', 'launch/dynamo-run', 'lib/bindings/kvbm'] }
permissions: permissions:
contents: read contents: read
steps: steps:
......
...@@ -1740,6 +1740,7 @@ dependencies = [ ...@@ -1740,6 +1740,7 @@ dependencies = [
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
"dynamo-kv-router", "dynamo-kv-router",
"dynamo-runtime",
"dynamo-tokens", "dynamo-tokens",
"ndarray", "ndarray",
"ndarray-interp", "ndarray-interp",
...@@ -1751,6 +1752,7 @@ dependencies = [ ...@@ -1751,6 +1752,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tracing", "tracing",
"uuid", "uuid",
"validator",
] ]
[[package]] [[package]]
......
...@@ -6,9 +6,7 @@ use anyhow::Result; ...@@ -6,9 +6,7 @@ use anyhow::Result;
use dynamo_llm::block_manager::block::{ use dynamo_llm::block_manager::block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
}; };
use dynamo_llm::block_manager::kv_consolidator::{ use dynamo_llm::block_manager::kv_consolidator::EventSource;
EventSource, KvEventConsolidatorConfig,
};
use dynamo_llm::block_manager::offload::filter::FrequencyFilter; use dynamo_llm::block_manager::offload::filter::FrequencyFilter;
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy}; use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
...@@ -368,7 +366,8 @@ impl BlockManagerBuilder { ...@@ -368,7 +366,8 @@ impl BlockManagerBuilder {
} }
if let Some((engine_ep, output_ep, engine_source)) = self.consolidator_config { if let Some((engine_ep, output_ep, engine_source)) = self.consolidator_config {
config_builder = config_builder.consolidator_config(engine_ep, output_ep, engine_source); config_builder =
config_builder.consolidator_config(engine_ep, output_ep, engine_source);
} }
let config = config_builder.build()?; let config = config_builder.build()?;
......
...@@ -17,17 +17,17 @@ const DEFAULT_LOG_INTERVAL_SECS: u64 = 5; ...@@ -17,17 +17,17 @@ const DEFAULT_LOG_INTERVAL_SECS: u64 = 5;
/// Cache statistics entry for a single request /// Cache statistics entry for a single request
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
struct CacheStatsEntry { struct CacheStatsEntry {
host_blocks: u64, // Blocks found in host cache host_blocks: u64, // Blocks found in host cache
disk_blocks: u64, // Blocks found in disk cache disk_blocks: u64, // Blocks found in disk cache
total_blocks: u64, // Total blocks queried from host/disk total_blocks: u64, // Total blocks queried from host/disk
} }
/// Aggregated cache statistics for the current sliding window /// Aggregated cache statistics for the current sliding window
#[derive(Default)] #[derive(Default)]
struct AggregatedStats { struct AggregatedStats {
total_blocks_queried: u64, // Total blocks queried from host/disk (same for both tiers) total_blocks_queried: u64, // Total blocks queried from host/disk (same for both tiers)
host_blocks_hit: u64, // Blocks found in host cache host_blocks_hit: u64, // Blocks found in host cache
disk_blocks_hit: u64, // Blocks found in disk cache disk_blocks_hit: u64, // Blocks found in disk cache
} }
/// Cache statistics tracker with sliding window /// Cache statistics tracker with sliding window
......
...@@ -143,6 +143,7 @@ impl KvbmWorker { ...@@ -143,6 +143,7 @@ impl KvbmWorker {
#[pymethods] #[pymethods]
impl KvbmWorker { impl KvbmWorker {
#[new] #[new]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (num_device_blocks, page_size, tensors, device_id=0, dtype_width_bytes=2, drt=None, layout_blocking=false, device_layout_type=None, host_layout_type=None, disk_layout_type=None))] #[pyo3(signature = (num_device_blocks, page_size, tensors, device_id=0, dtype_width_bytes=2, drt=None, layout_blocking=false, device_layout_type=None, host_layout_type=None, disk_layout_type=None))]
fn new( fn new(
num_device_blocks: usize, num_device_blocks: usize,
......
...@@ -21,7 +21,7 @@ use dynamo_llm::block_manager::{ ...@@ -21,7 +21,7 @@ use dynamo_llm::block_manager::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources,
locality::Logical, locality::Logical,
}, },
connector::{*, protocol::RequestType}, connector::{protocol::RequestType, *},
kv_consolidator::EventSource, kv_consolidator::EventSource,
}; };
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens}; use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
...@@ -221,7 +221,7 @@ impl Leader for KvConnectorLeader { ...@@ -221,7 +221,7 @@ impl Leader for KvConnectorLeader {
); );
// the number of device matched tokens should be less than or equal to the number of tokens in the request // the number of device matched tokens should be less than or equal to the number of tokens in the request
debug_assert!(num_computed_tokens % self.block_size == 0); debug_assert!(num_computed_tokens.is_multiple_of(self.block_size));
let shared_slot = self.slot_manager().get_slot(&request_id)?; let shared_slot = self.slot_manager().get_slot(&request_id)?;
let mut slot = shared_slot let mut slot = shared_slot
...@@ -262,7 +262,9 @@ impl Leader for KvConnectorLeader { ...@@ -262,7 +262,9 @@ impl Leader for KvConnectorLeader {
// return the number of external tokens that are ready for onboarding // return the number of external tokens that are ready for onboarding
// we always return true here as we always asynchronously onboard matched blocks // we always return true here as we always asynchronously onboard matched blocks
if let SlotState::OnboardStaged(num_external_tokens) = slot.state() { if let SlotState::OnboardStaged(num_external_tokens) = slot.state() {
debug_assert!((num_computed_tokens + num_external_tokens) % self.block_size == 0); debug_assert!(
(num_computed_tokens + num_external_tokens).is_multiple_of(self.block_size)
);
tracing::debug!( tracing::debug!(
request_id = request_id, request_id = request_id,
"scheduling onboarding for {} external tokens", "scheduling onboarding for {} external tokens",
...@@ -427,7 +429,13 @@ impl Leader for KvConnectorLeader { ...@@ -427,7 +429,13 @@ impl Leader for KvConnectorLeader {
.get(request_id) .get(request_id)
.unwrap_or(&0); .unwrap_or(&0);
slot.apply_scheduler_output(&[], &[], new_req.num_computed_tokens, scheduled_tokens, None)?; slot.apply_scheduler_output(
&[],
&[],
new_req.num_computed_tokens,
scheduled_tokens,
None,
)?;
let pending_ops_opt = slot.take_pending_operations(); let pending_ops_opt = slot.take_pending_operations();
......
...@@ -152,12 +152,11 @@ impl KvConnectorLeaderRecorder { ...@@ -152,12 +152,11 @@ impl KvConnectorLeaderRecorder {
if let (Some(vllm_ep), Some(output_ep)) = if let (Some(vllm_ep), Some(output_ep)) =
(consolidator_vllm_ep, consolidator_output_ep) (consolidator_vllm_ep, consolidator_output_ep)
{ {
block_manager_builder = block_manager_builder = block_manager_builder.consolidator_config(
block_manager_builder.consolidator_config( vllm_ep,
vllm_ep, Some(output_ep),
Some(output_ep), EventSource::Vllm,
EventSource::Vllm, );
);
} }
let block_manager = match block_manager_builder.build().await { let block_manager = match block_manager_builder.build().await {
......
...@@ -174,6 +174,7 @@ pub struct ConnectorSlotManager<R: RequestKey> { ...@@ -174,6 +174,7 @@ pub struct ConnectorSlotManager<R: RequestKey> {
/// Cache statistics tracker /// Cache statistics tracker
cache_stats: Arc<CacheStatsTracker>, cache_stats: Arc<CacheStatsTracker>,
/// KVBM metrics for exposing cache hit rates /// KVBM metrics for exposing cache hit rates
#[allow(dead_code)]
kvbm_metrics: KvbmMetrics, kvbm_metrics: KvbmMetrics,
/// Minimum priority threshold for host offload filtering (read once at init) /// Minimum priority threshold for host offload filtering (read once at init)
offload_min_priority: u32, offload_min_priority: u32,
...@@ -779,8 +780,8 @@ impl Slot for VllmConnectorSlot { ...@@ -779,8 +780,8 @@ impl Slot for VllmConnectorSlot {
let block_size = self.block_size; let block_size = self.block_size;
// Convert cached tokens to blocks (rounding up) // Convert cached tokens to blocks (rounding up)
let host_blocks = (self.tokens_cached_from_host + block_size - 1) / block_size; let host_blocks = self.tokens_cached_from_host.div_ceil(block_size);
let disk_blocks = (self.tokens_cached_from_disk + block_size - 1) / block_size; let disk_blocks = self.tokens_cached_from_disk.div_ceil(block_size);
tracing::debug!( tracing::debug!(
request_id = %self.request_id, request_id = %self.request_id,
...@@ -864,7 +865,7 @@ impl Slot for VllmConnectorSlot { ...@@ -864,7 +865,7 @@ impl Slot for VllmConnectorSlot {
let block_size = self.block_manager.block_size(); let block_size = self.block_manager.block_size();
let num_computed_blocks = num_computed_tokens / block_size; let num_computed_blocks = num_computed_tokens / block_size;
debug_assert!(num_computed_tokens % block_size == 0); debug_assert!(num_computed_tokens.is_multiple_of(block_size));
let sequence_hashes = self let sequence_hashes = self
.sequence() .sequence()
......
...@@ -4,8 +4,6 @@ ...@@ -4,8 +4,6 @@
use super::*; use super::*;
use crate::block_manager::BlockManagerBuilder; use crate::block_manager::BlockManagerBuilder;
use dynamo_llm::block_manager::connector::protocol::RequestType;
use dynamo_llm::block_manager::kv_consolidator::EventSource;
use crate::block_manager::vllm::connector::leader::slot::{ use crate::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState, ConnectorSlotManager, SlotManager, SlotState,
}; };
...@@ -15,6 +13,8 @@ use crate::block_manager::vllm::connector::leader::{ ...@@ -15,6 +13,8 @@ use crate::block_manager::vllm::connector::leader::{
use crate::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; use crate::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest};
use crate::get_current_tokio_handle; use crate::get_current_tokio_handle;
use anyhow; use anyhow;
use dynamo_llm::block_manager::connector::protocol::RequestType;
use dynamo_llm::block_manager::kv_consolidator::EventSource;
use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry}; use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
...@@ -190,7 +190,7 @@ impl Leader for KvConnectorLeader { ...@@ -190,7 +190,7 @@ impl Leader for KvConnectorLeader {
// TRTLLM could match partial blocks if enable_partial_reuse = True, // TRTLLM could match partial blocks if enable_partial_reuse = True,
// immediately return 0 to simplify things. // immediately return 0 to simplify things.
if num_computed_tokens % self.block_size != 0 { if !num_computed_tokens.is_multiple_of(self.block_size) {
return Ok((0, false)); return Ok((0, false));
} }
...@@ -215,7 +215,9 @@ impl Leader for KvConnectorLeader { ...@@ -215,7 +215,9 @@ impl Leader for KvConnectorLeader {
// return the number of external tokens that are ready for onboarding // return the number of external tokens that are ready for onboarding
// we always return true here as we always asynchronously onboard matched blocks // we always return true here as we always asynchronously onboard matched blocks
if let SlotState::OnboardStaged(num_external_tokens) = slot.state() { if let SlotState::OnboardStaged(num_external_tokens) = slot.state() {
debug_assert!((num_computed_tokens + num_external_tokens) % self.block_size == 0); debug_assert!(
(num_computed_tokens + num_external_tokens).is_multiple_of(self.block_size)
);
tracing::debug!( tracing::debug!(
request_id = request_id, request_id = request_id,
"scheduling onboarding for {} external tokens", "scheduling onboarding for {} external tokens",
......
...@@ -25,6 +25,7 @@ use dynamo_runtime::DistributedRuntime; ...@@ -25,6 +25,7 @@ use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync { pub trait Worker: Send + Sync {
#[allow(clippy::too_many_arguments)]
fn register_kv_caches( fn register_kv_caches(
&mut self, &mut self,
num_device_blocks: usize, num_device_blocks: usize,
...@@ -483,6 +484,7 @@ impl PyKvConnectorWorker { ...@@ -483,6 +484,7 @@ impl PyKvConnectorWorker {
Ok(Self { connector_worker }) Ok(Self { connector_worker })
} }
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (num_device_blocks, page_size, device_id, dtype_width_bytes, kv_caches, raw_event_handles, device_layout_type=None, host_layout_type=None, disk_layout_type=None))] #[pyo3(signature = (num_device_blocks, page_size, device_id, dtype_width_bytes, kv_caches, raw_event_handles, device_layout_type=None, host_layout_type=None, disk_layout_type=None))]
pub fn register_kv_caches( pub fn register_kv_caches(
&mut self, &mut self,
......
...@@ -719,7 +719,7 @@ mod tests { ...@@ -719,7 +719,7 @@ mod tests {
// Prefill count should remain unchanged // Prefill count should remain unchanged
assert_eq!(slot.num_tokens(SlotPosition::Prefill), 4); assert_eq!(slot.num_tokens(SlotPosition::Prefill), 4);
if expected_total % BLOCK_SIZE == 0 { if expected_total.is_multiple_of(BLOCK_SIZE) {
assert_eq!(slot.mutable.len(), 0); assert_eq!(slot.mutable.len(), 0);
assert_eq!(slot.immutable.len(), expected_total / BLOCK_SIZE); assert_eq!(slot.immutable.len(), expected_total / BLOCK_SIZE);
} else { } else {
......
...@@ -9,7 +9,9 @@ use std::{fmt::Display, sync::Arc}; ...@@ -9,7 +9,9 @@ use std::{fmt::Display, sync::Arc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use dynamo_runtime::{self as rs, RuntimeConfig, logging, traits::DistributedRuntimeProvider, config}; use dynamo_runtime::{
self as rs, RuntimeConfig, config, logging, traits::DistributedRuntimeProvider,
};
use dynamo_llm::{self as llm_rs}; use dynamo_llm::{self as llm_rs};
...@@ -23,8 +25,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -23,8 +25,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Initialize tokio runtime first to avoid panics when OTEL_EXPORT_ENABLED=1 // Initialize tokio runtime first to avoid panics when OTEL_EXPORT_ENABLED=1
init_pyo3_tokio_rt(); init_pyo3_tokio_rt();
if config::env_is_truthy("OTEL_EXPORT_ENABLED") if config::env_is_truthy("OTEL_EXPORT_ENABLED") {
{
// OTLP batch exporter needs runtime context to spawn background tasks // OTLP batch exporter needs runtime context to spawn background tasks
let handle = get_current_tokio_handle(); let handle = get_current_tokio_handle();
let _guard = handle.enter(); let _guard = handle.enter();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment