Unverified Commit cdc9fa77 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat(key_value_store): FileStore entries have a lease that auto-expires (#4301)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 1fd58e97
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::cmp;
use std::collections::HashSet; use std::collections::HashSet;
use std::ffi::OsString; use std::ffi::OsString;
use std::fmt; use std::fmt;
use std::fs; use std::fs;
use std::fs::OpenOptions;
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::time::Duration; use std::time::Duration;
use std::time::SystemTime;
use std::{collections::HashMap, pin::Pin}; use std::{collections::HashMap, pin::Pin};
use anyhow::Context as _; use anyhow::Context as _;
...@@ -21,24 +25,78 @@ use crate::storage::key_value_store::KeyValue; ...@@ -21,24 +25,78 @@ use crate::storage::key_value_store::KeyValue;
use super::{Key, KeyValueBucket, KeyValueStore, StoreError, StoreOutcome, WatchEvent}; use super::{Key, KeyValueBucket, KeyValueStore, StoreError, StoreOutcome, WatchEvent};
/// How long until a key expires. We keep the keys alive by touching the files.
/// 10s is the same as our etcd lease expiry.
const DEFAULT_TTL: Duration = Duration::from_secs(10);
/// Don't do keep-alive any more often than this. Limits the disk write load.
const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
/// Treat as a singleton /// Treat as a singleton
#[derive(Clone)] #[derive(Clone)]
pub struct FileStore { pub struct FileStore {
root: PathBuf, root: PathBuf,
connection_id: u64, connection_id: u64,
/// Directories we may have created files in, for shutdown cleanup /// Directories we may have created files in, for shutdown cleanup and keep-alive.
/// Arc so that we only ever have one map here after clone /// Arc so that we only ever have one map here after clone.
active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>, active_dirs: Arc<Mutex<HashMap<PathBuf, Directory>>>,
} }
impl FileStore { impl FileStore {
pub(super) fn new<P: Into<PathBuf>>(root_dir: P) -> Self { pub(super) fn new<P: Into<PathBuf>>(root_dir: P) -> Self {
FileStore { let fs = FileStore {
root: root_dir.into(), root: root_dir.into(),
connection_id: rand::random::<u64>(), connection_id: rand::random::<u64>(),
active_dirs: Arc::new(Mutex::new(HashMap::new())), active_dirs: Arc::new(Mutex::new(HashMap::new())),
};
let c = fs.clone();
thread::spawn(move || c.expiry_thread());
fs
}
/// Keep our files alive and delete expired keys. Does not return.
/// We run this in a real thread so it doesn't get delayed by tokio runtime load.
/// It doesn't need any cleanup so we don't use cancellation token.
fn expiry_thread(&self) -> ! {
loop {
let ttl = self.shortest_ttl();
let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
thread::sleep(keep_alive_interval);
self.keep_alive();
if let Err(err) = self.delete_expired_files() {
tracing::error!(error = %err, "FileStore delete_expired_files");
} }
} }
}
/// The shortest TTL of any directory we are using.
fn shortest_ttl(&self) -> Duration {
let mut ttl = DEFAULT_TTL;
let active_dirs = self.active_dirs.lock().clone();
for (_, dir) in active_dirs {
ttl = cmp::min(ttl, dir.ttl);
}
tracing::trace!("FileStore expiry shortest ttl {ttl:?}");
ttl
}
fn keep_alive(&self) {
let active_dirs = self.active_dirs.lock().clone();
for (_, dir) in active_dirs {
dir.keep_alive();
}
}
fn delete_expired_files(&self) -> anyhow::Result<()> {
let active_dirs = self.active_dirs.lock().clone();
for (path, dir) in active_dirs {
dir.delete_expired_files()
.with_context(|| path.display().to_string())?;
}
Ok(())
}
} }
#[async_trait] #[async_trait]
...@@ -49,7 +107,7 @@ impl KeyValueStore for FileStore { ...@@ -49,7 +107,7 @@ impl KeyValueStore for FileStore {
async fn get_or_create_bucket( async fn get_or_create_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
_ttl: Option<Duration>, // TODO ttl not used yet ttl: Option<Duration>,
) -> Result<Self::Bucket, StoreError> { ) -> Result<Self::Bucket, StoreError> {
let p = self.root.join(bucket_name); let p = self.root.join(bucket_name);
if let Some(dir) = self.active_dirs.lock().get(&p) { if let Some(dir) = self.active_dirs.lock().get(&p) {
...@@ -67,7 +125,7 @@ impl KeyValueStore for FileStore { ...@@ -67,7 +125,7 @@ impl KeyValueStore for FileStore {
// Create // Create
fs::create_dir_all(&p).map_err(to_fs_err)?; fs::create_dir_all(&p).map_err(to_fs_err)?;
} }
let dir = Directory::new(self.root.clone(), p.clone()); let dir = Directory::new(self.root.clone(), p.clone(), ttl.unwrap_or(DEFAULT_TTL));
self.active_dirs.lock().insert(p, dir.clone()); self.active_dirs.lock().insert(p, dir.clone());
Ok(dir) Ok(dir)
} }
...@@ -87,7 +145,8 @@ impl KeyValueStore for FileStore { ...@@ -87,7 +145,8 @@ impl KeyValueStore for FileStore {
"Bucket name is not a directory".to_string(), "Bucket name is not a directory".to_string(),
)); ));
} }
let dir = Directory::new(self.root.clone(), p.clone()); // The filesystem itself doesn't store the TTL so for now default it
let dir = Directory::new(self.root.clone(), p.clone(), DEFAULT_TTL);
self.active_dirs.lock().insert(p, dir.clone()); self.active_dirs.lock().insert(p, dir.clone());
Ok(Some(dir)) Ok(Some(dir))
} }
...@@ -111,21 +170,93 @@ impl KeyValueStore for FileStore { ...@@ -111,21 +170,93 @@ impl KeyValueStore for FileStore {
pub struct Directory { pub struct Directory {
root: PathBuf, root: PathBuf,
p: PathBuf, p: PathBuf,
ttl: Duration,
/// These are the files we created and hence must delete on shutdown /// These are the files we created and hence must delete on shutdown
owned_files: Arc<Mutex<HashSet<PathBuf>>>, owned_files: Arc<Mutex<HashSet<PathBuf>>>,
} }
impl Directory { impl Directory {
fn new(root: PathBuf, p: PathBuf) -> Self { fn new(root: PathBuf, p: PathBuf, ttl: Duration) -> Self {
// Canonicalize root to handle symlinks (e.g., /var -> /private/var on macOS) // Canonicalize root to handle symlinks (e.g., /var -> /private/var on macOS)
let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone()); let canonical_root = root.canonicalize().unwrap_or_else(|_| root.clone());
if ttl < MIN_KEEP_ALIVE {
let h_ttl = humantime::format_duration(ttl);
tracing::warn!(path = %p.display(), ttl = %h_ttl, "ttl is too short, increasing to {}", humantime::format_duration(MIN_KEEP_ALIVE));
}
let ttl = cmp::max(ttl, MIN_KEEP_ALIVE);
Directory { Directory {
root: canonical_root, root: canonical_root,
p, p,
ttl,
owned_files: Arc::new(Mutex::new(HashSet::new())), owned_files: Arc::new(Mutex::new(HashSet::new())),
} }
} }
/// touch the files we own so they don't get deleted by a different FileStore
fn keep_alive(&self) {
let owned_files = self.owned_files.lock().clone();
for path in owned_files {
let file = match OpenOptions::new().write(true).open(&path) {
Ok(f) => f,
Err(err) => {
tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed opening owned file");
continue;
}
};
if let Err(err) = file.set_modified(SystemTime::now()) {
tracing::error!(path = %path.display(), error = %err, "FileStore::keep_alive failed set_modified on owned file");
continue;
}
tracing::trace!("FileStore keep_alive set {}", path.display());
}
}
/// Remove any files not touched for longer than TTL.
/// This looks at all files in the directory to catch orphaned files from processes that didn't stop cleanly.
/// Returns an error if we cannot open the directory. Errors inside the directory are logged
/// but non-fatal.
fn delete_expired_files(&self) -> anyhow::Result<()> {
let deadline = SystemTime::now() - self.ttl;
let dirname = self.p.display().to_string();
for entry in fs::read_dir(&self.p).with_context(|| dirname.clone())? {
let entry = match entry {
Ok(p) => p,
Err(err) => {
tracing::warn!(dir = dirname, error = %err, "File store could read directory contents");
continue;
}
};
if !entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
tracing::warn!(dir = dirname, entry = %entry.path().display(), "File store directory should only contain files");
continue;
}
let ctx = entry.path().display().to_string();
let metadata = match entry.metadata() {
Ok(m) => m,
Err(err) => {
tracing::warn!(path = %ctx, error = %err, "Failed fetching metadata");
continue;
}
};
let last_modified = match metadata.modified() {
Ok(lm) => lm,
Err(err) => {
// We should only get an error on platforms with no mtime, which we don't
// support anyway.
tracing::warn!(path = %ctx, error = %err, "Failed reading mtime");
continue;
}
};
if last_modified < deadline {
tracing::info!(path = ctx, ?last_modified, "Expired");
if let Err(err) = fs::remove_file(entry.path()) {
tracing::warn!(path = %ctx, error = %err, "Failed removing");
}
}
}
Ok(())
}
fn delete_owned_files(&mut self) -> anyhow::Result<()> { fn delete_owned_files(&mut self) -> anyhow::Result<()> {
let mut errs = Vec::new(); let mut errs = Vec::new();
for p in self.owned_files.lock().drain() { for p in self.owned_files.lock().drain() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment