lock.rs 13.6 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! S3-based distributed lock manager implementation.
//!
//! This module provides [`S3LockManager`], an implementation of [`ObjectLockManager`]
//! that uses S3 conditional PUT operations for atomic lock acquisition.

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use futures::future::BoxFuture;

use super::S3ObjectBlockClient;
use crate::SequenceHash;
use crate::object::{LockFileContent, ObjectLockManager};

/// S3-based implementation of [`ObjectLockManager`].
///
/// Uses conditional PUT (If-None-Match: *) for atomic lock acquisition.
/// Lock files contain instance_id and deadline; stale locks (past deadline)
/// can be overwritten.
///
/// # Lock File Format
///
/// Lock files are stored at `{hash}.lock` as JSON:
/// ```json
/// {
///   "instance_id": "uuid-of-leader-instance",
///   "acquired_at": "2025-12-14T10:30:00Z",
///   "deadline": "2025-12-14T10:35:00Z"
/// }
/// ```
///
/// # Meta File Format
///
/// Meta files are stored at `{hash}.meta` as empty objects (presence-only).
pub struct S3LockManager {
    client: Arc<S3ObjectBlockClient>,
    instance_id: String,
    lock_timeout: Duration,
}

impl S3LockManager {
    /// Default lock timeout: 300 seconds (5 minutes).
    pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(300);

    /// Create a new S3 lock manager.
    ///
    /// # Arguments
    /// * `client` - S3 client for object operations
    /// * `instance_id` - Unique identifier for this instance (e.g., UUID)
    pub fn new(client: Arc<S3ObjectBlockClient>, instance_id: String) -> Self {
        Self {
            client,
            instance_id,
            lock_timeout: Self::DEFAULT_LOCK_TIMEOUT,
        }
    }

    /// Create with a custom lock timeout.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.lock_timeout = timeout;
        self
    }

    /// Format the lock key for a given hash.
    fn lock_key(&self, hash: &SequenceHash) -> String {
        format!("{}.lock", hash)
    }

    /// Format the meta key for a given hash.
    fn meta_key(&self, hash: &SequenceHash) -> String {
        format!("{}.meta", hash)
    }

    /// Create lock file content with current timestamp.
    fn create_lock_content(&self) -> LockFileContent {
        let now = chrono::Utc::now();
        let deadline = now + chrono::Duration::from_std(self.lock_timeout).unwrap_or_default();
        LockFileContent {
            instance_id: self.instance_id.clone(),
            acquired_at: now.to_rfc3339(),
            deadline: deadline.to_rfc3339(),
        }
    }

    /// Check if a lock's deadline has been breached.
    fn is_lock_expired(lock: &LockFileContent) -> bool {
        if let Ok(deadline) = chrono::DateTime::parse_from_rfc3339(&lock.deadline) {
            let now = chrono::Utc::now();
            now > deadline.with_timezone(&chrono::Utc)
        } else {
            // If we can't parse the deadline, consider it expired
            true
        }
    }
}

impl ObjectLockManager for S3LockManager {
    fn has_meta(&self, hash: SequenceHash) -> BoxFuture<'static, Result<bool>> {
        let client = self.client.clone();
        let meta_key = self.meta_key(&hash);

        Box::pin(async move { client.has_object(&meta_key).await })
    }

    fn try_acquire_lock(&self, hash: SequenceHash) -> BoxFuture<'static, Result<bool>> {
        let client = self.client.clone();
        let lock_key = self.lock_key(&hash);
        let lock_content = self.create_lock_content();
        let our_instance_id = self.instance_id.clone();

        Box::pin(async move {
            // Serialize lock content
            let lock_data = serde_json::to_vec(&lock_content)
                .map_err(|e| anyhow::anyhow!("failed to serialize lock content: {}", e))?;

            // Try conditional PUT (If-None-Match: *)
            match client
                .put_if_not_exists(&lock_key, bytes::Bytes::from(lock_data.clone()))
                .await
            {
                Ok(true) => {
                    // Successfully acquired lock
                    tracing::debug!(lock_key = %lock_key, "Acquired lock");
                    Ok(true)
                }
                Ok(false) => {
                    // Lock exists, read it with ETag for CAS-style takeover
                    tracing::debug!(lock_key = %lock_key, "Lock exists, checking deadline");
                    match client.get_object_with_etag(&lock_key).await? {
                        Some((existing_data, etag)) => {
                            match serde_json::from_slice::<LockFileContent>(&existing_data) {
                                Ok(existing_lock) => {
                                    // Check if we own the lock
                                    if existing_lock.instance_id == our_instance_id {
                                        tracing::debug!(lock_key = %lock_key, "We own this lock");
                                        return Ok(true);
                                    }

                                    // Check if the lock is expired
                                    if Self::is_lock_expired(&existing_lock) {
                                        tracing::debug!(
                                            lock_key = %lock_key,
                                            old_instance = %existing_lock.instance_id,
                                            deadline = %existing_lock.deadline,
                                            "Lock expired, attempting atomic takeover"
                                        );
                                        // Atomically overwrite the expired lock using ETag
                                        if let Some(etag) = etag {
                                            let won = client
                                                .put_object_if_match(
                                                    &lock_key,
                                                    bytes::Bytes::from(lock_data),
                                                    &etag,
                                                )
                                                .await?;
                                            if !won {
                                                tracing::debug!(
                                                    lock_key = %lock_key,
                                                    "Lost race for expired lock takeover"
                                                );
                                            }
                                            Ok(won)
                                        } else {
                                            // No ETag available, fall back to unconditional put
                                            tracing::warn!(
                                                lock_key = %lock_key,
                                                "No ETag on expired lock, falling back to unconditional overwrite"
                                            );
                                            client
                                                .put_object(
                                                    &lock_key,
                                                    bytes::Bytes::from(lock_data),
                                                )
                                                .await?;
                                            Ok(true)
                                        }
                                    } else {
                                        tracing::debug!(
                                            lock_key = %lock_key,
                                            owner = %existing_lock.instance_id,
                                            deadline = %existing_lock.deadline,
                                            "Lock held by another instance"
                                        );
                                        Ok(false)
                                    }
                                }
                                Err(e) => {
                                    // Malformed lock file, attempt atomic overwrite
                                    tracing::warn!(
                                        lock_key = %lock_key,
                                        error = %e,
                                        "Malformed lock file, attempting atomic overwrite"
                                    );
                                    if let Some(etag) = etag {
                                        let won = client
                                            .put_object_if_match(
                                                &lock_key,
                                                bytes::Bytes::from(lock_data),
                                                &etag,
                                            )
                                            .await?;
                                        if !won {
                                            tracing::debug!(
                                                lock_key = %lock_key,
                                                "Lost race for malformed lock takeover"
                                            );
                                        }
                                        Ok(won)
                                    } else {
                                        tracing::warn!(
                                            lock_key = %lock_key,
                                            "No ETag on malformed lock, falling back to unconditional overwrite"
                                        );
                                        client
                                            .put_object(&lock_key, bytes::Bytes::from(lock_data))
                                            .await?;
                                        Ok(true)
                                    }
                                }
                            }
                        }
                        None => {
                            // Lock was deleted between checks, try to acquire again
                            tracing::debug!(lock_key = %lock_key, "Lock disappeared, retrying");
                            match client
                                .put_if_not_exists(&lock_key, bytes::Bytes::from(lock_data))
                                .await
                            {
                                Ok(created) => Ok(created),
                                Err(e) => Err(e),
                            }
                        }
                    }
                }
                Err(e) => Err(e),
            }
        })
    }

    fn create_meta(&self, hash: SequenceHash) -> BoxFuture<'static, Result<()>> {
        let client = self.client.clone();
        let meta_key = self.meta_key(&hash);

        Box::pin(async move {
            // Create empty meta file to mark block as offloaded
            client.put_object(&meta_key, bytes::Bytes::new()).await?;
            tracing::debug!(meta_key = %meta_key, "Created meta file");
            Ok(())
        })
    }

    fn release_lock(&self, hash: SequenceHash) -> BoxFuture<'static, Result<()>> {
        let client = self.client.clone();
        let lock_key = self.lock_key(&hash);

        Box::pin(async move {
            client.delete_object(&lock_key).await?;
            tracing::debug!(lock_key = %lock_key, "Released lock");
            Ok(())
        })
    }
}

#[cfg(all(test, feature = "testing-s3"))]
mod s3_integration {
    use super::*;
    use crate::object::s3::client::s3_integration::create_test_client;

    #[tokio::test]
    async fn test_lock_expired_takeover_is_atomic() {
        let client = Arc::new(create_test_client("test-lock-atomic").await);
        let hash = SequenceHash::new(0xDEAD_BEEF_u64, None, 0);

        // Create a lock manager with an already-expired timeout (1ms)
        let manager_a = S3LockManager::new(client.clone(), "instance-a".into())
            .with_timeout(Duration::from_millis(1));

        // Acquire the lock with instance A (it will expire almost immediately)
        let acquired = manager_a.try_acquire_lock(hash).await.unwrap();
        assert!(acquired, "instance A should acquire lock");

        // Wait for the lock to expire
        tokio::time::sleep(Duration::from_millis(50)).await;

        // Now race two instances trying to take over the expired lock
        let client_b = client.clone();
        let client_c = client.clone();

        let manager_b =
            S3LockManager::new(client_b, "instance-b".into()).with_timeout(Duration::from_secs(60));
        let manager_c =
            S3LockManager::new(client_c, "instance-c".into()).with_timeout(Duration::from_secs(60));

        let (result_b, result_c) = tokio::join!(
            manager_b.try_acquire_lock(hash),
            manager_c.try_acquire_lock(hash),
        );

        let won_b = result_b.unwrap();
        let won_c = result_c.unwrap();

        // At most one should win. Both could fail if timing is unlucky (B wins the
        // conditional put, then C sees B's non-expired lock). The key invariant is
        // that they can't BOTH win.
        assert!(
            !(won_b && won_c),
            "both instances won the lock — race condition!"
        );

        // Cleanup
        if won_b {
            manager_b.release_lock(hash).await.unwrap();
        } else if won_c {
            manager_c.release_lock(hash).await.unwrap();
        } else {
            // Neither won, clean up the expired lock
            client.delete_object(&format!("{}.lock", hash)).await.ok();
        }
    }
}