runtime.rs 6.2 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

Graham King's avatar
Graham King committed
4
//! The [Runtime] module is the interface for [crate::component::Component]
Ryan Olson's avatar
Ryan Olson committed
5
6
7
//! to access shared resources. These include thread pool, memory allocators and other shared resources.
//!
//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
Graham King's avatar
Graham King committed
8
//! [`crate::component::Component`].
Ryan Olson's avatar
Ryan Olson committed
9
10
11
12
13
14
15
//!
//! We expect in the future to offer topologically aware thread and memory resources, but for now the
//! set of resources is limited to the thread pool and cancellation token.
//!
//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
//! private; however, for now we are exposing most objects as fully public while the API is maturing.

16
use super::utils::GracefulShutdownTracker;
17
use super::{Result, Runtime, RuntimeType, error};
Ryan Olson's avatar
Ryan Olson committed
18
19
20
21
use crate::config::{self, RuntimeConfig};

use futures::Future;
use once_cell::sync::OnceCell;
22
23
use std::sync::{Arc, atomic::Ordering};
use tokio::{signal, sync::Mutex, task::JoinHandle};
Ryan Olson's avatar
Ryan Olson committed
24
25
26
27

pub use tokio_util::sync::CancellationToken;

impl Runtime {
28
    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
33
34
        // worker id
        let id = Arc::new(uuid::Uuid::new_v4().to_string());

        // create a cancellation token
        let cancellation_token = CancellationToken::new();

35
36
37
        // create endpoint shutdown token as a child of the main token
        let endpoint_shutdown_token = cancellation_token.child_token();

Ryan Olson's avatar
Ryan Olson committed
38
        // secondary runtime for background ectd/nats tasks
39
40
41
        let secondary = match secondary {
            Some(secondary) => secondary,
            None => {
42
                tracing::debug!("Created secondary runtime with single thread");
43
44
45
                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
            }
        };
Ryan Olson's avatar
Ryan Olson committed
46
47
48
49

        Ok(Runtime {
            id,
            primary: runtime,
50
            secondary,
Ryan Olson's avatar
Ryan Olson committed
51
            cancellation_token,
52
53
            endpoint_shutdown_token,
            graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
Ryan Olson's avatar
Ryan Olson committed
54
55
56
        })
    }

57
    pub fn from_current() -> Result<Runtime> {
58
        Runtime::from_handle(tokio::runtime::Handle::current())
59
60
    }

Ryan Olson's avatar
Ryan Olson committed
61
    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
62
63
64
        let primary = RuntimeType::External(handle.clone());
        let secondary = RuntimeType::External(handle);
        Runtime::new(primary, Some(secondary))
Ryan Olson's avatar
Ryan Olson committed
65
66
67
68
69
70
    }

    /// Create a [`Runtime`] instance from the settings
    /// See [`config::RuntimeConfig::from_settings`]
    pub fn from_settings() -> Result<Runtime> {
        let config = config::RuntimeConfig::from_settings()?;
71
72
73
74
        let runtime = Arc::new(config.create_runtime()?);
        let primary = RuntimeType::Shared(runtime.clone());
        let secondary = RuntimeType::External(runtime.handle().clone());
        Runtime::new(primary, Some(secondary))
Ryan Olson's avatar
Ryan Olson committed
75
76
    }

77
    /// Create a [`Runtime`] with two single-threaded async tokio runtime
Ryan Olson's avatar
Ryan Olson committed
78
79
80
    pub fn single_threaded() -> Result<Runtime> {
        let config = config::RuntimeConfig::single_threaded();
        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
81
        Runtime::new(owned, None)
Ryan Olson's avatar
Ryan Olson committed
82
83
84
85
86
87
88
89
90
91
92
93
94
    }

    /// Returns the unique identifier for the [`Runtime`]
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
    pub fn primary(&self) -> tokio::runtime::Handle {
        self.primary.handle()
    }

    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
95
96
    pub fn secondary(&self) -> tokio::runtime::Handle {
        self.secondary.handle()
Ryan Olson's avatar
Ryan Olson committed
97
98
99
100
101
102
103
    }

    /// Access the primary [`CancellationToken`] for the [`Runtime`]
    pub fn primary_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

104
    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s endpoint shutdown token.
Ryan Olson's avatar
Ryan Olson committed
105
    pub fn child_token(&self) -> CancellationToken {
106
107
108
109
110
111
        self.endpoint_shutdown_token.child_token()
    }

    /// Get access to the graceful shutdown tracker
    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
        self.graceful_shutdown_tracker.clone()
Ryan Olson's avatar
Ryan Olson committed
112
113
114
115
    }

    /// Shuts down the [`Runtime`] instance
    pub fn shutdown(&self) {
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
        tracing::info!("Runtime shutdown initiated");

        // Spawn the shutdown coordination task BEFORE cancelling tokens
        let tracker = self.graceful_shutdown_tracker.clone();
        let main_token = self.cancellation_token.clone();
        let endpoint_token = self.endpoint_shutdown_token.clone();

        // Use the runtime handle to spawn the task
        let handle = self.primary();
        handle.spawn(async move {
            // Phase 1: Cancel endpoint shutdown token to stop accepting new requests
            tracing::info!("Phase 1: Cancelling endpoint shutdown token");
            endpoint_token.cancel();

            // Phase 2: Wait for all graceful endpoints to complete
            tracing::info!("Phase 2: Waiting for graceful endpoints to complete");

            let count = tracker.get_count();
            tracing::info!("Active graceful endpoints: {}", count);

            if count != 0 {
                tracker.wait_for_completion().await;
            }

            // Phase 3: Now shutdown NATS/ETCD by cancelling the main token
            tracing::info!(
                "Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
            );
            main_token.cancel();
        });
Ryan Olson's avatar
Ryan Olson committed
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
    }
}

impl RuntimeType {
    /// Get [`tokio::runtime::Handle`] to runtime
    pub fn handle(&self) -> tokio::runtime::Handle {
        match self {
            RuntimeType::External(rt) => rt.clone(),
            RuntimeType::Shared(rt) => rt.handle().clone(),
        }
    }
}

impl std::fmt::Debug for RuntimeType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
        }
    }
}