runtime.rs 6.74 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15

Graham King's avatar
Graham King committed
16
//! The [Runtime] module is the interface for [crate::component::Component]
Ryan Olson's avatar
Ryan Olson committed
17
18
19
//! to access shared resources. These include thread pool, memory allocators and other shared resources.
//!
//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
Graham King's avatar
Graham King committed
20
//! [`crate::component::Component`].
Ryan Olson's avatar
Ryan Olson committed
21
22
23
24
25
26
27
//!
//! We expect in the future to offer topologically aware thread and memory resources, but for now the
//! set of resources is limited to the thread pool and cancellation token.
//!
//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
//! private; however, for now we are exposing most objects as fully public while the API is maturing.

28
use super::utils::GracefulShutdownTracker;
29
use super::{Result, Runtime, RuntimeType, error};
Ryan Olson's avatar
Ryan Olson committed
30
31
32
33
use crate::config::{self, RuntimeConfig};

use futures::Future;
use once_cell::sync::OnceCell;
34
35
use std::sync::{Arc, atomic::Ordering};
use tokio::{signal, sync::Mutex, task::JoinHandle};
Ryan Olson's avatar
Ryan Olson committed
36
37
38
39

pub use tokio_util::sync::CancellationToken;

impl Runtime {
40
    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
Ryan Olson's avatar
Ryan Olson committed
41
42
43
44
45
46
        // worker id
        let id = Arc::new(uuid::Uuid::new_v4().to_string());

        // create a cancellation token
        let cancellation_token = CancellationToken::new();

47
48
49
        // create endpoint shutdown token as a child of the main token
        let endpoint_shutdown_token = cancellation_token.child_token();

Ryan Olson's avatar
Ryan Olson committed
50
        // secondary runtime for background ectd/nats tasks
51
52
53
        let secondary = match secondary {
            Some(secondary) => secondary,
            None => {
54
                tracing::debug!("Created secondary runtime with single thread");
55
56
57
                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
            }
        };
Ryan Olson's avatar
Ryan Olson committed
58
59
60
61

        Ok(Runtime {
            id,
            primary: runtime,
62
            secondary,
Ryan Olson's avatar
Ryan Olson committed
63
            cancellation_token,
64
65
            endpoint_shutdown_token,
            graceful_shutdown_tracker: Arc::new(GracefulShutdownTracker::new()),
Ryan Olson's avatar
Ryan Olson committed
66
67
68
        })
    }

69
    pub fn from_current() -> Result<Runtime> {
70
        Runtime::from_handle(tokio::runtime::Handle::current())
71
72
    }

Ryan Olson's avatar
Ryan Olson committed
73
    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
74
75
76
        let primary = RuntimeType::External(handle.clone());
        let secondary = RuntimeType::External(handle);
        Runtime::new(primary, Some(secondary))
Ryan Olson's avatar
Ryan Olson committed
77
78
79
80
81
82
    }

    /// Create a [`Runtime`] instance from the settings
    /// See [`config::RuntimeConfig::from_settings`]
    pub fn from_settings() -> Result<Runtime> {
        let config = config::RuntimeConfig::from_settings()?;
83
84
85
86
        let runtime = Arc::new(config.create_runtime()?);
        let primary = RuntimeType::Shared(runtime.clone());
        let secondary = RuntimeType::External(runtime.handle().clone());
        Runtime::new(primary, Some(secondary))
Ryan Olson's avatar
Ryan Olson committed
87
88
    }

89
    /// Create a [`Runtime`] with two single-threaded async tokio runtime
Ryan Olson's avatar
Ryan Olson committed
90
91
92
    pub fn single_threaded() -> Result<Runtime> {
        let config = config::RuntimeConfig::single_threaded();
        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
93
        Runtime::new(owned, None)
Ryan Olson's avatar
Ryan Olson committed
94
95
96
97
98
99
100
101
102
103
104
105
106
    }

    /// Returns the unique identifier for the [`Runtime`]
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
    pub fn primary(&self) -> tokio::runtime::Handle {
        self.primary.handle()
    }

    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
107
108
    pub fn secondary(&self) -> tokio::runtime::Handle {
        self.secondary.handle()
Ryan Olson's avatar
Ryan Olson committed
109
110
111
112
113
114
115
    }

    /// Access the primary [`CancellationToken`] for the [`Runtime`]
    pub fn primary_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

116
    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s endpoint shutdown token.
Ryan Olson's avatar
Ryan Olson committed
117
    pub fn child_token(&self) -> CancellationToken {
118
119
120
121
122
123
        self.endpoint_shutdown_token.child_token()
    }

    /// Get access to the graceful shutdown tracker
    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
        self.graceful_shutdown_tracker.clone()
Ryan Olson's avatar
Ryan Olson committed
124
125
126
127
    }

    /// Shuts down the [`Runtime`] instance
    pub fn shutdown(&self) {
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
        tracing::info!("Runtime shutdown initiated");

        // Spawn the shutdown coordination task BEFORE cancelling tokens
        let tracker = self.graceful_shutdown_tracker.clone();
        let main_token = self.cancellation_token.clone();
        let endpoint_token = self.endpoint_shutdown_token.clone();

        // Use the runtime handle to spawn the task
        let handle = self.primary();
        handle.spawn(async move {
            // Phase 1: Cancel endpoint shutdown token to stop accepting new requests
            tracing::info!("Phase 1: Cancelling endpoint shutdown token");
            endpoint_token.cancel();

            // Phase 2: Wait for all graceful endpoints to complete
            tracing::info!("Phase 2: Waiting for graceful endpoints to complete");

            let count = tracker.get_count();
            tracing::info!("Active graceful endpoints: {}", count);

            if count != 0 {
                tracker.wait_for_completion().await;
            }

            // Phase 3: Now shutdown NATS/ETCD by cancelling the main token
            tracing::info!(
                "Phase 3: All graceful endpoints completed, shutting down NATS/ETCD connections"
            );
            main_token.cancel();
        });
Ryan Olson's avatar
Ryan Olson committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    }
}

impl RuntimeType {
    /// Get [`tokio::runtime::Handle`] to runtime
    pub fn handle(&self) -> tokio::runtime::Handle {
        match self {
            RuntimeType::External(rt) => rt.clone(),
            RuntimeType::Shared(rt) => rt.handle().clone(),
        }
    }
}

impl std::fmt::Debug for RuntimeType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
        }
    }
}