Unverified Commit 2a5eb7e7 authored by Abrar Shivani's avatar Abrar Shivani Committed by GitHub
Browse files

feat: Use existing Tokio runtime in components (#941)

The runtime library already provides a from_current method that creates and returns a Runtime object initialized with the current Tokio runtime handle. Since components do not use the runtime library directly but access it through the worker, the worker needs to be updated to create itself using a Runtime instance derived from the current Tokio runtime.
This PR updates the http component and the worker to use the existing Tokio runtime instead of creating a new one. Other components can be similarly updated to run using the existing runtime.
parent 44250d44
...@@ -47,10 +47,11 @@ struct Args { ...@@ -47,10 +47,11 @@ struct Args {
component: String, component: String,
} }
fn main() -> Result<()> { #[tokio::main]
async fn main() -> Result<()> {
logging::init(); logging::init();
let worker = Worker::from_settings()?; let worker = Worker::from_current()?;
worker.execute(app) worker.execute_async(app).await
} }
async fn app(runtime: Runtime) -> Result<()> { async fn app(runtime: Runtime) -> Result<()> {
......
...@@ -40,6 +40,7 @@ use std::{sync::Mutex, time::Duration}; ...@@ -40,6 +40,7 @@ use std::{sync::Mutex, time::Duration};
use tokio::{signal, task::JoinHandle}; use tokio::{signal, task::JoinHandle};
static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new(); static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new(); static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> = OnceCell::new();
const SHUTDOWN_MESSAGE: &str = const SHUTDOWN_MESSAGE: &str =
...@@ -71,7 +72,7 @@ impl Worker { ...@@ -71,7 +72,7 @@ impl Worker {
/// Create a new [`Worker`] instance from a provided [`RuntimeConfig`] /// Create a new [`Worker`] instance from a provided [`RuntimeConfig`]
pub fn from_config(config: RuntimeConfig) -> Result<Worker> { pub fn from_config(config: RuntimeConfig) -> Result<Worker> {
// if the runtime is already initialized, return an error // if the runtime is already initialized, return an error
if RT.get().is_some() { if RT.get().is_some() || RTHANDLE.get().is_some() {
return Err(error!("Worker already initialized")); return Err(error!("Worker already initialized"));
} }
...@@ -94,15 +95,37 @@ impl Worker { ...@@ -94,15 +95,37 @@ impl Worker {
&self.runtime &self.runtime
} }
pub fn execute<F, Fut>(self, f: F) -> Result<()>
where
F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
let runtime = self.runtime.clone();
runtime.secondary().block_on(self.execute_internal(f))??;
runtime.shutdown();
Ok(())
}
pub async fn execute_async<F, Fut>(self, f: F) -> Result<()>
where
F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
let runtime = self.runtime.clone();
let task = self.execute_internal(f);
task.await??;
runtime.shutdown();
Ok(())
}
/// Executes the provided application/closure on the [`Runtime`]. /// Executes the provided application/closure on the [`Runtime`].
/// This is designed to be called once from main and will block the calling thread until the application completes. /// This is designed to be called once from main and will block the calling thread until the application completes.
pub fn execute<F, Fut>(self, f: F) -> Result<()> fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<Result<()>>
where where
F: FnOnce(Runtime) -> Fut + Send + 'static, F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static, Fut: Future<Output = Result<()>> + Send + 'static,
{ {
let runtime = self.runtime; let runtime = self.runtime.clone();
let local_runtime = runtime.clone();
let primary = runtime.primary(); let primary = runtime.primary();
let secondary = runtime.secondary(); let secondary = runtime.secondary();
...@@ -162,7 +185,7 @@ impl Worker { ...@@ -162,7 +185,7 @@ impl Worker {
result result
})))) }))))
.map_err(|e| error!("Failed to spawn application task: {:?}", e))?; .expect("Failed to spawn application task");
let task = INIT let task = INIT
.get() .get()
...@@ -171,10 +194,15 @@ impl Worker { ...@@ -171,10 +194,15 @@ impl Worker {
.unwrap() .unwrap()
.take() .take()
.expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once"); .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once");
task
}
secondary.block_on(task)??; pub fn from_current() -> Result<Worker> {
local_runtime.shutdown(); if RT.get().is_some() || RTHANDLE.get().is_some() {
Ok(()) return Err(error!("Worker already initialized"));
}
let runtime = Runtime::from_current()?;
Ok(Worker { runtime })
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment