Commit 26f6008a authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

fix: simplifying the ability to test using from_current (#242)

Enables `#[tokio::test]` via `Runtime::from_current()`

This uses the current handle as both the primary and secondary.
parent ccd153af
...@@ -49,22 +49,19 @@ impl EventPublisher for Namespace { ...@@ -49,22 +49,19 @@ impl EventPublisher for Namespace {
} }
} }
#[cfg(feature = "integration")]
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
// todo - make a distributed runtime fixture // todo - make a distributed runtime fixture
// todo - two options - fully mocked or integration test // todo - two options - fully mocked or integration test
#[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_publish() { async fn test_publish() {
// todo - use rtest - make fixtures let rt = Runtime::from_current().unwrap();
let dtr = DistributedRuntime::from_settings(Runtime::single_threaded().unwrap()) let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
.await
.unwrap();
let ns = dtr.namespace("test".to_string()).unwrap(); let ns = dtr.namespace("test".to_string()).unwrap();
ns.publish("test", &"test".to_string()).await.unwrap(); ns.publish("test", &"test".to_string()).await.unwrap();
rt.shutdown();
} }
} }
...@@ -61,7 +61,7 @@ impl ServiceConfigBuilder { ...@@ -61,7 +61,7 @@ impl ServiceConfigBuilder {
} }
// create service on the secondary runtime // create service on the secondary runtime
let secondary = component.drt.runtime.secondary.clone(); let secondary = component.drt.runtime.secondary();
let builder = component.drt.nats_client.client().service_builder(); let builder = component.drt.nats_client.client().service_builder();
let service = secondary let service = secondary
.spawn(async move { .spawn(async move {
......
...@@ -61,7 +61,7 @@ enum RuntimeType { ...@@ -61,7 +61,7 @@ enum RuntimeType {
pub struct Runtime { pub struct Runtime {
id: Arc<String>, id: Arc<String>,
primary: RuntimeType, primary: RuntimeType,
secondary: Arc<tokio::runtime::Runtime>, secondary: RuntimeType,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
} }
......
...@@ -56,7 +56,7 @@ use tracing_subscriber::{filter::Directive, fmt}; ...@@ -56,7 +56,7 @@ use tracing_subscriber::{filter::Directive, fmt};
const FILTER_ENV: &str = "TRD_LOG"; const FILTER_ENV: &str = "TRD_LOG";
/// Default log level /// Default log level
const DEFAULT_FILTER_LEVEL: &str = "debug"; const DEFAULT_FILTER_LEVEL: &str = "error";
/// ENV used to set the path to the logging configuration file /// ENV used to set the path to the logging configuration file
const CONFIG_PATH_ENV: &str = "TRD_LOGGING_CONFIG_PATH"; const CONFIG_PATH_ENV: &str = "TRD_LOGGING_CONFIG_PATH";
......
...@@ -36,7 +36,7 @@ use tokio::{signal, task::JoinHandle}; ...@@ -36,7 +36,7 @@ use tokio::{signal, task::JoinHandle};
pub use tokio_util::sync::CancellationToken; pub use tokio_util::sync::CancellationToken;
impl Runtime { impl Runtime {
fn new(runtime: RuntimeType) -> Result<Runtime> { fn new(runtime: RuntimeType, secondary: Option<RuntimeType>) -> Result<Runtime> {
// worker id // worker id
let id = Arc::new(uuid::Uuid::new_v4().to_string()); let id = Arc::new(uuid::Uuid::new_v4().to_string());
...@@ -44,19 +44,31 @@ impl Runtime { ...@@ -44,19 +44,31 @@ impl Runtime {
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
// secondary runtime for background ectd/nats tasks // secondary runtime for background ectd/nats tasks
let secondary = RuntimeConfig::single_threaded().create_runtime()?; let secondary = match secondary {
Some(secondary) => secondary,
None => {
RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
}
};
Ok(Runtime { Ok(Runtime {
id, id,
primary: runtime, primary: runtime,
secondary: Arc::new(secondary), secondary,
cancellation_token, cancellation_token,
}) })
} }
pub fn from_current() -> Result<Runtime> {
let handle = tokio::runtime::Handle::current();
let primary = RuntimeType::External(handle.clone());
let secondary = RuntimeType::External(handle);
Runtime::new(primary, Some(secondary))
}
pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> { pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
let runtime = RuntimeType::External(handle); let runtime = RuntimeType::External(handle);
Runtime::new(runtime) Runtime::new(runtime, None)
} }
/// Create a [`Runtime`] instance from the settings /// Create a [`Runtime`] instance from the settings
...@@ -64,14 +76,14 @@ impl Runtime { ...@@ -64,14 +76,14 @@ impl Runtime {
pub fn from_settings() -> Result<Runtime> { pub fn from_settings() -> Result<Runtime> {
let config = config::RuntimeConfig::from_settings()?; let config = config::RuntimeConfig::from_settings()?;
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
Runtime::new(owned) Runtime::new(owned, None)
} }
/// Create a [`Runtime`] with a single-threaded primary async tokio runtime /// Create a [`Runtime`] with a single-threaded primary async tokio runtime
pub fn single_threaded() -> Result<Runtime> { pub fn single_threaded() -> Result<Runtime> {
let config = config::RuntimeConfig::single_threaded(); let config = config::RuntimeConfig::single_threaded();
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
Runtime::new(owned) Runtime::new(owned, None)
} }
/// Returns the unique identifier for the [`Runtime`] /// Returns the unique identifier for the [`Runtime`]
...@@ -85,8 +97,8 @@ impl Runtime { ...@@ -85,8 +97,8 @@ impl Runtime {
} }
/// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool /// Returns a [`tokio::runtime::Handle`] for the secondary/background thread pool
pub fn secondary(&self) -> &Arc<tokio::runtime::Runtime> { pub fn secondary(&self) -> tokio::runtime::Handle {
&self.secondary self.secondary.handle()
} }
/// Access the primary [`CancellationToken`] for the [`Runtime`] /// Access the primary [`CancellationToken`] for the [`Runtime`]
......
...@@ -102,8 +102,9 @@ impl Worker { ...@@ -102,8 +102,9 @@ impl Worker {
Fut: Future<Output = Result<()>> + Send + 'static, Fut: Future<Output = Result<()>> + Send + 'static,
{ {
let runtime = self.runtime; let runtime = self.runtime;
let local_runtime = runtime.clone();
let primary = runtime.primary(); let primary = runtime.primary();
let secondary = runtime.secondary.clone(); let secondary = runtime.secondary();
let timeout = std::env::var(TRD_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) let timeout = std::env::var(TRD_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
.ok() .ok()
...@@ -171,7 +172,9 @@ impl Worker { ...@@ -171,7 +172,9 @@ impl Worker {
.take() .take()
.expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once"); .expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once");
secondary.block_on(task)? secondary.block_on(task)??;
local_runtime.shutdown();
Ok(())
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment