Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegated the implementations of Lock and Semaphore to the async backend class #761

Merged
merged 13 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/synchronization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ Example::

run(main)

.. tip:: If the performance of semaphores is critical for you, you could pass
``fast_acquire=True`` to :class:`Semaphore`. This has the effect of skipping the
:func:`~.lowlevel.cancel_shielded_checkpoint` call in :meth:`Semaphore.acquire` if
there is no contention (acquisition succeeds immediately). This could, in some cases,
lead to the task never yielding control back to to the event loop if you use the
semaphore in a loop that does not have other yield points.

Locks
-----

Expand All @@ -92,6 +99,12 @@ Example::

run(main)

.. tip:: If the performance of locks is critical for you, you could pass
``fast_acquire=True`` to :class:`Lock`. This has the effect of skipping the
:func:`~.lowlevel.cancel_shielded_checkpoint` call in :meth:`Lock.acquire` if there
is no contention (acquisition succeeds immediately). This could, in some cases, lead
to the task never yielding control back to to the event loop if use the lock in a
loop that does not have other yield points.

Conditions
----------
Expand Down
5 changes: 5 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

**UNRELEASED**

- Improved the performance of ``anyio.Lock`` and ``anyio.Semaphore`` on asyncio (even up
to 50 %)
- Added the ``fast_acquire`` parameter to ``anyio.Lock`` and ``anyio.Semaphore`` to
further boost performance at the expense of safety (``acquire()`` will not yield
control back if there is no contention)
- Fixed ``__repr__()`` of ``MemoryObjectItemReceiver``, when ``item`` is not defined
(`#767 <https://github.com/agronholm/anyio/pulls/767>`_; PR by @Danipulok)
- Added support for the ``from_uri()``, ``full_match()``, ``parser`` methods/properties
Expand Down
181 changes: 178 additions & 3 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@

import sniffio

from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
from .. import (
CapacityLimiterStatistics,
EventStatistics,
LockStatistics,
TaskInfo,
abc,
)
from .._core._eventloop import claim_worker_thread, threadlocals
from .._core._exceptions import (
BrokenResourceError,
Expand All @@ -70,9 +76,16 @@
)
from .._core._sockets import convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
from .._core._synchronization import (
CapacityLimiter as BaseCapacityLimiter,
)
from .._core._synchronization import Event as BaseEvent
from .._core._synchronization import ResourceGuard
from .._core._synchronization import Lock as BaseLock
from .._core._synchronization import (
ResourceGuard,
SemaphoreStatistics,
)
from .._core._synchronization import Semaphore as BaseSemaphore
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import (
AsyncBackend,
Expand Down Expand Up @@ -1657,6 +1670,154 @@ def statistics(self) -> EventStatistics:
return EventStatistics(len(self._event._waiters))


class Lock(BaseLock):
def __new__(cls, *, fast_acquire: bool = False) -> Lock:
return object.__new__(cls)

def __init__(self, *, fast_acquire: bool = False) -> None:
self._fast_acquire = fast_acquire
self._owner_task: asyncio.Task | None = None
self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()

async def acquire(self) -> None:
if self._owner_task is None and not self._waiters:
await AsyncIOBackend.checkpoint_if_cancelled()
self._owner_task = current_task()

# Unless on the "fast path", yield control of the event loop so that other
# tasks can run too
if not self._fast_acquire:
try:
await AsyncIOBackend.cancel_shielded_checkpoint()
except CancelledError:
self.release()
raise

return

task = cast(asyncio.Task, current_task())
fut: asyncio.Future[None] = asyncio.Future()
item = task, fut
self._waiters.append(item)
try:
await fut
except CancelledError:
self._waiters.remove(item)
if self._owner_task is task:
self.release()

raise

self._waiters.remove(item)

def acquire_nowait(self) -> None:
if self._owner_task is None and not self._waiters:
self._owner_task = current_task()
return

raise WouldBlock

def locked(self) -> bool:
return self._owner_task is not None

def release(self) -> None:
if self._owner_task != current_task():
raise RuntimeError("The current task is not holding this lock")

for task, fut in self._waiters:
if not fut.cancelled():
self._owner_task = task
fut.set_result(None)
return

self._owner_task = None

def statistics(self) -> LockStatistics:
task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
return LockStatistics(self.locked(), task_info, len(self._waiters))


class Semaphore(BaseSemaphore):
def __new__(
cls,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
) -> Semaphore:
return object.__new__(cls)

def __init__(
self,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
):
super().__init__(initial_value, max_value=max_value)
self._value = initial_value
self._max_value = max_value
self._fast_acquire = fast_acquire
self._waiters: deque[asyncio.Future[None]] = deque()

async def acquire(self) -> None:
if self._value > 0 and not self._waiters:
await AsyncIOBackend.checkpoint_if_cancelled()
self._value -= 1

# Unless on the "fast path", yield control of the event loop so that other
# tasks can run too
if not self._fast_acquire:
try:
await AsyncIOBackend.cancel_shielded_checkpoint()
except CancelledError:
self.release()
raise

return

fut: asyncio.Future[None] = asyncio.Future()
self._waiters.append(fut)
try:
await fut
except CancelledError:
try:
self._waiters.remove(fut)
except ValueError:
self.release()

raise

def acquire_nowait(self) -> None:
if self._value == 0:
raise WouldBlock

self._value -= 1

def release(self) -> None:
if self._max_value is not None and self._value == self._max_value:
raise ValueError("semaphore released too many times")

for fut in self._waiters:
if not fut.cancelled():
fut.set_result(None)
self._waiters.remove(fut)
return

self._value += 1

@property
def value(self) -> int:
return self._value

@property
def max_value(self) -> int | None:
return self._max_value

def statistics(self) -> SemaphoreStatistics:
return SemaphoreStatistics(len(self._waiters))


class CapacityLimiter(BaseCapacityLimiter):
_total_tokens: float = 0

Expand Down Expand Up @@ -2107,6 +2268,20 @@ def create_task_group(cls) -> abc.TaskGroup:
def create_event(cls) -> abc.Event:
return Event()

@classmethod
def create_lock(cls, *, fast_acquire: bool) -> abc.Lock:
return Lock(fast_acquire=fast_acquire)

@classmethod
def create_semaphore(
cls,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
) -> abc.Semaphore:
return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)

@classmethod
def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
return CapacityLimiter(total_tokens)
Expand Down
Loading
Loading