"recipes/llama-3-70b/vllm/vscode:/vscode.git/clone" did not exist on "3718da8c689a558b7958f462bc3d00a1bbcced3e"
disk.rs 6.7 KB
Newer Older
1
2
3
4
5
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

6
use anyhow::Context;
7
use core::ffi::c_char;
8
use nix::fcntl::{FallocateFlags, fallocate};
9
use nix::unistd::{ftruncate, unlink};
Ryan Olson's avatar
Ryan Olson committed
10
use std::ffi::CStr;
11
use std::ffi::CString;
12
13
14
use std::fs::File;
use std::io::Write;
use std::os::unix::io::{FromRawFd, RawFd};
Ryan Olson's avatar
Ryan Olson committed
15
16
17
18
use std::path::Path;

const DISK_CACHE_KEY: &str = "DYN_KVBM_DISK_CACHE_DIR";
const DEFAULT_DISK_CACHE_DIR: &str = "/tmp/";
19
const DISK_ZEROFILL_FALLBACK_KEY: &str = "DYN_KVBM_DISK_ZEROFILL_FALLBACK";
20
21
22

#[derive(Debug)]
pub struct DiskStorage {
Ryan Olson's avatar
Ryan Olson committed
23
    fd: u64,
24
25
26
    file_name: String,
    size: usize,
    handles: RegistrationHandles,
Ryan Olson's avatar
Ryan Olson committed
27
    unlinked: bool,
28
29
30
31
32
}

impl Local for DiskStorage {}
impl SystemAccessible for DiskStorage {}

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const ZERO_BUF_SIZE: usize = 16 * 1024 * 1024; // 16MB

fn allocate_file(fd: RawFd, size: u64) -> anyhow::Result<()> {
    match fallocate(fd, FallocateFlags::empty(), 0, size as i64) {
        Ok(_) => Ok(()),
        Err(err) => match err {
            nix::errno::Errno::EOPNOTSUPP => {
                let do_zero_fill = std::env::var(DISK_ZEROFILL_FALLBACK_KEY).is_ok();
                if do_zero_fill {
                    tracing::warn!(
                        "fallocate() not supported on this filesystem, using zero-fill fallback. \
                         This may be slower but provides actual disk space allocation."
                    );
                    // optional fallback: append zeros until reaching size
                    let mut written = 0;
                    let zeros = vec![0u8; ZERO_BUF_SIZE];

                    let mut file =
                        unsafe { File::from_raw_fd(nix::unistd::dup(fd).context("dup error")?) };

                    while written < size {
                        let to_write = std::cmp::min(ZERO_BUF_SIZE as u64, size - written) as usize;
                        file.write_all(&zeros[..to_write])
                            .context("write all error")?;
                        written += to_write as u64;
                    }
                    file.flush().context("flush error")?;
                    Ok(())
                } else {
                    tracing::warn!(
                        "fallocate() not supported on this filesystem, using truncate fallback. \
                         This may may not actually allocate disk space. \
                         Consider setting {}=true for slower zero-fill fallback.",
                        DISK_ZEROFILL_FALLBACK_KEY
                    );
                    // default fallback: set file length without zero-filling (does not really
                    // allocate)
                    ftruncate(fd, size as i64).context("truncate error")
                }
            }
            _ => Err(err.into()),
        },
    }
}

78
79
80
81
82
impl DiskStorage {
    pub fn new(size: usize) -> Result<Self, StorageError> {
        // We need to open our file with some special flags that aren't supported by the tempfile crate.
        // Instead, we'll use the mkostemp function to create a temporary file with the correct flags.

Ryan Olson's avatar
Ryan Olson committed
83
84
85
86
87
88
89
90
91
92
93
        let specified_dir =
            std::env::var(DISK_CACHE_KEY).unwrap_or_else(|_| DEFAULT_DISK_CACHE_DIR.to_string());
        let file_path = Path::new(&specified_dir).join("dynamo-kvbm-disk-cache-XXXXXX");

        if !file_path.exists() {
            std::fs::create_dir_all(file_path.parent().unwrap()).unwrap();
        }

        tracing::debug!("Allocating disk cache file at {}", file_path.display());

        let template = CString::new(file_path.to_str().unwrap()).unwrap();
94
95
96
97
        let mut template_bytes = template.into_bytes_with_nul();

        let raw_fd = unsafe {
            nix::libc::mkostemp(
98
                template_bytes.as_mut_ptr() as *mut c_char,
99
100
101
102
103
104
105
                // For maximum performance, GPU DirectStorage requires O_DIRECT.
                // This allows transfers to bypass the kernel page cache.
                // It also introduces the restriction that all accesses must be page-aligned.
                nix::libc::O_RDWR | nix::libc::O_DIRECT,
            )
        };

Ryan Olson's avatar
Ryan Olson committed
106
107
108
109
110
111
        let file_name = CStr::from_bytes_with_nul(template_bytes.as_slice())
            .unwrap()
            .to_str()
            .map_err(|e| {
                StorageError::AllocationFailed(format!("Failed to read temp file name: {}", e))
            })?
112
113
114
            .to_string();

        // We need to use fallocate to actually allocate the storage and create the blocks on disk.
115
        allocate_file(raw_fd, size as u64).map_err(|e| {
Ryan Olson's avatar
Ryan Olson committed
116
            StorageError::AllocationFailed(format!("Failed to allocate temp file: {}", e))
117
118
119
        })?;

        Ok(Self {
Ryan Olson's avatar
Ryan Olson committed
120
            fd: raw_fd as u64,
121
122
123
            file_name,
            size,
            handles: RegistrationHandles::new(),
Ryan Olson's avatar
Ryan Olson committed
124
            unlinked: false,
125
126
127
128
        })
    }

    pub fn fd(&self) -> u64 {
Ryan Olson's avatar
Ryan Olson committed
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
        self.fd
    }

    /// Unlink our temp file.
    /// This means that when this process terminates, the file will be automatically deleted by the OS.
    /// Unfortunately, GDS requires that files we try to register must be linked.
    /// To get around this, we unlink the file only after we've registered it with NIXL.
    pub fn unlink(&mut self) -> Result<(), StorageError> {
        if self.unlinked {
            return Ok(());
        }

        self.unlinked = true;

        unlink(self.file_name.as_str()).map_err(|e| {
            StorageError::AllocationFailed(format!("Failed to unlink temp file: {}", e))
        })
    }

    pub fn unlinked(&self) -> bool {
        self.unlinked
150
151
152
153
154
    }
}

impl Drop for DiskStorage {
    fn drop(&mut self) {
155
        self.handles.release();
Ryan Olson's avatar
Ryan Olson committed
156
        let _ = self.unlink();
157
158
159
160
161
    }
}

impl Storage for DiskStorage {
    fn storage_type(&self) -> StorageType {
Ryan Olson's avatar
Ryan Olson committed
162
        StorageType::Disk(self.fd())
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
    }

    fn addr(&self) -> u64 {
        0
    }

    fn size(&self) -> usize {
        self.size
    }

    unsafe fn as_ptr(&self) -> *const u8 {
        std::ptr::null()
    }

    unsafe fn as_mut_ptr(&mut self) -> *mut u8 {
        std::ptr::null_mut()
    }
}

impl RegisterableStorage for DiskStorage {
    fn register(
        &mut self,
        key: &str,
        handle: Box<dyn RegistationHandle>,
    ) -> Result<(), StorageError> {
        self.handles.register(key, handle)
    }

    fn is_registered(&self, key: &str) -> bool {
        self.handles.is_registered(key)
    }

    fn registration_handle(&self, key: &str) -> Option<&dyn RegistationHandle> {
        self.handles.registration_handle(key)
    }
}

#[derive(Default)]
pub struct DiskAllocator;

impl StorageAllocator<DiskStorage> for DiskAllocator {
    fn allocate(&self, size: usize) -> Result<DiskStorage, StorageError> {
        DiskStorage::new(size)
    }
}