Unverified Commit aa4ac947 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: atomic file write (#5809)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 69fdc9dd
...@@ -31,6 +31,10 @@ const DEFAULT_TTL: Duration = Duration::from_secs(10); ...@@ -31,6 +31,10 @@ const DEFAULT_TTL: Duration = Duration::from_secs(10);
/// Don't do keep-alive any more often than this. Limits the disk write load. /// Don't do keep-alive any more often than this. Limits the disk write load.
const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1); const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
/// Prefix for temporary files used in atomic writes.
/// Files with this prefix are ignored by the watcher.
const TEMP_FILE_PREFIX: &str = ".tmp_";
/// Treat as a singleton /// Treat as a singleton
#[derive(Clone)] #[derive(Clone)]
pub struct FileStore { pub struct FileStore {
...@@ -291,7 +295,8 @@ impl fmt::Display for Directory { ...@@ -291,7 +295,8 @@ impl fmt::Display for Directory {
#[async_trait] #[async_trait]
impl Bucket for Directory { impl Bucket for Directory {
/// Write a file to the directory /// Write a file to the directory using atomic write (temp file + rename).
/// This ensures watchers never see a partially written file.
async fn insert( async fn insert(
&self, &self,
key: &Key, key: &Key,
...@@ -300,11 +305,24 @@ impl Bucket for Directory { ...@@ -300,11 +305,24 @@ impl Bucket for Directory {
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
let safe_key = key.url_safe(); let safe_key = key.url_safe();
let full_path = self.p.join(safe_key.as_ref()); let full_path = self.p.join(safe_key.as_ref());
self.owned_files.lock().insert(full_path.clone());
let str_path = full_path.display().to_string(); let str_path = full_path.display().to_string();
fs::write(&full_path, &value)
.context(str_path) // Use atomic write: write to temp file, then rename.
// This prevents watchers from seeing partially written files.
let temp_name = format!("{TEMP_FILE_PREFIX}{:016x}", rand::random::<u64>());
let temp_path = self.p.join(&temp_name);
// Write to temp file first
fs::write(&temp_path, &value)
.with_context(|| format!("writing temp file {}", temp_path.display()))
.map_err(a_to_fs_err)?;
// Atomic rename to target path
fs::rename(&temp_path, &full_path)
.with_context(|| format!("renaming {} to {}", temp_path.display(), str_path))
.map_err(a_to_fs_err)?; .map_err(a_to_fs_err)?;
self.owned_files.lock().insert(full_path.clone());
Ok(StoreOutcome::Created(0)) Ok(StoreOutcome::Created(0))
} }
...@@ -399,8 +417,19 @@ impl Bucket for Directory { ...@@ -399,8 +417,19 @@ impl Bucket for Directory {
} }
}; };
// Skip temp files used for atomic writes
if item_path.file_name()
.map(|n| n.to_string_lossy().starts_with(TEMP_FILE_PREFIX))
.unwrap_or(false)
{
continue;
}
match event.kind { match event.kind {
EventKind::Create(event::CreateKind::File) | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content)) => { // Handle file creation, modification, and rename-to (from atomic writes)
EventKind::Create(event::CreateKind::File)
| EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content))
| EventKind::Modify(event::ModifyKind::Name(event::RenameMode::To)) => {
let data: bytes::Bytes = match fs::read(&item_path) { let data: bytes::Bytes = match fs::read(&item_path) {
Ok(data) => data.into(), Ok(data) => data.into(),
Err(err) => { Err(err) => {
...@@ -439,6 +468,15 @@ impl Bucket for Directory { ...@@ -439,6 +468,15 @@ impl Bucket for Directory {
continue; continue;
} }
// Skip temp files used for atomic writes
if entry
.file_name()
.to_string_lossy()
.starts_with(TEMP_FILE_PREFIX)
{
continue;
}
// Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS) // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
let canonical_entry_path = match entry.path().canonicalize() { let canonical_entry_path = match entry.path().canonicalize() {
Ok(p) => p, Ok(p) => p,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment