lease.rs 4.13 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

use super::*;

/// Create a [`Lease`] with a given time-to-live (TTL) attached to the [`CancellationToken`].
pub async fn create_lease(
    mut lease_client: LeaseClient,
    ttl: i64,
    token: CancellationToken,
) -> Result<Lease> {
    let lease = lease_client.grant(ttl, None).await?;

    let id = lease.id();
    let ttl = lease.ttl();
    let child = token.child_token();
    let clone = token.clone();

    tokio::spawn(async move {
        match keep_alive(lease_client, id, ttl, child).await {
21
            Ok(_) => tracing::trace!("keep alive task exited successfully"),
Ryan Olson's avatar
Ryan Olson committed
22
            Err(e) => {
23
24
25
26
                tracing::error!(
                    error = %e,
                    "Unable to maintain lease. Check etcd server status"
                );
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
31
32
33
34
35
36
37
                token.cancel();
            }
        }
    });

    Ok(Lease {
        id,
        cancel_token: clone,
    })
}

38
39
40
41
42
43
44
45
46
47
48
/// Revoke a lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: i64) -> Result<()> {
    match lease_client.revoke(lease_id).await {
        Ok(_) => Ok(()),
        Err(e) => {
            tracing::warn!("failed to revoke lease: {:?}", e);
            Err(e.into())
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/// Task to keep leases alive.
///
/// If this task returns an error, the cancellation token will be invoked on the runtime.
/// If
pub async fn keep_alive(
    client: LeaseClient,
    lease_id: i64,
    ttl: i64,
    token: CancellationToken,
) -> Result<()> {
    let mut ttl = ttl;
    let mut deadline = create_deadline(ttl)?;

    let mut client = client;
    let (mut heartbeat_sender, mut heartbeat_receiver) = client.keep_alive(lease_id).await?;

    loop {
        // if the deadline is exceeded, then we have failed to issue a heartbeat in time
67
        // we may be permanently disconnected from the etcd server, so we are now officially done
Ryan Olson's avatar
Ryan Olson committed
68
        if deadline < std::time::Instant::now() {
69
70
71
            return Err(error!(
                "Unable to refresh lease - deadline exceeded. Check etcd server status"
            ));
Ryan Olson's avatar
Ryan Olson committed
72
73
74
75
76
77
78
        }

        tokio::select! {
            biased;

            status = heartbeat_receiver.message() => {
                if let Some(resp) = status? {
79
                    tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
Ryan Olson's avatar
Ryan Olson committed
80
81
82
83
84
85

                    // update ttl and deadline
                    ttl = resp.ttl();
                    deadline = create_deadline(ttl)?;

                    if resp.ttl() == 0 {
86
                        return Err(error!("Unable to maintain lease - expired or revoked. Check etcd server status"));
Ryan Olson's avatar
Ryan Olson committed
87
88
89
90
91
92
                    }

                }
            }

            _ = token.cancelled() => {
93
                tracing::trace!(lease_id, "cancellation token triggered; revoking lease");
Ryan Olson's avatar
Ryan Olson committed
94
95
96
97
98
                let _ = client.revoke(lease_id).await?;
                return Ok(());
            }

            _ = tokio::time::sleep(tokio::time::Duration::from_secs(ttl as u64 / 2)) => {
99
                tracing::trace!(lease_id, "sending keep alive");
Ryan Olson's avatar
Ryan Olson committed
100
101
102
103
104
105

                // if we get a error issuing the heartbeat, set the ttl to 0
                // this will allow us to poll the response stream once and the cancellation token once, then
                // immediately try to tick the heartbeat
                // this will repeat until either the heartbeat is reestablished or the deadline is exceeded
                if let Err(e) = heartbeat_sender.keep_alive().await {
106
107
108
109
110
                    tracing::warn!(
                        lease_id,
                        error = %e,
                        "Unable to send lease heartbeat. Check etcd server status"
                    );
Ryan Olson's avatar
Ryan Olson committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
                    ttl = 0;
                }
            }

        }
    }
}

/// Create a deadline for a given time-to-live (TTL).
fn create_deadline(ttl: i64) -> Result<std::time::Instant> {
    if ttl <= 0 {
        return Err(error!("invalid ttl: {}", ttl));
    }
    Ok(std::time::Instant::now() + std::time::Duration::from_secs(ttl as u64))
}