lease.rs 6.79 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
use super::connector::Connector;
5
use etcd_client::{LeaseKeepAliveStream, LeaseKeeper};
6
use std::sync::Arc;
7
8
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
9

10
11
12
13
14
/// Create an etcd lease with the given TTL, attach it to the provided cancellation token,
/// spawn a keep-alive task, and return the lease id (u64).
///
/// Note: this function spawns a background task that maintains the lease until the token is
/// cancelled or an unrecoverable error occurs.
Ryan Olson's avatar
Ryan Olson committed
15
pub async fn create_lease(
16
    connector: Arc<Connector>,
17
    ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
18
    token: CancellationToken,
19
) -> anyhow::Result<u64> {
20
    let mut lease_client = connector.get_client().lease_client();
21
    let lease = lease_client.grant(ttl as i64, None).await?;
Ryan Olson's avatar
Ryan Olson committed
22

23
24
    let id = lease.id() as u64;
    let ttl = lease.ttl() as u64;
Ryan Olson's avatar
Ryan Olson committed
25
26
27
    let child = token.child_token();

    tokio::spawn(async move {
28
        match keep_alive(connector, id, ttl, child).await {
29
            Ok(_) => tracing::trace!("keep alive task exited successfully"),
Ryan Olson's avatar
Ryan Olson committed
30
            Err(e) => {
31
32
33
34
                tracing::error!(
                    error = %e,
                    "Unable to maintain lease. Check etcd server status"
                );
Ryan Olson's avatar
Ryan Olson committed
35
36
37
38
39
                token.cancel();
            }
        }
    });

40
    Ok(id)
41
42
}

43
/// Task to keep leases alive with reconnection support.
Ryan Olson's avatar
Ryan Olson committed
44
45
///
/// If this task returns an error, the cancellation token will be invoked on the runtime.
46
47
async fn keep_alive(
    connector: Arc<Connector>,
48
    lease_id: u64,
49
    ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
50
    token: CancellationToken,
51
) -> anyhow::Result<()> {
52
    // Deadline when the lease expires
53
    let mut deadline = Instant::now() + Duration::from_secs(ttl);
Ryan Olson's avatar
Ryan Olson committed
54

55
56
    let mut reconnect = true;
    while reconnect {
57
        // Try to establish or re-establish the keep-alive stream
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
        let (sender, receiver) =
            match new_keep_alive_stream(&connector, lease_id, &deadline, &token).await? {
                Some(stream) => stream,
                None => break, // cancelled
            };

        // Keep-alive loop with the established stream
        reconnect = keep_alive_with_stream(
            &connector,
            sender,
            receiver,
            lease_id,
            &mut deadline,
            &token,
        )
        .await?;
    }
    Ok(())
}

/// Establish a new keep-alive stream with automatic retry and reconnection.
///
/// Returns:
///     `Ok(Some((LeaseKeeper, LeaseKeepAliveStream)))` on success.
///     `Ok(None)` if cancelled.
///     `Err` for unrecoverable errors such as deadline exceeded.
async fn new_keep_alive_stream(
    connector: &Arc<Connector>,
    lease_id: u64,
    deadline: &Instant,
    token: &CancellationToken,
) -> anyhow::Result<Option<(LeaseKeeper, LeaseKeepAliveStream)>> {
    loop {
91
        let mut lease_client = connector.get_client().lease_client();
92
        match lease_client.keep_alive(lease_id as i64).await {
93
94
            Ok((sender, receiver)) => {
                tracing::debug!(lease_id, "Established keep-alive stream");
95
                return Ok(Some((sender, receiver))); // success
96
97
98
            }
            Err(e) => {
                tracing::warn!(lease_id, error = %e, "Failed to establish keep-alive stream");
Ryan Olson's avatar
Ryan Olson committed
99

100
101
102
                // Try to reconnect with the deadline, but also check for cancellation
                tokio::select! {
                    biased;
Ryan Olson's avatar
Ryan Olson committed
103

104
                    reconnect_result = connector.reconnect(*deadline) => {
105
                        match reconnect_result {
106
107
                            Err(e) => return Err(e), // cannot reconnect
                            _ => continue, // retry
108
                        }
Ryan Olson's avatar
Ryan Olson committed
109
110
                    }

111
112
                    _ = token.cancelled() => {
                        tracing::debug!(lease_id, "Cancellation token triggered during reconnection");
113
                        return Ok(None); // cancelled
114
                    }
Ryan Olson's avatar
Ryan Olson committed
115
116
                }
            }
117
        };
118
119
    }
}
120

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/// Keep-alive loop that maintains the lease using the provided sender and receiver.
///
/// Returns:
///     `Ok(true)` for recoverable errors such as stream closure that warrant reconnection attempts.
///     `Ok(false)` if cancelled.
///     `Err` for unrecoverable errors such as lease already expired.
async fn keep_alive_with_stream(
    connector: &Arc<Connector>,
    mut sender: LeaseKeeper,
    mut receiver: LeaseKeepAliveStream,
    lease_id: u64,
    deadline: &mut Instant,
    token: &CancellationToken,
) -> anyhow::Result<bool> {
    loop {
        let next_renewal = deadline
            .saturating_duration_since(Instant::now())
            .div_f64(2.0);

        tokio::select! {
            biased;

            status = receiver.message() => {
                match status {
                    Ok(Some(resp)) => {
                        tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
                        // Update deadline from response
                        let ttl = resp.ttl();
                        if ttl <= 0 {
                            tracing::error!(lease_id, "Keep-alive lease expired");
                            anyhow::bail!("Unable to maintain lease - expired or revoked. Check etcd server status");
152
                        }
153
154
155
156
157
158
159
160
161
                        *deadline = Instant::now() + Duration::from_secs(ttl as u64);
                    }
                    Ok(None) => {
                        tracing::warn!(lease_id, "Keep-alive stream unexpectedly ended");
                        return Ok(true); // Exit to reconnect
                    }
                    Err(e) => {
                        tracing::warn!(lease_id, error = %e, "Keep-alive stream error");
                        return Ok(true); // Exit to reconnect
162
163
                    }
                }
164
            }
165

166
167
168
169
170
171
172
173
174
            _ = token.cancelled() => {
                tracing::debug!(lease_id, "cancellation token triggered; revoking lease");
                let mut lease_client = connector.get_client().lease_client();
                if let Err(e) = lease_client.revoke(lease_id as i64).await {
                    tracing::warn!(
                        lease_id,
                        error = %e,
                        "Failed to revoke lease during cancellation. Cleanup may be incomplete."
                    );
Ryan Olson's avatar
Ryan Olson committed
175
                }
176
177
                return Ok(false);
            }
Ryan Olson's avatar
Ryan Olson committed
178

179
180
181
182
183
184
185
186
            _ = tokio::time::sleep(next_renewal) => {
                tracing::trace!(lease_id, "sending keep alive");
                if let Err(e) = sender.keep_alive().await {
                    tracing::warn!(
                        lease_id,
                        error = %e,
                        "Unable to send lease heartbeat. Check etcd server status"
                    );
187
188
                }
            }
Ryan Olson's avatar
Ryan Olson committed
189
190
191
        }
    }
}