"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "1da3309acefd156a1a834446b828d5e4065822df"
Unverified Commit 611c213b authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(runtime): Handle dropping the tokio runtime in async context (#4343)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 7702f49f
...@@ -21,7 +21,10 @@ use crate::{ ...@@ -21,7 +21,10 @@ use crate::{
use futures::Future; use futures::Future;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::sync::{Arc, atomic::Ordering}; use std::{
mem::ManuallyDrop,
sync::{Arc, atomic::Ordering},
};
use tokio::{signal, sync::Mutex, task::JoinHandle}; use tokio::{signal, sync::Mutex, task::JoinHandle};
pub use tokio_util::sync::CancellationToken; pub use tokio_util::sync::CancellationToken;
...@@ -29,7 +32,7 @@ pub use tokio_util::sync::CancellationToken; ...@@ -29,7 +32,7 @@ pub use tokio_util::sync::CancellationToken;
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime]. /// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum RuntimeType { enum RuntimeType {
Shared(Arc<tokio::runtime::Runtime>), Shared(Arc<ManuallyDrop<tokio::runtime::Runtime>>),
External(tokio::runtime::Handle), External(tokio::runtime::Handle),
} }
...@@ -62,7 +65,9 @@ impl Runtime { ...@@ -62,7 +65,9 @@ impl Runtime {
Some(secondary) => secondary, Some(secondary) => secondary,
None => { None => {
tracing::debug!("Created secondary runtime with single thread"); tracing::debug!("Created secondary runtime with single thread");
RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?)) RuntimeType::Shared(Arc::new(ManuallyDrop::new(
RuntimeConfig::single_threaded().create_runtime()?,
)))
} }
}; };
...@@ -243,7 +248,7 @@ impl Runtime { ...@@ -243,7 +248,7 @@ impl Runtime {
/// See [`config::RuntimeConfig::from_settings`] /// See [`config::RuntimeConfig::from_settings`]
pub fn from_settings() -> anyhow::Result<Runtime> { pub fn from_settings() -> anyhow::Result<Runtime> {
let config = config::RuntimeConfig::from_settings()?; let config = config::RuntimeConfig::from_settings()?;
let runtime = Arc::new(config.create_runtime()?); let runtime = Arc::new(ManuallyDrop::new(config.create_runtime()?));
let primary = RuntimeType::Shared(runtime.clone()); let primary = RuntimeType::Shared(runtime.clone());
let secondary = RuntimeType::External(runtime.handle().clone()); let secondary = RuntimeType::External(runtime.handle().clone());
Runtime::new_with_config(primary, Some(secondary), &config) Runtime::new_with_config(primary, Some(secondary), &config)
...@@ -252,7 +257,7 @@ impl Runtime { ...@@ -252,7 +257,7 @@ impl Runtime {
/// Create a [`Runtime`] with two single-threaded async tokio runtime /// Create a [`Runtime`] with two single-threaded async tokio runtime
pub fn single_threaded() -> anyhow::Result<Runtime> { pub fn single_threaded() -> anyhow::Result<Runtime> {
let config = config::RuntimeConfig::single_threaded(); let config = config::RuntimeConfig::single_threaded();
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); let owned = RuntimeType::Shared(Arc::new(ManuallyDrop::new(config.create_runtime()?)));
Runtime::new(owned, None) Runtime::new(owned, None)
} }
...@@ -324,8 +329,6 @@ impl Runtime { ...@@ -324,8 +329,6 @@ impl Runtime {
"Phase 3: All endpoints ended gracefully. Connections to NATS/ETCD will now be disconnected" "Phase 3: All endpoints ended gracefully. Connections to NATS/ETCD will now be disconnected"
); );
main_token.cancel(); main_token.cancel();
// TODO: We should likely call shutdown_background on tokio rt to stop it cleanly.
}); });
} }
} }
...@@ -339,3 +342,43 @@ impl RuntimeType { ...@@ -339,3 +342,43 @@ impl RuntimeType {
} }
} }
} }
/// Handle dropping a tokio runtime from an async context.
///
/// When used from the Python bindings the runtime will be dropped from (I think) Python's asyncio.
/// Tokio does not allow this and will panic. That panic prevents logging from printing it's last
/// messages, which makes knowing what went wrong very difficult.
///
/// This is the panic:
/// > pyo3_runtime.PanicException: Cannot drop a runtime in a context where blocking is not allowed.
/// > This happens when a runtime is dropped from within an asynchronous context.
///
/// Hence we wrap the runtime in a ManuallyDrop and use tokio's alternative shutdown if we detect
/// that we are inside an async runtime.
impl Drop for RuntimeType {
fn drop(&mut self) {
match self {
RuntimeType::External(_) => {}
RuntimeType::Shared(arc) => {
let Some(md_runtime) = Arc::get_mut(arc) else {
// Only drop if we are the only owner of the shared pointer, meaning
// one strong count and no weak count.
return;
};
if tokio::runtime::Handle::try_current().is_ok() {
// We are inside an async runtime.
let tokio_runtime = unsafe { ManuallyDrop::take(md_runtime) };
tokio_runtime.shutdown_background();
} else {
// We are not inside an async context, dropping the runtime is safe.
//
// We never reach this case. I'm not sure why, something about the interaction
// with pyo3 and Python lifetimes.
//
// Process is gone so doesn't really matter, but TODO now that we realize it.
unsafe { ManuallyDrop::drop(md_runtime) };
}
}
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment