Unverified Commit 6afa679c authored by Richard Huo's avatar Richard Huo Committed by GitHub
Browse files

chore: KVBM pip wheel (#3826)


Signed-off-by: default avatarAnant Sharma <anants@nvidia.com>
Co-authored-by: default avatarAnant Sharma <anants@nvidia.com>
parent e5c109d8
......@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;
use derive_getters::Dissolve;
use llm_rs::block_manager::distributed::{
KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig,
......@@ -57,7 +58,7 @@ fn get_leader_init_timeout_secs(override_key: &str) -> u64 {
#[derive(Clone, Dissolve)]
pub struct KvbmLeader {
leader: Arc<KvbmLeaderImpl>,
drt: DistributedRuntime,
drt: Option<Arc<rs::DistributedRuntime>>,
}
impl KvbmLeader {
......@@ -69,8 +70,16 @@ impl KvbmLeader {
#[pymethods]
impl KvbmLeader {
#[new]
#[pyo3(signature = (world_size, drt))]
fn new(world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
#[pyo3(signature = (world_size, drt=None))]
fn new(world_size: usize, drt: Option<PyObject>) -> PyResult<Self> {
let drt: Option<Arc<rs::DistributedRuntime>> = Python::with_gil(|py| {
if let Some(obj) = drt {
extract_distributed_runtime_from_obj(py, obj)
} else {
Ok(None)
}
})?;
let leader_init_timeout_sec: u64 =
get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);
......@@ -86,7 +95,7 @@ impl KvbmLeader {
config.sanity_check().map_err(to_pyerr)?;
let rt = drt.inner().runtime().primary();
let rt = get_current_tokio_handle();
let leader =
rt.block_on(async move { KvbmLeaderImpl::new(config).await.map_err(to_pyerr) })?;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use super::*;
use std::sync::Arc;
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use llm_rs::block_manager::distributed::{
BlockTransferHandler as RustBlockTransferHandler, KvbmWorker as KvbmWorkerImpl,
......@@ -132,7 +131,7 @@ impl BlockTransferHandler {
#[derive(Clone)]
pub struct KvbmWorker {
inner: Arc<Mutex<KvbmWorkerImpl>>,
_drt: DistributedRuntime,
_drt: Option<Arc<rs::DistributedRuntime>>,
}
impl KvbmWorker {
......@@ -151,19 +150,21 @@ impl KvbmWorker {
tensors: Vec<Py<PyAny>>,
device_id: usize,
dtype_width_bytes: usize,
drt: Option<DistributedRuntime>,
drt: Option<PyObject>,
layout_blocking: bool,
device_layout_type: Option<PyLayoutType>,
host_layout_type: Option<PyLayoutType>,
disk_layout_type: Option<PyLayoutType>,
) -> PyResult<Self> {
let py_drt = drt.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("DistributedRuntime (drt) must be provided")
let drt: Option<Arc<rs::DistributedRuntime>> = Python::with_gil(|py| {
if let Some(obj) = drt {
extract_distributed_runtime_from_obj(py, obj)
} else {
Ok(None)
}
})?;
// rusty drt
let drt = py_drt.inner.clone();
let rt = drt.runtime().primary();
let rt = get_current_tokio_handle();
let mut vllm_tensors: Vec<Arc<dyn TorchTensor>> = Vec::with_capacity(tensors.len());
......@@ -173,7 +174,7 @@ impl KvbmWorker {
}
let config = KvbmWorkerConfig::builder()
.drt(drt)
.cancel_token(get_current_cancel_token())
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.tensors(vllm_tensors)
......@@ -208,7 +209,7 @@ impl KvbmWorker {
Ok(Self {
inner: Arc::new(Mutex::new(worker)),
_drt: py_drt,
_drt: drt,
})
}
}
......@@ -22,9 +22,9 @@ use dynamo_llm::{
tokens::{SaltHash, SequenceHash, TokenBlockSequence, Tokens},
};
// use crate::llm::block_manager::BlockManager as PyBlockManager;
use crate::llm::block_manager::BlockManager as PyBlockManager;
use crate::llm::block_manager::VllmBlockManager;
// use crate::block_manager::BlockManager as PyBlockManager;
use crate::block_manager::BlockManager as PyBlockManager;
use crate::block_manager::VllmBlockManager;
use crate::to_pyerr;
......
......@@ -6,15 +6,14 @@ pub mod slot;
use super::*;
use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry};
use dynamo_runtime::DistributedRuntime;
use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState};
use crate::DistributedRuntime as PyDistributedRuntime;
use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::{
use crate::block_manager::BlockManagerBuilder;
use crate::block_manager::{
VllmBlockManager, distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest,
vllm::connector::leader::slot::VllmConnectorSlot,
};
use crate::get_current_tokio_handle;
use dynamo_llm::block_manager::{
BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage,
......@@ -89,7 +88,6 @@ pub struct KvConnectorLeader {
impl KvConnectorLeader {
fn new(
worker_id: String,
drt: PyDistributedRuntime,
page_size: usize,
leader_py: PyKvbmLeader,
consolidator_vllm_endpoint: Option<String>,
......@@ -101,8 +99,7 @@ impl KvConnectorLeader {
);
let leader = leader_py.get_inner().clone();
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let handle: Handle = get_current_tokio_handle();
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
......@@ -161,7 +158,6 @@ impl KvConnectorLeader {
let sm = ConnectorSlotManager::new(
block_manager.get_block_manager().clone(),
leader.clone(),
drt.clone(),
kvbm_metrics_clone.clone(),
);
......@@ -567,12 +563,14 @@ impl PyKvConnectorLeader {
#[pyo3(signature = (worker_id, drt, page_size, leader, consolidator_vllm_endpoint=None, consolidator_output_endpoint=None))]
pub fn new(
worker_id: String,
drt: PyDistributedRuntime,
drt: Option<PyObject>,
page_size: usize,
leader: PyKvbmLeader,
consolidator_vllm_endpoint: Option<String>,
consolidator_output_endpoint: Option<String>,
) -> Self {
) -> PyResult<Self> {
let _ = &drt; // drt is currently un-used in leader
// Initialize logging for the vLLM connector
dynamo_runtime::logging::init();
......@@ -583,7 +581,6 @@ impl PyKvConnectorLeader {
let connector_leader: Box<dyn Leader> = if enable_kvbm_record {
Box::new(recorder::KvConnectorLeaderRecorder::new(
worker_id,
drt,
page_size,
leader,
consolidator_vllm_endpoint,
......@@ -592,14 +589,13 @@ impl PyKvConnectorLeader {
} else {
Box::new(KvConnectorLeader::new(
worker_id,
drt,
page_size,
leader,
consolidator_vllm_endpoint,
consolidator_output_endpoint,
))
};
Self { connector_leader }
Ok(Self { connector_leader })
}
fn get_num_new_matched_tokens(
......
......@@ -87,7 +87,6 @@ pub struct KvConnectorLeaderRecorder {
impl KvConnectorLeaderRecorder {
pub fn new(
worker_id: String,
drt: PyDistributedRuntime,
page_size: usize,
leader_py: PyKvbmLeader,
consolidator_vllm_endpoint: Option<String>,
......@@ -99,8 +98,7 @@ impl KvConnectorLeaderRecorder {
);
let leader = leader_py.get_inner().clone();
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let handle: Handle = get_current_tokio_handle();
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
......@@ -113,9 +111,7 @@ impl KvConnectorLeaderRecorder {
let output_path = "/tmp/records.jsonl";
tracing::info!("recording events to {}", output_path);
let recorder = drt
.runtime()
.primary()
let recorder = get_current_tokio_handle()
.block_on(async { Recorder::new(token, &output_path, None, None, None).await })
.unwrap();
......@@ -123,8 +119,7 @@ impl KvConnectorLeaderRecorder {
let recorder_tx = recorder.event_sender();
// todo(kvbm): make this a critical task
drt.runtime()
.primary()
get_current_tokio_handle()
.spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx));
let slot_manager_cell = Arc::new(OnceLock::new());
......@@ -172,7 +167,6 @@ impl KvConnectorLeaderRecorder {
let sm = ConnectorSlotManager::new(
block_manager.get_block_manager().clone(),
leader.clone(),
drt.clone(),
kvbm_metrics_clone.clone(),
);
......
......@@ -16,6 +16,8 @@ use dynamo_llm::{
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
use tokio_util::sync::CancellationToken;
use crate::get_current_cancel_token;
use super::*;
#[derive(Debug, thiserror::Error)]
......@@ -191,7 +193,6 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
pub fn new(
block_manager: VllmBlockManager,
leader: Arc<KvbmLeader>,
drt: DistributedRuntime,
kvbm_metrics: KvbmMetrics,
) -> Self {
tracing::debug!(
......@@ -202,15 +203,20 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
let (xfer_tx, xfer_rx) = mpsc::unbounded_channel();
let mut xfer_engine = LocalTransferEngine::new(block_manager.clone(), leader, xfer_rx);
let primary_token = drt.primary_token();
let runtime_primary = drt.runtime().primary();
let drt_for_task = drt;
let primary_token = get_current_cancel_token();
let primary_token_clone = primary_token.clone();
let runtime_primary = get_current_tokio_handle();
let runtime_primary_clone = runtime_primary.clone();
let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime(
|cancellation_token| async move {
xfer_engine
.execute(cancellation_token, drt_for_task, kvbm_metrics)
.execute(
cancellation_token,
runtime_primary_clone,
primary_token_clone,
kvbm_metrics,
)
.await
},
primary_token,
......@@ -1163,12 +1169,12 @@ impl LocalTransferEngine {
async fn execute(
&mut self,
cancellation_token: CancellationToken,
drt: DistributedRuntime,
task_handle: Handle,
task_token: CancellationToken,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel();
let drt_clone = drt.clone();
// Clone resources needed for tasks
let block_manager_offload = self.block_manager.clone();
......@@ -1194,9 +1200,9 @@ impl LocalTransferEngine {
}
Ok(())
},
drt.primary_token(),
task_token.clone(),
"LocalOnboardTask",
&drt.runtime().primary(),
&task_handle,
)
.unwrap();
let offload_task = CriticalTaskExecutionHandle::new_with_runtime(
......@@ -1219,9 +1225,9 @@ impl LocalTransferEngine {
}
Ok(())
},
drt_clone.primary_token(),
task_token,
"LocalOffloadTask",
&drt_clone.runtime().primary(),
&task_handle,
)
.unwrap();
......
......@@ -3,15 +3,15 @@
use super::*;
use crate::DistributedRuntime as PyDistributedRuntime;
use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::vllm::connector::leader::slot::{
use crate::block_manager::BlockManagerBuilder;
use crate::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState,
};
use crate::llm::block_manager::vllm::connector::leader::{
use crate::block_manager::vllm::connector::leader::{
kvbm_metrics_endpoint_enabled, parse_kvbm_metrics_port,
};
use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest};
use crate::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest};
use crate::get_current_tokio_handle;
use anyhow;
use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry};
use std::collections::HashSet;
......@@ -63,20 +63,14 @@ pub struct KvConnectorLeader {
}
impl KvConnectorLeader {
fn new(
worker_id: u64,
drt: PyDistributedRuntime,
page_size: usize,
leader_py: PyKvbmLeader,
) -> Self {
fn new(worker_id: u64, page_size: usize, leader_py: PyKvbmLeader) -> Self {
tracing::info!(
"KvConnectorLeader initialized with worker_id: {}",
worker_id
);
let leader = leader_py.get_inner().clone();
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let handle: Handle = get_current_tokio_handle();
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
......@@ -120,7 +114,6 @@ impl KvConnectorLeader {
let sm = ConnectorSlotManager::new(
block_manager.get_block_manager().clone(),
leader.clone(),
drt.clone(),
kvbm_metrics_clone.clone(),
);
......@@ -448,13 +441,14 @@ impl PyTrtllmKvConnectorLeader {
#[pyo3(signature = (worker_id, drt, page_size, leader))]
pub fn new(
worker_id: u64,
drt: PyDistributedRuntime,
drt: Option<PyObject>,
page_size: usize,
leader: PyKvbmLeader,
) -> Self {
) -> PyResult<Self> {
let _ = &drt; // drt is currently un-used in leader
let connector_leader: Box<dyn Leader> =
Box::new(KvConnectorLeader::new(worker_id, drt, page_size, leader));
Self { connector_leader }
Box::new(KvConnectorLeader::new(worker_id, page_size, leader));
Ok(Self { connector_leader })
}
fn get_num_new_matched_tokens(
......
......@@ -10,18 +10,18 @@ use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
use super::*;
use crate::llm::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use crate::llm::block_manager::vllm::connector::worker::event_sync_blocking;
use crate::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use crate::block_manager::vllm::connector::worker::event_sync_blocking;
use crate::{block_manager::distributed::VllmTensor, to_pyerr};
use dynamo_runtime::DistributedRuntime;
use crate::{
DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor,
to_pyerr,
extract_distributed_runtime_from_obj, get_current_cancel_token, get_current_tokio_handle,
};
use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::layout::LayoutType;
use dynamo_llm::block_manager::storage::torch::TorchTensor;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync {
......@@ -51,7 +51,7 @@ pub trait Worker: Send + Sync {
}
pub struct KvConnectorWorker {
drt: DistributedRuntime,
_drt: Option<Arc<DistributedRuntime>>,
kvbm_worker: OnceLock<KvbmWorker>,
connector: WorkerSchedulerClient,
transfer_client: TransferSchedulerClient,
......@@ -74,18 +74,18 @@ pub struct KvConnectorWorker {
}
impl KvConnectorWorker {
fn new(py_drt: PyDistributedRuntime, trtllm_rank: String) -> anyhow::Result<Self> {
let drt = py_drt.inner.clone();
let runtime = drt.runtime().primary();
fn new(drt: Option<Arc<DistributedRuntime>>, trtllm_rank: String) -> anyhow::Result<Self> {
let runtime = get_current_tokio_handle();
let (scheduler, worker_client, transfer_client) = Scheduler::new(drt.primary_token());
let (scheduler, worker_client, transfer_client) =
Scheduler::new(get_current_cancel_token());
CriticalTaskExecutionHandle::new_with_runtime(
move |_| {
let mut scheduler = scheduler;
async move { scheduler.run().await }
},
drt.primary_token(),
get_current_cancel_token(),
"kv-connector-scheduler-task",
&runtime,
)?
......@@ -97,7 +97,7 @@ impl KvConnectorWorker {
);
Ok(Self {
drt,
_drt: drt,
kvbm_worker: OnceLock::new(),
connector: worker_client,
transfer_client,
......@@ -131,7 +131,7 @@ impl Worker for KvConnectorWorker {
let kv_cache_tensors = vec![kv_cache_tensor as Arc<dyn TorchTensor>];
let config = KvbmWorkerConfig::builder()
.drt(self.drt.clone())
.cancel_token(get_current_cancel_token())
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.tensors(kv_cache_tensors)
......@@ -147,7 +147,7 @@ impl Worker for KvConnectorWorker {
self.layer_events = raw_event_handles;
let worker = self.drt.runtime().primary().block_on(async move {
let worker = get_current_tokio_handle().block_on(async move {
let worker = KvbmWorker::new(config, true).await?;
anyhow::Ok(worker)
})?;
......@@ -405,9 +405,17 @@ pub struct PyTrtllmKvConnectorWorker {
impl PyTrtllmKvConnectorWorker {
#[new]
#[pyo3(signature = (py_drt, trtllm_rank))]
pub fn new(py_drt: PyDistributedRuntime, trtllm_rank: String) -> PyResult<Self> {
pub fn new(py_drt: Option<PyObject>, trtllm_rank: String) -> PyResult<Self> {
let drt: Option<Arc<DistributedRuntime>> = Python::with_gil(|py| {
if let Some(obj) = py_drt {
extract_distributed_runtime_from_obj(py, obj)
} else {
Ok(None)
}
})?;
let connector_worker: Box<dyn Worker> =
Box::new(KvConnectorWorker::new(py_drt, trtllm_rank).map_err(to_pyerr)?);
Box::new(KvConnectorWorker::new(drt, trtllm_rank).map_err(to_pyerr)?);
Ok(Self { connector_worker })
}
......
......@@ -10,13 +10,13 @@ use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
use super::*;
use crate::llm::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use crate::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
use crate::{block_manager::distributed::VllmTensor, to_pyerr};
use crate::block_manager::distributed::PyLayoutType;
use crate::{
DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor,
to_pyerr,
extract_distributed_runtime_from_obj, get_current_cancel_token, get_current_tokio_handle,
};
use crate::llm::block_manager::distributed::PyLayoutType;
use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::layout::LayoutType;
......@@ -51,7 +51,7 @@ pub trait Worker: Send + Sync {
}
pub struct KvConnectorWorker {
drt: DistributedRuntime,
_drt: Option<Arc<DistributedRuntime>>,
kvbm_worker: OnceLock<KvbmWorker>,
connector: WorkerSchedulerClient,
transfer_client: TransferSchedulerClient,
......@@ -76,18 +76,18 @@ pub struct KvConnectorWorker {
}
impl KvConnectorWorker {
fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> anyhow::Result<Self> {
let drt = py_drt.inner.clone();
let runtime = drt.runtime().primary();
fn new(drt: Option<Arc<DistributedRuntime>>, vllm_worker_id: String) -> anyhow::Result<Self> {
let runtime = get_current_tokio_handle();
let (scheduler, worker_client, transfer_client) = Scheduler::new(drt.primary_token());
let (scheduler, worker_client, transfer_client) =
Scheduler::new(get_current_cancel_token());
CriticalTaskExecutionHandle::new_with_runtime(
move |_| {
let mut scheduler = scheduler;
async move { scheduler.run().await }
},
drt.primary_token(),
get_current_cancel_token(),
"kv-connector-scheduler-task",
&runtime,
)?
......@@ -99,7 +99,7 @@ impl KvConnectorWorker {
);
Ok(Self {
drt,
_drt: drt,
kvbm_worker: OnceLock::new(),
connector: worker_client,
transfer_client,
......@@ -194,21 +194,21 @@ impl Worker for KvConnectorWorker {
};
let config = KvbmWorkerConfig::builder()
.drt(self.drt.clone())
.cancel_token(get_current_cancel_token())
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.tensors(vllm_tensors)
.device_id(device_id)
.dtype_width_bytes(dtype_width_bytes)
.leader_pub_url(get_leader_zmq_pub_url())
.leader_ack_url(get_leader_zmq_ack_url())
.scheduler_client(Some(self.transfer_client.clone()))
.device_layout_type(detected_device_layout_type)
.host_layout_type(host_layout_type.unwrap_or(LayoutType::FullyContiguous))
.disk_layout_type(disk_layout_type.unwrap_or(LayoutType::FullyContiguous))
.leader_pub_url(get_leader_zmq_pub_url())
.leader_ack_url(get_leader_zmq_ack_url())
.build()?;
let worker = self.drt.runtime().primary().block_on(async move {
let worker = get_current_tokio_handle().block_on(async move {
let worker = KvbmWorker::new(config, false).await?;
anyhow::Ok(worker)
})?;
......@@ -447,9 +447,17 @@ pub struct PyKvConnectorWorker {
impl PyKvConnectorWorker {
#[new]
#[pyo3(signature = (py_drt, vllm_worker_id))]
pub fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> PyResult<Self> {
pub fn new(py_drt: Option<PyObject>, vllm_worker_id: String) -> PyResult<Self> {
let drt: Option<Arc<DistributedRuntime>> = Python::with_gil(|py| {
if let Some(obj) = py_drt {
extract_distributed_runtime_from_obj(py, obj)
} else {
Ok(None)
}
})?;
let connector_worker: Box<dyn Worker> =
Box::new(KvConnectorWorker::new(py_drt, vllm_worker_id).map_err(to_pyerr)?);
Box::new(KvConnectorWorker::new(drt, vllm_worker_id).map_err(to_pyerr)?);
Ok(Self { connector_worker })
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment