Unverified Commit 234a89c0 authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: enable KVBM Disk allocation support different storage types (#7839)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
parent c376655f
......@@ -366,15 +366,29 @@ export DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS=3600 # 1 hour
**Symptom:** KVBM fails to start when disk offloading is enabled.
**Cause:** `fallocate()` is not supported on the filesystem (e.g., Lustre, certain network filesystems).
**Cause:** `fallocate()` is not supported on the filesystem (e.g., Lustre, certain network filesystems),
or the storage backend requires a different method for setting `O_DIRECT`.
**Solution:** Enable disk zerofill fallback:
**Solution:**
1. If `fallocate()` is not supported, enable the zerofill fallback:
```bash
export DYN_KVBM_DISK_ZEROFILL_FALLBACK=true
```
If you encounter "write all error" or EINVAL (errno 22), also try:
2. If your filesystem ignores `fcntl(F_SETFL, O_DIRECT)` (e.g., IBM Storage Scale), set the
disk allocator type to pass `O_DIRECT` at file open time instead:
```bash
export DYN_KVBM_DISK_ALLOCATOR_TYPE=open-direct
```
Supported values for `DYN_KVBM_DISK_ALLOCATOR_TYPE`:
- `default`: Apply `O_DIRECT` via `fcntl` after file creation. Works on most POSIX filesystems (ext4, XFS, Lustre, etc.).
- `open-direct`: Pass `O_DIRECT` to `mkostemp` at file open time. Required on filesystems where `fcntl(F_SETFL, O_DIRECT)` is ignored (e.g., IBM Storage Scale).
3. If you encounter "write all error" or EINVAL (errno 22), or need to debug without `O_DIRECT`:
```bash
export DYN_KVBM_DISK_DISABLE_O_DIRECT=true
......
......@@ -283,7 +283,7 @@ mod tests {
builder.disk_layout(
KvManagerLayoutConfig::builder()
.num_blocks(disk)
.allocator(storage::DiskAllocator)
.allocator(storage::DiskAllocator::default())
.build()
.unwrap(),
)
......
......@@ -165,7 +165,7 @@ async fn perform_allocation_and_build_handler(
};
// disk
let disk_blocks = if leader_meta.num_disk_blocks > 0 {
let disk_allocator = Arc::new(DiskAllocator);
let disk_allocator = Arc::new(DiskAllocator::from_env()?);
let disk_layout = layout_builder
.num_blocks(leader_meta.num_disk_blocks)
.build()?
......
......@@ -922,7 +922,7 @@ mod tests {
config.clone(),
layout_type,
agent,
&DiskAllocator,
&DiskAllocator::from_env()?,
duplication_setting,
)?)
} else {
......
......@@ -19,6 +19,199 @@ const DISK_CACHE_KEY: &str = "DYN_KVBM_DISK_CACHE_DIR";
const DEFAULT_DISK_CACHE_DIR: &str = "/tmp/";
const DISK_ZEROFILL_FALLBACK_KEY: &str = "DYN_KVBM_DISK_ZEROFILL_FALLBACK";
const DISK_DISABLE_O_DIRECT_KEY: &str = "DYN_KVBM_DISK_DISABLE_O_DIRECT";
const DISK_ALLOCATOR_TYPE_KEY: &str = "DYN_KVBM_DISK_ALLOCATOR_TYPE";
/// Strategy for applying O_DIRECT to disk cache files.
///
/// Different storage backends require different approaches to setting O_DIRECT.
/// For example, IBM Storage Scale ignores `fcntl(F_SETFL, O_DIRECT)`,
/// requiring O_DIRECT to be passed at file open time instead.
///
/// Implementations are selected via the `DYN_KVBM_DISK_ALLOCATOR_TYPE` env var.
/// Customers can set this based on their storage backend, or contribute new
/// implementations for other providers.
pub trait DiskOpenStrategy: Send + Sync + std::fmt::Debug {
/// Return a human-readable name for this strategy (for logging).
fn name(&self) -> &str;
/// Open a temporary file at the given path template and apply O_DIRECT
/// as appropriate for the storage backend.
///
/// `template_bytes` must contain a null-terminated path ending in "XXXXXX".
/// `disable_o_direct` indicates the user has explicitly disabled O_DIRECT.
///
/// Returns the raw file descriptor on success.
fn open_temp_file(
&self,
template_bytes: &mut [u8],
disable_o_direct: bool,
) -> Result<RawFd, StorageError>;
}
/// Default strategy: open with mkostemp(O_CLOEXEC), then apply O_DIRECT via fcntl.
///
/// This works on most POSIX filesystems (ext4, XFS, Lustre, etc.).
#[derive(Debug, Default)]
pub struct DefaultDirectIo;
impl DiskOpenStrategy for DefaultDirectIo {
fn name(&self) -> &str {
"default"
}
fn open_temp_file(
&self,
template_bytes: &mut [u8],
disable_o_direct: bool,
) -> Result<RawFd, StorageError> {
let raw_fd = unsafe {
nix::libc::mkostemp(
template_bytes.as_mut_ptr() as *mut c_char,
nix::libc::O_CLOEXEC,
)
};
if raw_fd < 0 {
let file_name = CStr::from_bytes_with_nul(template_bytes)
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
return Err(StorageError::AllocationFailed(format!(
"Failed to create temp file {}: {}",
file_name,
std::io::Error::last_os_error()
)));
}
if !disable_o_direct {
use nix::fcntl::{FcntlArg, OFlag, fcntl};
let current_flags = match fcntl(raw_fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(e) => {
unsafe { nix::libc::close(raw_fd) };
let file_name = CStr::from_bytes_with_nul(template_bytes)
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
let _ = unlink(file_name);
return Err(StorageError::AllocationFailed(format!(
"Failed to get file flags for {}: {}",
file_name, e
)));
}
};
let new_flags = current_flags | OFlag::O_DIRECT;
if let Err(e) = fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)) {
tracing::error!(
"Failed to set O_DIRECT on file descriptor {}: {}. \
This may indicate filesystem doesn't support O_DIRECT via fcntl. \
Consider setting {}=open-direct for filesystems like IBM Storage Scale, \
or {}=true to disable O_DIRECT entirely.",
raw_fd,
e,
DISK_ALLOCATOR_TYPE_KEY,
DISK_DISABLE_O_DIRECT_KEY
);
unsafe { nix::libc::close(raw_fd) };
let file_name = CStr::from_bytes_with_nul(template_bytes)
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
let _ = unlink(file_name);
return Err(StorageError::AllocationFailed(format!(
"Failed to set O_DIRECT: {}. Try {}=open-direct or {}=true",
e, DISK_ALLOCATOR_TYPE_KEY, DISK_DISABLE_O_DIRECT_KEY
)));
}
tracing::debug!("O_DIRECT enabled via fcntl for disk cache (fd={})", raw_fd);
} else {
tracing::warn!(
"O_DIRECT disabled via {}. GPU DirectStorage performance may be reduced.",
DISK_DISABLE_O_DIRECT_KEY
);
}
Ok(raw_fd)
}
}
/// Open-direct strategy: pass O_DIRECT directly to mkostemp at file open time.
///
/// Some filesystems (e.g., IBM Storage Scale) ignore `fcntl(F_SETFL, O_DIRECT)`,
/// so O_DIRECT must be specified at file open time. This strategy passes O_DIRECT
/// as a flag to mkostemp instead of applying it post-creation via fcntl.
#[derive(Debug, Default)]
pub struct MkostempDirectIo;
impl DiskOpenStrategy for MkostempDirectIo {
fn name(&self) -> &str {
"open-direct"
}
fn open_temp_file(
&self,
template_bytes: &mut [u8],
disable_o_direct: bool,
) -> Result<RawFd, StorageError> {
let flags = if disable_o_direct {
tracing::warn!(
"O_DIRECT disabled via {}. GPU DirectStorage performance may be reduced.",
DISK_DISABLE_O_DIRECT_KEY
);
nix::libc::O_CLOEXEC
} else {
nix::libc::O_CLOEXEC | nix::libc::O_DIRECT
};
let raw_fd =
unsafe { nix::libc::mkostemp(template_bytes.as_mut_ptr() as *mut c_char, flags) };
if raw_fd < 0 {
let file_name = CStr::from_bytes_with_nul(template_bytes)
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
return Err(StorageError::AllocationFailed(format!(
"Failed to create temp file {}: {}",
file_name,
std::io::Error::last_os_error()
)));
}
if !disable_o_direct {
tracing::debug!("O_DIRECT enabled via mkostemp at open time (fd={})", raw_fd);
}
Ok(raw_fd)
}
}
/// Create a `DiskOpenStrategy` from the `DYN_KVBM_DISK_ALLOCATOR_TYPE` env var.
///
/// Supported values:
/// - `"default"` (default): Apply O_DIRECT via fcntl after file creation.
/// - `"open-direct"`: Pass O_DIRECT to mkostemp at file open time (required for filesystems
/// like IBM Storage Scale where fcntl-based O_DIRECT is ignored).
fn disk_open_strategy_from_env() -> Result<Box<dyn DiskOpenStrategy>, StorageError> {
match std::env::var(DISK_ALLOCATOR_TYPE_KEY).as_deref() {
Ok("default") | Err(_) => {
tracing::info!("Using default fcntl disk open strategy");
Ok(Box::new(DefaultDirectIo))
}
Ok("open-direct") => {
tracing::info!("Using open-direct disk open strategy (O_DIRECT via mkostemp)");
Ok(Box::new(MkostempDirectIo))
}
Ok(unknown) => Err(StorageError::AllocationFailed(format!(
"Unknown {}={:?}. Supported values: \"default\", \"open-direct\"",
DISK_ALLOCATOR_TYPE_KEY, unknown
))),
}
}
#[derive(Debug)]
pub struct DiskStorage {
......@@ -197,7 +390,7 @@ fn allocate_file(fd: RawFd, size: u64) -> anyhow::Result<()> {
}
impl DiskStorage {
pub fn new(size: usize) -> Result<Self, StorageError> {
pub fn new(size: usize, strategy: &dyn DiskOpenStrategy) -> Result<Self, StorageError> {
// We need to open our file with some special flags that aren't supported by the tempfile crate.
// Instead, we'll use the mkostemp function to create a temporary file with the correct flags.
......@@ -209,93 +402,17 @@ impl DiskStorage {
std::fs::create_dir_all(file_path.parent().unwrap()).unwrap();
}
tracing::debug!("Allocating disk cache file at {}", file_path.display());
tracing::debug!(
"Allocating disk cache file at {} using {} strategy",
file_path.display(),
strategy.name()
);
let template = CString::new(file_path.to_str().unwrap()).unwrap();
let mut template_bytes = template.into_bytes_with_nul();
// mkostemp only supports flags like O_CLOEXEC, not O_RDWR/O_DIRECT.
// The file is always opened O_RDWR by mkostemp.
// We'll use fcntl to set O_DIRECT after creation.
let raw_fd = unsafe {
nix::libc::mkostemp(
template_bytes.as_mut_ptr() as *mut c_char,
nix::libc::O_CLOEXEC,
)
};
if raw_fd < 0 {
let file_name = CStr::from_bytes_with_nul(template_bytes.as_slice())
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
return Err(StorageError::AllocationFailed(format!(
"Failed to create temp file {}: {}",
file_name,
std::io::Error::last_os_error()
)));
}
// Determine whether to use O_DIRECT based on environment variable.
// O_DIRECT is required for GPU DirectStorage but has strict alignment requirements.
// For debugging or when filesystems don't support O_DIRECT alignment, it can be disabled.
let disable_o_direct = std::env::var(DISK_DISABLE_O_DIRECT_KEY).is_ok();
if !disable_o_direct {
// Set O_DIRECT via fcntl after file creation.
// For maximum performance, GPU DirectStorage requires O_DIRECT.
// This allows transfers to bypass the kernel page cache.
// It also introduces the restriction that all accesses must be page-aligned.
use nix::fcntl::{FcntlArg, OFlag, fcntl};
// Get current flags
let current_flags = match fcntl(raw_fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(e) => {
unsafe { nix::libc::close(raw_fd) };
let file_name = CStr::from_bytes_with_nul(template_bytes.as_slice())
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
let _ = unlink(file_name);
return Err(StorageError::AllocationFailed(format!(
"Failed to get file flags for {}: {}",
file_name, e
)));
}
};
// Add O_DIRECT to existing flags
let new_flags = current_flags | OFlag::O_DIRECT;
if let Err(e) = fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)) {
tracing::error!(
"Failed to set O_DIRECT on file descriptor {}: {}. \
This may indicate filesystem doesn't support O_DIRECT. \
Consider setting {}=true to disable O_DIRECT.",
raw_fd,
e,
DISK_DISABLE_O_DIRECT_KEY
);
unsafe { nix::libc::close(raw_fd) };
let file_name = CStr::from_bytes_with_nul(template_bytes.as_slice())
.unwrap()
.to_str()
.unwrap_or("<invalid utf8>");
let _ = unlink(file_name);
return Err(StorageError::AllocationFailed(format!(
"Failed to set O_DIRECT: {}. Try {}=true",
e, DISK_DISABLE_O_DIRECT_KEY
)));
}
tracing::debug!("O_DIRECT enabled for disk cache (fd={})", raw_fd);
} else {
tracing::warn!(
"O_DIRECT disabled via {}. GPU DirectStorage performance may be reduced.",
DISK_DISABLE_O_DIRECT_KEY
);
}
let raw_fd = strategy.open_temp_file(&mut template_bytes, disable_o_direct)?;
let file_name = CStr::from_bytes_with_nul(template_bytes.as_slice())
.unwrap()
......@@ -424,12 +541,35 @@ impl RegisterableStorage for DiskStorage {
}
}
#[derive(Default)]
pub struct DiskAllocator;
pub struct DiskAllocator {
strategy: Box<dyn DiskOpenStrategy>,
}
impl Default for DiskAllocator {
fn default() -> Self {
Self {
strategy: Box::new(DefaultDirectIo),
}
}
}
impl DiskAllocator {
/// Create a DiskAllocator by reading `DYN_KVBM_DISK_ALLOCATOR_TYPE` from the environment.
pub fn from_env() -> Result<Self, StorageError> {
Ok(Self {
strategy: disk_open_strategy_from_env()?,
})
}
/// Create a DiskAllocator with an explicit strategy.
pub fn with_strategy(strategy: Box<dyn DiskOpenStrategy>) -> Self {
Self { strategy }
}
}
impl StorageAllocator<DiskStorage> for DiskAllocator {
fn allocate(&self, size: usize) -> Result<DiskStorage, StorageError> {
DiskStorage::new(size)
DiskStorage::new(size, self.strategy.as_ref())
}
}
......@@ -621,7 +761,8 @@ mod tests {
for (name, size) in test_cases {
eprintln!("Testing: {} ({} bytes)", name, size);
let storage = DiskStorage::new(size).unwrap_or_else(|e| {
let strategy = DefaultDirectIo;
let storage = DiskStorage::new(size, &strategy).unwrap_or_else(|e| {
panic!("Failed to allocate {} bytes ({}): {:?}", size, name, e)
});
......@@ -660,7 +801,9 @@ mod tests {
}
let size = 1024 * 1024;
let storage = DiskStorage::new(size).expect("Failed to allocate with O_DIRECT disabled");
let strategy = DefaultDirectIo;
let storage =
DiskStorage::new(size, &strategy).expect("Failed to allocate with O_DIRECT disabled");
assert_eq!(storage.size(), size);
......@@ -669,4 +812,41 @@ mod tests {
std::env::remove_var(DISK_ZEROFILL_FALLBACK_KEY);
}
}
/// Test that disk_open_strategy_from_env returns DefaultDirectIo by default.
#[test]
fn test_strategy_from_env_default() {
temp_env::with_var_unset(DISK_ALLOCATOR_TYPE_KEY, || {
let strategy = disk_open_strategy_from_env().expect("default strategy should succeed");
assert_eq!(strategy.name(), "default");
});
}
/// Test that disk_open_strategy_from_env returns DefaultDirectIo for explicit "default".
#[test]
fn test_strategy_from_env_fcntl() {
temp_env::with_var(DISK_ALLOCATOR_TYPE_KEY, Some("default"), || {
let strategy = disk_open_strategy_from_env().expect("fcntl strategy should succeed");
assert_eq!(strategy.name(), "default");
});
}
/// Test that disk_open_strategy_from_env returns MkostempDirectIo for "open-direct".
#[test]
fn test_strategy_from_env_open_direct() {
temp_env::with_var(DISK_ALLOCATOR_TYPE_KEY, Some("open-direct"), || {
let strategy =
disk_open_strategy_from_env().expect("open-direct strategy should succeed");
assert_eq!(strategy.name(), "open-direct");
});
}
/// Test that disk_open_strategy_from_env rejects unknown values.
#[test]
fn test_strategy_from_env_unknown() {
temp_env::with_var(DISK_ALLOCATOR_TYPE_KEY, Some("not-a-real-backend"), || {
let result = disk_open_strategy_from_env();
assert!(result.is_err(), "unknown strategy should fail");
});
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment