Unverified Commit 9d643f1e authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix: use primary lease for NixlMetadataStore (#928)

parent 960ee927
...@@ -73,7 +73,14 @@ class NixlMetadataStore: ...@@ -73,7 +73,14 @@ class NixlMetadataStore:
async def put(self, engine_id, metadata: NixlMetadata): async def put(self, engine_id, metadata: NixlMetadata):
serialized_metadata = msgspec.msgpack.encode(metadata) serialized_metadata = msgspec.msgpack.encode(metadata)
key = "/".join([self._key_prefix, engine_id]) key = "/".join([self._key_prefix, engine_id])
await self._client.kv_put(key, serialized_metadata, None) # create with primary lease so that the kv entry will be deleted when the worker shutdowns
try:
# TODO: should we create a series of function in etcd client to use primary lease?
await self._client.kv_create_or_validate(
key, serialized_metadata, self._client.primary_lease_id()
)
except Exception as e:
logger.warning(f"A different metadata exists for engine {engine_id}: {e}")
self._stored.add(engine_id) self._stored.add(engine_id)
async def get(self, engine_id) -> NixlMetadata: async def get(self, engine_id) -> NixlMetadata:
......
...@@ -34,6 +34,9 @@ class RequestHandler: ...@@ -34,6 +34,9 @@ class RequestHandler:
@dynamo_worker(static=False) @dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime): async def worker(runtime: DistributedRuntime):
print(
f"Primary lease ID: {runtime.etcd_client().primary_lease_id()}/{runtime.etcd_client().primary_lease_id():#x}"
)
await init(runtime, "dynamo") await init(runtime, "dynamo")
......
...@@ -481,6 +481,24 @@ impl Namespace { ...@@ -481,6 +481,24 @@ impl Namespace {
#[pymethods] #[pymethods]
impl EtcdClient { impl EtcdClient {
#[pyo3(signature = (key, value, lease_id=None))]
fn kv_create<'p>(
&self,
py: Python<'p>,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> PyResult<Bound<'p, PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.kv_create(key, value, lease_id)
.await
.map_err(to_pyerr)?;
Ok(())
})
}
#[pyo3(signature = (key, value, lease_id=None))] #[pyo3(signature = (key, value, lease_id=None))]
fn kv_create_or_validate<'p>( fn kv_create_or_validate<'p>(
&self, &self,
...@@ -499,6 +517,10 @@ impl EtcdClient { ...@@ -499,6 +517,10 @@ impl EtcdClient {
}) })
} }
fn primary_lease_id(&self) -> i64 {
self.inner.lease_id()
}
#[pyo3(signature = (key, value, lease_id=None))] #[pyo3(signature = (key, value, lease_id=None))]
fn kv_put<'p>( fn kv_put<'p>(
&self, &self,
......
...@@ -79,6 +79,20 @@ class EtcdClient: ...@@ -79,6 +79,20 @@ class EtcdClient:
Etcd is used for discovery in the DistributedRuntime Etcd is used for discovery in the DistributedRuntime
""" """
def primary_lease_id(self) -> int:
"""
return the primary lease id.
"""
...
async def kv_create(
self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None:
"""
Atomically create a key in etcd, fail if the key already exists.
"""
...
async def kv_create_or_validate( async def kv_create_or_validate(
self, key: str, value: bytes, lease_id: Optional[int] = None self, key: str, value: bytes, lease_id: Optional[int] = None
) -> None: ) -> None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment