Unverified Commit 5b6624bd authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: enable kvbm v1 to use the v2 ::new_with_device (#7151)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent 1042c552
......@@ -47,5 +47,10 @@ CODEOWNERS @ai-dynamo/Devops
/LICENSE @ai-dynamo/Devops
/SECURITY.md @ai-dynamo/Devops
# KVBM v2
/lib/memory/ @grahamking @ryanolson @oandreeva-nv
/lib/kvbm-*/ @ai-dynamo/kvbm-v2
/lib/kvbm-logical/ @ai-dynamo/kvbm-v2 @PeaBrane
# Claude Code
/.claude/ @ishandhanani @alec-flowers @MatejKosec @dagil-nvidia @athreesh @nv-tusharma @ayushag-nv
......@@ -45,11 +45,11 @@ required-features = ["kv-router-stress"]
[dependencies]
# repo
dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true, features = ["metrics", "runtime-protocols"] }
dynamo-memory = { workspace = true }
dynamo-mocker = { workspace = true }
dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true }
# workspace
aho-corasick = "1.1"
......
......@@ -97,7 +97,7 @@ fn build_layout(
match storage_kind {
StorageKind::System => builder.allocate_system().build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(false).build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(None).build().unwrap(),
StorageKind::Device(device_id) => builder.allocate_device(device_id).build().unwrap(),
StorageKind::Disk(_) => builder.allocate_disk(None).build().unwrap(),
}
......
......@@ -9,7 +9,21 @@ pub use dynamo_memory::numa::*;
/// Set `DYN_KVBM_ENABLE_NUMA=1` to enable NUMA-aware allocation in the
/// KV cache block manager. This is opt-in because the block manager
/// manages its own pinned memory allocations separately from `PinnedStorage`.
///
/// The global kill switch `DYN_MEMORY_DISABLE_NUMA` always takes precedence:
/// if it is set truthy, this function returns `false` regardless of
/// `DYN_KVBM_ENABLE_NUMA`.
///
/// TODO(KVBM-336): remove this function in the future
#[deprecated(
since = "1.0.0",
note = "Use dynamo_memory::numa::is_numa_enabled instead"
)]
pub fn is_numa_enabled() -> bool {
// Global kill switch always wins
if is_numa_disabled() {
return false;
}
matches!(
std::env::var("DYN_KVBM_ENABLE_NUMA").as_deref(),
Ok("1" | "true" | "yes")
......
......@@ -155,6 +155,18 @@ pub enum StorageError {
OutOfBounds(String),
}
impl From<dynamo_memory::StorageError> for StorageError {
fn from(e: dynamo_memory::StorageError) -> Self {
match e {
dynamo_memory::StorageError::AllocationFailed(s) => StorageError::AllocationFailed(s),
dynamo_memory::StorageError::OperationFailed(s) => StorageError::OperationFailed(s),
dynamo_memory::StorageError::Cuda(e) => StorageError::Cuda(e),
dynamo_memory::StorageError::Nixl(e) => StorageError::NixlError(e),
e => StorageError::OperationFailed(e.to_string()),
}
}
}
/// Core storage trait that provides access to memory regions
pub trait Storage: Debug + Send + Sync + 'static {
/// Returns the type of storage
......
......@@ -75,40 +75,7 @@ use std::{
};
use cudarc::driver::CudaContext;
use crate::block_manager::numa_allocator;
/// Allocates pinned host memory, preferring write-combined if supported.
///
/// Write-combined (WC) memory is optimal for PCIe DMA transfers but may not be
/// supported on systems with cache-coherent CPU-GPU interconnects (e.g., Grace
/// Hopper/Blackwell with NVLink-C2C). This function tries WC first and falls
/// back to regular pinned memory if not supported.
///
/// # Safety
///
/// Caller must ensure a valid CUDA context is bound to the current thread.
unsafe fn malloc_host_prefer_writecombined(size: usize) -> Result<*mut u8, StorageError> {
// First, try write-combined allocation (optimal for PCIe systems)
// SAFETY: Caller guarantees a valid CUDA context is bound to the current thread
match unsafe {
cudarc::driver::result::malloc_host(
size,
cudarc::driver::sys::CU_MEMHOSTALLOC_WRITECOMBINED,
)
} {
Ok(ptr) => Ok(ptr as *mut u8),
Err(_) => {
// Write-combined not supported (e.g., Grace Hopper/Blackwell),
// fall back to regular pinned memory
tracing::debug!("Write-combined memory not supported, using regular pinned memory");
// SAFETY: Same as above - caller guarantees valid CUDA context
unsafe { cudarc::driver::result::malloc_host(size, 0) }
.map(|ptr| ptr as *mut u8)
.map_err(StorageError::Cuda)
}
}
}
use dynamo_memory::MemoryDescriptor as _;
/// Trait for [Storage] types that can be accessed by CUDA
pub trait CudaAccessible: Storage {}
......@@ -191,13 +158,12 @@ impl Cuda {
}
}
/// Pinned host memory storage using CUDA page-locked memory
/// Pinned host memory storage using CUDA page-locked memory.
/// Wraps [`dynamo_memory::PinnedStorage`] and adds registration handle support.
#[derive(Debug)]
pub struct PinnedStorage {
ptr: u64,
size: usize,
inner: dynamo_memory::PinnedStorage,
handles: RegistrationHandles,
ctx: Arc<CudaContext>,
}
impl Local for PinnedStorage {}
......@@ -205,62 +171,51 @@ impl SystemAccessible for PinnedStorage {}
impl CudaAccessible for PinnedStorage {}
impl PinnedStorage {
/// Create a new pinned storage with the given size
/// Create a new pinned storage with the given size.
///
/// Uses write-combined allocation with NUMA-awareness when enabled.
/// Prefer [`new_for_device`](Self::new_for_device) for new code.
///
/// TODO(KVBM-336): remove PinnedStorage::new in the future
#[deprecated(since = "1.0.0", note = "Use PinnedStorage::new_for_device instead")]
pub fn new(ctx: &Arc<CudaContext>, size: usize) -> Result<Self, StorageError> {
unsafe {
ctx.bind_to_thread().map_err(StorageError::Cuda)?;
let inner =
dynamo_memory::PinnedStorage::new_for_device(size, Some(ctx.cu_device() as u32))?;
Ok(Self {
inner,
handles: RegistrationHandles::new(),
})
}
// Try NUMA-aware allocation if enabled, otherwise use direct allocation.
let ptr = if numa_allocator::is_numa_enabled() {
let device_id = ctx.cu_device() as u32;
match numa_allocator::worker_pool::NumaWorkerPool::global()
.allocate_pinned_for_gpu(size, device_id)
{
Ok(Some(ptr)) => ptr,
Ok(None) => {
tracing::debug!(
"NUMA node unknown for GPU {}, using direct allocation",
device_id
/// Create a new pinned storage, optionally NUMA-aware for a specific GPU.
///
/// Delegates NUMA-aware allocation and write-combined selection to
/// [`dynamo_memory::PinnedStorage::new_for_device`].
///
/// When `device_id` is `None`, allocates on device 0 without NUMA awareness.
pub fn new_for_device(size: usize, device_id: Option<u32>) -> Result<Self, StorageError> {
// Warn once if the legacy opt-in env var is still set.
static DEPRECATION_WARN: std::sync::Once = std::sync::Once::new();
if std::env::var("DYN_KVBM_ENABLE_NUMA").is_ok() {
DEPRECATION_WARN.call_once(|| {
tracing::warn!(
"DYN_KVBM_ENABLE_NUMA is deprecated for PinnedStorage::new_for_device; \
NUMA is now enabled by default. Use DYN_MEMORY_DISABLE_NUMA=1 to disable."
);
malloc_host_prefer_writecombined(size)?
}
Err(e) => {
tracing::warn!("NUMA allocation failed: {}, using direct allocation", e);
malloc_host_prefer_writecombined(size)?
});
}
}
} else {
malloc_host_prefer_writecombined(size)?
};
assert!(!ptr.is_null(), "Failed to allocate pinned memory");
assert!(ptr.is_aligned(), "Pinned memory is not aligned");
assert!(size < isize::MAX as usize);
let ptr = ptr as u64;
let inner = dynamo_memory::PinnedStorage::new_for_device(size, device_id)?;
Ok(Self {
ptr,
size,
inner,
handles: RegistrationHandles::new(),
ctx: ctx.clone(),
})
}
}
}
impl Drop for PinnedStorage {
fn drop(&mut self) {
self.handles.release();
unsafe {
if let Err(e) = cudarc::driver::result::free_host(self.ptr as _) {
tracing::error!(
"Failed to free pinned storage at 0x{:x} (size={}): {}",
self.ptr,
self.size,
e
);
}
}
// inner Drop handles free_host
}
}
......@@ -270,25 +225,25 @@ impl Storage for PinnedStorage {
}
fn addr(&self) -> u64 {
self.ptr
self.inner.addr() as u64
}
fn size(&self) -> usize {
self.size
self.inner.size()
}
unsafe fn as_ptr(&self) -> *const u8 {
self.ptr as *const u8
unsafe { self.inner.as_ptr() }
}
unsafe fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr as *mut u8
unsafe { self.inner.as_mut_ptr() }
}
}
impl CudaContextProivder for PinnedStorage {
fn cuda_context(&self) -> &Arc<CudaContext> {
&self.ctx
self.inner.ctx()
}
}
......@@ -312,13 +267,13 @@ impl RegisterableStorage for PinnedStorage {
impl StorageMemset for PinnedStorage {
fn memset(&mut self, value: u8, offset: usize, size: usize) -> Result<(), StorageError> {
if offset + size > self.size {
if offset + size > self.inner.size() {
return Err(StorageError::OperationFailed(
"memset: offset + size > storage size".into(),
));
}
unsafe {
let ptr = (self.ptr as *mut u8).add(offset);
let ptr = self.inner.as_mut_ptr().add(offset);
std::ptr::write_bytes(ptr, value, size);
}
Ok(())
......@@ -352,7 +307,7 @@ impl PinnedAllocator {
impl StorageAllocator<PinnedStorage> for PinnedAllocator {
fn allocate(&self, size: usize) -> Result<PinnedStorage, StorageError> {
PinnedStorage::new(&self.ctx, size)
PinnedStorage::new_for_device(size, Some(self.ctx.cu_device() as u32))
}
}
......@@ -617,44 +572,13 @@ mod tests {
assert!(result.is_err());
}
#[test]
fn test_malloc_host_prefer_writecombined_allocates_memory() {
let ctx = Cuda::device_or_create(0).expect("Failed to create CUDA context");
let size = 4096; // One page
unsafe {
ctx.bind_to_thread().expect("Failed to bind CUDA context");
// Test allocation succeeds (either write-combined or fallback)
let ptr = malloc_host_prefer_writecombined(size)
.expect("malloc_host_prefer_writecombined should succeed");
// Verify pointer is valid and non-null
assert!(!ptr.is_null(), "Allocated pointer should not be null");
// Verify memory is accessible by writing and reading
std::ptr::write_volatile(ptr, 0xAB);
let val = std::ptr::read_volatile(ptr);
assert_eq!(val, 0xAB, "Should be able to write and read pinned memory");
// Clean up
cudarc::driver::result::free_host(ptr as _).expect("Failed to free pinned memory");
}
}
/// Test PinnedStorage::new with NUMA disabled (the direct allocation path).
/// Test PinnedStorage::new (deprecated) allocates usable pinned memory.
#[allow(deprecated)]
#[test]
fn test_pinned_storage_new_without_numa() {
// Verify NUMA is actually disabled for this test
assert!(
!numa_allocator::is_numa_enabled(),
"NUMA should be disabled for this test"
);
let ctx = Cuda::device_or_create(0).expect("Failed to create CUDA context");
let size = 8192;
// Create PinnedStorage - this should take the non-NUMA path
let mut storage =
PinnedStorage::new(&ctx, size).expect("PinnedStorage::new should succeed");
......
......@@ -48,10 +48,18 @@ pub enum LayoutKind {
#[derive(Debug, Clone)]
enum AllocationKind {
System,
Pinned { numa_aware: bool },
/// Pinned (page-locked) host memory. If `device_id` is Some, NUMA-aware
/// allocation is used on the GPU's NUMA node (when NUMA is enabled).
Pinned {
device_id: Option<u32>,
},
Device { device_id: u32 },
Disk { path: Option<PathBuf> },
Device {
device_id: u32,
},
Disk {
path: Option<PathBuf>,
},
}
/// Memory provisioning plan (either provided regions or an allocation request).
......@@ -222,11 +230,15 @@ impl PhysicalLayoutBuilder<HasConfig, HasLayout, NoMemory> {
}
/// Allocate pinned (page-locked) host memory.
///
/// # Arguments
/// * `device_id` - If `Some(id)`, enables NUMA-aware allocation on the GPU's NUMA node
/// (disable with `DYN_MEMORY_DISABLE_NUMA=1`). If `None`, uses direct allocation.
pub fn allocate_pinned(
self,
numa_aware: bool,
device_id: Option<u32>,
) -> PhysicalLayoutBuilder<HasConfig, HasLayout, HasMemory> {
self.set_memory_plan(MemoryPlan::Allocate(AllocationKind::Pinned { numa_aware }))
self.set_memory_plan(MemoryPlan::Allocate(AllocationKind::Pinned { device_id }))
}
/// Allocate device memory on the specified CUDA device (or the context device if `None`).
......@@ -384,8 +396,8 @@ fn allocate_regions(
let base_entry = match strategy {
AllocationKind::System => allocate_system_entry(reserve_size, agent)?,
AllocationKind::Pinned { numa_aware } => {
allocate_pinned_entry(reserve_size, agent, numa_aware)?
AllocationKind::Pinned { device_id } => {
allocate_pinned_entry(reserve_size, agent, device_id)?
}
AllocationKind::Device { device_id } => {
......@@ -403,8 +415,12 @@ fn allocate_system_entry(size: usize, agent: &NixlAgent) -> Result<MemoryEntry>
register_storage(storage, agent)
}
fn allocate_pinned_entry(size: usize, agent: &NixlAgent, _numa_aware: bool) -> Result<MemoryEntry> {
let storage = PinnedStorage::new(size)
fn allocate_pinned_entry(
size: usize,
agent: &NixlAgent,
device_id: Option<u32>,
) -> Result<MemoryEntry> {
let storage = PinnedStorage::new_for_device(size, device_id)
.map_err(|e| anyhow!("failed to allocate pinned memory ({size} bytes): {e}"))?;
register_storage(storage, agent)
}
......
......@@ -109,7 +109,7 @@ pub fn create_fc_layout(
match storage_kind {
StorageKind::System => builder.allocate_system().build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(false).build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(None).build().unwrap(),
StorageKind::Device(device_id) => builder.allocate_device(device_id).build().unwrap(),
StorageKind::Disk(_) => builder.allocate_disk(None).build().unwrap(),
}
......@@ -128,7 +128,7 @@ pub fn create_lw_layout(
match storage_kind {
StorageKind::System => builder.allocate_system().build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(false).build().unwrap(),
StorageKind::Pinned => builder.allocate_pinned(None).build().unwrap(),
StorageKind::Device(device_id) => builder.allocate_device(device_id).build().unwrap(),
StorageKind::Disk(_) => builder.allocate_disk(None).build().unwrap(),
}
......
......@@ -36,6 +36,10 @@ libc = { version = "0.2" }
nix = { version = "0.30", features = ["fs"] }
offset-allocator = "0.2"
[[bin]]
name = "validate_numa_placement"
path = "bin/validate_numa_placement.rs"
[dev-dependencies]
serde_json = { workspace = true }
tempfile = "3"
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Diagnostic tool for validating NUMA page placement of pinned memory.
//!
//! On a multi-socket machine with multiple GPUs, this binary:
//! 1. Enumerates all visible CUDA devices
//! 2. Maps each GPU to its expected NUMA node (via PCI bus / sysfs)
//! 3. Allocates pinned memory via `PinnedStorage::new_for_device`
//! 4. Uses the `move_pages(2)` syscall to query actual page NUMA placement
//! 5. Reports match/mismatch statistics per GPU
//!
//! # Usage
//! ```bash
//! cargo run -p dynamo-memory --bin validate_numa_placement
//! cargo run -p dynamo-memory --bin validate_numa_placement -- --size 64 # 64 MiB per GPU
//! cargo run -p dynamo-memory --bin validate_numa_placement -- --gpus 0,2 # specific GPUs
//! ```
use std::process;
/// Query the NUMA node of each page in a memory region using `move_pages(2)`.
///
/// `move_pages(pid=0, count, pages, nodes=NULL, status, flags=0)` fills `status`
/// with the current NUMA node of each page without moving anything.
///
/// Returns a Vec of NUMA node IDs (one per page), or negative error codes.
fn query_page_nodes(ptr: *const u8, size: usize) -> Vec<i32> {
let page_size = unsafe {
let ps = libc::sysconf(libc::_SC_PAGESIZE);
if ps > 0 { ps as usize } else { 4096 }
};
let num_pages = size.div_ceil(page_size);
if num_pages == 0 {
return Vec::new();
}
// Build array of page-aligned pointers
let pages: Vec<*const libc::c_void> = (0..num_pages)
.map(|i| unsafe { ptr.add(i * page_size) as *const libc::c_void })
.collect();
let mut status: Vec<i32> = vec![-1; num_pages];
let ret = unsafe {
libc::syscall(
libc::SYS_move_pages,
0i32, // pid = 0 (self)
num_pages as libc::c_ulong, // count
pages.as_ptr(), // pages
std::ptr::null::<i32>(), // nodes = NULL (query mode)
status.as_mut_ptr(), // status (output)
0i32, // flags
)
};
if ret != 0 {
let errno = std::io::Error::last_os_error();
eprintln!(" move_pages syscall failed: {errno}");
return vec![-1; num_pages];
}
status
}
fn main() {
// Parse args
let args: Vec<String> = std::env::args().collect();
let mut size_mib: usize = 16; // default 16 MiB
let mut gpu_filter: Option<Vec<u32>> = None;
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"--size" => {
i += 1;
size_mib = args.get(i).and_then(|s| s.parse().ok()).unwrap_or_else(|| {
eprintln!("--size requires a numeric argument (MiB)");
process::exit(1);
});
}
"--gpus" => {
i += 1;
let gpus = args.get(i).unwrap_or_else(|| {
eprintln!("--gpus requires a comma-separated list (e.g. 0,1,3)");
process::exit(1);
});
gpu_filter = Some(
gpus.split(',')
.filter_map(|s| s.trim().parse::<u32>().ok())
.collect(),
);
}
"--help" | "-h" => {
eprintln!("Usage: validate_numa_placement [--size MiB] [--gpus 0,1,...]");
eprintln!();
eprintln!("Options:");
eprintln!(" --size MiB Allocation size per GPU (default: 16)");
eprintln!(" --gpus LIST Comma-separated GPU indices (default: all)");
process::exit(0);
}
other => {
eprintln!("Unknown argument: {other}");
process::exit(1);
}
}
i += 1;
}
let alloc_size = size_mib * 1024 * 1024;
cudarc::driver::result::init().expect("Failed to initialize CUDA driver");
let gpu_count = match cudarc::driver::result::device::get_count() {
Ok(n) => n,
Err(e) => {
eprintln!("Failed to query CUDA device count: {e}");
process::exit(1);
}
};
if gpu_count == 0 {
eprintln!("No CUDA devices found");
process::exit(1);
}
let gpus: Vec<u32> = match gpu_filter {
Some(list) => {
for &g in &list {
if g >= gpu_count as u32 {
eprintln!("GPU {g} out of range (have {gpu_count} devices)");
process::exit(1);
}
}
list
}
None => (0..gpu_count as u32).collect(),
};
println!("NUMA Placement Validator");
println!("=======================");
println!("GPUs: {gpus:?}");
println!("Alloc size: {size_mib} MiB ({alloc_size} bytes)");
println!("NUMA disabled: {}", dynamo_memory::is_numa_disabled());
println!();
// Phase 1: Show GPU-to-NUMA mapping
println!("--- GPU-to-NUMA Topology ---");
let mut expected_nodes: Vec<Option<u32>> = Vec::new();
for &gpu_id in &gpus {
let numa_node = dynamo_memory::numa::get_device_numa_node(gpu_id);
let node_str = match numa_node {
Some(n) => format!("{}", n.0),
None => "UNKNOWN".to_string(),
};
println!(" GPU {gpu_id} -> NUMA node {node_str}");
expected_nodes.push(numa_node.map(|n| n.0));
}
println!();
// Phase 2: Allocate and validate
println!("--- Page Placement Validation ---");
let mut all_ok = true;
for (idx, &gpu_id) in gpus.iter().enumerate() {
let expected = expected_nodes[idx];
print!(" GPU {gpu_id}: allocating {size_mib} MiB via new_for_device... ");
let storage = match dynamo_memory::PinnedStorage::new_for_device(alloc_size, Some(gpu_id)) {
Ok(s) => s,
Err(e) => {
println!("FAILED: {e}");
all_ok = false;
continue;
}
};
let ptr = unsafe { storage.as_ptr() };
println!("OK (ptr={ptr:p})");
// Query actual page placement
let page_nodes = query_page_nodes(ptr, alloc_size);
let total_pages = page_nodes.len();
if total_pages == 0 {
println!(" No pages to check");
continue;
}
// Count pages per NUMA node
let mut node_counts: std::collections::BTreeMap<i32, usize> =
std::collections::BTreeMap::new();
for &node in &page_nodes {
*node_counts.entry(node).or_insert(0) += 1;
}
// Report distribution
print!(" Pages: {total_pages} total -> ");
let parts: Vec<String> = node_counts
.iter()
.map(|(&node, &count)| {
let pct = (count as f64 / total_pages as f64) * 100.0;
if node < 0 {
format!("ERROR({node}): {count} ({pct:.1}%)")
} else {
format!("node {node}: {count} ({pct:.1}%)")
}
})
.collect();
println!("{}", parts.join(", "));
// Validate against expected
match expected {
Some(expected_node) => {
let correct = node_counts
.get(&(expected_node as i32))
.copied()
.unwrap_or(0);
let pct = (correct as f64 / total_pages as f64) * 100.0;
if correct == total_pages {
println!(" PASS: 100% pages on expected NUMA node {expected_node}");
} else {
let misplaced = total_pages - correct;
println!(
" FAIL: {misplaced}/{total_pages} pages ({:.1}%) NOT on expected NUMA node {expected_node}",
100.0 - pct
);
all_ok = false;
}
}
None => {
println!(" SKIP: NUMA node unknown for GPU {gpu_id}, cannot validate placement");
}
}
// Storage drops here, freeing the pinned memory
}
println!();
if all_ok {
println!("Result: ALL PASSED");
} else {
println!("Result: SOME FAILED (see above)");
process::exit(1);
}
}
......@@ -31,8 +31,15 @@ pub mod worker_pool;
use cudarc::driver::sys::CUdevice_attribute_enum;
use nix::libc;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::{fs, mem, process::Command};
/// Cache for GPU PCI address → NUMA node lookups.
/// The mapping never changes at runtime, so we cache results (including negative
/// lookups) to avoid repeated sysfs reads and nvidia-smi subprocesses.
static NUMA_NODE_CACHE: OnceLock<Mutex<HashMap<String, Option<NumaNode>>>> = OnceLock::new();
/// Check if NUMA optimization is disabled via environment variable.
///
/// NUMA-aware allocation is enabled by default. Set `DYN_MEMORY_DISABLE_NUMA=1`
......@@ -183,35 +190,40 @@ pub fn get_device_numa_node(device_id: u32) -> Option<NumaNode> {
}
};
// Step 2: Read NUMA node from sysfs
if let Some(node) = read_numa_node_from_sysfs(&pci_address) {
tracing::trace!(
"GPU {} (PCI {}) on NUMA node {} (sysfs)",
device_id,
pci_address,
node.0
);
return Some(node);
// Step 2: Check cache (includes negative lookups)
let cache = NUMA_NODE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
{
let guard = cache.lock().unwrap();
if let Some(cached) = guard.get(&pci_address) {
return *cached;
}
}
// Step 3: Read NUMA node from sysfs
let result = read_numa_node_from_sysfs(&pci_address)
.or_else(|| get_numa_node_from_nvidia_smi(&pci_address));
// Step 3: Fallback to nvidia-smi with PCI address
if let Some(node) = get_numa_node_from_nvidia_smi(&pci_address) {
match result {
Some(node) => {
tracing::trace!(
"GPU {} (PCI {}) on NUMA node {} (nvidia-smi)",
"GPU {} (PCI {}) on NUMA node {}",
device_id,
pci_address,
node.0
);
return Some(node);
}
// No NUMA info available — caller should skip NUMA optimization
None => {
tracing::warn!(
"Could not determine NUMA node for GPU {} (PCI {}), skipping NUMA optimization",
device_id,
pci_address
);
None
}
}
// Cache result (including None for negative lookups)
cache.lock().unwrap().insert(pci_address, result);
result
}
/// Pin the current thread to a specific NUMA node's CPUs.
......
......@@ -5,10 +5,71 @@
use super::{MemoryDescriptor, Result, StorageError, StorageKind, actions, nixl::NixlDescriptor};
use cudarc::driver::CudaContext;
use cudarc::driver::sys;
use std::any::Any;
use std::sync::Arc;
/// Whether to use write-combined pinned allocations.
///
/// Probed once at first use: returns `false` if `DYN_KVBM_DISABLE_WRITE_COMBINED`
/// is set, or if a test allocation reveals the hardware does not support it
/// (e.g. Grace Hopper / Blackwell with NVLink-C2C). Must be accessed only after
/// a CUDA context has been bound to the current thread.
static USE_WRITE_COMBINED: std::sync::LazyLock<bool> = std::sync::LazyLock::new(|| {
if dynamo_config::env_is_truthy("DYN_KVBM_DISABLE_WRITE_COMBINED") {
tracing::debug!("DYN_KVBM_DISABLE_WRITE_COMBINED set; write-combined disabled");
return false;
}
// Probe hardware support with a 1-byte test allocation.
// SAFETY: called from an allocation path that has already bound a CUDA context.
unsafe {
match cudarc::driver::result::malloc_host(
1,
cudarc::driver::sys::CU_MEMHOSTALLOC_WRITECOMBINED,
) {
Ok(ptr) => {
let _ = cudarc::driver::result::free_host(ptr);
true
}
Err(_) => {
tracing::debug!(
"Write-combined memory not supported on this system; \
will use regular pinned memory"
);
false
}
}
}
});
/// Allocates pinned host memory, using write-combined if [`USE_WRITE_COMBINED`]
/// allows it, otherwise falling back to `CU_MEMHOSTALLOC_DEVICEMAP`.
///
/// # Safety
/// Caller must ensure a valid CUDA context is bound to the current thread.
unsafe fn malloc_host_prefer_writecombined(size: usize) -> Result<*mut u8> {
if *USE_WRITE_COMBINED {
// SAFETY: caller guarantees a valid CUDA context is bound to the current thread
unsafe {
cudarc::driver::result::malloc_host(
size,
cudarc::driver::sys::CU_MEMHOSTALLOC_WRITECOMBINED,
)
}
.map(|ptr| ptr as *mut u8)
.map_err(StorageError::Cuda)
} else {
// SAFETY: caller guarantees a valid CUDA context is bound to the current thread
unsafe {
cudarc::driver::result::malloc_host(
size,
cudarc::driver::sys::CU_MEMHOSTALLOC_DEVICEMAP,
)
}
.map(|ptr| ptr as *mut u8)
.map_err(StorageError::Cuda)
}
}
/// CUDA pinned host memory allocated via cudaHostAlloc.
#[derive(Debug)]
pub struct PinnedStorage {
......@@ -97,10 +158,8 @@ impl PinnedStorage {
unsafe {
ctx.bind_to_thread().map_err(StorageError::Cuda)?;
let ptr = cudarc::driver::result::malloc_host(len, sys::CU_MEMHOSTALLOC_DEVICEMAP)
.map_err(StorageError::Cuda)?;
let ptr = malloc_host_prefer_writecombined(len)?;
let ptr = ptr as *mut u8;
assert!(!ptr.is_null(), "Failed to allocate pinned memory");
assert!(ptr.is_aligned(), "Pinned memory is not aligned");
assert!(len < isize::MAX as usize);
......@@ -128,6 +187,11 @@ impl PinnedStorage {
pub unsafe fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr as *mut u8
}
/// Get a reference to the CUDA context used for this allocation.
pub fn ctx(&self) -> &Arc<CudaContext> {
&self.ctx
}
}
impl Drop for PinnedStorage {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment