lease.rs 5.75 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
use super::connector::Connector;
use std::sync::Arc;
6
7
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
8

9
10
11
12
13
/// Create an etcd lease with the given TTL, attach it to the provided cancellation token,
/// spawn a keep-alive task, and return the lease id (u64).
///
/// Note: this function spawns a background task that maintains the lease until the token is
/// cancelled or an unrecoverable error occurs.
Ryan Olson's avatar
Ryan Olson committed
14
pub async fn create_lease(
15
    connector: Arc<Connector>,
16
    ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
17
    token: CancellationToken,
18
) -> anyhow::Result<u64> {
19
    let mut lease_client = connector.get_client().lease_client();
20
    let lease = lease_client.grant(ttl as i64, None).await?;
Ryan Olson's avatar
Ryan Olson committed
21

22
23
    let id = lease.id() as u64;
    let ttl = lease.ttl() as u64;
Ryan Olson's avatar
Ryan Olson committed
24
25
26
    let child = token.child_token();

    tokio::spawn(async move {
27
        match keep_alive(connector, id, ttl, child).await {
28
            Ok(_) => tracing::trace!("keep alive task exited successfully"),
Ryan Olson's avatar
Ryan Olson committed
29
            Err(e) => {
30
31
32
33
                tracing::error!(
                    error = %e,
                    "Unable to maintain lease. Check etcd server status"
                );
Ryan Olson's avatar
Ryan Olson committed
34
35
36
37
38
                token.cancel();
            }
        }
    });

39
    Ok(id)
40
41
}

42
/// Task to keep leases alive with reconnection support.
Ryan Olson's avatar
Ryan Olson committed
43
44
///
/// If this task returns an error, the cancellation token will be invoked on the runtime.
45
46
async fn keep_alive(
    connector: Arc<Connector>,
47
    lease_id: u64,
48
    mut ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
49
    token: CancellationToken,
50
51
) -> anyhow::Result<()> {
    let mut deadline = Instant::now() + Duration::from_secs(ttl);
Ryan Olson's avatar
Ryan Olson committed
52
53

    loop {
54
55
56
57
58
59
60
61
62
63
64
65
        // Try to establish or re-establish the keep-alive stream
        let mut lease_client = connector.get_client().lease_client();
        let (mut heartbeat_sender, mut heartbeat_receiver) = match lease_client
            .keep_alive(lease_id as i64)
            .await
        {
            Ok((sender, receiver)) => {
                tracing::debug!(lease_id, "Established keep-alive stream");
                (sender, receiver)
            }
            Err(e) => {
                tracing::warn!(lease_id, error = %e, "Failed to establish keep-alive stream");
Ryan Olson's avatar
Ryan Olson committed
66

67
68
69
                // Try to reconnect with the deadline, but also check for cancellation
                tokio::select! {
                    biased;
Ryan Olson's avatar
Ryan Olson committed
70

71
72
73
74
75
                    reconnect_result = connector.reconnect(deadline) => {
                        match reconnect_result {
                            Err(e) => return Err(e),
                            _ => continue,
                        }
Ryan Olson's avatar
Ryan Olson committed
76
77
                    }

78
79
80
81
                    _ = token.cancelled() => {
                        tracing::debug!(lease_id, "Cancellation token triggered during reconnection");
                        return Ok(());
                    }
Ryan Olson's avatar
Ryan Olson committed
82
83
                }
            }
84
85
86
87
88
        };

        // Keep-alive loop with the established stream
        loop {
            if deadline < std::time::Instant::now() {
89
                anyhow::bail!(
90
                    "Unable to refresh lease - deadline exceeded. Check etcd server status"
91
                );
Ryan Olson's avatar
Ryan Olson committed
92
93
            }

94
95
96
97
98
99
100
101
102
103
            tokio::select! {
                biased;

                status = heartbeat_receiver.message() => {
                    match status {
                        Ok(Some(resp)) => {
                            tracing::trace!(lease_id, "keep alive response received: {:?}", resp);

                            // Update ttl and deadline from response
                            ttl = resp.ttl() as u64;
104
                            deadline = Instant::now() + Duration::from_secs(ttl);
105
106

                            if resp.ttl() == 0 {
107
                                anyhow::bail!("Unable to maintain lease - expired or revoked. Check etcd server status");
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
                            }
                        }
                        Ok(None) => {
                            tracing::warn!(lease_id, "Keep-alive stream unexpectedly ended");
                            break;
                        }
                        Err(e) => {
                            tracing::warn!(lease_id, error = %e, "Keep-alive stream error");
                            break;
                        }
                    }
                }

                _ = token.cancelled() => {
                    tracing::debug!(lease_id, "cancellation token triggered; revoking lease");
                    if let Err(e) = lease_client.revoke(lease_id as i64).await {
                        tracing::warn!(
                            lease_id,
                            error = %e,
                            "Failed to revoke lease during cancellation. Cleanup may be incomplete."
                        );
                    }
                    return Ok(());
Ryan Olson's avatar
Ryan Olson committed
131
132
                }

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
                _ = tokio::time::sleep(Duration::from_secs(ttl / 2)) => {
                    tracing::trace!(lease_id, "sending keep alive");

                    // if we get a error issuing the heartbeat, set the ttl to 0
                    // this will allow us to poll the response stream once and the cancellation
                    // token once, then immediately try to tick the heartbeat
                    // this will repeat until either the heartbeat is reestablished or the deadline
                    // is exceeded
                    if let Err(e) = heartbeat_sender.keep_alive().await {
                        tracing::warn!(
                            lease_id,
                            error = %e,
                            "Unable to send lease heartbeat. Check etcd server status"
                        );
                        ttl = 0;
                    }
                }
            }
Ryan Olson's avatar
Ryan Olson committed
151
152
153
        }
    }
}