runtime.rs 4.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15

Graham King's avatar
Graham King committed
16
//! The [Runtime] module is the interface for [crate::component::Component]
Ryan Olson's avatar
Ryan Olson committed
17
18
19
//! to access shared resources. These include thread pool, memory allocators and other shared resources.
//!
//! The [Runtime] holds the primary [`CancellationToken`] which can be used to terminate all attached
Graham King's avatar
Graham King committed
20
//! [`crate::component::Component`].
Ryan Olson's avatar
Ryan Olson committed
21
22
23
24
25
26
27
//!
//! We expect in the future to offer topologically aware thread and memory resources, but for now the
//! set of resources is limited to the thread pool and cancellation token.
//!
//! Notes: We will need to do an evaluation on what is fully public, what is pub(crate) and what is
//! private; however, for now we are exposing most objects as fully public while the API is maturing.

28
use super::{error, Result, Runtime, RuntimeType};
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
33
34
35
36
37
38
use crate::config::{self, RuntimeConfig};

use futures::Future;
use once_cell::sync::OnceCell;
use std::sync::{Arc, Mutex};
use tokio::{signal, task::JoinHandle};

pub use tokio_util::sync::CancellationToken;

impl Runtime {
39
    fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
Ryan Olson's avatar
Ryan Olson committed
40
41
42
43
44
45
46
        // worker id
        let id = Arc::new(uuid::Uuid::new_v4().to_string());

        // create a cancellation token
        let cancellation_token = CancellationToken::new();

        // secondary runtime for background ectd/nats tasks
47
48
49
50
51
52
        let secondary = match secondary {
            Some(secondary) => secondary,
            None => {
                RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
            }
        };
Ryan Olson's avatar
Ryan Olson committed
53
54
55
56

        Ok(Runtime {
            id,
            primary: runtime,
57
            secondary,
Ryan Olson's avatar
Ryan Olson committed
58
59
60
61
            cancellation_token,
        })
    }

62
63
64
65
66
67
68
    pub fn from_current() -> Result<Runtime> {
        let handle = tokio::runtime::Handle::current();
        let primary = RuntimeType::External(handle.clone());
        let secondary = RuntimeType::External(handle);
        Runtime::new(primary, Some(secondary))
    }

Ryan Olson's avatar
Ryan Olson committed
69
70
    pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
        let runtime = RuntimeType::External(handle);
71
        Runtime::new(runtime, None)
Ryan Olson's avatar
Ryan Olson committed
72
73
74
75
76
77
78
    }

    /// Create a [`Runtime`] instance from the settings
    /// See [`config::RuntimeConfig::from_settings`]
    pub fn from_settings() -> Result<Runtime> {
        let config = config::RuntimeConfig::from_settings()?;
        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
79
        Runtime::new(owned, None)
Ryan Olson's avatar
Ryan Olson committed
80
81
82
83
84
85
    }

    /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
    pub fn single_threaded() -> Result<Runtime> {
        let config = config::RuntimeConfig::single_threaded();
        let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
86
        Runtime::new(owned, None)
Ryan Olson's avatar
Ryan Olson committed
87
88
89
90
91
92
93
94
95
96
97
98
99
    }

    /// Returns the unique identifier for the [`Runtime`]
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Returns a [`tokio::runtime::Handle`] for the primary/application thread pool
    pub fn primary(&self) -> tokio::runtime::Handle {
        self.primary.handle()
    }

    /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
100
101
    pub fn secondary(&self) -> tokio::runtime::Handle {
        self.secondary.handle()
Ryan Olson's avatar
Ryan Olson committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    }

    /// Access the primary [`CancellationToken`] for the [`Runtime`]
    pub fn primary_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

    /// Creates a child [`CancellationToken`] tied to the life-cycle of the [`Runtime`]'s root [`CancellationToken::child_token`] method.
    pub fn child_token(&self) -> CancellationToken {
        self.cancellation_token.child_token()
    }

    /// Shuts down the [`Runtime`] instance
    pub fn shutdown(&self) {
        self.cancellation_token.cancel();
    }
}

impl RuntimeType {
    /// Get [`tokio::runtime::Handle`] to runtime
    pub fn handle(&self) -> tokio::runtime::Handle {
        match self {
            RuntimeType::External(rt) => rt.clone(),
            RuntimeType::Shared(rt) => rt.handle().clone(),
        }
    }
}

impl std::fmt::Debug for RuntimeType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RuntimeType::External(_) => write!(f, "RuntimeType::External"),
            RuntimeType::Shared(_) => write!(f, "RuntimeType::Shared"),
        }
    }
}