Unverified Commit dcfa87be authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

fix: Fix ETCD and NATS starvation under massive request concurrency (#2384)


Signed-off-by: default avatarjthomson04 <jwillthomson19@gmail.com>
parent 3411bda8
...@@ -44,7 +44,6 @@ impl MetricsRegistry for DistributedRuntime { ...@@ -44,7 +44,6 @@ impl MetricsRegistry for DistributedRuntime {
impl DistributedRuntime { impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let secondary = runtime.secondary();
let (etcd_config, nats_config, is_static) = config.dissolve(); let (etcd_config, nats_config, is_static) = config.dissolve();
let runtime_clone = runtime.clone(); let runtime_clone = runtime.clone();
...@@ -52,30 +51,10 @@ impl DistributedRuntime { ...@@ -52,30 +51,10 @@ impl DistributedRuntime {
let etcd_client = if is_static { let etcd_client = if is_static {
None None
} else { } else {
Some( Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?)
secondary
.spawn(async move {
let client = etcd::Client::new(etcd_config.clone(), runtime_clone)
.await
.context(format!(
"Failed to connect to etcd server with config {:?}",
etcd_config
))?;
OK(client)
})
.await??,
)
}; };
let nats_client = secondary let nats_client = nats_config.clone().connect().await?;
.spawn(async move {
let client = nats_config.clone().connect().await.context(format!(
"Failed to connect to NATS server with config {:?}",
nats_config
))?;
anyhow::Ok(client)
})
.await??;
// Start system status server for health and metrics if enabled in configuration // Start system status server for health and metrics if enabled in configuration
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
......
...@@ -21,4 +21,5 @@ ...@@ -21,4 +21,5 @@
pub mod etcd; pub mod etcd;
pub mod nats; pub mod nats;
pub mod tcp; pub mod tcp;
mod utils;
pub mod zmq; pub mod zmq;
...@@ -37,6 +37,8 @@ mod path; ...@@ -37,6 +37,8 @@ mod path;
use lease::*; use lease::*;
pub use path::*; pub use path::*;
use super::utils::build_in_runtime;
//pub use etcd::ConnectOptions as EtcdConnectOptions; //pub use etcd::ConnectOptions as EtcdConnectOptions;
/// ETCD Client /// ETCD Client
...@@ -45,6 +47,7 @@ pub struct Client { ...@@ -45,6 +47,7 @@ pub struct Client {
client: etcd_client::Client, client: etcd_client::Client,
primary_lease: i64, primary_lease: i64,
runtime: Runtime, runtime: Runtime,
rt: Arc<tokio::runtime::Runtime>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
...@@ -101,33 +104,36 @@ impl Client { ...@@ -101,33 +104,36 @@ impl Client {
/// If the lease expires, the [`Runtime`] will be shutdown. /// If the lease expires, the [`Runtime`] will be shutdown.
/// If the [`Runtime`] is shutdown, the lease will be revoked. /// If the [`Runtime`] is shutdown, the lease will be revoked.
pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> { pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
runtime
.secondary()
.spawn(Self::create(config, runtime.clone()))
.await?
}
/// Create a new etcd client and tie the primary [`CancellationToken`] to the primary etcd lease.
async fn create(config: ClientOptions, runtime: Runtime) -> Result<Self> {
let token = runtime.primary_token(); let token = runtime.primary_token();
let client =
etcd_client::Client::connect(config.etcd_url, config.etcd_connect_options).await?;
let lease_id = if config.attach_lease { let ((client, lease_id), rt) = build_in_runtime(
let lease_client = client.lease_client(); async move {
let client =
etcd_client::Client::connect(config.etcd_url, config.etcd_connect_options)
.await?;
let lease = create_lease(lease_client, 10, token) let lease_id = if config.attach_lease {
.await let lease_client = client.lease_client();
.context("creating primary lease")?;
lease.id let lease = create_lease(lease_client, 10, token)
} else { .await
0 .context("creating primary lease")?;
};
lease.id
} else {
0
};
Ok((client, lease_id))
},
1,
)
.await?;
Ok(Client { Ok(Client {
client, client,
primary_lease: lease_id, primary_lease: lease_id,
rt,
runtime, runtime,
}) })
} }
...@@ -155,8 +161,7 @@ impl Client { ...@@ -155,8 +161,7 @@ impl Client {
pub async fn create_lease(&self, ttl: i64) -> Result<Lease> { pub async fn create_lease(&self, ttl: i64) -> Result<Lease> {
let token = self.runtime.child_token(); let token = self.runtime.child_token();
let lease_client = self.client.lease_client(); let lease_client = self.client.lease_client();
self.runtime self.rt
.secondary()
.spawn(create_lease(lease_client, ttl, token)) .spawn(create_lease(lease_client, ttl, token))
.await? .await?
} }
...@@ -164,10 +169,7 @@ impl Client { ...@@ -164,10 +169,7 @@ impl Client {
// Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke // Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> { pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> {
let lease_client = self.client.lease_client(); let lease_client = self.client.lease_client();
self.runtime self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
.secondary()
.spawn(revoke_lease(lease_client, lease_id))
.await?
} }
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> { pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
...@@ -340,7 +342,7 @@ impl Client { ...@@ -340,7 +342,7 @@ impl Client {
let (tx, rx) = mpsc::channel(32); let (tx, rx) = mpsc::channel(32);
self.runtime.secondary().spawn(async move { self.rt.spawn(async move {
for kv in kvs { for kv in kvs {
if tx.send(WatchEvent::Put(kv)).await.is_err() { if tx.send(WatchEvent::Put(kv)).await.is_err() {
// receiver is already closed // receiver is already closed
......
...@@ -35,6 +35,7 @@ use bytes::Bytes; ...@@ -35,6 +35,7 @@ use bytes::Bytes;
use derive_builder::Builder; use derive_builder::Builder;
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File as TokioFile; use tokio::fs::File as TokioFile;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio::time; use tokio::time;
...@@ -44,6 +45,8 @@ use validator::{Validate, ValidationError}; ...@@ -44,6 +45,8 @@ use validator::{Validate, ValidationError};
pub use crate::slug::Slug; pub use crate::slug::Slug;
use tracing as log; use tracing as log;
use super::utils::build_in_runtime;
pub const URL_PREFIX: &str = "nats://"; pub const URL_PREFIX: &str = "nats://";
#[derive(Clone)] #[derive(Clone)]
...@@ -236,7 +239,9 @@ fn validate_nats_server(server: &str) -> Result<(), ValidationError> { ...@@ -236,7 +239,9 @@ fn validate_nats_server(server: &str) -> Result<(), ValidationError> {
} }
} }
#[allow(dead_code)] // TODO(jthomson04): We really shouldn't be hardcoding this.
const NATS_WORKER_THREADS: usize = 4;
impl ClientOptions { impl ClientOptions {
/// Create a new [`ClientOptionsBuilder`] /// Create a new [`ClientOptionsBuilder`]
pub fn builder() -> ClientOptionsBuilder { pub fn builder() -> ClientOptionsBuilder {
...@@ -258,7 +263,17 @@ impl ClientOptions { ...@@ -258,7 +263,17 @@ impl ClientOptions {
} }
}; };
let client = client.connect(self.server).await?; let (client, _) = build_in_runtime(
async move {
client
.connect(self.server)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to NATS: {e}"))
},
NATS_WORKER_THREADS,
)
.await?;
let js_ctx = jetstream::new(client.clone()); let js_ctx = jetstream::new(client.clone());
Ok(Client { client, js_ctx }) Ok(Client { client, js_ctx })
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::{future::Future, sync::Arc};
use anyhow::Result;
pub async fn build_in_runtime<
T: Send + Sync + 'static,
F: Future<Output = Result<T>> + Send + 'static,
>(
f: F,
num_threads: usize,
) -> Result<(T, Arc<tokio::runtime::Runtime>)> {
let (tx, rx) = tokio::sync::oneshot::channel();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.enable_all()
.build()?,
);
let runtime_clone = runtime.clone();
std::thread::spawn(move || {
runtime_clone.block_on(async move {
let result = f.await;
tx.send(result)
.unwrap_or_else(|_| panic!("This should never happen!"));
std::future::pending::<()>().await;
})
});
let result = rx.await??;
Ok((result, runtime))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment