"lib/vscode:/vscode.git/clone" did not exist on "a6e7348400a331d08e7b843d0eb886d91134a272"
Unverified Commit 01819b87 authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

feat: KVBM V2 optimized bounce buffer transfer + benchmark (#3947)


Signed-off-by: default avatarjthomson04 <jwillthomson19@gmail.com>
parent f57fd72f
......@@ -1602,6 +1602,19 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "console"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b430743a6eb14e9764d4260d4c0d8123087d504eeb9c48f2b2a5e810dd369df4"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"unicode-width 0.2.2",
"windows-sys 0.61.2",
]
[[package]]
name = "console-api"
version = "0.8.1"
......@@ -2370,7 +2383,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658bce805d770f407bc62102fca7c2c64ceef2fbcb2b8bd19d2765ce093980de"
dependencies = [
"console",
"console 0.15.11",
"shell-words",
"tempfile",
"thiserror 1.0.69",
......@@ -2650,6 +2663,7 @@ dependencies = [
"bytes",
"candle-core 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono",
"clap 4.5.52",
"criterion 0.3.6",
"cudarc",
"dashmap 5.5.3",
......@@ -2671,6 +2685,7 @@ dependencies = [
"hyper 1.8.1",
"hyper-util",
"image",
"indicatif 0.18.3",
"insta",
"itertools 0.14.0",
"json-five",
......@@ -4050,8 +4065,8 @@ checksum = "629d8f3bbeda9d148036d6b0de0a3ab947abd08ce90626327fc3547a49d59d97"
dependencies = [
"dirs",
"futures",
"indicatif 0.17.11",
"http 1.4.0",
"indicatif",
"libc",
"log",
"num_cpus",
......@@ -4672,7 +4687,7 @@ version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"console 0.15.11",
"number_prefix",
"portable-atomic",
"rayon",
......@@ -4680,6 +4695,19 @@ dependencies = [
"web-time",
]
[[package]]
name = "indicatif"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9375e112e4b463ec1b1c6c011953545c65a30164fbab5b581df32b3abf0dcb88"
dependencies = [
"console 0.16.1",
"portable-atomic",
"unicode-width 0.2.2",
"unit-prefix",
"web-time",
]
[[package]]
name = "inlinable_string"
version = "0.1.15"
......@@ -4734,7 +4762,7 @@ version = "1.44.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5c943d4415edd8153251b6f197de5eb1640e56d84e8d9159bea190421c73698"
dependencies = [
"console",
"console 0.15.11",
"globset",
"once_cell",
"pest",
......@@ -6135,7 +6163,7 @@ dependencies = [
"http 1.4.0",
"image",
"indexmap 2.12.1",
"indicatif",
"indicatif 0.17.11",
"interprocess",
"itertools 0.14.0",
"libc",
......@@ -11546,6 +11574,12 @@ dependencies = [
"rand 0.8.5",
]
[[package]]
name = "unit-prefix"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81e544489bf3d8ef66c953931f56617f423cd4b5494be343d9b9d3dda037b9a3"
[[package]]
name = "universal-hash"
version = "0.5.1"
......
......@@ -22,6 +22,7 @@ testing-cuda = ["dep:cudarc"]
testing-nixl = ["dep:nixl-sys"]
testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"]
block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"]
cuda = ["dep:cudarc"]
integration = ["dynamo-runtime/integration"]
media-nixl = ["dep:nixl-sys", "dep:dynamo-memory"]
......@@ -105,6 +106,10 @@ nixl-sys = { version = "=0.7.1", optional = true }
cudarc = { workspace = true, optional = true }
nix = { version = "0.26", optional = true }
# block_manager_bench
clap = { version = "4.5.49", features = ["derive"], optional = true }
indicatif = { version = "0.18.0", optional = true }
# protocols
unicode-segmentation = "1.12"
......@@ -188,3 +193,8 @@ mockito = "1.7.0"
[build-dependencies]
tonic-build = { version = "0.13.1" }
[[bin]]
name = "bench_local_transfer_v2"
path = "bin/bench_local_transfer_v2.rs"
required-features = ["block-manager-bench"]
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Result;
use clap::Parser;
use core::time::Duration;
use indicatif::ProgressIterator;
use std::time::Instant;
use dynamo_llm::block_manager::v2::physical::{
layout::LayoutConfig,
transfer::{
BounceBufferSpec, NixlAgent, PhysicalLayout, StorageKind, TransferOptions,
TransportManager, executor::execute_transfer,
},
};
use std::sync::Arc;
#[derive(Parser)]
struct Args {
/// Amount of layers
#[clap(long, default_value_t = 24)]
num_layers: usize,
/// Inner dimension
#[clap(long, default_value_t = 4096)]
inner_dim: usize,
/// Block size
#[clap(long, default_value_t = 32)]
block_size: usize,
/// Amount of blocks per pool
#[clap(long, default_value_t = 16)]
num_blocks: usize,
/// Amount of blocks per transferred batch
#[clap(long, default_value_t = 4)]
blocks_per_batch: usize,
/// Amount of pinned bounce buffer blocks
#[clap(long, default_value_t = 2)]
num_bounce_blocks: usize,
/// Amount of iterations
#[clap(long, default_value_t = 100)]
iterations: usize,
}
struct DummyBounceBufferSpec {
pub layout: PhysicalLayout,
pub block_ids: Vec<usize>,
}
impl BounceBufferSpec for DummyBounceBufferSpec {
fn layout(&self) -> &PhysicalLayout {
&self.layout
}
fn block_ids(&self) -> &[usize] {
&self.block_ids
}
}
#[tokio::main]
pub async fn main() -> Result<()> {
let args = Args::parse();
// let manager = build_manager(&args).await?;
benchmark(&args).await?;
Ok(())
}
fn build_layout(
agent: NixlAgent,
config: LayoutConfig,
storage_kind: StorageKind,
) -> PhysicalLayout {
let builder = PhysicalLayout::builder(agent)
.with_config(config)
.fully_contiguous();
match storage_kind {
StorageKind::System => builder.allocate_system().build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(false).build().unwrap(),
StorageKind::Device(device_id) => builder.allocate_device(device_id).build().unwrap(),
StorageKind::Disk(_) => builder.allocate_disk(None).build().unwrap(),
}
}
fn get_bandwidth_gbs(latencies: Vec<Duration>, args: &Args) -> f64 {
let total_bytes =
args.num_layers * args.inner_dim * args.block_size * args.blocks_per_batch * 2;
let mean = latencies.iter().sum::<Duration>() / latencies.len() as u32;
total_bytes as f64 / mean.as_nanos() as f64
}
async fn benchmark(args: &Args) -> Result<()> {
let agent = NixlAgent::require_backends("test_agent", &["POSIX", "GDS_MT"])?;
let src_dst_config = LayoutConfig::builder()
.num_blocks(args.num_blocks)
.num_layers(args.num_layers)
.outer_dim(2)
.page_size(args.block_size)
.inner_dim(args.inner_dim)
.dtype_width_bytes(2)
.build()?;
let disk_layout = build_layout(agent.clone(), src_dst_config.clone(), StorageKind::Disk(0));
let device_layout = build_layout(
agent.clone(),
src_dst_config.clone(),
StorageKind::Device(0),
);
let bounce_config = LayoutConfig::builder()
.num_blocks(args.num_bounce_blocks)
.num_layers(args.num_layers)
.outer_dim(2)
.page_size(args.block_size)
.inner_dim(args.inner_dim)
.dtype_width_bytes(2)
.build()?;
let bounce_layout = build_layout(agent.clone(), bounce_config.clone(), StorageKind::Pinned);
let ctx = TransportManager::builder()
.worker_id(0)
.nixl_agent(agent)
.cuda_device_id(0)
.build()?;
let bounce_buffer_spec: Arc<dyn BounceBufferSpec> = Arc::new(DummyBounceBufferSpec {
layout: bounce_layout,
block_ids: (0..args.num_bounce_blocks).collect(),
});
let options = TransferOptions::builder()
.bounce_buffer(bounce_buffer_spec)
.build()?;
anyhow::ensure!(
args.blocks_per_batch <= args.num_blocks,
"blocks_per_batch must be less than or equal to num_blocks"
);
let blocks = (0..args.blocks_per_batch).collect::<Vec<_>>();
for (src, dst, name) in vec![
(disk_layout.clone(), device_layout.clone(), "disk_to_device"),
(device_layout, disk_layout, "device_to_disk"),
] {
println!("Starting {} benchmark...", name);
let mut latencies = Vec::new();
for _ in (0..args.iterations).progress() {
let options_clone = options.clone();
let start = Instant::now();
execute_transfer(
&src,
&dst,
blocks.as_slice(),
blocks.as_slice(),
options_clone,
ctx.context(),
)?
.await?;
let end = Instant::now();
let duration = end.duration_since(start);
latencies.push(duration);
}
println!(
"{} bandwidth: {:?} GB/s",
name,
get_bandwidth_gbs(latencies, args)
);
}
Ok(())
}
......@@ -17,6 +17,7 @@ use anyhow::Result;
use std::ops::Range;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex;
// Re-export the NIXL transfer builder for public use
pub use nixl::NixlTransferBuilder;
......@@ -181,6 +182,64 @@ struct TwoHopTransferParams<'a> {
ctx: &'a TransferContext,
}
#[allow(clippy::too_many_arguments)]
async fn handle_buffered_transfer(
src: &PhysicalLayout,
bounce_layout: &PhysicalLayout,
dst: &PhysicalLayout,
src_block_ids: &[usize],
bounce_block_ids: &[usize],
dst_block_ids: &[usize],
first_strategy: TransferStrategy,
second_strategy: TransferStrategy,
layer_range: &Option<Range<usize>>,
ctx: &TransferContext,
) -> Result<()> {
let bounce_groups =
&bounce_block_ids[0..std::cmp::min(src_block_ids.len(), bounce_block_ids.len())];
let (bounce_group_0, bounce_group_1) = bounce_groups.split_at(bounce_groups.len() / 2);
let bounce_group_0 = bounce_group_0.to_vec();
let bounce_group_1 = bounce_group_1.to_vec();
let src_dst_iter = Arc::new(Mutex::new(src_block_ids.iter().zip(dst_block_ids.iter())));
let transfer_task = async move |bounce_group: &[usize]| -> Result<()> {
loop {
let (src_ids, dst_ids): (Vec<usize>, Vec<usize>);
{
let mut x = src_dst_iter.lock().await;
(src_ids, dst_ids) = x.by_ref().take(bounce_group.len()).unzip();
if src_ids.is_empty() {
break;
}
}
execute_two_hop_transfer_chunk(
src,
bounce_layout,
dst,
&src_ids,
&bounce_group[0..src_ids.len()],
&dst_ids,
first_strategy,
second_strategy,
layer_range,
ctx,
)
.await?;
}
Ok(())
};
let transfer_0 = transfer_task(&bounce_group_0);
let transfer_1 = transfer_task(&bounce_group_1);
futures::future::try_join(transfer_0, transfer_1).await?;
Ok(())
}
fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result<TransferCompleteNotification> {
let TwoHopTransferParams {
src,
......@@ -223,22 +282,26 @@ fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result<TransferComp
return;
}
if bounce_buffer_spec.block_ids().is_empty() {
tx.send(Err(anyhow::anyhow!(
"Bounce buffer must have at least one block."
)))
.unwrap();
return;
}
let num_bounce_blocks = bounce_buffer_spec.block_ids().len();
if num_bounce_blocks < src_block_ids.len() {
for (src_block_ids, dst_block_ids) in src_block_ids
.chunks(num_bounce_blocks)
.zip(dst_block_ids.chunks(num_bounce_blocks))
{
let bounce_block_ids_to_use =
&bounce_buffer_spec.block_ids()[..src_block_ids.len()];
if num_bounce_blocks == 1 {
let bounce_block = bounce_buffer_spec.block_ids()[0];
for (src_block_id, dst_block_id) in src_block_ids.iter().zip(dst_block_ids.iter()) {
if let Err(e) = execute_two_hop_transfer_chunk(
&src_clone,
bounce_buffer_spec.layout(),
&dst_clone,
src_block_ids,
bounce_block_ids_to_use,
dst_block_ids,
&[*src_block_id],
&[bounce_block],
&[*dst_block_id],
first_strategy,
second_strategy,
&options_clone.layer_range,
......@@ -246,28 +309,30 @@ fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result<TransferComp
)
.await
{
tx.send(Err(e)).unwrap();
let _ = tx.send(Err(e));
return;
}
}
tx.send(Ok(())).unwrap();
} else {
let bounce_block_ids_to_use = &bounce_buffer_spec.block_ids()[..src_block_ids.len()];
let result = execute_two_hop_transfer_chunk(
if let Err(e) = handle_buffered_transfer(
&src_clone,
bounce_buffer_spec.layout(),
&dst_clone,
src_block_ids.as_slice(),
bounce_block_ids_to_use,
dst_block_ids.as_slice(),
&src_block_ids,
bounce_buffer_spec.block_ids(),
&dst_block_ids,
first_strategy,
second_strategy,
&options_clone.layer_range,
&ctx_clone,
)
.await;
tx.send(result).unwrap();
.await
{
let _ = tx.send(Err(e));
return;
}
let _ = tx.send(Ok(()));
}
});
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment