Unverified Commit 8a14f9ed authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Remove DistributedRuntime::etcd_client (#4489)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent d1ce4236
......@@ -14,8 +14,8 @@ use parking_lot::Mutex;
use pyo3::{exceptions::PyException, prelude::*};
use super::to_pyerr;
use dynamo_runtime::CancellationToken;
use dynamo_runtime::transports::etcd::{Client, KvCache};
use dynamo_runtime::transports::etcd::{self, Client, KvCache};
use tokio_util::sync::CancellationToken;
// All three AI's I asked agreed, this is the way
const NONE_SENTINEL: usize = usize::MAX;
......@@ -45,32 +45,39 @@ pub struct VirtualConnectorCoordinator(Arc<InnerConnector>);
impl VirtualConnectorCoordinator {
#[new]
pub fn new(
runtime: super::DistributedRuntime,
drt: super::DistributedRuntime,
dynamo_namespace: &str,
check_interval_secs: usize,
max_wait_time_secs: usize,
max_retries: usize,
) -> Self {
) -> PyResult<Self> {
let check_interval = Duration::from_secs(check_interval_secs as u64);
let max_wait_time = Duration::from_secs(max_wait_time_secs as u64);
// default reads from environment variables
let etcd_config = etcd::ClientOptions::default();
// etcd client construction is async, but async python constructors are not allowed
let etcd_client = drt
.inner
.runtime()
.secondary()
.block_on(
async move { etcd::Client::new(etcd_config, drt.inner.runtime().clone()).await },
)
.map_err(to_pyerr)?;
let c = InnerConnector {
check_interval,
max_wait_time,
max_retries,
namespace: dynamo_namespace.to_string(),
etcd_client: runtime
.inner()
.etcd_client()
.expect("Planner cannot run without etcd / in static mode"),
etcd_client,
kv_cache: Mutex::new(None),
num_prefill_workers: AtomicUsize::new(NONE_SENTINEL),
num_decode_workers: AtomicUsize::new(NONE_SENTINEL),
decision_id: AtomicUsize::new(NONE_SENTINEL),
first_skip_timestamp: AtomicUsize::new(NONE_SENTINEL),
};
Self(Arc::new(c))
Ok(Self(Arc::new(c)))
}
#[pyo3(signature = ())]
......@@ -365,16 +372,24 @@ pub struct VirtualConnectorClient(Arc<InnerClient>);
#[pymethods]
impl VirtualConnectorClient {
#[new]
pub fn new(runtime: super::DistributedRuntime, dynamo_namespace: &str) -> Self {
pub fn new(drt: super::DistributedRuntime, dynamo_namespace: &str) -> PyResult<Self> {
let runtime = drt.inner.runtime();
let cancellation_token = runtime.child_token();
// default reads from environment variables
let etcd_config = etcd::ClientOptions::default();
// etcd client construction is async, but async python constructors are not allowed
let etcd_client = runtime
.secondary()
.block_on(
async move { etcd::Client::new(etcd_config, drt.inner.runtime().clone()).await },
)
.map_err(to_pyerr)?;
let c = InnerClient {
etcd_client: runtime
.inner
.etcd_client()
.expect("Planner cannot run without etcd / in static mode"),
etcd_client,
key: root_key(dynamo_namespace),
cancellation_token: runtime.inner().child_token(),
cancellation_token,
};
Self(Arc::new(c))
Ok(Self(Arc::new(c)))
}
/// Get the current values as a PlannerDecision
......
......@@ -43,8 +43,6 @@ pub struct DistributedRuntime {
// local runtime
runtime: Runtime,
// Unified transport manager
etcd_client: Option<transports::etcd::Client>,
nats_client: Option<transports::nats::Client>,
store: KeyValueStoreManager,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
......@@ -101,17 +99,16 @@ impl DistributedRuntime {
let runtime_clone = runtime.clone();
let (etcd_client, store) = match selected_kv_store {
let store = match selected_kv_store {
KeyValueStoreSelect::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
// The returned error doesn't show because of a dropped runtime error, so
// log it first.
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
let store = KeyValueStoreManager::etcd(etcd_client.clone());
(Some(etcd_client), store)
KeyValueStoreManager::etcd(etcd_client)
}
KeyValueStoreSelect::File(root) => (None, KeyValueStoreManager::file(root)),
KeyValueStoreSelect::Memory => (None, KeyValueStoreManager::memory()),
KeyValueStoreSelect::File(root) => KeyValueStoreManager::file(root),
KeyValueStoreSelect::Memory => KeyValueStoreManager::memory(),
};
let nats_client = match nats_config {
......@@ -176,7 +173,6 @@ impl DistributedRuntime {
let distributed_runtime = Self {
runtime,
etcd_client,
store,
nats_client,
tcp_server: Arc::new(OnceCell::new()),
......@@ -423,15 +419,7 @@ impl DistributedRuntime {
self.system_status_server.get().cloned()
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
//
// Try to use `store()` instead of this. Only use this if you have not been able to migrate
// yet, or if you require etcd-specific features like distributed locking (rare).
pub fn etcd_client(&self) -> Option<etcd::Client> {
self.etcd_client.clone()
}
/// An interface to store things. Will eventually replace `etcd_client`.
/// An interface to store things outside of the process. Usually backed by something like etcd.
/// Currently does key-value, but will grow to include whatever we need to store.
pub fn store(&self) -> &KeyValueStoreManager {
&self.store
......
......@@ -273,8 +273,7 @@ mod concurrent_create_tests {
}
async fn test_concurrent_create(drt: DistributedRuntime) -> Result<(), StoreError> {
let etcd_client = drt.etcd_client().expect("etcd client should be available");
let storage = EtcdStore::new(etcd_client);
let storage = drt.store();
// Create a bucket for testing
let bucket = Arc::new(tokio::sync::Mutex::new(
......
......@@ -764,7 +764,9 @@ mod tests {
let key = "__integration_test_key";
let value = b"test_value";
let client = drt.etcd_client().expect("etcd client should be available");
let client = Client::new(ClientOptions::default(), drt.runtime().clone())
.await
.expect("etcd client should be available");
let lease_id = drt.connection_id();
// Create the key
......@@ -804,8 +806,10 @@ mod tests {
}
async fn test_kv_cache_operations(drt: DistributedRuntime) -> Result<()> {
// Get the client and unwrap it
let client = drt.etcd_client().expect("etcd client should be available");
// Make the client and unwrap it
let client = Client::new(ClientOptions::default(), drt.runtime().clone())
.await
.expect("etcd client should be available");
// Create a unique test prefix to avoid conflicts with other tests
let test_id = uuid::Uuid::new_v4().to_string();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment