Unverified Commit da83f820 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

perf(runtime): Use all available parallelism (#1858)

parent e1ae0f15
...@@ -62,8 +62,9 @@ fn main() -> anyhow::Result<()> { ...@@ -62,8 +62,9 @@ fn main() -> anyhow::Result<()> {
// max_worker_threads and max_blocking_threads from env vars or config file. // max_worker_threads and max_blocking_threads from env vars or config file.
let rt_config = dynamo_runtime::RuntimeConfig::from_settings()?; let rt_config = dynamo_runtime::RuntimeConfig::from_settings()?;
tracing::debug!("Runtime config: {rt_config}");
// One per process. Wraps a Runtime with holds two tokio runtimes. // One per process. Wraps a Runtime with holds one or two tokio runtimes.
let worker = dynamo_runtime::Worker::from_config(rt_config)?; let worker = dynamo_runtime::Worker::from_config(rt_config)?;
worker.execute(wrapper) worker.execute(wrapper)
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::Result; use super::Result;
use derive_builder::Builder; use derive_builder::Builder;
...@@ -20,6 +8,7 @@ use figment::{ ...@@ -20,6 +8,7 @@ use figment::{
Figment, Figment,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt;
use validator::Validate; use validator::Validate;
/// Default HTTP server host /// Default HTTP server host
...@@ -66,35 +55,58 @@ impl Default for WorkerConfig { ...@@ -66,35 +55,58 @@ impl Default for WorkerConfig {
pub struct RuntimeConfig { pub struct RuntimeConfig {
/// Number of async worker threads /// Number of async worker threads
/// If set to 1, the runtime will run in single-threaded mode /// If set to 1, the runtime will run in single-threaded mode
/// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
/// number of cores.
#[validate(range(min = 1))] #[validate(range(min = 1))]
#[builder(default = "16")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub num_worker_threads: usize, pub num_worker_threads: Option<usize>,
/// Maximum number of blocking threads /// Maximum number of blocking threads
/// Blocking threads are used for blocking operations, this value must be greater than 0. /// Blocking threads are used for blocking operations, this value must be greater than 0.
/// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
/// 512.
#[validate(range(min = 1))] #[validate(range(min = 1))]
#[builder(default = "512")] #[builder(default = "512")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub max_blocking_threads: usize, pub max_blocking_threads: usize,
/// HTTP server host for health and metrics endpoints /// HTTP server host for health and metrics endpoints
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_HOST
#[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")] #[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_host: String, pub http_server_host: String,
/// HTTP server port for health and metrics endpoints /// HTTP server port for health and metrics endpoints
/// If set to 0, the system will assign a random available port /// If set to 0, the system will assign a random available port
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_PORT
#[builder(default = "DEFAULT_HTTP_SERVER_PORT")] #[builder(default = "DEFAULT_HTTP_SERVER_PORT")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_port: u16, pub http_server_port: u16,
/// Health and metrics HTTP server enabled /// Health and metrics HTTP server enabled
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_ENABLED
#[builder(default = "false")] #[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_enabled: bool, pub http_enabled: bool,
} }
impl fmt::Display for RuntimeConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// If None, it defaults to "number of cores", so we indicate that.
match self.num_worker_threads {
Some(val) => write!(f, "num_worker_threads={val}, ")?,
None => write!(f, "num_worker_threads=default (num_cores), ")?,
}
write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
write!(f, "http_server_host={}, ", self.http_server_host)?;
write!(f, "http_server_port={}, ", self.http_server_port)?;
write!(f, "http_enabled={}", self.http_enabled)?;
Ok(())
}
}
impl RuntimeConfig { impl RuntimeConfig {
pub fn builder() -> RuntimeConfigBuilder { pub fn builder() -> RuntimeConfigBuilder {
RuntimeConfigBuilder::default() RuntimeConfigBuilder::default()
...@@ -138,7 +150,7 @@ impl RuntimeConfig { ...@@ -138,7 +150,7 @@ impl RuntimeConfig {
pub fn single_threaded() -> Self { pub fn single_threaded() -> Self {
RuntimeConfig { RuntimeConfig {
num_worker_threads: 1, num_worker_threads: Some(1),
max_blocking_threads: 1, max_blocking_threads: 1,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT, http_server_port: DEFAULT_HTTP_SERVER_PORT,
...@@ -147,20 +159,24 @@ impl RuntimeConfig { ...@@ -147,20 +159,24 @@ impl RuntimeConfig {
} }
/// Create a new default runtime configuration /// Create a new default runtime configuration
pub(crate) fn create_runtime(&self) -> Result<tokio::runtime::Runtime> { pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
Ok(tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.num_worker_threads) .worker_threads(
self.num_worker_threads
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
)
.max_blocking_threads(self.max_blocking_threads) .max_blocking_threads(self.max_blocking_threads)
.enable_all() .enable_all()
.build()?) .build()
} }
} }
impl Default for RuntimeConfig { impl Default for RuntimeConfig {
fn default() -> Self { fn default() -> Self {
let num_cores = std::thread::available_parallelism().unwrap().get();
Self { Self {
num_worker_threads: 16, num_worker_threads: Some(num_cores),
max_blocking_threads: 16, max_blocking_threads: num_cores,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT, http_server_port: DEFAULT_HTTP_SERVER_PORT,
http_enabled: false, http_enabled: false,
...@@ -240,7 +256,7 @@ mod tests { ...@@ -240,7 +256,7 @@ mod tests {
], ],
|| { || {
let config = RuntimeConfig::from_settings()?; let config = RuntimeConfig::from_settings()?;
assert_eq!(config.num_worker_threads, 24); assert_eq!(config.num_worker_threads, Some(24));
assert_eq!(config.max_blocking_threads, 32); assert_eq!(config.max_blocking_threads, 32);
Ok(()) Ok(())
}, },
......
...@@ -47,6 +47,7 @@ impl Runtime { ...@@ -47,6 +47,7 @@ impl Runtime {
let secondary = match secondary { let secondary = match secondary {
Some(secondary) => secondary, Some(secondary) => secondary,
None => { None => {
tracing::debug!("Created secondary runtime with single thread");
RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?)) RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
} }
}; };
...@@ -60,26 +61,26 @@ impl Runtime { ...@@ -60,26 +61,26 @@ impl Runtime {
} }
pub fn from_current() -> Result<Runtime> { pub fn from_current() -> Result<Runtime> {
let handle = tokio::runtime::Handle::current(); Runtime::from_handle(tokio::runtime::Handle::current())
let primary = RuntimeType::External(handle.clone());
let secondary = RuntimeType::External(handle);
Runtime::new(primary, Some(secondary))
} }
pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> { pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
let runtime = RuntimeType::External(handle); let primary = RuntimeType::External(handle.clone());
Runtime::new(runtime, None) let secondary = RuntimeType::External(handle);
Runtime::new(primary, Some(secondary))
} }
/// Create a [`Runtime`] instance from the settings /// Create a [`Runtime`] instance from the settings
/// See [`config::RuntimeConfig::from_settings`] /// See [`config::RuntimeConfig::from_settings`]
pub fn from_settings() -> Result<Runtime> { pub fn from_settings() -> Result<Runtime> {
let config = config::RuntimeConfig::from_settings()?; let config = config::RuntimeConfig::from_settings()?;
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); let runtime = Arc::new(config.create_runtime()?);
Runtime::new(owned, None) let primary = RuntimeType::Shared(runtime.clone());
let secondary = RuntimeType::External(runtime.handle().clone());
Runtime::new(primary, Some(secondary))
} }
/// Create a [`Runtime`] with a single-threaded primary async tokio runtime /// Create a [`Runtime`] with two single-threaded async tokio runtime
pub fn single_threaded() -> Result<Runtime> { pub fn single_threaded() -> Result<Runtime> {
let config = config::RuntimeConfig::single_threaded(); let config = config::RuntimeConfig::single_threaded();
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment