discovery.rs 3.84 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
 * Copyright 2024-2025 NVIDIA CORPORATION & AFFILIATES
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

use crate::{transports::etcd, Result};

pub use etcd::Lease;

pub struct DiscoveryClient {
    namespace: String,
    etcd_client: etcd::Client,
}

impl DiscoveryClient {
    /// Create a new [`DiscoveryClient`]
    ///
    /// This will establish a connection to the etcd server, create a primary lease,
    /// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
    /// to the lease.
    ///
    /// If the lease expires, the [`Runtime`] will be shutdown.
    /// If the [`Runtime`] is shutdown, the lease will be revoked.
    pub(crate) fn new(namespace: String, etcd_client: etcd::Client) -> Self {
        DiscoveryClient {
            namespace,
            etcd_client,
        }
    }

    /// Get the primary lease ID
    pub fn primary_lease_id(&self) -> i64 {
        self.etcd_client.lease_id()
    }

    /// Create a [`Lease`] with a given time-to-live (TTL).
    /// This [`Lease`] will be tied to the [`Runtime`], but has its own independent [`crate::CancellationToken`].
    pub async fn create_lease(&self, ttl: i64) -> Result<Lease> {
        self.etcd_client.create_lease(ttl).await
    }

    // the following two commented out codes are not implemented, but are placeholders for proposed ectd usage patterns

    // /// Create an ephemeral key/value pair tied to a lease_id.
    // /// This is an atomic create. If the key already exists, this will fail.
    // /// The [`etcd_client::KeyValue`] will be removed when the lease expires or is revoked.
    // pub async fn create_ephemerial_key(&self, key: &str, value: &str, lease_id: i64) -> Result<()> {
    //     // self.etcd_client.create_ephemeral_key(key, value, lease_id).await
    //     unimplemented!()
    // }

    // /// Create a shared [`etcd_client::KeyValue`] which behaves similar to a C++ `std::shared_ptr` or a
    // /// Rust [std::sync::Arc]. Instead of having one owner of the lease, multiple owners participate in
    // /// maintaining the lease. In this manner, when the last member of the group sharing the lease is gone,
    // /// the lease will be expired.
    // ///
    // /// Implementation notes: At the time of writing, it is unclear if we have atomics that control leases,
    // /// so in our initial implementation, the last member of the group will not revoke the lease, so the object
    // /// will live for upto the TTL after the last member is gone.
    // ///
    // /// Notes
    // /// -----
    // ///
    // /// - Multiple members sharing the lease and contributing to the heartbeat might cause some overheads.
    // ///   The implementation will try to randomize the heartbeat intervals to avoid thundering herd problem,
    // ///   and with any luck, the heartbeat watchers will be able to detect when if a external member triggered
    // ///   the heartbeat checking this interval and skip unnecessary heartbeat messages.
    // ///
    // /// A new lease will be created for this object. If you wish to add an object to a shared group s
    // ///
    // /// The [`etcd_client::KeyValue`] will be removed when the lease expires or is revoked.
    // pub async fn create_shared_key(&self, key: &str, value: &str, lease_id: i64) -> Result<()> {
    //     // self.etcd_client.create_ephemeral_key(key, value, lease_id).await
    //     unimplemented!()
    // }
}