Unverified Commit 41b1db69 authored by Lianmin Zheng's avatar Lianmin Zheng Committed by GitHub
Browse files

A better aio rwlock that guarantees the order (#2547)

parent 84967019
...@@ -2,93 +2,99 @@ import asyncio ...@@ -2,93 +2,99 @@ import asyncio
class RWLock: class RWLock:
"""
A Read-Write Lock for asyncio:
- Multiple readers can hold the lock in parallel if no writer holds it.
- A writer has exclusive access.
"""
def __init__(self): def __init__(self):
self._readers = 0 # How many readers currently hold the lock # Protects internal state
self._lock = asyncio.Lock()
# Condition variable used to wait for state changes
self._cond = asyncio.Condition(self._lock)
# Number of readers currently holding the lock
self._readers = 0
# Whether a writer is currently holding the lock
self._writer_active = False self._writer_active = False
self._lock = asyncio.Lock() # Internal mutex to protect state
# Conditions associated with _lock:
self._readers_ok = asyncio.Condition(self._lock) # Notify blocked readers
self._writers_ok = asyncio.Condition(self._lock) # Notify blocked writers
# Expose two async context-manager helpers: # How many writers are queued waiting for a turn
self.reader_lock = self._ReaderLock(self) self._waiting_writers = 0
self.writer_lock = self._WriterLock(self)
async def _acquire_reader(self): @property
def reader_lock(self):
""" """
Wait until there is no active writer. A context manager for acquiring a shared (reader) lock.
Then increment the count of active readers.
Example:
async with rwlock.reader_lock:
# read-only access
""" """
async with self._lock: return _ReaderLock(self)
# If a writer is active, wait until it's done.
while self._writer_active:
await self._readers_ok.wait()
self._readers += 1
async def _release_reader(self): @property
def writer_lock(self):
""" """
Decrement the count of active readers. A context manager for acquiring an exclusive (writer) lock.
If this was the last active reader, wake up a possible waiting writer.
Example:
async with rwlock.writer_lock:
# exclusive access
""" """
return _WriterLock(self)
async def acquire_reader(self):
async with self._lock:
# Wait until there is no active writer or waiting writer
# to ensure fairness.
while self._writer_active or self._waiting_writers > 0:
await self._cond.wait()
self._readers += 1
async def release_reader(self):
async with self._lock: async with self._lock:
self._readers -= 1 self._readers -= 1
# If no more readers, a writer could proceed. # If this was the last reader, wake up anyone waiting
# (potentially a writer or new readers).
if self._readers == 0: if self._readers == 0:
self._writers_ok.notify() self._cond.notify_all()
async def _acquire_writer(self): async def acquire_writer(self):
"""
Wait until there is no active writer and no active readers.
Then mark a writer as active.
"""
async with self._lock: async with self._lock:
while self._writer_active or self._readers > 0: # Increment the count of writers waiting
await self._writers_ok.wait() self._waiting_writers += 1
self._writer_active = True try:
# Wait while either a writer is active or readers are present
async def _release_writer(self): while self._writer_active or self._readers > 0:
""" await self._cond.wait()
Mark the writer as done and notify readers and writers. self._writer_active = True
""" finally:
# Decrement waiting writers only after we've acquired the writer lock
self._waiting_writers -= 1
async def release_writer(self):
async with self._lock: async with self._lock:
self._writer_active = False self._writer_active = False
# Allow any waiting readers to proceed: # Wake up anyone waiting (readers or writers)
self._readers_ok.notify_all() self._cond.notify_all()
# Allow next waiting writer to proceed:
self._writers_ok.notify()
class _ReaderLock:
"""
A simple async context manager that acquires a reader lock
on entering and releases it on exit.
"""
def __init__(self, parent: "RWLock"): class _ReaderLock:
self._parent = parent def __init__(self, rwlock: RWLock):
self._rwlock = rwlock
async def __aenter__(self): async def __aenter__(self):
await self._parent._acquire_reader() await self._rwlock.acquire_reader()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._parent._release_reader() await self._rwlock.release_reader()
class _WriterLock:
"""
A simple async context manager that acquires a writer lock
on entering and releases it on exit.
"""
def __init__(self, parent: "RWLock"): class _WriterLock:
self._parent = parent def __init__(self, rwlock: RWLock):
self._rwlock = rwlock
async def __aenter__(self): async def __aenter__(self):
await self._parent._acquire_writer() await self._rwlock.acquire_writer()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._parent._release_writer() await self._rwlock.release_writer()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment