worker.rs 8.74 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8

//! The [Worker] class is a convenience wrapper around the construction of the [Runtime]
//! and execution of the users application.
//!
//! In the future, the [Worker] should probably be moved to a procedural macro similar
//! to the `#[tokio::main]` attribute, where we might annotate an async main function with
Neelay Shah's avatar
Neelay Shah committed
9
//! `#[dynamo::main]` or similar.
Ryan Olson's avatar
Ryan Olson committed
10
11
12
13
14
15
//!
//! The [Worker::execute] method is designed to be called once from main and will block
//! the calling thread until the application completes or is canceled. The method initialized
//! the signal handler used to trap `SIGINT` and `SIGTERM` signals and trigger a graceful shutdown.
//!
//! On termination, the user application is given a graceful shutdown period of controlled by
16
//! the [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] environment variable. If the application does not
Ryan Olson's avatar
Ryan Olson committed
17
18
//! shutdown in time, the worker will terminate the application with an exit code of 911.
//!
19
//! The default values of [DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT] differ between the development
Ryan Olson's avatar
Ryan Olson committed
20
21
22
//! and release builds. In development, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG] and
//! in release, the default is [DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE].

23
use super::{CancellationToken, Runtime, RuntimeConfig};
Ryan Olson's avatar
Ryan Olson committed
24
25
26

use futures::Future;
use once_cell::sync::OnceCell;
27
28
use parking_lot::Mutex;
use std::time::Duration;
Ryan Olson's avatar
Ryan Olson committed
29
30
31
use tokio::{signal, task::JoinHandle};

static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
32
static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
33
static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
Ryan Olson's avatar
Ryan Olson committed
34
35
36
37

const SHUTDOWN_MESSAGE: &str =
    "Application received shutdown signal; attempting to gracefully shutdown";
const SHUTDOWN_TIMEOUT_MESSAGE: &str =
38
    "Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
Ryan Olson's avatar
Ryan Olson committed
39
40

/// Environment variable to control the graceful shutdown timeout
41
pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
Ryan Olson's avatar
Ryan Olson committed
42
43
44
45
46
47
48

/// Default graceful shutdown timeout in seconds in debug mode
pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;

/// Default graceful shutdown timeout in seconds in release mode
pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;

49
#[derive(Debug, Clone)]
Ryan Olson's avatar
Ryan Olson committed
50
51
pub struct Worker {
    runtime: Runtime,
52
    config: RuntimeConfig,
Ryan Olson's avatar
Ryan Olson committed
53
54
55
56
}

impl Worker {
    /// Create a new [`Worker`] instance from [`RuntimeConfig`] settings which is sourced from the environment
57
    pub fn from_settings() -> anyhow::Result<Worker> {
Ryan Olson's avatar
Ryan Olson committed
58
59
60
61
62
        let config = RuntimeConfig::from_settings()?;
        Worker::from_config(config)
    }

    /// Create a new [`Worker`] instance from a provided [`RuntimeConfig`]
63
    pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
Ryan Olson's avatar
Ryan Olson committed
64
        // if the runtime is already initialized, return an error
65
        if RT.get().is_some() || RTHANDLE.get().is_some() {
66
            return Err(anyhow::anyhow!("Worker already initialized"));
Ryan Olson's avatar
Ryan Olson committed
67
68
69
70
71
72
        }

        // create a new runtime and insert it into the OnceCell
        // there is still a potential race-condition here, two threads cou have passed the first check
        // but only one will succeed in inserting the runtime
        let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
73
            anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
Ryan Olson's avatar
Ryan Olson committed
74
75
76
        })?;

        let runtime = Runtime::from_handle(rt.handle().clone())?;
77
        Ok(Worker { runtime, config })
Ryan Olson's avatar
Ryan Olson committed
78
79
    }

80
    pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
Ryan Olson's avatar
Ryan Olson committed
81
82
83
84
85
86
87
88
89
        if let Some(rt) = RT.get() {
            Ok(Runtime::from_handle(rt.handle().clone())?)
        } else if let Some(rt) = RTHANDLE.get() {
            Ok(Runtime::from_handle(rt.clone())?)
        } else {
            Runtime::from_settings()
        }
    }

90
91
92
    pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
        RT.get()
            .ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
Ryan Olson's avatar
Ryan Olson committed
93
94
95
96
97
98
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

99
    pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
100
101
    where
        F: FnOnce(Runtime) -> Fut + Send + 'static,
102
        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
103
104
105
106
107
108
109
    {
        let runtime = self.runtime.clone();
        runtime.secondary().block_on(self.execute_internal(f))??;
        runtime.shutdown();
        Ok(())
    }

110
    pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
111
112
    where
        F: FnOnce(Runtime) -> Fut + Send + 'static,
113
        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
114
115
116
117
118
119
120
121
    {
        let runtime = self.runtime.clone();
        let task = self.execute_internal(f);
        task.await??;
        runtime.shutdown();
        Ok(())
    }

Ryan Olson's avatar
Ryan Olson committed
122
123
    /// Executes the provided application/closure on the [`Runtime`].
    /// This is designed to be called once from main and will block the calling thread until the application completes.
124
    fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
Ryan Olson's avatar
Ryan Olson committed
125
126
    where
        F: FnOnce(Runtime) -> Fut + Send + 'static,
127
        Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
Ryan Olson's avatar
Ryan Olson committed
128
    {
129
        let runtime = self.runtime.clone();
Ryan Olson's avatar
Ryan Olson committed
130
        let primary = runtime.primary();
131
        let secondary = runtime.secondary();
Ryan Olson's avatar
Ryan Olson committed
132

133
        let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
Ryan Olson's avatar
Ryan Olson committed
134
135
136
137
138
139
140
141
142
143
144
145
            .ok()
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or({
                if cfg!(debug_assertions) {
                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
                } else {
                    DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
                }
            });

        INIT.set(Mutex::new(Some(secondary.spawn(async move {
            // start signal handler
146
            tokio::spawn(signal_handler(runtime.primary_token().clone()));
Ryan Olson's avatar
Ryan Olson committed
147
148
149
150
151

            let cancel_token = runtime.child_token();
            let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();

            // spawn a task to run the application
152
            let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
Ryan Olson's avatar
Ryan Olson committed
153
154
155
156
157
158
                let _rx = app_rx;
                f(runtime).await
            });

            tokio::select! {
                _ = cancel_token.cancelled() => {
159
160
                    tracing::debug!("{}", SHUTDOWN_MESSAGE);
                    tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
Ryan Olson's avatar
Ryan Olson committed
161
162
163
164
165
166
167
168
169
170
171
172
                }

                _ = app_tx.closed() => {
                }
            };

            let result = tokio::select! {
                result = task => {
                    result
                }

                _ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
173
                    tracing::debug!("Application did not shutdown in time; terminating");
Ryan Olson's avatar
Ryan Olson committed
174
175
176
177
178
179
                    std::process::exit(911);
                }
            }?;

            match &result {
                Ok(_) => {
180
                    tracing::debug!("Application shutdown successfully");
Ryan Olson's avatar
Ryan Olson committed
181
182
                }
                Err(e) => {
183
                    tracing::error!("Application shutdown with error: {:?}", e);
Ryan Olson's avatar
Ryan Olson committed
184
185
186
187
188
                }
            }

            result
        }))))
189
        .expect("Failed to spawn application task");
Ryan Olson's avatar
Ryan Olson committed
190

191
        INIT
Ryan Olson's avatar
Ryan Olson committed
192
193
194
195
            .get()
            .expect("Application task not initialized")
            .lock()
            .take()
196
            .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
197
    }
Ryan Olson's avatar
Ryan Olson committed
198

199
    pub fn from_current() -> anyhow::Result<Worker> {
200
        if RT.get().is_some() || RTHANDLE.get().is_some() {
201
            return Err(anyhow::anyhow!("Worker already initialized"));
202
203
        }
        let runtime = Runtime::from_current()?;
204
205
        let config = RuntimeConfig::from_settings()?;
        Ok(Worker { runtime, config })
Ryan Olson's avatar
Ryan Olson committed
206
207
208
209
    }
}

/// Catch signals and trigger a shutdown
210
async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
Ryan Olson's avatar
Ryan Olson committed
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    let ctrl_c = async {
        signal::ctrl_c().await?;
        anyhow::Ok(())
    };

    let sigterm = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())?
            .recv()
            .await;
        anyhow::Ok(())
    };

    tokio::select! {
        _ = ctrl_c => {
            tracing::info!("Ctrl+C received, starting graceful shutdown");
        },
        _ = sigterm => {
            tracing::info!("SIGTERM received, starting graceful shutdown");
        },
        _ = cancel_token.cancelled() => {
231
            tracing::debug!("CancellationToken triggered; shutting down");
Ryan Olson's avatar
Ryan Olson committed
232
233
234
235
236
237
238
239
        },
    }

    // trigger a shutdown
    cancel_token.cancel();

    Ok(())
}