lease.rs 4.09 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8

use super::*;

/// Create a [`Lease`] with a given time-to-live (TTL) attached to the [`CancellationToken`].
pub async fn create_lease(
    mut lease_client: LeaseClient,
9
    ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
10
11
    token: CancellationToken,
) -> Result<Lease> {
12
    let lease = lease_client.grant(ttl as i64, None).await?;
Ryan Olson's avatar
Ryan Olson committed
13

14
15
    let id = lease.id() as u64;
    let ttl = lease.ttl() as u64;
Ryan Olson's avatar
Ryan Olson committed
16
17
18
19
20
    let child = token.child_token();
    let clone = token.clone();

    tokio::spawn(async move {
        match keep_alive(lease_client, id, ttl, child).await {
21
            Ok(_) => tracing::trace!("keep alive task exited successfully"),
Ryan Olson's avatar
Ryan Olson committed
22
            Err(e) => {
23
24
25
26
                tracing::error!(
                    error = %e,
                    "Unable to maintain lease. Check etcd server status"
                );
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
31
32
33
34
35
36
37
                token.cancel();
            }
        }
    });

    Ok(Lease {
        id,
        cancel_token: clone,
    })
}

38
/// Revoke a lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
39
40
pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: u64) -> Result<()> {
    match lease_client.revoke(lease_id as i64).await {
41
42
43
44
45
46
47
48
        Ok(_) => Ok(()),
        Err(e) => {
            tracing::warn!("failed to revoke lease: {:?}", e);
            Err(e.into())
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
49
50
51
52
53
54
/// Task to keep leases alive.
///
/// If this task returns an error, the cancellation token will be invoked on the runtime.
/// If
pub async fn keep_alive(
    client: LeaseClient,
55
56
    lease_id: u64,
    ttl: u64,
Ryan Olson's avatar
Ryan Olson committed
57
58
59
60
61
62
    token: CancellationToken,
) -> Result<()> {
    let mut ttl = ttl;
    let mut deadline = create_deadline(ttl)?;

    let mut client = client;
63
    let (mut heartbeat_sender, mut heartbeat_receiver) = client.keep_alive(lease_id as i64).await?;
Ryan Olson's avatar
Ryan Olson committed
64
65
66

    loop {
        // if the deadline is exceeded, then we have failed to issue a heartbeat in time
67
        // we may be permanently disconnected from the etcd server, so we are now officially done
Ryan Olson's avatar
Ryan Olson committed
68
        if deadline < std::time::Instant::now() {
69
70
71
            return Err(error!(
                "Unable to refresh lease - deadline exceeded. Check etcd server status"
            ));
Ryan Olson's avatar
Ryan Olson committed
72
73
74
75
76
77
78
        }

        tokio::select! {
            biased;

            status = heartbeat_receiver.message() => {
                if let Some(resp) = status? {
79
                    tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
Ryan Olson's avatar
Ryan Olson committed
80
81

                    // update ttl and deadline
82
                    ttl = resp.ttl() as u64;
Ryan Olson's avatar
Ryan Olson committed
83
84
85
                    deadline = create_deadline(ttl)?;

                    if resp.ttl() == 0 {
86
                        return Err(error!("Unable to maintain lease - expired or revoked. Check etcd server status"));
Ryan Olson's avatar
Ryan Olson committed
87
88
89
90
91
92
                    }

                }
            }

            _ = token.cancelled() => {
93
                tracing::trace!(lease_id, "cancellation token triggered; revoking lease");
94
                let _ = client.revoke(lease_id as i64).await?;
Ryan Olson's avatar
Ryan Olson committed
95
96
97
                return Ok(());
            }

98
            _ = tokio::time::sleep(tokio::time::Duration::from_secs(ttl / 2)) => {
99
                tracing::trace!(lease_id, "sending keep alive");
Ryan Olson's avatar
Ryan Olson committed
100
101
102
103
104
105

                // if we get a error issuing the heartbeat, set the ttl to 0
                // this will allow us to poll the response stream once and the cancellation token once, then
                // immediately try to tick the heartbeat
                // this will repeat until either the heartbeat is reestablished or the deadline is exceeded
                if let Err(e) = heartbeat_sender.keep_alive().await {
106
107
108
109
110
                    tracing::warn!(
                        lease_id,
                        error = %e,
                        "Unable to send lease heartbeat. Check etcd server status"
                    );
Ryan Olson's avatar
Ryan Olson committed
111
112
113
114
115
116
117
118
119
                    ttl = 0;
                }
            }

        }
    }
}

/// Create a deadline for a given time-to-live (TTL).
120
121
fn create_deadline(ttl: u64) -> Result<std::time::Instant> {
    Ok(std::time::Instant::now() + std::time::Duration::from_secs(ttl))
Ryan Olson's avatar
Ryan Olson committed
122
}