Unverified Commit 9ab148dc authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: kvbm-physical (#6490)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent 7546c193
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Physical layout types that combine abstract layouts with storage location metadata.
use crate::BlockId;
use super::{
FullyContiguousLayout, InnerShape, LayerSeparateLayout, Layout, MemoryRegion,
builder::{PhysicalLayoutBuilder, PhysicalLayoutBuilderDefault},
serialize::{LayoutDescriptor, LayoutTypeDetails},
};
use anyhow::{Result, anyhow};
use dynamo_memory::{
Buffer, MemoryDescriptor, StorageKind,
nixl::{MemType, NixlAgent, NixlDescriptor},
};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::sync::Arc;
/// Runtime representation of a layout with its physical storage location.
///
/// A `PhysicalLayout` wraps an abstract [`Layout`] with information about where
/// its memory physically resides (GPU, host, disk) and whether it's local or remote.
/// This enables the transfer system to select appropriate copy strategies and build
/// NIXL transfer descriptors.
#[derive(Debug, Clone)]
pub struct PhysicalLayout {
/// The abstract layout defining memory organization
layout: Arc<dyn Layout>,
/// Physical storage location (System, Device, Pinned, Disk)
location: StorageKind,
/// NIXL registration metadata
nixl_metadata: NixlMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NixlMetadata {
agent_name: String,
mem_type: MemType,
device_id: u64,
}
impl NixlMetadata {
pub fn new(agent_name: String, mem_type: MemType, device_id: u64) -> Self {
Self {
agent_name,
mem_type,
device_id,
}
}
pub fn agent_name(&self) -> &str {
&self.agent_name
}
#[inline(always)]
pub fn mem_type(&self) -> MemType {
self.mem_type
}
#[inline(always)]
pub fn device_id(&self) -> u64 {
self.device_id
}
}
impl PhysicalLayout {
/// Create a typed builder that enforces NIXL registration.
pub fn builder(agent: NixlAgent) -> PhysicalLayoutBuilderDefault {
PhysicalLayoutBuilder::new(agent)
}
/// Create a new local physical layout.
///
/// # Arguments
/// * `layout` - The abstract layout to wrap
/// * `location` - Where the layout's memory resides
pub(crate) fn new_local(
layout: Arc<dyn Layout>,
location: StorageKind,
nixl_metadata: NixlMetadata,
) -> Self {
Self {
layout,
location,
nixl_metadata,
}
}
// /// Create a new remote physical layout from a descriptor.
// ///
// /// # Arguments
// /// * `layout` - The abstract layout to wrap
// /// * `location` - Where the layout's memory resides (on remote node)
// /// * `remote_agent` - Name of the NIXL agent on the remote node
// pub fn new_remote(
// layout: Arc<dyn Layout>,
// location: StorageKind,
// remote_agent: String,
// ) -> Self {
// let metadata = NixlMetadata::new(
// remote_agent.clone(),
// location.to_nixl_mem_type(),
// location.device_id(),
// );
// let registrations = vec![RegisteredStorageMetadata::new(
// metadata.agent_name().to_string(),
// location,
// )];
// Self {
// layout,
// location,
// locality: Locality::Remote(remote_agent),
// nixl_metadata: Some(metadata),
// registered: registrations,
// }
// }
/// Get the underlying layout.
pub fn layout(&self) -> &Arc<dyn Layout> {
&self.layout
}
/// Get the storage location.
pub(crate) fn location(&self) -> StorageKind {
self.location
}
/// Get the NIXL metadata.
pub(crate) fn nixl_metadata(&self) -> &NixlMetadata {
&self.nixl_metadata
}
/// Get a memory region with location information.
///
/// # Arguments
/// * `block_id` - Block identifier
/// * `layer_id` - Layer identifier
/// * `outer_id` - Outer dimension identifier
pub fn memory_region(
&self,
block_id: BlockId,
layer_id: usize,
outer_id: usize,
) -> Result<MemoryRegion> {
self.layout.memory_region(block_id, layer_id, outer_id)
}
/// Serialize this physical layout for transmission to remote nodes.
///
/// This converts the runtime `PhysicalLayout` into a `LayoutDescriptor` that
/// contains all information needed to reconstruct the layout on a remote node,
/// including layout configuration, memory descriptors, NIXL metadata, and
/// layout-type-specific details.
///
/// # Returns
/// A serializable representation of this layout
pub(crate) fn to_descriptor(&self) -> Result<LayoutDescriptor> {
// Extract memory descriptors
let memory_descriptors = self
.layout
.memory_regions()
.iter()
.map(|region| MemoryRegion {
addr: region.addr(),
size: region.size(),
})
.collect();
// Get layout type details from the layout itself
let layout_type_details = self.layout.serialization_details();
Ok(LayoutDescriptor {
version: LayoutDescriptor::CURRENT_VERSION,
layout_config: self.layout.config().clone(),
location: self.location,
nixl_metadata: self.nixl_metadata.clone(),
memory_descriptors,
layout_type_details,
})
}
/// Reconstruct a physical layout from serialized data received from a remote node.
///
/// This creates a new `PhysicalLayout` from a `LayoutDescriptor`. The reconstructed
/// layout will have memory descriptors that point to the remote node's memory,
/// allowing NIXL to build RDMA descriptors for remote access.
///
/// # Arguments
/// * `serialized` - Serialized layout data from a remote node
///
/// # Returns
/// A new `PhysicalLayout` representing the remote layout
///
/// # Note
/// The memory regions in the reconstructed layout are not valid for local access;
/// they represent remote memory addresses and are used to build NIXL transfer descriptors.
pub(crate) fn from_descriptor(serialized: LayoutDescriptor) -> Result<Self> {
// Validate version
if serialized.version > LayoutDescriptor::CURRENT_VERSION {
return Err(anyhow!(
"Unsupported serialization version: {}. Maximum supported: {}",
serialized.version,
LayoutDescriptor::CURRENT_VERSION
));
}
// Create remote memory regions from descriptors
let remote_regions: Vec<Arc<dyn MemoryDescriptor>> = serialized
.memory_descriptors
.iter()
.map(|desc| {
Arc::new(RemoteMemoryDescriptor {
addr: desc.addr,
size: desc.size,
storage_kind: serialized.location,
nixl_metadata: serialized.nixl_metadata.clone(),
}) as Arc<dyn MemoryDescriptor>
})
.collect();
// Reconstruct the layout based on type
let layout: Arc<dyn Layout> = match serialized.layout_type_details {
LayoutTypeDetails::FullyContiguous(details) => {
if remote_regions.len() != 1 {
return Err(anyhow!(
"FullyContiguous layout requires exactly 1 memory region, got {}",
remote_regions.len()
));
}
let layout = FullyContiguousLayout::new_with_format(
serialized.layout_config.clone(),
Buffer::from_arc(remote_regions[0].clone()),
details.block_format,
details.kv_block_layout,
)?;
Arc::new(layout)
}
LayoutTypeDetails::LayerSeparate(details) => {
if remote_regions.len() != serialized.layout_config.num_layers {
return Err(anyhow!(
"LayerSeparate layout requires {} memory regions (one per layer), got {}",
serialized.layout_config.num_layers,
remote_regions.len()
));
}
let inner_shape = details
.kv_block_layout
.to_inner_shape()
.unwrap_or(InnerShape::Unknown);
let layout = LayerSeparateLayout::builder()
.config(serialized.layout_config.clone())
.memory(remote_regions.into_iter().map(Buffer::from_arc).collect())
.block_dim(details.block_dim)
.inner_shape(inner_shape)
.build()?;
Arc::new(layout)
}
};
Ok(Self {
layout,
location: serialized.location,
nixl_metadata: serialized.nixl_metadata,
})
}
}
/// A memory region that represents remote memory addresses.
///
/// This type is used when reconstructing layouts from serialized data.
/// The addresses are not valid for local access but can be used to
/// build NIXL transfer descriptors for remote memory access.
#[derive(Debug)]
struct RemoteMemoryDescriptor {
addr: usize,
size: usize,
storage_kind: StorageKind,
nixl_metadata: NixlMetadata,
}
impl MemoryDescriptor for RemoteMemoryDescriptor {
fn addr(&self) -> usize {
self.addr
}
fn size(&self) -> usize {
self.size
}
fn storage_kind(&self) -> StorageKind {
self.storage_kind
}
fn as_any(&self) -> &dyn Any {
self
}
fn nixl_descriptor(&self) -> Option<NixlDescriptor> {
Some(NixlDescriptor {
addr: self.addr as u64,
size: self.size,
mem_type: self.nixl_metadata.mem_type(),
device_id: self.nixl_metadata.device_id(),
})
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Serialization types for physical layouts.
//!
//! This module provides types for serializing and deserializing physical layouts
//! so they can be transmitted to remote nodes and reconstructed there for RDMA operations.
use super::physical::NixlMetadata;
use super::{BlockDimension, KvBlockLayout, LayoutConfig};
use anyhow::Result;
use dynamo_memory::{MemoryRegion, StorageKind};
use serde::{Deserialize, Serialize};
/// Format of blocks in a fully contiguous layout.
///
/// This enum describes how the blocks are organized and formatted in memory.
/// Currently only `Operational` is supported, but future variants may include
/// different compression schemes or memory layouts.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlockFormat {
/// Standard operational format - blocks are stored in their normal, uncompressed form.
Operational,
}
impl Default for BlockFormat {
fn default() -> Self {
Self::Operational
}
}
/// Details specific to fully contiguous layouts.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FullyContiguousDetails {
/// Format of the blocks in memory
pub block_format: BlockFormat,
/// KV block layout describing dimension ordering within blocks
#[serde(default)]
pub kv_block_layout: KvBlockLayout,
}
/// Details specific to layer-separate layouts.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayerSeparateDetails {
/// Block dimension ordering (block-first or block-second)
pub block_dim: BlockDimension,
/// KV block layout for the inner tensor format (must be operational: NHD or HND)
#[serde(default)]
pub kv_block_layout: KvBlockLayout,
}
/// Layout-type-specific details.
///
/// This enum captures the information that differs between layout types
/// and is needed to reconstruct the layout on a remote node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LayoutTypeDetails {
/// Fully contiguous layout details
FullyContiguous(FullyContiguousDetails),
/// Layer-separate layout details
LayerSeparate(LayerSeparateDetails),
}
/// Serializable representation of a physical layout.
///
/// This structure contains all information needed to reconstruct a layout
/// on a remote node, including:
/// - Layout configuration (dimensions, sizes, etc.)
/// - Storage location and NIXL metadata
/// - Memory descriptors for all regions
/// - Layout-type-specific details
///
/// The serialized form can be transmitted over the network and used to
/// build NIXL transfer descriptors for remote memory access.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LayoutDescriptor {
/// Serialization format version (for future compatibility)
pub version: u32,
/// Layout configuration
pub layout_config: LayoutConfig,
/// Storage location
pub location: StorageKind,
/// NIXL metadata from the source node
pub nixl_metadata: NixlMetadata,
/// Memory descriptors for all regions backing this layout
pub memory_descriptors: Vec<MemoryRegion>,
/// Layout-type-specific details
pub layout_type_details: LayoutTypeDetails,
}
impl LayoutDescriptor {
/// Current serialization version
pub const CURRENT_VERSION: u32 = 1;
/// Serialize this layout to a JSON string.
///
/// # Returns
/// JSON string representation of the layout
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(self)
.map_err(|e| anyhow::anyhow!("failed to serialize layout to JSON: {}", e))
}
/// Serialize this layout to JSON bytes.
///
/// # Returns
/// UTF-8 encoded JSON bytes
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self)
.map_err(|e| anyhow::anyhow!("failed to serialize layout to JSON bytes: {}", e))
}
/// Deserialize a layout from a JSON string.
///
/// # Arguments
/// * `json` - JSON string representation
///
/// # Returns
/// Deserialized layout
pub fn from_json(json: &str) -> Result<Self> {
serde_json::from_str(json)
.map_err(|e| anyhow::anyhow!("failed to deserialize layout from JSON: {}", e))
}
/// Deserialize a layout from JSON bytes.
///
/// # Arguments
/// * `bytes` - UTF-8 encoded JSON bytes
///
/// # Returns
/// Deserialized layout
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes)
.map_err(|e| anyhow::anyhow!("failed to deserialize layout from JSON bytes: {}", e))
}
/// Get the layout configuration.
pub fn layout_config(&self) -> &LayoutConfig {
&self.layout_config
}
/// Get the storage location.
pub fn location(&self) -> StorageKind {
self.location
}
/// Get the NIXL metadata from the source node.
pub fn nixl_metadata(&self) -> &NixlMetadata {
&self.nixl_metadata
}
/// Get the memory descriptors.
pub fn memory_descriptors(&self) -> &[MemoryRegion] {
&self.memory_descriptors
}
/// Get the layout type details.
pub fn layout_type_details(&self) -> &LayoutTypeDetails {
&self.layout_type_details
}
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
use dynamo_memory::nixl::MemType;
use super::*;
fn make_test_config() -> LayoutConfig {
LayoutConfig::builder()
.num_blocks(10)
.num_layers(4)
.outer_dim(2)
.page_size(16)
.inner_dim(128)
.dtype_width_bytes(2)
.build()
.unwrap()
}
#[test]
fn test_block_format_default() {
assert_eq!(BlockFormat::default(), BlockFormat::Operational);
}
#[test]
fn test_serialized_layout_json_roundtrip() {
let layout = LayoutDescriptor {
version: LayoutDescriptor::CURRENT_VERSION,
layout_config: make_test_config(),
location: StorageKind::System,
nixl_metadata: NixlMetadata::new("test_agent".to_string(), MemType::Dram, 0),
memory_descriptors: vec![MemoryRegion::new(0x1000, 4096)],
layout_type_details: LayoutTypeDetails::FullyContiguous(FullyContiguousDetails {
block_format: BlockFormat::Operational,
kv_block_layout: KvBlockLayout::OperationalNHD,
}),
};
// Test to_json/from_json
let json = layout.to_json().unwrap();
let deserialized = LayoutDescriptor::from_json(&json).unwrap();
assert_eq!(deserialized.version, layout.version);
assert_eq!(deserialized.layout_config, layout.layout_config);
assert_eq!(deserialized.location, layout.location);
assert_eq!(
deserialized.nixl_metadata.agent_name(),
layout.nixl_metadata.agent_name()
);
assert_eq!(deserialized.memory_descriptors.len(), 1);
}
#[test]
fn test_serialized_layout_json_bytes_roundtrip() {
let layout = LayoutDescriptor {
version: LayoutDescriptor::CURRENT_VERSION,
layout_config: make_test_config(),
location: StorageKind::System,
nixl_metadata: NixlMetadata::new("test_agent".to_string(), MemType::Vram, 5),
memory_descriptors: vec![
MemoryRegion::new(0x1000, 2048),
MemoryRegion::new(0x2000, 2048),
],
layout_type_details: LayoutTypeDetails::LayerSeparate(LayerSeparateDetails {
block_dim: BlockDimension::BlockIsFirstDim,
kv_block_layout: KvBlockLayout::OperationalNHD,
}),
};
// Test to_json_bytes/from_json_bytes
let bytes = layout.to_json_bytes().unwrap();
let deserialized = LayoutDescriptor::from_json_bytes(&bytes).unwrap();
assert_eq!(deserialized.version, layout.version);
assert_eq!(deserialized.nixl_metadata.device_id(), 5);
assert_eq!(deserialized.memory_descriptors.len(), 2);
}
#[test]
fn test_fully_contiguous_details_serialization() {
let details = LayoutTypeDetails::FullyContiguous(FullyContiguousDetails {
block_format: BlockFormat::Operational,
kv_block_layout: KvBlockLayout::UniversalTP,
});
let json = serde_json::to_string(&details).unwrap();
let deserialized: LayoutTypeDetails = serde_json::from_str(&json).unwrap();
match deserialized {
LayoutTypeDetails::FullyContiguous(d) => {
assert_eq!(d.block_format, BlockFormat::Operational);
assert_eq!(d.kv_block_layout, KvBlockLayout::UniversalTP);
}
_ => panic!("Expected FullyContiguous variant"),
}
}
#[test]
fn test_layer_separate_details_serialization() {
let details = LayoutTypeDetails::LayerSeparate(LayerSeparateDetails {
block_dim: BlockDimension::BlockIsSecondDim,
kv_block_layout: KvBlockLayout::OperationalHND,
});
let json = serde_json::to_string(&details).unwrap();
let deserialized: LayoutTypeDetails = serde_json::from_str(&json).unwrap();
match deserialized {
LayoutTypeDetails::LayerSeparate(d) => {
assert_eq!(d.block_dim, BlockDimension::BlockIsSecondDim);
assert_eq!(d.kv_block_layout, KvBlockLayout::OperationalHND);
}
_ => panic!("Expected LayerSeparate variant"),
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Integration tests for layout serialization.
//!
//! These tests verify the complete serialization and deserialization flow,
//! ensuring that layouts can be transmitted to remote nodes and reconstructed
//! with all necessary metadata intact.
use crate::layout::physical::PhysicalLayout;
use crate::layout::{BlockDimension, LayoutConfig, LayoutDescriptor};
use dynamo_memory::nixl::{MemType, NixlAgent, NixlDescriptor};
use dynamo_memory::{Buffer, MemoryDescriptor, MemoryRegion, StorageKind};
use std::any::Any;
use std::sync::Arc;
// Simple mock implementation for testing
#[derive(Debug)]
pub struct MockMemory {
addr: usize,
size: usize,
}
impl MockMemory {
pub fn new(addr: usize, size: usize) -> Arc<Self> {
Arc::new(Self { addr, size })
}
}
impl MemoryDescriptor for MockMemory {
fn addr(&self) -> usize {
self.addr
}
fn size(&self) -> usize {
self.size
}
fn storage_kind(&self) -> StorageKind {
StorageKind::System
}
fn as_any(&self) -> &dyn Any {
self
}
fn nixl_descriptor(&self) -> Option<NixlDescriptor> {
None
}
}
/// Mock memory region for testing serialization
#[derive(Debug)]
struct TestMemoryRegion {
addr: usize,
size: usize,
kind: StorageKind,
descriptor: NixlDescriptor,
}
impl TestMemoryRegion {
fn new(addr: usize, size: usize, kind: StorageKind) -> Arc<Self> {
Arc::new(Self {
addr,
size,
kind,
descriptor: NixlDescriptor {
addr: addr as u64,
size,
mem_type: MemType::Dram,
device_id: 0,
},
})
}
}
impl MemoryDescriptor for TestMemoryRegion {
fn addr(&self) -> usize {
self.addr
}
fn size(&self) -> usize {
self.size
}
fn storage_kind(&self) -> StorageKind {
self.kind
}
fn as_any(&self) -> &dyn Any {
self
}
fn nixl_descriptor(&self) -> Option<NixlDescriptor> {
Some(self.descriptor.clone())
}
}
fn make_test_config() -> LayoutConfig {
LayoutConfig::builder()
.num_blocks(10)
.num_layers(4)
.outer_dim(2)
.page_size(16)
.inner_dim(128)
.dtype_width_bytes(2)
.build()
.unwrap()
}
#[test]
fn test_fully_contiguous_layout_serialization_roundtrip() {
let agent = NixlAgent::new("test-fc-serialize").expect("failed to create agent");
let config = make_test_config();
// Calculate required size
let required_size = config.num_blocks
* config.num_layers
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;
// Create test memory region
let memory = TestMemoryRegion::new(0x10000, required_size, StorageKind::System);
let regions = vec![Buffer::from_arc(memory as Arc<dyn MemoryDescriptor>)];
// Build physical layout
let original_layout = PhysicalLayout::builder(agent)
.with_config(config.clone())
.fully_contiguous()
.with_registered_regions(regions)
.expect("failed to provide regions")
.build()
.expect("failed to build layout");
// Serialize to LayoutDescriptor
let serialized = original_layout
.to_descriptor()
.expect("failed to serialize layout");
// Verify serialized data
assert_eq!(serialized.version, LayoutDescriptor::CURRENT_VERSION);
assert_eq!(serialized.layout_config, config);
assert_eq!(serialized.location, StorageKind::System);
assert_eq!(serialized.memory_descriptors.len(), 1);
assert_eq!(serialized.memory_descriptors[0].addr, 0x10000);
assert_eq!(serialized.memory_descriptors[0].size, required_size);
// Serialize to JSON
let json = serialized.to_json().expect("failed to serialize to JSON");
assert!(json.contains("\"version\":1"));
assert!(json.contains("\"num_blocks\":10"));
// Deserialize from JSON
let deserialized = LayoutDescriptor::from_json(&json).expect("failed to deserialize from JSON");
// Verify deserialized matches original
assert_eq!(deserialized.version, serialized.version);
assert_eq!(deserialized.layout_config, serialized.layout_config);
assert_eq!(deserialized.location, serialized.location);
assert_eq!(
deserialized.memory_descriptors.len(),
serialized.memory_descriptors.len()
);
// Reconstruct layout from serialized data
let reconstructed =
PhysicalLayout::from_descriptor(deserialized).expect("failed to reconstruct layout");
// Verify reconstructed layout has same configuration
assert_eq!(reconstructed.layout().config(), &config);
assert_eq!(reconstructed.location(), StorageKind::System);
assert_eq!(reconstructed.layout().num_blocks(), 10);
assert_eq!(reconstructed.layout().num_layers(), 4);
assert!(reconstructed.layout().is_fully_contiguous());
}
#[test]
fn test_layer_separate_layout_serialization_roundtrip() {
let agent = NixlAgent::new("test-ls-serialize").expect("failed to create agent");
let config = make_test_config();
// Calculate per-layer size
let per_layer_size = config.num_blocks
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;
// Create memory regions (one per layer)
let regions: Vec<Buffer> = (0..config.num_layers)
.map(|i| {
Buffer::from_arc(TestMemoryRegion::new(
0x10000 + i * per_layer_size,
per_layer_size,
StorageKind::System,
) as Arc<dyn MemoryDescriptor>)
})
.collect();
// Build physical layout
let original_layout = PhysicalLayout::builder(agent)
.with_config(config.clone())
.layer_separate(BlockDimension::BlockIsFirstDim)
.with_registered_regions(regions)
.expect("failed to provide regions")
.build()
.expect("failed to build layout");
// Serialize to LayoutDescriptor
let serialized = original_layout
.to_descriptor()
.expect("failed to serialize layout");
// Verify serialized data
assert_eq!(serialized.version, LayoutDescriptor::CURRENT_VERSION);
assert_eq!(serialized.layout_config, config);
assert_eq!(serialized.memory_descriptors.len(), 4); // One per layer
// Verify memory descriptors
for (i, desc) in serialized.memory_descriptors.iter().enumerate() {
assert_eq!(desc.addr, 0x10000 + i * per_layer_size);
assert_eq!(desc.size, per_layer_size);
}
// Serialize to JSON bytes
let json_bytes = serialized
.to_json_bytes()
.expect("failed to serialize to JSON bytes");
// Deserialize from JSON bytes
let deserialized = LayoutDescriptor::from_json_bytes(&json_bytes)
.expect("failed to deserialize from JSON bytes");
// Verify deserialized matches original
assert_eq!(deserialized.version, serialized.version);
assert_eq!(deserialized.layout_config, serialized.layout_config);
assert_eq!(
deserialized.memory_descriptors.len(),
serialized.memory_descriptors.len()
);
// Reconstruct layout from serialized data
let reconstructed =
PhysicalLayout::from_descriptor(deserialized).expect("failed to reconstruct layout");
// Verify reconstructed layout has same configuration
assert_eq!(reconstructed.layout().config(), &config);
assert_eq!(reconstructed.location(), StorageKind::System);
assert_eq!(reconstructed.layout().num_blocks(), 10);
assert_eq!(reconstructed.layout().num_layers(), 4);
assert!(!reconstructed.layout().is_fully_contiguous());
}
#[test]
fn test_memory_region_calculation_after_deserialization() {
let agent = NixlAgent::new("test-memory-calc").expect("failed to create agent");
let config = LayoutConfig::builder()
.num_blocks(2)
.num_layers(2)
.outer_dim(2)
.page_size(4)
.inner_dim(8)
.dtype_width_bytes(2)
.build()
.unwrap();
let required_size = config.num_blocks
* config.num_layers
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;
let memory = TestMemoryRegion::new(0x1000, required_size, StorageKind::System);
let regions = vec![Buffer::from_arc(memory as Arc<dyn MemoryDescriptor>)];
let original_layout = PhysicalLayout::builder(agent)
.with_config(config.clone())
.fully_contiguous()
.with_registered_regions(regions)
.expect("failed to provide regions")
.build()
.expect("failed to build layout");
// Serialize and deserialize
let serialized = original_layout
.to_descriptor()
.expect("failed to serialize");
let reconstructed = PhysicalLayout::from_descriptor(serialized).expect("failed to reconstruct");
// Verify memory region calculations
let region = reconstructed
.memory_region(0, 0, 0)
.expect("failed to get memory region");
assert_eq!(region.addr, 0x1000);
let region_size = config.page_size * config.inner_dim * config.dtype_width_bytes;
assert_eq!(region.size, region_size);
// Test different block/layer/outer indices
let region = reconstructed
.memory_region(1, 1, 1)
.expect("failed to get memory region");
// Address should be: base + block_stride + layer_stride + outer_stride
let layer_stride = config.outer_dim * region_size;
let block_stride = config.num_layers * layer_stride;
let expected_addr = 0x1000 + block_stride + layer_stride + region_size;
assert_eq!(region.addr, expected_addr);
}
#[test]
fn test_version_check_on_deserialization() {
let config = make_test_config();
// Calculate required size for fully contiguous layout
let required_size = config.num_blocks
* config.num_layers
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;
let mut serialized = LayoutDescriptor {
version: 999, // Future version
layout_config: config.clone(),
location: StorageKind::System,
nixl_metadata: crate::layout::physical::NixlMetadata::new(
"test".to_string(),
MemType::Dram,
0,
),
memory_descriptors: vec![],
layout_type_details: crate::layout::LayoutTypeDetails::FullyContiguous(
crate::layout::FullyContiguousDetails {
block_format: crate::layout::BlockFormat::Operational,
kv_block_layout: crate::layout::KvBlockLayout::OperationalNHD,
},
),
};
// Should fail with unsupported version
let result = PhysicalLayout::from_descriptor(serialized.clone());
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Unsupported serialization version")
);
// Should succeed with supported version
serialized.version = LayoutDescriptor::CURRENT_VERSION;
serialized.memory_descriptors = vec![MemoryRegion {
addr: 0x1000,
size: required_size,
}];
let result = PhysicalLayout::from_descriptor(serialized);
if let Err(ref e) = result {
eprintln!("Error during deserialization: {}", e);
}
assert!(
result.is_ok(),
"Expected successful deserialization, got error: {:?}",
result.err()
);
let layout = result.unwrap();
assert_eq!(
layout.layout().block_layout(),
crate::layout::KvBlockLayout::OperationalNHD
);
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Tensor validation utilities for layout creation.
use anyhow::{Result, anyhow};
use std::sync::Arc;
use dynamo_memory::TensorDescriptor;
/// Format of tensor layout (for future TP translation).
#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TensorFormat {
/// NHD format: [N, H, D] where N=block_size, H=heads, D=hidden
NHD,
/// HND format: [H, N, D] where H=heads, N=block_size, D=hidden
HND,
/// Unknown or ambiguous format
Unknown,
}
/// Validate tensor strides and detect format.
///
/// This function checks that tensor strides are monotonically decreasing,
/// which ensures tensor-contiguous layout. The stride validation is flexible
/// at the inner dimension boundary to accommodate different layouts.
///
/// Additionally, it attempts to detect whether the layout is NHD or HND format,
/// which is important for future tensor parallel (TP) translation.
///
/// # Arguments
/// * `tensors` - Slice of tensors to validate
///
/// # Returns
/// The detected tensor format (NHD, HND, or Unknown)
#[expect(dead_code)]
pub fn validate_tensor_strides(tensors: &[Arc<dyn TensorDescriptor>]) -> Result<TensorFormat> {
if tensors.is_empty() {
return Err(anyhow!("Cannot validate empty tensor list"));
}
let mut format = TensorFormat::Unknown;
for tensor in tensors {
let stride = tensor.stride();
let shape = tensor.shape();
if stride.len() < 2 {
return Err(anyhow!(
"Tensor must have at least 2 dimensions, got stride: {:?}",
stride
));
}
// Check monotonic decreasing stride
// Note: We're flexible at the combined inner dimension boundary as per requirements
let mut prev_stride = usize::MAX;
for (i, &current_stride) in stride.iter().enumerate() {
if current_stride > prev_stride {
return Err(anyhow!(
"Tensor strides must be monotonically decreasing (until inner dimension). \
Got stride: {:?} at position {}",
stride,
i
));
}
prev_stride = current_stride;
}
// Attempt to detect NHD vs HND format based on shape and stride patterns
// This is a heuristic and may need refinement based on actual usage
if shape.len() >= 3 {
// If the first dimension stride is smaller than the second, likely HND
// If the first dimension stride is larger than the second, likely NHD
if stride[0] < stride[1] {
format = TensorFormat::HND;
} else if stride[0] > stride[1] {
format = TensorFormat::NHD;
}
}
}
Ok(format)
}
/// Validate that all tensors have consistent shapes.
///
/// # Arguments
/// * `tensors` - Slice of tensors to validate
///
/// # Returns
/// The common shape shared by all tensors
#[expect(dead_code)]
pub fn validate_tensor_shapes(tensors: &[Arc<dyn TensorDescriptor>]) -> Result<Vec<usize>> {
if tensors.is_empty() {
return Err(anyhow!("Cannot validate empty tensor list"));
}
let first_shape = tensors[0].shape();
for tensor in &tensors[1..] {
if tensor.shape() != first_shape {
return Err(anyhow!(
"All tensors must have the same shape. Expected {:?}, got {:?}",
first_shape,
tensor.shape()
));
}
}
Ok(first_shape.to_vec())
}
#[allow(dead_code)]
pub fn determine_compressed_shape(shape: &[usize]) -> usize {
shape.iter().product()
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
// Note: These tests would require mock TorchTensor implementations
// which we can add if needed for testing infrastructure
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod layout;
pub mod manager;
pub mod transfer;
pub use manager::TransferManager;
pub use transfer::{TransferConfig, TransferOptions};
pub use kvbm_common::BlockId;
pub type SequenceHash = kvbm_common::SequenceHash;
#[cfg(test)]
#[cfg(not(feature = "testing-kvbm"))]
mod sentinel {
#[test]
#[allow(non_snake_case)]
fn all_functional_tests_skipped___enable_testing_kvbm() {
eprintln!(
"kvbm-physical functional tests require feature `testing-kvbm`. \
Run with: cargo test -p kvbm-physical --features testing-kvbm"
);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Layout handle type encoding worker ID and layout ID.
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
/// Unique handle for a layout combining worker_id and layout_id.
///
/// The handle encodes:
/// - Bits 0-63: worker_id (u64)
/// - Bits 64-79: layout_id (u16)
/// - Bits 80-127: Reserved (48 bits, currently unused)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
pub struct LayoutHandle(u128);
impl LayoutHandle {
/// Create a new layout handle from worker_id and layout_id.
///
/// # Arguments
/// * `worker_id` - Unique identifier for the worker (0-63 bits)
/// * `layout_id` - Layout identifier within the worker (64-79 bits)
pub fn new(worker_id: u64, layout_id: u16) -> Self {
let handle = (worker_id as u128) | ((layout_id as u128) << 64);
Self(handle)
}
/// Extract the worker_id from this handle.
pub fn worker_id(&self) -> u64 {
(self.0 & 0xFFFF_FFFF_FFFF_FFFF) as u64
}
/// Extract the layout_id from this handle.
pub fn layout_id(&self) -> u16 {
((self.0 >> 64) & 0xFFFF) as u16
}
/// Get the raw u128 value.
pub fn as_u128(&self) -> u128 {
self.0
}
/// Reconstruct a handle from a raw u128 value.
///
/// This preserves all bits including reserved bits, and is intended for
/// deserialization roundtrips with `as_u128()`.
pub fn from_u128(value: u128) -> Self {
Self(value)
}
}
impl std::fmt::Display for LayoutHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"LayoutHandle(worker={}, layout={})",
self.worker_id(),
self.layout_id()
)
}
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
use super::*;
#[test]
fn test_handle_encoding() {
let worker_id = 0x1234_5678_9ABC_DEF0u64;
let layout_id = 0x4242u16;
let handle = LayoutHandle::new(worker_id, layout_id);
assert_eq!(handle.worker_id(), worker_id);
assert_eq!(handle.layout_id(), layout_id);
}
#[test]
fn test_handle_roundtrip() {
let handle = LayoutHandle::new(42, 100);
let raw = handle.as_u128();
let restored = LayoutHandle::from_u128(raw);
assert_eq!(handle, restored);
assert_eq!(restored.worker_id(), 42);
assert_eq!(restored.layout_id(), 100);
}
#[test]
fn test_handle_max_values() {
let max_worker = u64::MAX;
let max_layout = u16::MAX;
let handle = LayoutHandle::new(max_worker, max_layout);
assert_eq!(handle.worker_id(), max_worker);
assert_eq!(handle.layout_id(), max_layout);
}
#[test]
fn test_handle_bincode_roundtrip() {
let handle = LayoutHandle::new(999, 42);
let encoded = bincode::encode_to_vec(handle, bincode::config::standard()).unwrap();
let (decoded, _): (LayoutHandle, _) =
bincode::decode_from_slice(&encoded, bincode::config::standard()).unwrap();
assert_eq!(handle, decoded);
}
#[test]
fn test_handle_display() {
let handle = LayoutHandle::new(123, 456);
let display = format!("{}", handle);
assert!(display.contains("123"));
assert!(display.contains("456"));
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Local layout wrapper with handle and metadata.
use std::ops::Deref;
use super::handle::LayoutHandle;
use crate::layout::PhysicalLayout;
/// A local physical layout with an assigned handle.
///
/// This wraps a `PhysicalLayout` that exists on the local worker,
/// associating it with a unique handle that combines the worker_id
/// and a locally-assigned layout_id.
///
/// This type is cheap to clone as `PhysicalLayout` contains `Arc` internally.
#[derive(Debug, Clone)]
pub struct LocalLayout {
handle: LayoutHandle,
layout: PhysicalLayout,
}
#[allow(dead_code)]
impl LocalLayout {
/// Create a new local layout.
///
/// # Arguments
/// * `handle` - Unique handle for this layout
/// * `layout` - The physical layout
pub fn new(handle: LayoutHandle, layout: PhysicalLayout) -> Self {
Self { handle, layout }
}
/// Get the handle for this layout.
pub fn handle(&self) -> LayoutHandle {
self.handle
}
/// Get a reference to the physical layout.
pub fn layout(&self) -> &PhysicalLayout {
&self.layout
}
/// Get the worker_id from the handle.
pub fn worker_id(&self) -> u64 {
self.handle.worker_id()
}
/// Get the layout_id from the handle.
pub fn layout_id(&self) -> u16 {
self.handle.layout_id()
}
/// Consume this local layout and return the physical layout.
pub fn into_layout(self) -> PhysicalLayout {
self.layout
}
}
impl Deref for LocalLayout {
type Target = PhysicalLayout;
fn deref(&self) -> &Self::Target {
&self.layout
}
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
use super::*;
use crate::layout::{LayoutConfig, PhysicalLayout};
use dynamo_memory::nixl::NixlAgent;
fn create_test_agent(name: &str) -> NixlAgent {
NixlAgent::new(name).expect("failed to create agent")
}
fn make_test_layout() -> PhysicalLayout {
let agent = create_test_agent("test-local");
let config = LayoutConfig::builder()
.num_blocks(2)
.num_layers(2)
.outer_dim(2)
.page_size(4)
.inner_dim(8)
.dtype_width_bytes(2)
.build()
.unwrap();
PhysicalLayout::builder(agent)
.with_config(config)
.fully_contiguous()
.allocate_system()
.build()
.unwrap()
}
#[test]
fn test_local_layout_creation() {
let handle = LayoutHandle::new(42, 100);
let layout = make_test_layout();
let local = LocalLayout::new(handle, layout);
assert_eq!(local.handle(), handle);
assert_eq!(local.worker_id(), 42);
assert_eq!(local.layout_id(), 100);
}
#[test]
fn test_local_layout_into_layout() {
let handle = LayoutHandle::new(1, 2);
let layout = make_test_layout();
let local = LocalLayout::new(handle, layout);
let _recovered = local.into_layout();
// Successfully consumed and returned the layout
}
}
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Remote layout wrapper reconstructed from imported metadata.
use super::handle::LayoutHandle;
use crate::layout::PhysicalLayout;
/// A remote physical layout reconstructed from imported metadata.
///
/// This wraps a `PhysicalLayout` that was deserialized from another worker's
/// exported metadata. The layout's memory regions point to addresses on the
/// remote worker and are used for building NIXL RDMA transfer descriptors.
///
/// This type is cheap to clone as `PhysicalLayout` contains `Arc` internally.
#[derive(Debug, Clone)]
pub struct RemoteLayout {
handle: LayoutHandle,
layout: PhysicalLayout,
}
#[allow(dead_code)]
impl RemoteLayout {
/// Create a new remote layout.
///
/// # Arguments
/// * `handle` - Unique handle for this layout (from remote worker)
/// * `layout` - The reconstructed physical layout
pub fn new(handle: LayoutHandle, layout: PhysicalLayout) -> Self {
Self { handle, layout }
}
/// Get the handle for this layout.
pub fn handle(&self) -> LayoutHandle {
self.handle
}
/// Get a reference to the physical layout.
pub fn layout(&self) -> &PhysicalLayout {
&self.layout
}
/// Get the worker_id from the handle (identifies the remote worker).
pub fn worker_id(&self) -> u64 {
self.handle.worker_id()
}
/// Get the layout_id from the handle.
pub fn layout_id(&self) -> u16 {
self.handle.layout_id()
}
/// Consume this remote layout and return the physical layout.
pub fn into_layout(self) -> PhysicalLayout {
self.layout
}
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
use super::*;
use crate::layout::{LayoutConfig, LayoutDescriptor, NixlMetadata, PhysicalLayout};
fn make_serialized_layout() -> LayoutDescriptor {
use crate::layout::{BlockFormat, FullyContiguousDetails, LayoutTypeDetails};
use dynamo_memory::{MemoryRegion, StorageKind, nixl};
let config = LayoutConfig::builder()
.num_blocks(2)
.num_layers(2)
.outer_dim(2)
.page_size(4)
.inner_dim(8)
.dtype_width_bytes(2)
.build()
.unwrap();
let required_size = config.num_blocks
* config.num_layers
* config.outer_dim
* config.page_size
* config.inner_dim
* config.dtype_width_bytes;
LayoutDescriptor {
version: 1,
layout_config: config,
location: StorageKind::System,
nixl_metadata: NixlMetadata::new("remote_agent".to_string(), nixl::MemType::Dram, 0),
memory_descriptors: vec![MemoryRegion {
addr: 0x1000,
size: required_size,
}],
layout_type_details: LayoutTypeDetails::FullyContiguous(FullyContiguousDetails {
block_format: BlockFormat::Operational,
kv_block_layout: crate::layout::KvBlockLayout::OperationalNHD,
}),
}
}
#[test]
fn test_remote_layout_creation() {
let handle = LayoutHandle::new(999, 42);
let serialized = make_serialized_layout();
let layout = PhysicalLayout::from_descriptor(serialized).unwrap();
let remote = RemoteLayout::new(handle, layout);
assert_eq!(remote.handle(), handle);
assert_eq!(remote.worker_id(), 999);
assert_eq!(remote.layout_id(), 42);
assert_eq!(
remote.layout().layout().block_layout(),
crate::layout::KvBlockLayout::OperationalNHD
);
}
#[test]
fn test_remote_layout_into_layout() {
let handle = LayoutHandle::new(100, 200);
let serialized = make_serialized_layout();
let layout = PhysicalLayout::from_descriptor(serialized).unwrap();
let remote = RemoteLayout::new(handle, layout);
let _recovered = remote.into_layout();
// Successfully consumed and returned the layout
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! CUDA event polling-based completion checker.
use anyhow::Result;
use cudarc::driver::{CudaEvent, DriverError, result as cuda_result, sys::CUresult};
use super::CompletionChecker;
/// Completion checker that polls CUDA event status.
pub struct CudaEventChecker {
event: CudaEvent,
}
impl CudaEventChecker {
pub fn new(event: CudaEvent) -> Self {
Self { event }
}
}
impl CompletionChecker for CudaEventChecker {
fn is_complete(&self) -> Result<bool> {
// Query the CUDA event to check if it's complete
// cudaEventQuery returns cudaSuccess if complete, cudaErrorNotReady if still pending
unsafe {
match cuda_result::event::query(self.event.cu_event()) {
Ok(()) => Ok(true), // Event is complete
Err(DriverError(CUresult::CUDA_ERROR_NOT_READY)) => Ok(false),
Err(e) => Err(anyhow::anyhow!("CUDA event query failed: {:?}", e)),
}
}
}
}
#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
use crate::manager::TransferManager;
use crate::transfer::tests::CudaSleep;
use dynamo_memory::nixl::NixlAgent;
use std::time::{Duration, Instant};
#[tokio::test]
async fn test_cuda_event_delayed_notification() {
let agent = NixlAgent::new("test_agent").unwrap();
let manager = TransferManager::builder()
.cuda_device_id(0)
.nixl_agent(agent)
.build()
.unwrap();
let stream = manager.h2d_stream();
let cuda_ctx = manager.cuda_context();
// Get or create the CudaSleep utility (compiles kernel and calibrates on first use)
let cuda_sleep = CudaSleep::for_context(cuda_ctx).unwrap();
// Test 1: Launch sleep and wait via async notification
let t0_queue_start = Instant::now();
cuda_sleep
.launch(Duration::from_millis(600), stream)
.unwrap();
let queue_time = t0_queue_start.elapsed();
let event = stream.record_event(None).unwrap();
let notification = manager.register_cuda_event(event);
notification.await.unwrap();
let wait_time = t0_queue_start.elapsed() - queue_time;
println!(
"GPU sleep test: queue {:?}, wait {:?}",
queue_time, wait_time
);
assert!(
queue_time < Duration::from_millis(10),
"launching the sleep kernel should be fast: {:?}",
queue_time
);
assert!(
wait_time >= Duration::from_millis(500),
"wait time should reflect >=500ms of GPU work: {:?}",
wait_time
);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment