Unverified Commit 17db1b6a authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

fix: harden numa cpuset lookup (#7060)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent b950034b
...@@ -229,7 +229,7 @@ impl PhysicalLayoutBuilder<HasConfig, HasLayout, NoMemory> { ...@@ -229,7 +229,7 @@ impl PhysicalLayoutBuilder<HasConfig, HasLayout, NoMemory> {
/// ///
/// # Arguments /// # Arguments
/// * `device_id` - If `Some(id)`, enables NUMA-aware allocation on the GPU's NUMA node /// * `device_id` - If `Some(id)`, enables NUMA-aware allocation on the GPU's NUMA node
/// (when `DYN_KVBM_ENABLE_NUMA=1` is set). If `None`, uses direct allocation. /// (disable with `DYN_MEMORY_DISABLE_NUMA=1`). If `None`, uses direct allocation.
pub fn allocate_pinned( pub fn allocate_pinned(
self, self,
device_id: Option<u32>, device_id: Option<u32>,
......
...@@ -4,12 +4,22 @@ ...@@ -4,12 +4,22 @@
//! Re-export NUMA utilities from dynamo-memory. //! Re-export NUMA utilities from dynamo-memory.
pub use dynamo_memory::numa::*; pub use dynamo_memory::numa::*;
/// Check if NUMA optimization is explicitly opted-in for the block manager.
///
/// Set `DYN_KVBM_ENABLE_NUMA=1` to enable NUMA-aware allocation in the
/// KV cache block manager. This is opt-in because the block manager
/// manages its own pinned memory allocations separately from `PinnedStorage`.
pub fn is_numa_enabled() -> bool {
matches!(
std::env::var("DYN_KVBM_ENABLE_NUMA").as_deref(),
Ok("1" | "true" | "yes")
)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
// ── NumaNode tests ──────────────────────────────────────────────────
#[test] #[test]
fn test_numa_node_equality() { fn test_numa_node_equality() {
let node0a = NumaNode(0); let node0a = NumaNode(0);
...@@ -61,116 +71,101 @@ mod tests { ...@@ -61,116 +71,101 @@ mod tests {
#[test] #[test]
fn test_numa_node_copy_clone() { fn test_numa_node_copy_clone() {
let node1 = NumaNode(5); let node1 = NumaNode(5);
let node2 = node1; // Copy let node2 = node1;
let node3 = node1; // Clone let node3 = node1;
assert_eq!(node1, node2); assert_eq!(node1, node2);
assert_eq!(node1, node3); assert_eq!(node1, node3);
assert_eq!(node2, node3); assert_eq!(node2, node3);
} }
// ── System detection tests ──────────────────────────────────────────
#[test] #[test]
fn test_get_current_cpu_numa_node() { fn test_get_current_cpu_numa_node() {
let node = get_current_cpu_numa_node(); let node = get_current_cpu_numa_node();
if !node.is_unknown() { if !node.is_unknown() {
assert!(node.0 < 8, "NUMA node {} seems unreasonably high", node.0); assert!(node.0 < 8, "NUMA node {} seems unreasonably high", node.0);
} }
} }
#[test]
fn test_get_device_numa_node_valid_gpu() {
let node = get_device_numa_node(0);
println!("GPU 0 detected on NUMA node: {}", node.0);
}
// ── Worker pool tests ───────────────────────────────────────────────
//
// NumaWorker and NumaWorkerPool::new() are private in dynamo-memory,
// so these tests go through the public NumaWorkerPool::global() API.
/// Check if CUDA is available for testing
fn is_cuda_available() -> bool {
if std::process::Command::new("nvidia-smi")
.arg("--query-gpu=count")
.arg("--format=csv,noheader")
.output()
.is_err()
{
return false;
}
crate::block_manager::storage::cuda::Cuda::device_or_create(0).is_ok()
}
#[test] #[test]
fn test_worker_pool_singleton() { fn test_worker_pool_singleton() {
let pool1 = worker_pool::NumaWorkerPool::global(); let pool1 = worker_pool::NumaWorkerPool::global();
let pool2 = worker_pool::NumaWorkerPool::global(); let pool2 = worker_pool::NumaWorkerPool::global();
assert!(std::ptr::eq(pool1, pool2)); assert!(std::ptr::eq(pool1, pool2));
} }
}
#[cfg(all(test, feature = "testing-cuda"))]
mod cuda_tests {
use super::*;
#[test] #[test]
fn test_worker_pool_allocate() { fn test_get_device_numa_node_valid_gpu() {
if !is_cuda_available() { match get_device_numa_node(0) {
eprintln!("Skipping test_worker_pool_allocate: CUDA not available"); Some(node) => println!("GPU 0 detected on NUMA node: {}", node.0),
return; None => println!("GPU 0 has no determinable NUMA node"),
}
} }
#[test]
fn test_worker_pool_allocate() {
let pool = worker_pool::NumaWorkerPool::global(); let pool = worker_pool::NumaWorkerPool::global();
unsafe { match pool.allocate_pinned_for_gpu(8192, 0).unwrap() {
let ptr = pool.allocate_pinned_for_gpu(8192, 0).unwrap(); Some(ptr) => unsafe {
assert!(!ptr.is_null()); assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
},
None => {
println!("NUMA node unknown for GPU 0, allocation skipped");
}
} }
} }
#[test] #[test]
fn test_worker_pool_reuse() { fn test_worker_pool_reuse() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_pool_reuse: CUDA not available");
return;
}
let pool = worker_pool::NumaWorkerPool::global(); let pool = worker_pool::NumaWorkerPool::global();
unsafe { let r1 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
let ptr1 = pool.allocate_pinned_for_gpu(1024, 0).unwrap(); let r2 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
let ptr2 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
match (r1, r2) {
(Some(ptr1), Some(ptr2)) => unsafe {
assert!(!ptr1.is_null()); assert!(!ptr1.is_null());
assert!(!ptr2.is_null()); assert!(!ptr2.is_null());
assert_ne!(ptr1, ptr2); assert_ne!(ptr1, ptr2);
cudarc::driver::result::free_host(ptr1 as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr1 as *mut std::ffi::c_void).unwrap();
cudarc::driver::result::free_host(ptr2 as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr2 as *mut std::ffi::c_void).unwrap();
},
(None, None) => {
println!("NUMA node unknown, both allocations skipped");
}
_ => panic!("inconsistent NUMA detection between two calls for same GPU"),
} }
} }
#[test] #[test]
fn test_zero_size_allocation() { fn test_zero_size_allocation() {
if !is_cuda_available() {
eprintln!("Skipping test_zero_size_allocation: CUDA not available");
return;
}
let pool = worker_pool::NumaWorkerPool::global(); let pool = worker_pool::NumaWorkerPool::global();
let result = pool.allocate_pinned_for_gpu(0, 0); let result = pool.allocate_pinned_for_gpu(0, 0);
assert!(result.is_err()); match result {
assert!(result.unwrap_err().contains("zero")); Ok(None) => {
println!("NUMA node unknown, zero-size check not reached");
}
Err(e) => {
assert!(e.contains("zero"));
}
Ok(Some(_)) => panic!("zero-size allocation should not succeed"),
}
} }
#[test] #[test]
fn test_pinned_allocation_api() { fn test_pinned_allocation_api() {
let pool = worker_pool::NumaWorkerPool::global(); let pool = worker_pool::NumaWorkerPool::global();
unsafe { if let Ok(Some(ptr)) = pool.allocate_pinned_for_gpu(1024, 0) {
if let Ok(ptr) = pool.allocate_pinned_for_gpu(1024, 0) {
assert!(!ptr.is_null()); assert!(!ptr.is_null());
unsafe {
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
} }
} }
......
...@@ -216,7 +216,14 @@ impl PinnedStorage { ...@@ -216,7 +216,14 @@ impl PinnedStorage {
match numa_allocator::worker_pool::NumaWorkerPool::global() match numa_allocator::worker_pool::NumaWorkerPool::global()
.allocate_pinned_for_gpu(size, device_id) .allocate_pinned_for_gpu(size, device_id)
{ {
Ok(ptr) => ptr, Ok(Some(ptr)) => ptr,
Ok(None) => {
tracing::debug!(
"NUMA node unknown for GPU {}, using direct allocation",
device_id
);
malloc_host_prefer_writecombined(size)?
}
Err(e) => { Err(e) => {
tracing::warn!("NUMA allocation failed: {}, using direct allocation", e); tracing::warn!("NUMA allocation failed: {}, using direct allocation", e);
malloc_host_prefer_writecombined(size)? malloc_host_prefer_writecombined(size)?
......
...@@ -10,7 +10,7 @@ use std::collections::HashMap; ...@@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock}; use std::sync::{Arc, Mutex, OnceLock};
/// Get or create a CUDA context for the given device. /// Get or create a CUDA context for the given device.
fn cuda_context(device_id: u32) -> Result<Arc<CudaContext>> { pub(crate) fn cuda_context(device_id: u32) -> Result<Arc<CudaContext>> {
static CONTEXTS: OnceLock<Mutex<HashMap<u32, Arc<CudaContext>>>> = OnceLock::new(); static CONTEXTS: OnceLock<Mutex<HashMap<u32, Arc<CudaContext>>>> = OnceLock::new();
let mut map = CONTEXTS.get_or_init(Default::default).lock().unwrap(); let mut map = CONTEXTS.get_or_init(Default::default).lock().unwrap();
......
...@@ -43,7 +43,7 @@ pub use device::DeviceStorage; ...@@ -43,7 +43,7 @@ pub use device::DeviceStorage;
pub use disk::DiskStorage; pub use disk::DiskStorage;
pub use external::ExternalDeviceMemory; pub use external::ExternalDeviceMemory;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub use numa::{NumaNode, is_numa_enabled}; pub use numa::{NumaNode, is_numa_disabled};
pub use offset::OffsetBuffer; pub use offset::OffsetBuffer;
pub use pinned::PinnedStorage; pub use pinned::PinnedStorage;
pub use pool::{CudaMemPool, CudaMemPoolBuilder}; pub use pool::{CudaMemPool, CudaMemPoolBuilder};
......
...@@ -15,30 +15,30 @@ ...@@ -15,30 +15,30 @@
//! //!
//! ## Usage //! ## Usage
//! //!
//! NUMA optimization is opt-in via environment variable: //! NUMA optimization is enabled by default. To disable it:
//! ```bash //! ```bash
//! export DYN_KVBM_ENABLE_NUMA=1 //! export DYN_MEMORY_DISABLE_NUMA=1
//! ``` //! ```
//! //!
//! When enabled, pinned memory allocations are routed through NUMA workers //! When enabled, pinned memory allocations are routed through NUMA workers
//! that are pinned to the target GPU's NUMA node, ensuring first-touch policy //! that are pinned to the target GPU's NUMA node, ensuring first-touch policy
//! places pages on the correct node. //! places pages on the correct node. If the GPU's NUMA node cannot be
//! determined, allocation falls back to the non-NUMA path transparently.
pub mod topology; pub mod topology;
pub mod worker_pool; pub mod worker_pool;
use cudarc::driver::sys::CUdevice_attribute_enum;
use nix::libc; use nix::libc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{mem, process::Command}; use std::{fs, mem, process::Command};
/// Check if NUMA optimization is enabled via environment variable /// Check if NUMA optimization is disabled via environment variable.
/// ///
/// Set `DYN_KVBM_ENABLE_NUMA=1` to enable NUMA-aware allocation. /// NUMA-aware allocation is enabled by default. Set `DYN_MEMORY_DISABLE_NUMA=1`
/// Default: disabled (opt-in) /// (or any truthy value) to disable it.
pub fn is_numa_enabled() -> bool { pub fn is_numa_disabled() -> bool {
std::env::var("DYN_KVBM_ENABLE_NUMA") dynamo_config::env_is_truthy("DYN_MEMORY_DISABLE_NUMA")
.map(|v| v == "1" || v.to_lowercase() == "true")
.unwrap_or(false)
} }
/// Represents a NUMA node identifier. /// Represents a NUMA node identifier.
...@@ -92,88 +92,126 @@ pub fn get_current_cpu_numa_node() -> NumaNode { ...@@ -92,88 +92,126 @@ pub fn get_current_cpu_numa_node() -> NumaNode {
} }
} }
/// Resolve process-local CUDA device index to the physical identifier for nvidia-smi. /// Format a PCI bus address from domain, bus, and device IDs.
/// ///
/// When `CUDA_VISIBLE_DEVICES` is set, the process sees a remapped device space (e.g. only /// Returns a string in the format `"DDDD:BB:DD.0"` suitable for sysfs lookups.
/// GPU 2 visible as device 0). nvidia-smi's `-i` flag expects the *physical* device index or fn format_pci_bus_address(domain: i32, bus: i32, device: i32) -> String {
/// UUID, not the process-local index. This function parses `CUDA_VISIBLE_DEVICES` to map format!("{:04x}:{:02x}:{:02x}.0", domain, bus, device)
/// process-local `device_id` to the correct physical identifier. }
/// Query the PCI bus address for a CUDA device from the CUDA driver API.
/// ///
/// Returns the identifier string to pass to `nvidia-smi -i` (physical index or UUID). /// Uses `CudaContext::attribute()` to read PCI domain, bus, and device IDs.
fn cuda_device_id_to_nvidia_smi_id(device_id: u32) -> String { /// This transparently handles `CUDA_VISIBLE_DEVICES` remapping since
let visible = match std::env::var("CUDA_VISIBLE_DEVICES") { /// `CudaContext::new(ordinal)` operates on the process-local device index.
Ok(v) if !v.trim().is_empty() => v, fn get_pci_bus_address_from_cuda(device_id: u32) -> Option<String> {
_ => return device_id.to_string(), // No remapping: identity let ctx = crate::device::cuda_context(device_id).ok()?;
}; let domain = ctx
.attribute(CUdevice_attribute_enum::CU_DEVICE_ATTRIBUTE_PCI_DOMAIN_ID)
.ok()?;
let bus = ctx
.attribute(CUdevice_attribute_enum::CU_DEVICE_ATTRIBUTE_PCI_BUS_ID)
.ok()?;
let device = ctx
.attribute(CUdevice_attribute_enum::CU_DEVICE_ATTRIBUTE_PCI_DEVICE_ID)
.ok()?;
Some(format_pci_bus_address(domain, bus, device))
}
// Parse comma-separated list. Supports: "0,1,2", "2,3", "GPU-uuid", "2,GPU-uuid", etc. /// Read the NUMA node for a PCI device from sysfs.
let devices: Vec<&str> = visible ///
.split(',') /// Reads `/sys/bus/pci/devices/<pci_address>/numa_node`. Returns `None` if the
.map(|s| s.trim()) /// file doesn't exist, can't be read, or contains `-1` (no NUMA affinity).
.filter(|s| !s.is_empty()) fn read_numa_node_from_sysfs(pci_address: &str) -> Option<NumaNode> {
.collect(); let path = format!("/sys/bus/pci/devices/{}/numa_node", pci_address);
if device_id as usize >= devices.len() { let content = fs::read_to_string(&path).ok()?;
tracing::warn!( let node: i32 = content.trim().parse().ok()?;
"device_id {} out of range for CUDA_VISIBLE_DEVICES ({} devices), using identity", if node < 0 {
device_id, // -1 means no NUMA affinity info available
devices.len() None
); } else {
return device_id.to_string(); Some(NumaNode(node as u32))
}
}
/// Fallback: query NUMA node from nvidia-smi using PCI bus address.
///
/// Uses the PCI BDF address (not env-var-based device index) so it is
/// correct regardless of `CUDA_VISIBLE_DEVICES` remapping.
fn get_numa_node_from_nvidia_smi(pci_address: &str) -> Option<NumaNode> {
let output = Command::new("nvidia-smi")
.args(["topo", "--get-numa-id-of-nearby-cpu", "-i", pci_address])
.output()
.ok()?;
if !output.status.success() {
return None;
} }
let id = devices[device_id as usize]; let stdout = std::str::from_utf8(&output.stdout).ok()?;
id.to_string() let line = stdout.lines().next()?;
let numa_str = line.split(':').nth(1)?;
let node: u32 = numa_str.trim().parse().ok()?;
Some(NumaNode(node))
} }
/// Get NUMA node for a GPU device. /// Get NUMA node for a GPU device.
/// ///
/// For GPU memory, the NUMA affinity depends on which PCIe bus the GPU is attached to. /// Queries the PCI bus address from the CUDA driver API, then reads the NUMA
/// This is queried via nvidia-smi. Falls back to a heuristic (device_id % 2) if nvidia-smi /// node from sysfs. Falls back to nvidia-smi with the PCI address. Returns
/// is unavailable. /// `None` if the NUMA node cannot be determined, signaling the caller to skip
/// NUMA-aware allocation entirely rather than guessing wrong.
/// ///
/// When `CUDA_VISIBLE_DEVICES` is set, the process-local `device_id` is correctly mapped /// `CUDA_VISIBLE_DEVICES` is handled transparently because `CudaContext::new(ordinal)`
/// to the physical GPU identifier before querying nvidia-smi, so NUMA attribution is accurate. /// operates on the process-local device index.
/// ///
/// # Arguments /// # Arguments
/// * `device_id` - CUDA device index (0, 1, 2, ...) as seen by the process /// * `device_id` - CUDA device index (0, 1, 2, ...) as seen by the process
/// ///
/// # Returns /// # Returns
/// The NUMA node closest to the specified GPU, or a heuristic fallback. /// The NUMA node closest to the specified GPU, or `None` if it cannot be determined.
pub fn get_device_numa_node(device_id: u32) -> NumaNode { pub fn get_device_numa_node(device_id: u32) -> Option<NumaNode> {
let nvidia_smi_id = cuda_device_id_to_nvidia_smi_id(device_id); // Step 1: Get PCI bus address from CUDA driver
let pci_address = match get_pci_bus_address_from_cuda(device_id) {
// Use nvidia-smi topo to get NUMA ID of nearest CPU Some(addr) => addr,
// -i must be physical device index or UUID, not process-local index None => {
let output = match Command::new("nvidia-smi")
.args(["topo", "--get-numa-id-of-nearby-cpu", "-i", &nvidia_smi_id])
.output()
{
Ok(out) if out.status.success() => out,
_ => {
tracing::warn!( tracing::warn!(
"nvidia-smi failed for GPU {} (nvidia-smi -i {}), using heuristic", "Failed to get PCI address from CUDA for device {}, skipping NUMA optimization",
device_id, device_id
nvidia_smi_id
); );
return NumaNode(device_id % 2); return None;
} }
}; };
if let Ok(stdout) = std::str::from_utf8(&output.stdout) // Step 2: Read NUMA node from sysfs
&& let Some(line) = stdout.lines().next() if let Some(node) = read_numa_node_from_sysfs(&pci_address) {
&& let Some(numa_str) = line.split(':').nth(1)
&& let Ok(node) = numa_str.trim().parse::<u32>()
{
tracing::trace!( tracing::trace!(
"GPU {} (physical {}) on NUMA node {}", "GPU {} (PCI {}) on NUMA node {} (sysfs)",
device_id, device_id,
nvidia_smi_id, pci_address,
node node.0
); );
return NumaNode(node); return Some(node);
} }
tracing::warn!("Failed to get NUMA node for GPU {}", device_id);
NumaNode::UNKNOWN // Step 3: Fallback to nvidia-smi with PCI address
if let Some(node) = get_numa_node_from_nvidia_smi(&pci_address) {
tracing::trace!(
"GPU {} (PCI {}) on NUMA node {} (nvidia-smi)",
device_id,
pci_address,
node.0
);
return Some(node);
}
// No NUMA info available — caller should skip NUMA optimization
tracing::warn!(
"Could not determine NUMA node for GPU {} (PCI {}), skipping NUMA optimization",
device_id,
pci_address
);
None
} }
/// Pin the current thread to a specific NUMA node's CPUs. /// Pin the current thread to a specific NUMA node's CPUs.
...@@ -257,7 +295,6 @@ mod tests { ...@@ -257,7 +295,6 @@ mod tests {
#[test] #[test]
fn test_numa_node_serialization() { fn test_numa_node_serialization() {
// Verify NumaNode can be serialized (important for benchmarking)
let node = NumaNode(1); let node = NumaNode(1);
let json = serde_json::to_string(&node).unwrap(); let json = serde_json::to_string(&node).unwrap();
let deserialized: NumaNode = serde_json::from_str(&json).unwrap(); let deserialized: NumaNode = serde_json::from_str(&json).unwrap();
...@@ -266,28 +303,14 @@ mod tests { ...@@ -266,28 +303,14 @@ mod tests {
#[test] #[test]
fn test_get_current_cpu_numa_node() { fn test_get_current_cpu_numa_node() {
// Should either return a valid node or UNKNOWN
let node = get_current_cpu_numa_node(); let node = get_current_cpu_numa_node();
// If not unknown, should be a reasonable NUMA node number (< 8 on most systems)
if !node.is_unknown() { if !node.is_unknown() {
assert!(node.0 < 8, "NUMA node {} seems unreasonably high", node.0); assert!(node.0 < 8, "NUMA node {} seems unreasonably high", node.0);
} }
} }
#[test]
fn test_get_device_numa_node_valid_gpu() {
// Test GPU 0 detection
let node = get_device_numa_node(0);
// Should return either a valid node (0-7) or use heuristic (gpu_id % 2)
// On dual-socket systems, GPU 0 typically on node 0 or 1
println!("GPU 0 detected on NUMA node: {}", node.0);
}
#[test] #[test]
fn test_numa_node_hash() { fn test_numa_node_hash() {
// Verify NumaNode can be used as a HashMap key
use std::collections::HashMap; use std::collections::HashMap;
let mut map = HashMap::new(); let mut map = HashMap::new();
...@@ -301,13 +324,78 @@ mod tests { ...@@ -301,13 +324,78 @@ mod tests {
#[test] #[test]
fn test_numa_node_copy_clone() { fn test_numa_node_copy_clone() {
// Verify NumaNode is Copy and Clone
let node1 = NumaNode(5); let node1 = NumaNode(5);
let node2 = node1; // Copy let node2 = node1;
let node3 = node1; // Clone let node3 = node1;
assert_eq!(node1, node2); assert_eq!(node1, node2);
assert_eq!(node1, node3); assert_eq!(node1, node3);
assert_eq!(node2, node3); assert_eq!(node2, node3);
} }
#[test]
fn test_format_pci_bus_address() {
assert_eq!(format_pci_bus_address(0, 0, 0), "0000:00:00.0");
assert_eq!(format_pci_bus_address(0, 0x3b, 0), "0000:3b:00.0");
assert_eq!(format_pci_bus_address(0, 0xaf, 0), "0000:af:00.0");
assert_eq!(format_pci_bus_address(0x10, 0x1a, 0x03), "0010:1a:03.0");
}
#[test]
fn test_read_numa_node_from_sysfs_nonexistent() {
assert!(read_numa_node_from_sysfs("ffff:ff:ff.0").is_none());
}
}
#[cfg(all(test, feature = "testing-cuda"))]
mod cuda_tests {
use super::*;
#[test]
fn test_get_pci_bus_address_from_cuda() {
let addr = get_pci_bus_address_from_cuda(0).expect("should get PCI address for GPU 0");
// Validate BDF format: DDDD:BB:DD.0
let parts: Vec<&str> = addr.split(':').collect();
assert_eq!(
parts.len(),
3,
"PCI address should have 3 colon-separated parts: {}",
addr
);
assert_eq!(parts[0].len(), 4, "domain should be 4 hex chars: {}", addr);
assert!(parts[2].ends_with(".0"), "should end with .0: {}", addr);
println!("GPU 0 PCI address: {}", addr);
}
#[test]
fn test_read_numa_node_from_sysfs_real_gpu() {
let addr = get_pci_bus_address_from_cuda(0).expect("should get PCI address for GPU 0");
if let Some(node) = read_numa_node_from_sysfs(&addr) {
assert!(node.0 < 16, "NUMA node {} seems unreasonably high", node.0);
println!("GPU 0 (PCI {}) sysfs NUMA node: {}", addr, node.0);
} else {
println!(
"GPU 0 (PCI {}) has no sysfs NUMA info (single-socket?)",
addr
);
}
}
#[test]
fn test_get_device_numa_node_returns_some_or_none() {
let result = get_device_numa_node(0);
match result {
Some(node) => {
assert!(node.0 < 16, "NUMA node {} seems unreasonably high", node.0);
assert!(
!node.is_unknown(),
"should never return UNKNOWN inside Some"
);
println!("GPU 0 detected on NUMA node: {}", node.0);
}
None => {
println!("GPU 0 has no determinable NUMA node (single-socket or no sysfs info)");
}
}
}
} }
...@@ -13,11 +13,9 @@ ...@@ -13,11 +13,9 @@
//! - First-touch page allocation ensures correct NUMA placement //! - First-touch page allocation ensures correct NUMA placement
use super::get_current_cpu_numa_node; use super::get_current_cpu_numa_node;
use cudarc::driver::CudaContext;
use cudarc::driver::result::malloc_host; use cudarc::driver::result::malloc_host;
use cudarc::driver::sys::CU_MEMHOSTALLOC_DEVICEMAP; use cudarc::driver::sys::CU_MEMHOSTALLOC_DEVICEMAP;
use nix::libc; use nix::libc;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender, channel}; use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, Mutex, OnceLock}; use std::sync::{Arc, Mutex, OnceLock};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
...@@ -25,25 +23,6 @@ use std::time::Duration; ...@@ -25,25 +23,6 @@ use std::time::Duration;
use super::{NumaNode, get_device_numa_node}; use super::{NumaNode, get_device_numa_node};
/// Get or create a CUDA context for the given device.
fn cuda_context(device_id: u32) -> Result<Arc<CudaContext>, String> {
static CONTEXTS: OnceLock<Mutex<HashMap<u32, Arc<CudaContext>>>> = OnceLock::new();
let mut map = CONTEXTS.get_or_init(Default::default).lock().unwrap();
if let Some(existing) = map.get(&device_id) {
return Ok(existing.clone());
}
let ctx = CudaContext::new(device_id as usize).map_err(|e| {
format!(
"Failed to create CUDA context for device {}: {:?}",
device_id, e
)
})?;
map.insert(device_id, ctx.clone());
Ok(ctx)
}
/// Wrapper for raw pointer that can be sent between threads. /// Wrapper for raw pointer that can be sent between threads.
/// ///
/// # Safety /// # Safety
...@@ -197,7 +176,8 @@ impl NumaWorker { ...@@ -197,7 +176,8 @@ impl NumaWorker {
} }
// Get or create CUDA context for this GPU // Get or create CUDA context for this GPU
let ctx = cuda_context(gpu_id)?; let ctx = crate::device::cuda_context(gpu_id)
.map_err(|e| format!("Failed to create CUDA context for device {}: {}", gpu_id, e))?;
unsafe { unsafe {
// Bind CUDA context to this worker thread before allocation // Bind CUDA context to this worker thread before allocation
...@@ -370,19 +350,35 @@ impl NumaWorkerPool { ...@@ -370,19 +350,35 @@ impl NumaWorkerPool {
/// Allocate CUDA pinned memory for a specific GPU (auto-detects NUMA node). /// Allocate CUDA pinned memory for a specific GPU (auto-detects NUMA node).
/// ///
/// This method: /// This method:
/// 1. Determines the GPU's NUMA node via nvidia-smi /// 1. Determines the GPU's NUMA node via CUDA driver PCI attributes + sysfs
/// 2. Routes the allocation to a worker pinned to that node /// 2. Routes the allocation to a worker pinned to that node
/// 3. The worker allocates and touches pages to ensure first-touch placement /// 3. The worker allocates and touches pages to ensure first-touch placement
/// ///
/// Returns `None` if the GPU's NUMA node cannot be determined, signaling
/// the caller to fall back to non-NUMA allocation.
///
/// # Arguments /// # Arguments
/// * `size` - Number of bytes to allocate /// * `size` - Number of bytes to allocate
/// * `gpu_id` - CUDA device ID /// * `gpu_id` - CUDA device ID
/// ///
/// # Returns /// # Returns
/// Raw pointer to the allocated memory. Caller is responsible for freeing via /// `Some(ptr)` on success, `None` if NUMA node is unknown (caller should
/// `cudarc::driver::result::free_host`. /// use non-NUMA allocation). Returns `Err` on allocation failure.
pub fn allocate_pinned_for_gpu(&self, size: usize, gpu_id: u32) -> Result<*mut u8, String> { pub fn allocate_pinned_for_gpu(
let node = get_device_numa_node(gpu_id); &self,
size: usize,
gpu_id: u32,
) -> Result<Option<*mut u8>, String> {
let node = match get_device_numa_node(gpu_id) {
Some(node) => node,
None => {
tracing::debug!(
"NUMA node unknown for GPU {}, skipping NUMA-aware allocation",
gpu_id
);
return Ok(None);
}
};
tracing::debug!( tracing::debug!(
"Allocating {} bytes pinned memory for GPU {} (NUMA node {})", "Allocating {} bytes pinned memory for GPU {} (NUMA node {})",
...@@ -392,45 +388,67 @@ impl NumaWorkerPool { ...@@ -392,45 +388,67 @@ impl NumaWorkerPool {
); );
let worker = self.get_or_spawn_worker(node)?; let worker = self.get_or_spawn_worker(node)?;
worker.allocate(size, gpu_id).map(|send_ptr| send_ptr.0) worker
.allocate(size, gpu_id)
.map(|send_ptr| Some(send_ptr.0))
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::numa::{get_current_cpu_numa_node, get_device_numa_node}; use crate::numa::get_current_cpu_numa_node;
/// Check if CUDA is available for testing. #[test]
fn is_cuda_available() -> bool { fn test_worker_spawn() {
// Check if nvidia-smi is available let node = NumaNode(0);
if std::process::Command::new("nvidia-smi") let worker = NumaWorker::spawn(node);
.arg("--query-gpu=count") assert!(worker.is_ok());
.arg("--format=csv,noheader")
.output()
.is_err()
{
return false;
} }
// Try to initialize CUDA context for device 0 #[test]
cuda_context(0).is_ok() fn test_worker_pool_singleton() {
let pool1 = NumaWorkerPool::global();
let pool2 = NumaWorkerPool::global();
assert!(std::ptr::eq(pool1, pool2));
} }
#[test] #[test]
fn test_worker_spawn() { fn test_get_current_cpu_numa_node() {
let node = get_current_cpu_numa_node();
if !node.is_unknown() {
println!("Current CPU on NUMA node: {}", node.0);
} else {
println!("NUMA node detection unavailable (single-node or fake NUMA)");
}
}
#[test]
fn test_numa_node_display() {
let node = NumaNode(0); let node = NumaNode(0);
let worker = NumaWorker::spawn(node); assert_eq!(format!("{}", node), "NumaNode(0)");
assert!(worker.is_ok());
let unknown = NumaNode::UNKNOWN;
assert_eq!(format!("{}", unknown), "UNKNOWN");
} }
#[test] #[test]
fn test_worker_allocate_pinned() { fn test_numa_node_is_unknown() {
if !is_cuda_available() { let valid = NumaNode(0);
eprintln!("Skipping test_worker_allocate_pinned: CUDA not available"); assert!(!valid.is_unknown());
return;
let unknown = NumaNode::UNKNOWN;
assert!(unknown.is_unknown());
} }
}
#[cfg(all(test, feature = "testing-cuda"))]
mod cuda_tests {
use super::*;
use crate::numa::get_device_numa_node;
#[test]
fn test_worker_allocate_pinned() {
let node = NumaNode(0); let node = NumaNode(0);
let worker = NumaWorker::spawn(node).unwrap(); let worker = NumaWorker::spawn(node).unwrap();
...@@ -445,123 +463,83 @@ mod tests { ...@@ -445,123 +463,83 @@ mod tests {
#[test] #[test]
fn test_worker_pool() { fn test_worker_pool() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_pool: CUDA not available");
return;
}
let pool = NumaWorkerPool::new(); let pool = NumaWorkerPool::new();
unsafe { match pool.allocate_pinned_for_gpu(8192, 0).unwrap() {
let ptr = pool.allocate_pinned_for_gpu(8192, 0).unwrap(); Some(ptr) => unsafe {
assert!(!ptr.is_null()); assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
},
None => {
println!(
"NUMA node unknown for GPU 0, allocation skipped (expected on single-socket)"
);
} }
} }
#[test]
fn test_worker_pool_singleton() {
// Verify that global() returns the same instance
let pool1 = NumaWorkerPool::global();
let pool2 = NumaWorkerPool::global();
// They should be the same static reference
assert!(std::ptr::eq(pool1, pool2));
} }
#[test] #[test]
fn test_worker_reuse() { fn test_worker_reuse() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_reuse: CUDA not available");
return;
}
// Test that subsequent allocations for the same GPU reuse the same worker
let pool = NumaWorkerPool::new(); let pool = NumaWorkerPool::new();
unsafe { // If NUMA node is unknown, both calls return None — that's fine
// First allocation spawns worker for GPU 0 let r1 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
let ptr1 = pool.allocate_pinned_for_gpu(1024, 0).unwrap(); let r2 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
// Second allocation should reuse worker for GPU 0
let ptr2 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
match (r1, r2) {
(Some(ptr1), Some(ptr2)) => unsafe {
assert!(!ptr1.is_null()); assert!(!ptr1.is_null());
assert!(!ptr2.is_null()); assert!(!ptr2.is_null());
assert_ne!(ptr1, ptr2); assert_ne!(ptr1, ptr2);
cudarc::driver::result::free_host(ptr1 as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr1 as *mut std::ffi::c_void).unwrap();
cudarc::driver::result::free_host(ptr2 as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr2 as *mut std::ffi::c_void).unwrap();
},
(None, None) => {
println!("NUMA node unknown, both allocations skipped");
}
_ => panic!("inconsistent NUMA detection between two calls for same GPU"),
} }
} }
#[test] #[test]
fn test_zero_size_allocation() { fn test_zero_size_allocation_with_known_node() {
// Test that zero-size allocations are rejected // Zero-size is rejected by the worker, but only if NUMA node is known.
// If NUMA node is unknown, allocate_pinned_for_gpu returns Ok(None) before
// reaching the worker.
let pool = NumaWorkerPool::new(); let pool = NumaWorkerPool::new();
let result = pool.allocate_pinned_for_gpu(0, 0); let result = pool.allocate_pinned_for_gpu(0, 0);
assert!(result.is_err()); match result {
assert!(result.unwrap_err().contains("zero")); Ok(None) => {
println!("NUMA node unknown, zero-size check not reached");
} }
Err(e) => {
#[test] assert!(e.contains("zero"));
fn test_get_current_cpu_numa_node() { }
// Test that we can detect current CPU's NUMA node Ok(Some(_)) => panic!("zero-size allocation should not succeed"),
let node = get_current_cpu_numa_node();
// On a real NUMA system, should return a valid node
// On fake NUMA or single-node, might return 0 or UNKNOWN
if !node.is_unknown() {
println!("Current CPU on NUMA node: {}", node.0);
} else {
println!("NUMA node detection unavailable (single-node or fake NUMA)");
} }
} }
#[test] #[test]
fn test_get_device_numa_node() { fn test_get_device_numa_node() {
// Test GPU NUMA node detection
// This will only work if nvidia-smi is available
let node = get_device_numa_node(0); let node = get_device_numa_node(0);
match node {
if !node.is_unknown() { Some(n) => {
println!("GPU 0 on NUMA node: {}", node.0); assert!(n.0 < 16, "NUMA node {} seems unreasonably high", n.0);
// Node should be 0 or 1 on typical dual-socket systems println!("GPU 0 on NUMA node: {}", n.0);
assert!(node.0 <= 1 || node.0 == u32::MAX);
} else {
println!("GPU NUMA detection unavailable (no nvidia-smi or no GPU)");
} }
None => {
println!("GPU 0 has no determinable NUMA node");
} }
#[test]
fn test_numa_node_display() {
// Test Display implementation for NumaNode
let node = NumaNode(0);
assert_eq!(format!("{}", node), "NumaNode(0)");
let unknown = NumaNode::UNKNOWN;
assert_eq!(format!("{}", unknown), "UNKNOWN");
} }
#[test]
fn test_numa_node_is_unknown() {
let valid = NumaNode(0);
assert!(!valid.is_unknown());
let unknown = NumaNode::UNKNOWN;
assert!(unknown.is_unknown());
} }
#[test] #[test]
fn test_pinned_allocation_api() { fn test_pinned_allocation_api() {
// Verify the public API works for pinned allocation
let pool = NumaWorkerPool::new(); let pool = NumaWorkerPool::new();
unsafe { if let Some(ptr) = pool.allocate_pinned_for_gpu(1024, 0).unwrap() {
// Test that we can allocate pinned memory for a GPU
if let Ok(ptr) = pool.allocate_pinned_for_gpu(1024, 0) {
assert!(!ptr.is_null()); assert!(!ptr.is_null());
unsafe {
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
} }
} }
...@@ -569,22 +547,15 @@ mod tests { ...@@ -569,22 +547,15 @@ mod tests {
#[test] #[test]
fn test_worker_channel_communication() { fn test_worker_channel_communication() {
// Test that worker receives and processes requests
let node = NumaNode(0); let node = NumaNode(0);
let worker = NumaWorker::spawn(node).unwrap(); let worker = NumaWorker::spawn(node).unwrap();
// Send allocation request let send_ptr = worker.allocate(1024, 0).unwrap();
let result = worker.allocate(1024, 0);
// Should get a response (either success or timeout)
assert!(result.is_ok() || result.is_err());
if let Ok(send_ptr) = result {
unsafe {
let ptr = send_ptr.0; let ptr = send_ptr.0;
assert!(!ptr.is_null()); assert!(!ptr.is_null());
unsafe {
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap(); cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
} }
} }
}
} }
...@@ -7,22 +7,7 @@ use super::{MemoryDescriptor, Result, StorageError, StorageKind, actions, nixl:: ...@@ -7,22 +7,7 @@ use super::{MemoryDescriptor, Result, StorageError, StorageKind, actions, nixl::
use cudarc::driver::CudaContext; use cudarc::driver::CudaContext;
use cudarc::driver::sys; use cudarc::driver::sys;
use std::any::Any; use std::any::Any;
use std::collections::HashMap; use std::sync::Arc;
use std::sync::{Arc, Mutex, OnceLock};
/// Get or create a CUDA context for the given device.
fn cuda_context(device_id: u32) -> Result<Arc<CudaContext>> {
static CONTEXTS: OnceLock<Mutex<HashMap<u32, Arc<CudaContext>>>> = OnceLock::new();
let mut map = CONTEXTS.get_or_init(Default::default).lock().unwrap();
if let Some(existing) = map.get(&device_id) {
return Ok(existing.clone());
}
let ctx = CudaContext::new(device_id as usize)?;
map.insert(device_id, ctx.clone());
Ok(ctx)
}
/// CUDA pinned host memory allocated via cudaHostAlloc. /// CUDA pinned host memory allocated via cudaHostAlloc.
#[derive(Debug)] #[derive(Debug)]
...@@ -51,10 +36,11 @@ impl PinnedStorage { ...@@ -51,10 +36,11 @@ impl PinnedStorage {
/// Allocate pinned memory, optionally NUMA-aware for a specific GPU. /// Allocate pinned memory, optionally NUMA-aware for a specific GPU.
/// ///
/// When `device_id` is `Some`, the allocation is performed on a worker thread /// When `device_id` is `Some`, NUMA-aware allocation is attempted by default:
/// pinned to the GPU's NUMA node, ensuring optimal memory placement via /// a worker thread pinned to the GPU's NUMA node performs the allocation,
/// first-touch policy, However, NUMA is only used if enabled via the /// ensuring optimal memory placement via first-touch policy. If the GPU's
/// `DYN_KVBM_ENABLE_NUMA=1` environment variable. /// NUMA node cannot be determined, allocation falls back to the direct path.
/// Set `DYN_MEMORY_DISABLE_NUMA=1` to skip NUMA optimization entirely.
/// ///
/// When `device_id` is `None`, a direct allocation is performed on device 0. /// When `device_id` is `None`, a direct allocation is performed on device 0.
/// ///
...@@ -75,21 +61,40 @@ impl PinnedStorage { ...@@ -75,21 +61,40 @@ impl PinnedStorage {
} }
let gpu_id = device_id.unwrap_or(0); let gpu_id = device_id.unwrap_or(0);
let ctx = cuda_context(gpu_id)?; let ctx = crate::device::cuda_context(gpu_id)?;
let ptr = match device_id { // Try NUMA-aware allocation unless explicitly disabled
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
Some(gpu_id) if super::numa::is_numa_enabled() => { let numa_ptr = if let Some(gpu_id) = device_id {
if !super::numa::is_numa_disabled() {
match super::numa::worker_pool::NumaWorkerPool::global()
.allocate_pinned_for_gpu(len, gpu_id)
{
Ok(Some(ptr)) => {
tracing::debug!( tracing::debug!(
"Using NUMA-aware allocation for {} bytes on GPU {}", "Using NUMA-aware allocation for {} bytes on GPU {}",
len, len,
gpu_id gpu_id
); );
super::numa::worker_pool::NumaWorkerPool::global() Some(ptr as usize)
.allocate_pinned_for_gpu(len, gpu_id) }
.map_err(StorageError::AllocationFailed)? as usize Ok(None) => None, // NUMA node unknown, fall through
Err(e) => return Err(StorageError::AllocationFailed(e)),
} }
_ => unsafe { } else {
None
}
} else {
None
};
#[cfg(not(target_os = "linux"))]
let numa_ptr: Option<usize> = None;
let ptr = if let Some(ptr) = numa_ptr {
ptr
} else {
unsafe {
ctx.bind_to_thread().map_err(StorageError::Cuda)?; ctx.bind_to_thread().map_err(StorageError::Cuda)?;
let ptr = cudarc::driver::result::malloc_host(len, sys::CU_MEMHOSTALLOC_DEVICEMAP) let ptr = cudarc::driver::result::malloc_host(len, sys::CU_MEMHOSTALLOC_DEVICEMAP)
...@@ -101,7 +106,7 @@ impl PinnedStorage { ...@@ -101,7 +106,7 @@ impl PinnedStorage {
assert!(len < isize::MAX as usize); assert!(len < isize::MAX as usize);
ptr as usize ptr as usize
}, }
}; };
Ok(Self { ptr, len, ctx }) Ok(Self { ptr, len, ctx })
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment