// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use super::*; use anyhow::Result; use dynamo_llm::block_manager::block::{ data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical, }; use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy}; use pyo3::PyResult; use tokio_util::sync::CancellationToken; mod controller; mod distributed; pub mod vllm; /// Add bingings from this crate to the provided module pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; vllm::add_to_module(m)?; Ok(()) } type VllmBlockManager = dynamo_llm::block_manager::KvBlockManager< Logical, BasicMetadata, >; type VllmController = Arc< dynamo_llm::block_manager::controller::Controller< Logical, BasicMetadata, >, >; #[pyclass] #[derive(Clone)] pub struct BlockManager { inner: VllmBlockManager, drt: DistributedRuntime, _controller: Option, } // TODO: This is in desperate need of a massive refactor. We bind and instantiate this in Python, but we never actually use it. #[pymethods] #[allow(unused_variables)] impl BlockManager { #[new] #[pyo3(signature = (worker_id, leader = None, page_size = 32, num_device_blocks = None, disable_device_pool = false))] fn new( worker_id: u64, leader: Option, page_size: usize, num_device_blocks: Option, disable_device_pool: bool, ) -> PyResult { let cancel_token = CancellationToken::new(); let mut config = dynamo_llm::block_manager::KvBlockManagerConfig::builder().runtime( dynamo_llm::block_manager::KvManagerRuntimeConfig::builder() .worker_id(worker_id) .cancellation_token(cancel_token.clone()) .build() .map_err(to_pyerr)?, ); let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder() .num_layers(1) .outer_dim(1) .page_size(page_size) .inner_dim(1); config = config.model(model_config.build().map_err(to_pyerr)?); let (leader, drt) = if let Some(leader) = leader { let (leader, rt) = leader.dissolve(); if !disable_device_pool { config = config.device_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader.num_device_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build() .map_err(to_pyerr)?, ); } if leader.num_host_blocks() > 0 { tracing::info!("Using {} host blocks", leader.num_host_blocks()); config = config.host_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader.num_host_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build() .map_err(to_pyerr)?, ); } if leader.num_disk_blocks() > 0 { tracing::info!("Using {} disk blocks", leader.num_disk_blocks()); config = config.disk_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader.num_disk_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build() .map_err(to_pyerr)?, ); } (Some(leader), rt) } else { tracing::info!("Leader not provided. Block transfer functionality will be disabled."); // let num_device_blocks = num_device_blocks // .expect("num_device_blocks must be provided if leader is not provided"); // config = config.device_layout( // dynamo_llm::block_manager::KvManagerLayoutConfig::builder() // .num_blocks(num_device_blocks) // .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) // .build() // .map_err(to_pyerr)?, // ); unimplemented!("Leader not provided"); // ( // None, // Arc::new( // tokio::runtime::Builder::new_multi_thread() // .enable_all() // .build() // .map_err(to_pyerr)?, // ), // ) }; let rt = drt.inner().runtime().primary(); let config = config.build().map_err(to_pyerr)?; Ok(BlockManager { inner: rt .block_on(async { let resources = DistributedLeaderWorkerResources::new(leader, cancel_token.child_token())?; dynamo_llm::block_manager::KvBlockManager::< Logical, BasicMetadata, >::new(config, resources) .await }) .map_err(to_pyerr)?, drt, _controller: None, }) } fn block_size(&self) -> usize { self.inner.block_size() } fn init_controller(&mut self, component: Component) -> PyResult<()> { if self._controller.is_some() { tracing::warn!("Controller already initialized. Ignoring init_controller call."); return Ok(()); } let block_manager = self.inner.clone(); let controller = self .drt .inner() .runtime() .primary() .block_on(controller::Controller::new( block_manager, component.inner.clone(), )) .map_err(to_pyerr)?; self._controller = Some(Arc::new(controller)); let instance_id = component .inner .drt() .primary_lease() .map(|lease| lease.id()) .ok_or_else(|| to_pyerr(anyhow::anyhow!("no instance id")))?; tracing::info!( "Dynamo KVBM Controller: {}.{}:{}", component.inner.namespace().name(), component.inner.name(), instance_id ); Ok(()) } } impl BlockManager { #[inline(always)] pub fn get_block_manager(&self) -> &VllmBlockManager { &self.inner } } #[derive(Default)] pub struct BlockManagerBuilder { worker_id: u64, leader: Option, page_size: usize, disable_device_pool: bool, } impl BlockManagerBuilder { pub fn new() -> Self { Self { page_size: 32, // default consistent with BlockManager::new ..Default::default() } } pub fn worker_id(mut self, id: u64) -> Self { self.worker_id = id; self } pub fn page_size(mut self, ps: usize) -> Self { self.page_size = ps; self } pub fn leader(mut self, l: distributed::KvbmLeader) -> Self { self.leader = Some(l); self } pub fn disable_device_pool(mut self, yes: bool) -> Self { self.disable_device_pool = yes; self } /// Async build (call from an async context). pub async fn build(self) -> Result { let worker_id = self.worker_id; let leader = self.leader.ok_or_else(|| { anyhow::anyhow!("leader is required (runtime is always taken from leader)") })?; // Get (inner leader handle, runtime) from the provided leader. let (leader_inner, drt) = leader.dissolve(); let cancel_token = CancellationToken::new(); // Runtime & model config let runtime_config = dynamo_llm::block_manager::KvManagerRuntimeConfig::builder() .worker_id(worker_id) .cancellation_token(cancel_token.clone()) .build()?; let mut config = dynamo_llm::block_manager::KvBlockManagerConfig::builder().runtime(runtime_config); let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder() .num_layers(1) .outer_dim(1) .page_size(self.page_size) .inner_dim(1) .build()?; config = config.model(model_config); // Layouts derived from leader’s counts if !self.disable_device_pool { config = config.device_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader_inner.num_device_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build()?, ); } if leader_inner.num_host_blocks() > 0 { config = config.host_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader_inner.num_host_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build()?, ); } if leader_inner.num_disk_blocks() > 0 { config = config.disk_layout( dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader_inner.num_disk_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .build()?, ); } let config = config.build()?; let resources = DistributedLeaderWorkerResources::new(Some(leader_inner), cancel_token.child_token())?; let inner = dynamo_llm::block_manager::KvBlockManager::< Logical, BasicMetadata, >::new(config, resources) .await?; Ok(BlockManager { inner, drt, _controller: None, }) } }