Unverified Commit 67d27bcc authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

fix: fixing the NUMA sensitivity problem in KVBM for TP=1 (#3700)


Signed-off-by: default avatarOlga Andreeva <oandreeva@nvidia.com>
parent f6ed01b1
...@@ -16,6 +16,7 @@ pub mod distributed; ...@@ -16,6 +16,7 @@ pub mod distributed;
pub mod events; pub mod events;
pub mod layout; pub mod layout;
pub mod metrics_kvbm; pub mod metrics_kvbm;
pub mod numa_allocator;
pub mod offload; pub mod offload;
pub mod pool; pub mod pool;
pub mod storage; pub mod storage;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub mod topology;
pub mod worker_pool;
use nix::libc;
use serde::{Deserialize, Serialize};
use std::{mem, process::Command};
/// Check if NUMA optimization is enabled via environment variable
///
/// Set `DYN_KVBM_ENABLE_NUMA=1` to enable NUMA-aware allocation.
/// Default: disabled (opt-in)
pub fn is_numa_enabled() -> bool {
std::env::var("DYN_KVBM_ENABLE_NUMA")
.map(|v| v == "1" || v.to_lowercase() == "true")
.unwrap_or(false)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NumaNode(pub u32);
impl NumaNode {
pub const UNKNOWN: NumaNode = NumaNode(u32::MAX);
pub fn is_unknown(&self) -> bool {
self.0 == u32::MAX
}
}
impl std::fmt::Display for NumaNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_unknown() {
write!(f, "UNKNOWN")
} else {
write!(f, "NumaNode({})", self.0)
}
}
}
/// Get the current CPU's NUMA node
///
/// Uses the Linux `getcpu` syscall to determine which NUMA node the current CPU belongs to.
/// Returns `NumaNode::UNKNOWN` if the syscall fails.
pub fn get_current_cpu_numa_node() -> NumaNode {
unsafe {
let mut cpu: libc::c_uint = 0;
let mut node: libc::c_uint = 0;
// getcpu syscall: int getcpu(unsigned *cpu, unsigned *node, struct getcpu_cache *tcache);
let result = libc::syscall(
libc::SYS_getcpu,
&mut cpu,
&mut node,
std::ptr::null_mut::<libc::c_void>(),
);
if result == 0 {
NumaNode(node)
} else {
NumaNode::UNKNOWN
}
}
}
/// Get NUMA node for device (GPU) memory
///
/// For GPU memory, the NUMA affinity depends on which PCIe bus the GPU is attached to.
/// This can be queried via nvidia-smi.
pub fn get_device_numa_node(device_id: u32) -> NumaNode {
// Use nvidia-smi topo to get NUMA ID of nearest CPU
// This directly returns the NUMA node
let output = match Command::new("nvidia-smi")
.args([
"topo",
"--get-numa-id-of-nearby-cpu",
"-i",
&device_id.to_string(),
])
.output()
{
Ok(out) if out.status.success() => out,
_ => {
tracing::warn!("nvidia-smi failed for GPU {}, using heuristic", device_id);
return NumaNode(device_id % 2);
}
};
if let Ok(stdout) = std::str::from_utf8(&output.stdout)
&& let Some(line) = stdout.lines().next()
&& let Some(numa_str) = line.split(':').nth(1)
&& let Ok(node) = numa_str.trim().parse::<u32>()
{
tracing::trace!("GPU {} on NUMA node {}", device_id, node);
return NumaNode(node);
}
tracing::warn!("Failed to get NUMA node for GPU {}", device_id);
NumaNode::UNKNOWN
}
/// Pin the current thread to a specific NUMA node's CPUs
///
/// This sets the CPU affinity for the calling thread to only run on CPUs
/// belonging to the specified NUMA node. This is critical for ensuring
/// that memory allocations follow the first-touch policy on the correct node.
pub fn pin_thread_to_numa_node(node: NumaNode) -> Result<(), String> {
let topology =
topology::get_numa_topology().map_err(|e| format!("Can not get NUMA topology: {}", e))?;
let cpus = topology
.cpus_for_node(node.0)
.ok_or_else(|| format!("No CPUs found for NUMA node {}", node.0))?;
if cpus.is_empty() {
return Err(format!("No CPUs found for NUMA node {}", node.0));
}
unsafe {
let mut cpu_set: libc::cpu_set_t = mem::zeroed();
for cpu in cpus {
libc::CPU_SET(*cpu, &mut cpu_set);
}
let result = libc::sched_setaffinity(
0, // current thread
mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
if result != 0 {
let err = std::io::Error::last_os_error();
return Err(format!("Failed to set CPU affinity: {}", err));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_numa_node_equality() {
let node0a = NumaNode(0);
let node0b = NumaNode(0);
let node1 = NumaNode(1);
assert_eq!(node0a, node0b);
assert_ne!(node0a, node1);
}
#[test]
fn test_numa_node_unknown() {
let unknown = NumaNode::UNKNOWN;
assert!(unknown.is_unknown());
assert_eq!(unknown.0, u32::MAX);
let valid = NumaNode(0);
assert!(!valid.is_unknown());
}
#[test]
fn test_numa_node_display() {
assert_eq!(format!("{}", NumaNode(0)), "NumaNode(0)");
assert_eq!(format!("{}", NumaNode(7)), "NumaNode(7)");
assert_eq!(format!("{}", NumaNode::UNKNOWN), "UNKNOWN");
}
#[test]
fn test_numa_node_serialization() {
// Verify NumaNode can be serialized (important for benchmarking)
let node = NumaNode(1);
let json = serde_json::to_string(&node).unwrap();
let deserialized: NumaNode = serde_json::from_str(&json).unwrap();
assert_eq!(node, deserialized);
}
#[test]
fn test_get_current_cpu_numa_node() {
// Should either return a valid node or UNKNOWN
let node = get_current_cpu_numa_node();
// If not unknown, should be a reasonable NUMA node number (< 8 on most systems)
if !node.is_unknown() {
assert!(node.0 < 8, "NUMA node {} seems unreasonably high", node.0);
}
}
#[test]
fn test_get_device_numa_node_valid_gpu() {
// Test GPU 0 detection
let node = get_device_numa_node(0);
// Should return either a valid node (0-7) or use heuristic (gpu_id % 2)
// On dual-socket systems, GPU 0 typically on node 0 or 1
println!("GPU 0 detected on NUMA node: {}", node.0);
}
#[test]
fn test_numa_node_hash() {
// Verify NumaNode can be used as a HashMap key
use std::collections::HashMap;
let mut map = HashMap::new();
map.insert(NumaNode(0), "node0");
map.insert(NumaNode(1), "node1");
assert_eq!(map.get(&NumaNode(0)), Some(&"node0"));
assert_eq!(map.get(&NumaNode(1)), Some(&"node1"));
assert_eq!(map.get(&NumaNode(2)), None);
}
#[test]
fn test_numa_node_copy_clone() {
// Verify NumaNode is Copy and Clone
let node1 = NumaNode(5);
let node2 = node1; // Copy
let node3 = node1; // Clone
assert_eq!(node1, node2);
assert_eq!(node1, node3);
assert_eq!(node2, node3);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! NUMA topology detection
//!
//! This module provides utilities to read the actual CPU-to-NUMA mapping from the system,
//! replacing heuristic assumptions with real topology data.
use std::collections::HashMap;
use std::fs;
/// Global cached topology
static TOPOLOGY: std::sync::OnceLock<Result<NumaTopology, String>> = std::sync::OnceLock::new();
/// Represents the CPU topology for NUMA nodes
pub struct NumaTopology {
/// Maps NUMA node ID -> list of CPU IDs
node_to_cpus: HashMap<u32, Vec<usize>>,
/// Maps CPU ID -> NUMA node ID
cpu_to_node: HashMap<usize, u32>,
}
impl NumaTopology {
/// Read NUMA topology from sysfs
pub fn from_sysfs() -> Result<Self, String> {
let mut node_to_cpus: HashMap<u32, Vec<usize>> = HashMap::new();
let mut cpu_to_node: HashMap<usize, u32> = HashMap::new();
// TODO: Read /sys/devices/system/node directory
let node_dir = std::path::Path::new("/sys/devices/system/node");
if !node_dir.exists() {
return Err("Node directory not found".to_string());
}
let entries =
fs::read_dir(node_dir).map_err(|e| format!("Failed to read node directory: {}", e))?;
for entry in entries.flatten() {
let path = entry.path();
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
// Only process "nodeN" directories
if !name.starts_with("node") {
continue;
}
// Extract node number
let node_id: u32 = name[4..]
.parse()
.map_err(|_| format!("Invalid node dir: {}", name))?;
// Read cpulist file
let cpulist_path = path.join("cpulist");
if !cpulist_path.exists() {
continue;
}
let cpulist = fs::read_to_string(&cpulist_path)
.map_err(|e| format!("Failed to read {}: {}", cpulist_path.display(), e))?;
let cpus = parse_cpulist(cpulist.trim())?;
// Populate both maps
for cpu in &cpus {
cpu_to_node.insert(*cpu, node_id);
}
node_to_cpus.insert(node_id, cpus);
}
if node_to_cpus.is_empty() {
return Err("No NUMA nodes found".to_string());
}
Ok(Self {
node_to_cpus,
cpu_to_node,
})
}
/// Get all CPUs for a NUMA node
pub fn cpus_for_node(&self, node_id: u32) -> Option<&[usize]> {
self.node_to_cpus.get(&node_id).map(|v| v.as_slice())
}
/// Get NUMA node for a CPU
pub fn node_for_cpu(&self, cpu_id: usize) -> Option<u32> {
self.cpu_to_node.get(&cpu_id).copied()
}
/// Get number of NUMA nodes
pub fn num_nodes(&self) -> usize {
self.node_to_cpus.len()
}
/// Check if single-node system
pub fn is_single_node(&self) -> bool {
self.num_nodes() == 1
}
}
/// Parse Linux cpulist format
/// Examples:
/// "0-15" -> [0,1,2,...,15]
/// "0,4,8" -> [0,4,8]
/// "0-3,8-11" -> [0,1,2,3,8,9,10,11]
fn parse_cpulist(cpulist: &str) -> Result<Vec<usize>, String> {
let mut cpus = Vec::new();
for part in cpulist.split(',') {
if part.contains('-') {
// Range: "0-15"
let range: Vec<&str> = part.split('-').collect();
if range.len() != 2 {
return Err(format!("Invalid range: {}", part));
}
let start: usize = range[0]
.parse()
.map_err(|_| format!("Invalid CPU ID: {}", range[0]))?;
let end: usize = range[1]
.parse()
.map_err(|_| format!("Invalid CPU ID: {}", range[1]))?;
for cpu in start..=end {
cpus.push(cpu);
}
} else {
// Single CPU
let cpu: usize = part
.parse()
.map_err(|_| format!("Invalid CPU ID: {}", part))?;
cpus.push(cpu);
}
}
cpus.sort_unstable();
cpus.dedup();
Ok(cpus)
}
/// Get the global NUMA topology (cached after first call)
///
/// Returns an error if NUMA topology cannot be read from sysfs. This indicates either:
/// - System doesn't support NUMA
/// - `/sys` is not mounted (e.g., restricted container)
/// - Kernel NUMA support is disabled
///
/// Callers should handle errors gracefully by disabling NUMA optimizations.
pub fn get_numa_topology() -> Result<&'static NumaTopology, &'static str> {
TOPOLOGY
.get_or_init(NumaTopology::from_sysfs)
.as_ref()
.map_err(|e| {
tracing::warn!("NUMA topology unavailable: {}", e);
"NUMA topology unavailable"
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cpulist_range() {
let cpus = parse_cpulist("0-3").unwrap();
assert_eq!(cpus, vec![0, 1, 2, 3]);
}
#[test]
fn test_parse_cpulist_list() {
let cpus = parse_cpulist("0,4,8").unwrap();
assert_eq!(cpus, vec![0, 4, 8]);
}
#[test]
fn test_parse_cpulist_mixed() {
let cpus = parse_cpulist("0-2,8,16-17").unwrap();
assert_eq!(cpus, vec![0, 1, 2, 8, 16, 17]);
}
#[test]
fn test_parse_cpulist_ht() {
// Hyperthreading: 0-15,32-47 (physical cores 0-15, HT siblings 32-47)
let cpus = parse_cpulist("0-15,32-47").unwrap();
assert_eq!(cpus.len(), 32);
assert_eq!(cpus[0], 0);
assert_eq!(cpus[15], 15);
assert_eq!(cpus[16], 32);
assert_eq!(cpus[31], 47);
}
#[test]
fn test_parse_cpulist_real_numa_system() {
// Real dual-socket system with hyperthreading (discovered pattern)
// Node 0: CPUs 0-15, 128-143
let cpus = parse_cpulist("0-15,128-143").unwrap();
assert_eq!(cpus.len(), 32);
assert_eq!(cpus[0], 0);
assert_eq!(cpus[15], 15);
assert_eq!(cpus[16], 128);
assert_eq!(cpus[31], 143);
// Node 1: CPUs 16-31, 144-159
let cpus = parse_cpulist("16-31,144-159").unwrap();
assert_eq!(cpus.len(), 32);
assert_eq!(cpus[0], 16);
assert_eq!(cpus[15], 31);
assert_eq!(cpus[16], 144);
assert_eq!(cpus[31], 159);
}
#[test]
fn test_parse_cpulist_out_of_order() {
// Test that parser handles out-of-order input (seen in some systems)
let cpus = parse_cpulist("4,2,0,1,3").unwrap();
assert_eq!(cpus, vec![0, 1, 2, 3, 4]); // Should be sorted
}
#[test]
fn test_parse_cpulist_duplicates() {
// Test deduplication (in case kernel reports duplicates)
let cpus = parse_cpulist("0-2,1-3").unwrap();
assert_eq!(cpus, vec![0, 1, 2, 3]); // Should remove duplicates
}
#[test]
fn test_parse_cpulist_empty() {
// Edge case: empty cpulist
let result = parse_cpulist("");
assert!(result.is_err() || result.unwrap().is_empty());
}
#[test]
fn test_parse_cpulist_single_cpu() {
// Single CPU node (uncommon but valid)
let cpus = parse_cpulist("5").unwrap();
assert_eq!(cpus, vec![5]);
}
#[test]
fn test_topology_bidirectional_lookup() {
// Test that node->cpu and cpu->node mappings are consistent
let mut node_to_cpus = std::collections::HashMap::new();
let mut cpu_to_node = std::collections::HashMap::new();
node_to_cpus.insert(0, vec![0, 1, 2, 3]);
node_to_cpus.insert(1, vec![4, 5, 6, 7]);
for (node, cpus) in &node_to_cpus {
for cpu in cpus {
cpu_to_node.insert(*cpu, *node);
}
}
let topology = NumaTopology {
node_to_cpus,
cpu_to_node,
};
// Verify forward lookup (node -> cpus)
assert_eq!(topology.cpus_for_node(0), Some(&[0, 1, 2, 3][..]));
assert_eq!(topology.cpus_for_node(1), Some(&[4, 5, 6, 7][..]));
// Verify reverse lookup (cpu -> node)
assert_eq!(topology.node_for_cpu(0), Some(0));
assert_eq!(topology.node_for_cpu(3), Some(0));
assert_eq!(topology.node_for_cpu(4), Some(1));
assert_eq!(topology.node_for_cpu(7), Some(1));
// Verify unknown CPU
assert_eq!(topology.node_for_cpu(999), None);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! NUMA worker pool for memory allocation with first-touch policy
//!
//! This module provides dedicated worker threads that are pinned to specific NUMA nodes..
//!
//! ## Architecture
//!
//! - One worker thread per NUMA node (spawned lazily)
//! - Workers pin themselves on startup (immune to application thread management)
//! - Channel-based communication for allocation requests
//! - First-touch page allocation ensures correct NUMA placement
use super::get_current_cpu_numa_node;
use crate::block_manager::storage::cuda::Cuda;
use cudarc::driver::result::malloc_host;
use cudarc::driver::sys::CU_MEMHOSTALLOC_WRITECOMBINED;
use nix::libc;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, Mutex, OnceLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::{NumaNode, get_device_numa_node};
/// Wrapper for raw pointer that can be sent between threads
///
/// # Safety
///
/// This wrapper allows sending raw pointers across thread boundaries. The safety contract is:
/// - The pointer is allocated by the worker thread and returned to the caller
/// - The pointer is only dereferenced by the receiver (caller), never by the sender (worker)
/// - Ownership is transferred: the caller is responsible for deallocation
/// - The pointer remains valid for the lifetime expected by the caller
struct SendPtr(*mut u8);
// SAFETY: The pointer ownership is transferred from worker to caller.
// The worker never accesses the pointer after sending it.
unsafe impl Send for SendPtr {}
/// Request to allocate CUDA pinned memory
struct AllocRequest {
size: usize,
node: NumaNode,
gpu_id: u32,
response: Sender<AllocResult>,
}
/// Result of allocation
type AllocResult = Result<SendPtr, String>;
/// A dedicated worker thread pinned to a specific NUMA node
struct NumaWorker {
node: NumaNode,
request_tx: Option<Sender<AllocRequest>>,
handle: Option<JoinHandle<()>>,
}
impl NumaWorker {
/// Spawn a new worker thread pinned to the specified NUMA node
fn spawn(node: NumaNode) -> Result<Self, String> {
let (request_tx, request_rx) = channel();
let handle = thread::Builder::new()
.name(format!("numa-worker-{}", node.0))
.spawn(move || {
Self::worker_loop(node, request_rx);
})
.map_err(|e| format!("Failed to spawn worker thread: {}", e))?;
Ok(Self {
node,
request_tx: Some(request_tx),
handle: Some(handle),
})
}
/// Worker thread main loop
fn worker_loop(node: NumaNode, requests: Receiver<AllocRequest>) {
// First thing: pin this thread to the target NUMA node
tracing::trace!("Pinning worker thread to node {}", node.0);
if let Err(e) = super::pin_thread_to_numa_node(node) {
tracing::error!("Failed to pin worker thread to node {}: {}", node.0, e);
tracing::error!("Worker will continue but allocations may be suboptimal");
} else {
tracing::trace!("Successfully pinned worker thread to node {}", node.0);
// `pin_thread_to_numa_node` uses `sched_setaffinity` to set the CPU affinity mask
// but doesn't immediately migrate the thread. The scheduler will migrate at
// the next opportunity (timer tick, yield, etc).
// We yield once to give the scheduler a chance to migrate before we verify.
// This is primarily for accurate logging - allocations will happen on the right CPU
// regardless since the affinity mask prevents running on wrong CPUs.
thread::yield_now();
thread::sleep(Duration::from_millis(1));
// Verify we're on the right node
let current_node = super::get_current_cpu_numa_node();
tracing::trace!("Current node after pinning: {}", current_node.0);
if current_node != node {
tracing::warn!(
"Worker thread on node {} after pinning (expected {})",
current_node.0,
node.0
);
} else {
tracing::trace!("NUMA worker thread for node {} started and pinned", node.0);
}
}
// Process allocation requests
loop {
tracing::trace!("Worker waiting for request on node {}", node.0);
match requests.recv() {
Ok(req) => {
tracing::trace!(
"Worker received CUDA pinned allocation request on node {}",
node.0
);
let result = Self::do_cuda_pinned_allocation(req.size, req.node, req.gpu_id);
match result {
Ok(SendPtr(ptr)) => {
if let Err(_e) = req.response.send(Ok(SendPtr(ptr))) {
// Receiver gone: free to avoid leak
tracing::warn!(
"Receiver dropped before receiving allocation, freeing {} bytes at {:p}",
req.size,
ptr
);
unsafe {
let _ = cudarc::driver::result::free_host(
ptr as *mut std::ffi::c_void,
);
}
}
}
Err(err) => {
let _ = req.response.send(Err(err));
}
}
}
Err(_) => {
// Channel closed, exit worker
tracing::trace!(
"NUMA worker for node {} shutting down (channel closed)",
node.0
);
break;
}
}
}
}
/// Perform CUDA pinned memory allocation
fn do_cuda_pinned_allocation(size: usize, node: NumaNode, gpu_id: u32) -> AllocResult {
if size == 0 {
return Err("Cannot allocate zero bytes".to_string());
}
// Verify we're on the correct NUMA node BEFORE allocation
let node_before = get_current_cpu_numa_node();
if node_before != node {
tracing::warn!(
"Worker thread moved! Expected NUMA node {}, currently on node {}",
node.0,
node_before.0
);
}
// Get or create CUDA context for this GPU
let ctx = Cuda::device_or_create(gpu_id as usize)
.map_err(|e| format!("Failed to get CUDA context for GPU {}: {:?}", gpu_id, e))?;
unsafe {
// Bind CUDA context to this worker thread before allocation
// This ensures malloc_host has a valid context to work with
ctx.bind_to_thread()
.map_err(|e| format!("Failed to bind CUDA context to worker thread: {:?}", e))?;
// Verify thread is still on correct node after CUDA context binding
let node_after_ctx = get_current_cpu_numa_node();
if node_after_ctx != node {
tracing::warn!(
"Thread moved after CUDA context bind! Expected node {}, now on node {}",
node.0,
node_after_ctx.0
);
}
// Allocate CUDA pinned memory
// This is called from the pinned worker thread, so pages will be
// allocated on the correct NUMA node via first-touch
let ptr = malloc_host(size, CU_MEMHOSTALLOC_WRITECOMBINED)
.map_err(|e| format!("malloc_host failed: {:?}", e))?;
let ptr = ptr as *mut u8;
if ptr.is_null() {
return Err("malloc_host returned null".to_string());
}
// Verify thread is STILL on correct node before touching pages
let node_before_touch = get_current_cpu_numa_node();
if node_before_touch != node {
tracing::error!(
"Thread on wrong node before first-touch! Expected {}, on node {} - memory will be misplaced!",
node.0,
node_before_touch.0
);
}
// Touch one byte per page to trigger first-touch policy efficiently
// This is much faster than zeroing the entire region for large allocations
let page_size = (libc::sysconf(libc::_SC_PAGESIZE) as usize).max(4096);
let mut offset = 0usize;
while offset < size {
std::ptr::write_volatile(ptr.add(offset), 0);
offset = offset.saturating_add(page_size);
}
// Ensure the last page is touched
if size > 0 && !size.is_multiple_of(page_size) {
std::ptr::write_volatile(ptr.add(size - 1), 0);
}
// Verify final node after touching
let node_after_touch = get_current_cpu_numa_node();
tracing::trace!(
"Worker allocated {} bytes (CUDA pinned) on GPU {} (target NUMA node {}) at {:p} - thread nodes: before={} after_ctx={} before_touch={} after_touch={}",
size,
gpu_id,
node.0,
ptr,
node_before.0,
node_after_ctx.0,
node_before_touch.0,
node_after_touch.0
);
Ok(SendPtr(ptr))
}
}
/// Request an allocation from this worker
fn allocate(&self, size: usize, gpu_id: u32) -> AllocResult {
let (response_tx, response_rx) = channel();
let request = AllocRequest {
size,
node: self.node,
gpu_id,
response: response_tx,
};
self.request_tx
.as_ref()
.ok_or_else(|| "Worker has been shut down".to_string())?
.send(request)
.map_err(|_| "Worker thread has died".to_string())?;
// Wait for response with dynamic timeout based on allocation size
// Large allocations take time: we account for ~1 second per GB to touch pages
// Add 10 second base + 1 second per GB
let timeout_secs = 10u64 + (size as u64 / (1024 * 1024 * 1024));
let timeout = Duration::from_secs(timeout_secs.clamp(10, 300)); // Clamp to 10-300 seconds
tracing::trace!(
"Worker pool waiting for allocation of {} MB with timeout of {} seconds",
size / (1024 * 1024),
timeout.as_secs()
);
response_rx
.recv_timeout(timeout)
.map_err(|e| format!("Worker timeout after {} seconds: {}", timeout.as_secs(), e))?
}
}
impl Drop for NumaWorker {
fn drop(&mut self) {
tracing::trace!("Dropping NUMA worker for node {}", self.node.0);
// Drop request_tx FIRST to close the channel
// This causes recv() in worker thread to return Err and exit
self.request_tx.take();
tracing::trace!("Channel closed for worker node {}", self.node.0);
// Now the worker thread will exit its loop
if let Some(handle) = self.handle.take() {
tracing::trace!("Waiting for worker thread {} to join", self.node.0);
let _ = handle.join();
tracing::trace!("Worker thread {} joined", self.node.0);
}
}
}
/// Pool of NUMA workers, one per node
pub struct NumaWorkerPool {
workers: Mutex<std::collections::HashMap<u32, Arc<NumaWorker>>>,
}
impl NumaWorkerPool {
fn new() -> Self {
Self {
workers: Mutex::new(std::collections::HashMap::new()),
}
}
/// Get the global worker pool
pub fn global() -> &'static Self {
static POOL: OnceLock<NumaWorkerPool> = OnceLock::new();
POOL.get_or_init(NumaWorkerPool::new)
}
/// Get or create a worker for a NUMA node
fn get_or_spawn_worker(&self, node: NumaNode) -> Result<Arc<NumaWorker>, String> {
let mut workers = self.workers.lock().unwrap();
if let Some(worker) = workers.get(&node.0) {
return Ok(worker.clone());
}
// Spawn new worker
let worker = NumaWorker::spawn(node)?;
let worker = Arc::new(worker);
workers.insert(node.0, worker.clone());
tracing::trace!("Spawned NUMA worker for node {}", node.0);
Ok(worker)
}
/// Allocate CUDA pinned memory for a specific GPU (auto-detects NUMA node)
pub fn allocate_pinned_for_gpu(&self, size: usize, gpu_id: u32) -> Result<*mut u8, String> {
let node = get_device_numa_node(gpu_id);
tracing::debug!(
"Allocating {} bytes pinned memory for GPU {} (NUMA node {})",
size,
gpu_id,
node.0
);
let worker = self.get_or_spawn_worker(node)?;
worker.allocate(size, gpu_id).map(|send_ptr| send_ptr.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_manager::numa_allocator::{get_current_cpu_numa_node, get_device_numa_node};
/// Check if CUDA is available for testing
fn is_cuda_available() -> bool {
// Check if nvidia-smi is available
if std::process::Command::new("nvidia-smi")
.arg("--query-gpu=count")
.arg("--format=csv,noheader")
.output()
.is_err()
{
return false;
}
// Try to initialize CUDA context for device 0
use crate::block_manager::storage::cuda::Cuda;
Cuda::device_or_create(0).is_ok()
}
#[test]
fn test_worker_spawn() {
let node = NumaNode(0);
let worker = NumaWorker::spawn(node);
assert!(worker.is_ok());
}
#[test]
fn test_worker_allocate_pinned() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_allocate_pinned: CUDA not available");
return;
}
let node = NumaNode(0);
let worker = NumaWorker::spawn(node).unwrap();
let send_ptr = worker.allocate(4096, 0).unwrap();
let ptr = send_ptr.0;
assert!(!ptr.is_null());
unsafe {
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
}
}
#[test]
fn test_worker_pool() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_pool: CUDA not available");
return;
}
let pool = NumaWorkerPool::new();
unsafe {
let ptr = pool.allocate_pinned_for_gpu(8192, 0).unwrap();
assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
}
}
#[test]
fn test_worker_pool_singleton() {
// Verify that global() returns the same instance
let pool1 = NumaWorkerPool::global();
let pool2 = NumaWorkerPool::global();
// They should be the same static reference
assert!(std::ptr::eq(pool1, pool2));
}
#[test]
fn test_worker_reuse() {
if !is_cuda_available() {
eprintln!("Skipping test_worker_reuse: CUDA not available");
return;
}
// Test that subsequent allocations for the same GPU reuse the same worker
let pool = NumaWorkerPool::new();
unsafe {
// First allocation spawns worker for GPU 0
let ptr1 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
// Second allocation should reuse worker for GPU 0
let ptr2 = pool.allocate_pinned_for_gpu(1024, 0).unwrap();
assert!(!ptr1.is_null());
assert!(!ptr2.is_null());
assert_ne!(ptr1, ptr2);
cudarc::driver::result::free_host(ptr1 as *mut std::ffi::c_void).unwrap();
cudarc::driver::result::free_host(ptr2 as *mut std::ffi::c_void).unwrap();
}
}
#[test]
fn test_zero_size_allocation() {
// Test that zero-size allocations are rejected
let pool = NumaWorkerPool::new();
let result = pool.allocate_pinned_for_gpu(0, 0);
assert!(result.is_err());
assert!(result.unwrap_err().contains("zero"));
}
#[test]
fn test_dynamic_timeout_scaling() {
// Test that timeout scales with allocation size
let pool = NumaWorkerPool::new();
// We can't easily test the actual timeout behavior without sleeping,
// but we can verify that allocations of different sizes work
// The timeout calculation is: 10s + (size_in_GB) seconds, capped at 300s
// Just verify a small allocation works (timeout calculation is internal)
unsafe {
if let Ok(ptr) = pool.allocate_pinned_for_gpu(1024, 0) {
assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
}
}
}
#[test]
fn test_get_current_cpu_numa_node() {
// Test that we can detect current CPU's NUMA node
let node = get_current_cpu_numa_node();
// On a real NUMA system, should return a valid node
// On fake NUMA or single-node, might return 0 or UNKNOWN
if !node.is_unknown() {
println!("Current CPU on NUMA node: {}", node.0);
} else {
println!("NUMA node detection unavailable (single-node or fake NUMA)");
}
}
#[test]
fn test_get_device_numa_node() {
// Test GPU NUMA node detection
// This will only work if nvidia-smi is available
let node = get_device_numa_node(0);
if !node.is_unknown() {
println!("GPU 0 on NUMA node: {}", node.0);
// Node should be 0 or 1 on typical dual-socket systems
assert!(node.0 <= 1 || node.0 == u32::MAX);
} else {
println!("GPU NUMA detection unavailable (no nvidia-smi or no GPU)");
}
}
#[test]
fn test_numa_node_display() {
// Test Display implementation for NumaNode
let node = NumaNode(0);
assert_eq!(format!("{}", node), "NumaNode(0)");
let unknown = NumaNode::UNKNOWN;
assert_eq!(format!("{}", unknown), "UNKNOWN");
}
#[test]
fn test_numa_node_is_unknown() {
let valid = NumaNode(0);
assert!(!valid.is_unknown());
let unknown = NumaNode::UNKNOWN;
assert!(unknown.is_unknown());
}
#[test]
fn test_pinned_allocation_api() {
// Verify the public API works for pinned allocation
let pool = NumaWorkerPool::new();
unsafe {
// Test that we can allocate pinned memory for a GPU
if let Ok(ptr) = pool.allocate_pinned_for_gpu(1024, 0) {
assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
}
}
}
#[test]
fn test_worker_channel_communication() {
// Test that worker receives and processes requests
let node = NumaNode(0);
let worker = NumaWorker::spawn(node).unwrap();
// Send allocation request
let result = worker.allocate(1024, 0);
// Should get a response (either success or timeout)
assert!(result.is_ok() || result.is_err());
if let Ok(send_ptr) = result {
unsafe {
let ptr = send_ptr.0;
assert!(!ptr.is_null());
cudarc::driver::result::free_host(ptr as *mut std::ffi::c_void).unwrap();
}
}
}
}
...@@ -76,6 +76,8 @@ use std::{ ...@@ -76,6 +76,8 @@ use std::{
use cudarc::driver::{CudaContext, sys}; use cudarc::driver::{CudaContext, sys};
use crate::block_manager::numa_allocator;
/// Trait for [Storage] types that can be accessed by CUDA /// Trait for [Storage] types that can be accessed by CUDA
pub trait CudaAccessible: Storage {} pub trait CudaAccessible: Storage {}
...@@ -176,10 +178,27 @@ impl PinnedStorage { ...@@ -176,10 +178,27 @@ impl PinnedStorage {
unsafe { unsafe {
ctx.bind_to_thread().map_err(StorageError::Cuda)?; ctx.bind_to_thread().map_err(StorageError::Cuda)?;
let ptr = cudarc::driver::result::malloc_host(size, sys::CU_MEMHOSTALLOC_WRITECOMBINED) // Try NUMA-aware allocation if enabled, otherwise use direct allocation
.map_err(StorageError::Cuda)?; let ptr = if numa_allocator::is_numa_enabled() {
let device_id = ctx.cu_device() as u32;
match numa_allocator::worker_pool::NumaWorkerPool::global()
.allocate_pinned_for_gpu(size, device_id)
{
Ok(ptr) => ptr,
Err(e) => {
tracing::warn!("NUMA allocation failed: {}, using direct allocation", e);
cudarc::driver::result::malloc_host(
size,
sys::CU_MEMHOSTALLOC_WRITECOMBINED,
)
.map_err(StorageError::Cuda)? as *mut u8
}
}
} else {
cudarc::driver::result::malloc_host(size, sys::CU_MEMHOSTALLOC_WRITECOMBINED)
.map_err(StorageError::Cuda)? as *mut u8
};
let ptr = ptr as *mut u8;
assert!(!ptr.is_null(), "Failed to allocate pinned memory"); assert!(!ptr.is_null(), "Failed to allocate pinned memory");
assert!(ptr.is_aligned(), "Pinned memory is not aligned"); assert!(ptr.is_aligned(), "Pinned memory is not aligned");
assert!(size < isize::MAX as usize); assert!(size < isize::MAX as usize);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment