Unverified Commit 7ebbd001 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Remove etcd from python bindings (#3238)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 53f3d2af
...@@ -5,8 +5,7 @@ use futures::StreamExt; ...@@ -5,8 +5,7 @@ use futures::StreamExt;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use pyo3::IntoPyObjectExt; use pyo3::IntoPyObjectExt;
use pyo3::exceptions::PyStopAsyncIteration; use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::types::PyBytes; use pyo3::types::{PyDict, PyString};
use pyo3::types::{PyDict, PyList, PyString};
use pyo3::{exceptions::PyException, prelude::*}; use pyo3::{exceptions::PyException, prelude::*};
use rand::seq::IteratorRandom as _; use rand::seq::IteratorRandom as _;
use rs::pipeline::network::Ingress; use rs::pipeline::network::Ingress;
...@@ -141,7 +140,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -141,7 +140,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Component>()?; m.add_class::<Component>()?;
m.add_class::<Endpoint>()?; m.add_class::<Endpoint>()?;
m.add_class::<Client>()?; m.add_class::<Client>()?;
m.add_class::<EtcdClient>()?;
m.add_class::<AsyncResponseStream>()?; m.add_class::<AsyncResponseStream>()?;
m.add_class::<llm::disagg_router::DisaggregatedRouter>()?; m.add_class::<llm::disagg_router::DisaggregatedRouter>()?;
m.add_class::<llm::entrypoint::EntrypointArgs>()?; m.add_class::<llm::entrypoint::EntrypointArgs>()?;
...@@ -170,7 +168,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -170,7 +168,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<http::HttpError>()?; m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?; m.add_class::<http::HttpAsyncEngine>()?;
m.add_class::<context::Context>()?; m.add_class::<context::Context>()?;
m.add_class::<EtcdKvCache>()?;
m.add_class::<ModelType>()?; m.add_class::<ModelType>()?;
m.add_class::<ModelInput>()?; m.add_class::<ModelInput>()?;
m.add_class::<llm::kv::ForwardPassMetrics>()?; m.add_class::<llm::kv::ForwardPassMetrics>()?;
...@@ -283,12 +280,6 @@ fn register_llm<'p>( ...@@ -283,12 +280,6 @@ fn register_llm<'p>(
}) })
} }
#[pyclass]
#[derive(Clone)]
struct EtcdKvCache {
inner: Arc<rs::transports::etcd::KvCache>,
}
#[pyclass] #[pyclass]
#[derive(Clone)] #[derive(Clone)]
pub struct DistributedRuntime { pub struct DistributedRuntime {
...@@ -303,12 +294,6 @@ impl DistributedRuntime { ...@@ -303,12 +294,6 @@ impl DistributedRuntime {
} }
} }
#[pyclass]
#[derive(Clone)]
struct EtcdClient {
inner: rs::transports::etcd::Client,
}
#[pyclass] #[pyclass]
#[derive(Clone)] #[derive(Clone)]
struct CancellationToken { struct CancellationToken {
...@@ -580,13 +565,6 @@ impl DistributedRuntime { ...@@ -580,13 +565,6 @@ impl DistributedRuntime {
}) })
} }
fn do_not_use_etcd_client(&self) -> PyResult<Option<EtcdClient>> {
match self.inner.etcd_client().clone() {
Some(etcd_client) => Ok(Some(EtcdClient { inner: etcd_client })),
None => Ok(None),
}
}
fn shutdown(&self) { fn shutdown(&self) {
self.inner.runtime().shutdown(); self.inner.runtime().shutdown();
} }
...@@ -623,127 +601,6 @@ fn local_ip() -> Result<IpAddr, local_ip_address::Error> { ...@@ -623,127 +601,6 @@ fn local_ip() -> Result<IpAddr, local_ip_address::Error> {
}) })
} }
#[pymethods]
impl EtcdKvCache {
#[new]
fn py_new(
_etcd_client: &EtcdClient,
_prefix: String,
_initial_values: &Bound<'_, PyDict>,
) -> PyResult<Self> {
// We can't create the KvCache here because it's async, so we'll return an error
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"EtcdKvCache must be created using the 'new' class method",
))
}
#[staticmethod]
#[allow(clippy::new_ret_no_self)]
fn create<'p>(
py: Python<'p>,
etcd_client: &EtcdClient,
prefix: String,
initial_values: &Bound<'p, PyDict>,
) -> PyResult<Bound<'p, PyAny>> {
let client = etcd_client.inner.clone();
// Convert Python dict to Rust HashMap
let mut rust_initial_values = std::collections::HashMap::new();
for (key, value) in initial_values.iter() {
let key_str = key.extract::<String>()?;
// Handle both string and bytes values
let value_bytes = if let Ok(bytes) = value.extract::<Vec<u8>>() {
bytes
} else if let Ok(string) = value.extract::<String>() {
string.into_bytes()
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"Values must be either strings or bytes",
));
};
rust_initial_values.insert(key_str, value_bytes);
}
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let kv_cache = rs::transports::etcd::KvCache::new(client, prefix, rust_initial_values)
.await
.map_err(to_pyerr)?;
Ok(EtcdKvCache {
inner: Arc::new(kv_cache),
})
})
}
fn get<'p>(&self, py: Python<'p>, key: String) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
if let Some(value) = inner.get(&key).await {
match Python::with_gil(|py| {
let py_obj = PyBytes::new(py, &value).into_pyobject(py)?;
Ok(py_obj.unbind().into_any())
}) {
Ok(result) => Ok(result),
Err(e) => Err(e),
}
} else {
Ok(Python::with_gil(|py| py.None()))
}
})
}
fn get_all<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let all_values = inner.get_all().await;
Python::with_gil(|py| {
let dict = PyDict::new(py);
for (key, value) in all_values {
// Strip the prefix from the key
let stripped_key = if let Some(stripped) = key.strip_prefix(&inner.prefix) {
stripped.to_string()
} else {
key
};
dict.set_item(stripped_key, PyBytes::new(py, &value))?;
}
let py_obj = dict.into_pyobject(py)?;
Ok(py_obj.unbind().into_any())
})
})
}
#[pyo3(signature = (key, value, lease_id=None))]
fn put<'p>(
&self,
py: Python<'p>,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.put(&key, value, lease_id).await.map_err(to_pyerr)?;
Ok(())
})
}
fn delete<'p>(&self, py: Python<'p>, key: String) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.delete(&key).await.map_err(to_pyerr)?;
Ok(())
})
}
}
#[pymethods] #[pymethods]
impl CancellationToken { impl CancellationToken {
fn cancel(&self) { fn cancel(&self) {
...@@ -872,103 +729,6 @@ impl Namespace { ...@@ -872,103 +729,6 @@ impl Namespace {
} }
} }
#[pymethods]
impl EtcdClient {
#[pyo3(signature = (key, value, lease_id=None))]
fn kv_create<'p>(
&self,
py: Python<'p>,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.kv_create(&key, value, lease_id)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
#[pyo3(signature = (key, value, lease_id=None))]
fn kv_create_or_validate<'p>(
&self,
py: Python<'p>,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.kv_create_or_validate(key, value, lease_id)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
fn primary_lease_id(&self) -> i64 {
self.inner.lease_id()
}
#[pyo3(signature = (key, value, lease_id=None))]
fn kv_put<'p>(
&self,
py: Python<'p>,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.kv_put(key, value, lease_id)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
fn kv_get_prefix<'p>(&self, py: Python<'p>, prefix: String) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let result = client
.kv_get_prefix(prefix)
.await
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
// Convert Vec<KeyValue> to a list of dictionaries
let py_list = Python::with_gil(|py| {
let list = PyList::empty(py);
for kv in result {
let dict = PyDict::new(py);
dict.set_item("key", String::from_utf8_lossy(kv.key()).to_string())?;
dict.set_item("value", PyBytes::new(py, kv.value()))?;
dict.set_item("create_revision", kv.create_revision())?;
dict.set_item("mod_revision", kv.mod_revision())?;
dict.set_item("version", kv.version())?;
dict.set_item("lease", kv.lease())?;
list.append(dict)?;
}
Ok::<Py<PyList>, PyErr>(list.into())
})?;
Ok(py_list)
})
}
fn revoke_lease<'p>(&self, py: Python<'p>, lease_id: i64) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client.revoke_lease(lease_id).await.map_err(to_pyerr)?;
Ok(())
})
}
}
#[pymethods] #[pymethods]
impl Client { impl Client {
/// Get list of current instances. /// Get list of current instances.
......
...@@ -10,7 +10,6 @@ from typing import ( ...@@ -10,7 +10,6 @@ from typing import (
List, List,
Optional, Optional,
Tuple, Tuple,
Union,
) )
def log_message(level: str, message: str, module: str, file: str, line: int) -> None: def log_message(level: str, message: str, module: str, file: str, line: int) -> None:
...@@ -41,13 +40,6 @@ class DistributedRuntime: ...@@ -41,13 +40,6 @@ class DistributedRuntime:
""" """
... ...
def do_not_use_etcd_client(self) -> Optional[EtcdClient]:
"""
Get the `EtcdClient` object. Not available for static workers.
This will be removed soon, do not use it.
"""
...
def allocate_port_block(self, namespace, port_min, port_max, block_size, context=None) -> List[int]: def allocate_port_block(self, namespace, port_min, port_max, block_size, context=None) -> List[int]:
""" """
Allocate a contiguous block of ports from the specified range and atomically reserve them. Allocate a contiguous block of ports from the specified range and atomically reserve them.
...@@ -61,126 +53,6 @@ class DistributedRuntime: ...@@ -61,126 +53,6 @@ class DistributedRuntime:
""" """
... ...
class EtcdClient:
"""
Etcd is used for discovery in the DistributedRuntime
"""
def primary_lease_id(self) -> int:
"""
return the primary lease id.
"""
...
async def kv_create(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Atomically create a key in etcd, fail if the key already exists.
"""
...
async def kv_create_or_validate(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Atomically create a key if it does not exist, or validate the values are identical if the key exists.
"""
...
async def kv_put(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Put a key-value pair into etcd
"""
...
async def kv_get_prefix(self, prefix: str) -> List[Dict[str, JsonLike]]:
"""
Get all keys with a given prefix
"""
...
async def revoke_lease(self, lease_id: int) -> None:
"""
Revoke a lease
"""
...
class EtcdKvCache:
"""
A cache for key-value pairs stored in etcd.
"""
@staticmethod
async def new(
etcd_client: EtcdClient,
prefix: str,
initial_values: Dict[str, Union[str, bytes]]
) -> "EtcdKvCache":
"""
Create a new EtcdKvCache instance.
Args:
etcd_client: The etcd client to use for operations
prefix: The prefix to use for all keys in this cache.
EtcdKvCache will continuously watch the changes of the keys under this prefix.
initial_values: Initial key-value pairs to populate the cache with
NOTE: if the key already exists, it won't be updated
Returns:
A new EtcdKvCache instance
"""
...
async def get(self, key: str) -> Optional[bytes]:
"""
Get a value from the cache.
Args:
key: The key to retrieve
Returns:
The value as bytes if found, None otherwise
NOTE: this get is cheap because internally there is a cache that holds the latest kv pairs.
To prevent race condition, there is a lock when reading/writing the internal cache.
"""
...
async def get_all(self) -> Dict[str, bytes]:
"""
Get all key-value pairs from the cache.
Returns:
A dictionary of all key-value pairs, with keys stripped of the prefix
(i.e., in the same format as in `initial_values`.keys())
"""
...
async def put(
self,
key: str,
value: bytes,
lease_id: Optional[int] = None
) -> None:
"""
Put a key-value pair into the cache and etcd.
Args:
key: The key to store
value: The value to store
lease_id: Optional lease ID to associate with this key-value pair
"""
...
async def delete(self, key: str) -> None:
"""
Delete a key-value pair from the cache and etcd.
"""
...
class Namespace: class Namespace:
""" """
A namespace is a collection of components A namespace is a collection of components
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio import asyncio
from functools import wraps from functools import wraps
...@@ -28,7 +15,6 @@ from dynamo._core import Component as Component ...@@ -28,7 +15,6 @@ from dynamo._core import Component as Component
from dynamo._core import Context as Context from dynamo._core import Context as Context
from dynamo._core import DistributedRuntime as DistributedRuntime from dynamo._core import DistributedRuntime as DistributedRuntime
from dynamo._core import Endpoint as Endpoint from dynamo._core import Endpoint as Endpoint
from dynamo._core import EtcdKvCache as EtcdKvCache
from dynamo._core import ModelDeploymentCard as ModelDeploymentCard from dynamo._core import ModelDeploymentCard as ModelDeploymentCard
from dynamo._core import OAIChatPreprocessor as OAIChatPreprocessor from dynamo._core import OAIChatPreprocessor as OAIChatPreprocessor
......
...@@ -222,51 +222,6 @@ def get_runtime(): ...@@ -222,51 +222,6 @@ def get_runtime():
return _runtime_instance return _runtime_instance
async def check_registration_in_etcd(
expected_count: int, endpoint: Optional[str] = None
):
"""Check that the expected number of KV routers are registered in etcd.
Args:
expected_count: The number of KV routers expected to be registered
endpoint: The endpoint string to extract component path from (e.g., "dyn://namespace.component.generate")
Returns:
List of registered KV router entries from etcd
"""
runtime = get_runtime()
etcd = runtime.do_not_use_etcd_client()
# Extract component path from endpoint if provided
prefix = "kv_routers/"
if endpoint:
# Parse endpoint format: dyn://namespace.component.endpoint_suffix
# Extract namespace and component, ignoring the endpoint suffix (e.g., "generate")
endpoint_parts = endpoint.replace("dyn://", "").split(".")
if len(endpoint_parts) >= 2:
namespace = endpoint_parts[0]
component = endpoint_parts[1]
component_path = f"{namespace}/{component}"
prefix = f"kv_routers/{component_path}/"
logger.info(
f"Checking for KV routers with component path: {component_path}"
)
# Check for kv_routers in etcd
# The KV router registers itself with key format: kv_routers/{component_path}/{uuid}
kv_routers = await etcd.kv_get_prefix(prefix)
logger.info(
f"Found {len(kv_routers)} KV router(s) registered in etcd under prefix: {prefix}"
)
# Assert we have the expected number of KV routers registered
assert (
len(kv_routers) == expected_count
), f"Expected {expected_count} KV router(s) in etcd, found {len(kv_routers)}"
return kv_routers
async def send_inflight_requests(urls: list, payload: dict, num_requests: int): async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
"""Send multiple requests concurrently, alternating between URLs if multiple provided""" """Send multiple requests concurrently, alternating between URLs if multiple provided"""
...@@ -365,12 +320,6 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers): ...@@ -365,12 +320,6 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
logger.info(f"Successfully completed {NUM_REQUESTS} requests") logger.info(f"Successfully completed {NUM_REQUESTS} requests")
# Check etcd registration - expect 1 KV router
# Use the mockers' endpoint since all mockers share the same component path
asyncio.run(
check_registration_in_etcd(expected_count=1, endpoint=mockers.endpoint)
)
finally: finally:
# Clean up # Clean up
if "kv_router" in locals(): if "kv_router" in locals():
...@@ -432,12 +381,6 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers) ...@@ -432,12 +381,6 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers)
f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers" f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers"
) )
# Check etcd registration - expect 2 KV routers
# Use the mockers' endpoint since all mockers share the same component path
asyncio.run(
check_registration_in_etcd(expected_count=2, endpoint=mockers.endpoint)
)
finally: finally:
# Clean up routers # Clean up routers
for kv_router in kv_routers: for kv_router in kv_routers:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment