Unverified Commit b6595e24 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(bindings): Provide a binding to clear etcd namespace (#3094)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 6dd33261
......@@ -5,7 +5,7 @@ import argparse
import asyncio
import logging
from dynamo.runtime import DistributedRuntime, EtcdKvCache, dynamo_worker
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
......@@ -13,13 +13,8 @@ configure_dynamo_logging()
@dynamo_worker()
async def clear_namespace(runtime: DistributedRuntime, namespace: str):
etcd_kv_cache = await EtcdKvCache.create(
runtime.etcd_client(),
f"/{namespace}/",
{},
)
await etcd_kv_cache.clear_all()
logging.info(f"Cleared /{namespace} in EtcdKvCache")
await runtime.temp_clear_namespace(f"/{namespace}/")
logging.info(f"Cleared /{namespace}")
if __name__ == "__main__":
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import argparse
import asyncio
import logging
from dynamo.runtime import DistributedRuntime, EtcdKvCache, dynamo_worker
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
......@@ -27,13 +14,8 @@ logger = logging.getLogger(__name__)
@dynamo_worker()
async def clear_namespace(runtime: DistributedRuntime, namespace: str):
etcd_kv_cache = await EtcdKvCache.create(
runtime.etcd_client(),
f"/{namespace}/",
{},
)
await etcd_kv_cache.clear_all()
logger.info(f"Cleared /{namespace} in EtcdKvCache")
await runtime.temp_clear_namespace(f"/{namespace}/")
logger.info(f"Cleared /{namespace}")
if __name__ == "__main__":
......
......@@ -69,7 +69,7 @@ async def graceful_shutdown(runtime):
async def worker(runtime: DistributedRuntime):
config = parse_args()
etcd_client = runtime.etcd_client()
etcd_client = runtime.do_not_use_etcd_client()
await configure_ports_with_etcd(config, etcd_client)
overwrite_args(config)
......
......@@ -42,7 +42,7 @@ class VirtualConnector(PlannerConnector):
def __init__(
self, runtime: DistributedRuntime, dynamo_namespace: str, backend: str
):
etcd_client = runtime.etcd_client()
etcd_client = runtime.do_not_use_etcd_client()
if etcd_client is None:
raise RuntimeError("ETCD client is not initialized")
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
......@@ -38,7 +26,6 @@ async def worker(runtime: DistributedRuntime):
namespace_name = "hello_world"
component_name = "backend"
endpoint_name = "generate"
lease_id = runtime.etcd_client().primary_lease_id()
component = runtime.namespace(namespace_name).component(component_name)
await component.create_service()
......@@ -47,7 +34,7 @@ async def worker(runtime: DistributedRuntime):
endpoint = component.endpoint(endpoint_name)
logger.info(f"Serving endpoint {endpoint_name} on lease {lease_id}")
logger.info(f"Serving endpoint {endpoint_name}")
await endpoint.serve_endpoint(content_generator)
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
......@@ -432,7 +420,7 @@ async def worker(runtime: DistributedRuntime):
args, config = VllmBaseWorker.parse_args()
# vLLM config overwrites
etcd_client = runtime.etcd_client()
etcd_client = runtime.do_not_use_etcd_client()
await configure_ports_with_etcd(config, etcd_client)
overwrite_args(config)
await init(runtime, args, config)
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import signal
......@@ -35,10 +23,6 @@ class RequestHandler:
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
print(
f"Primary lease ID: {runtime.etcd_client().primary_lease_id()}/{runtime.etcd_client().primary_lease_id():#x}"
)
# Set up signal handler for graceful shutdown
loop = asyncio.get_running_loop()
......
......@@ -358,6 +358,15 @@ impl DistributedRuntime {
})
}
/// Remove everything in an etcd namespace.
/// Will be removed once we can clear the MDC automatically.
fn temp_clear_namespace<'p>(&self, py: Python<'p>, name: String) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.temp_clear_namespace(&name).await.map_err(to_pyerr)
})
}
fn namespace(&self, name: String) -> PyResult<Namespace> {
Ok(Namespace {
inner: self.inner.namespace(name).map_err(to_pyerr)?,
......@@ -365,7 +374,7 @@ impl DistributedRuntime {
})
}
fn etcd_client(&self) -> PyResult<Option<EtcdClient>> {
fn do_not_use_etcd_client(&self) -> PyResult<Option<EtcdClient>> {
match self.inner.etcd_client().clone() {
Some(etcd_client) => Ok(Some(EtcdClient { inner: etcd_client })),
None => Ok(None),
......@@ -500,32 +509,6 @@ impl EtcdKvCache {
Ok(())
})
}
fn clear_all<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
// Get all keys with the prefix
let all_keys = inner
.get_all()
.await
.keys()
.cloned()
.collect::<Vec<String>>();
// Delete each key
for key in all_keys {
// Strip the prefix from the key before deleting
if let Some(stripped_key) = key.strip_prefix(&inner.prefix) {
inner.delete(stripped_key).await.map_err(to_pyerr)?;
} else {
inner.delete(&key).await.map_err(to_pyerr)?;
}
}
Ok(())
})
}
}
#[pymethods]
......
......@@ -41,9 +41,10 @@ class DistributedRuntime:
"""
...
def etcd_client(self) -> Optional[EtcdClient]:
def do_not_use_etcd_client(self) -> Optional[EtcdClient]:
"""
Get the `EtcdClient` object. Not available for static workers.
This will be removed soon, do not use it.
"""
...
......@@ -52,6 +53,7 @@ class DistributedRuntime:
Shutdown the runtime by triggering the cancellation token
"""
...
class EtcdClient:
"""
Etcd is used for discovery in the DistributedRuntime
......@@ -172,12 +174,6 @@ class EtcdKvCache:
"""
...
async def clear_all(self) -> None:
"""
Delete all key-value pairs from the cache and etcd.
"""
...
class Namespace:
"""
A namespace is a collection of components
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
......@@ -27,7 +15,7 @@ async def test_simple_put_get():
runtime = DistributedRuntime(loop, False)
# Get etcd client
etcd = runtime.etcd_client()
etcd = runtime.do_not_use_etcd_client()
# Write some key-value pairs
test_keys = {
......
......@@ -327,6 +327,19 @@ impl DistributedRuntime {
}
}
/// Clear everything in etcd under a key.
/// todo: Remove as soon as we auto-delete the MDC.
pub async fn temp_clear_namespace(&self, name: &str) -> anyhow::Result<()> {
let Some(etcd_client) = self.etcd_client() else {
return Ok(()); // no etcd, nothing to clear
};
let kvs = etcd_client.kv_get_prefix(name).await?;
for kv in kvs {
etcd_client.kv_delete(kv.key(), None).await?;
}
Ok(())
}
/// Get all registered hierarchy keys. Private because it is only used for testing.
fn get_registered_hierarchies(&self) -> Vec<String> {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
......
......@@ -235,7 +235,7 @@ async def check_registration_in_etcd(
List of registered KV router entries from etcd
"""
runtime = get_runtime()
etcd = runtime.etcd_client()
etcd = runtime.do_not_use_etcd_client()
# Extract component path from endpoint if provided
prefix = "kv_routers/"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment