Skip to content

add create_priority_memory_object_stream #906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
from ._core._sockets import wait_socket_writable as wait_socket_writable
from ._core._sockets import wait_writable as wait_writable
from ._core._streams import create_memory_object_stream as create_memory_object_stream
from ._core._streams import (
create_priority_memory_object_stream as create_priority_memory_object_stream,
)
from ._core._subprocesses import open_process as open_process
from ._core._subprocesses import run_process as run_process
from ._core._synchronization import CapacityLimiter as CapacityLimiter
Expand Down
25 changes: 25 additions & 0 deletions src/anyio/_core/_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
MemoryObjectReceiveStream,
MemoryObjectSendStream,
MemoryObjectStreamState,
PriorityMemoryObjectStreamState,
)

T_Item = TypeVar("T_Item")
Expand Down Expand Up @@ -50,3 +51,27 @@ def __new__( # type: ignore[misc]

state = MemoryObjectStreamState[T_Item](max_buffer_size)
return (MemoryObjectSendStream(state), MemoryObjectReceiveStream(state))


class create_priority_memory_object_stream(
tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]],
):
"""
Create a memory object stream.

The stream's item type can be annotated like
:func:`create_priority_memory_object_stream[T_Item]`.

:param max_buffer_size: number of items held in the buffer until ``send()`` starts
blocking
"""

def __new__( # type: ignore[misc]
cls, max_buffer_size: float = 0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if max_buffer_size should be allowed to be set to 0 or 1 for create_priority_memory_object_stream, it would behave the same as create_memory_object_stream, but be slower for no purpose.

we could set the default to be math.inf and require it to be more than 2

) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]:
if max_buffer_size != math.inf and not isinstance(max_buffer_size, int):
raise ValueError("max_buffer_size must be either an integer or math.inf")
if max_buffer_size < 0:
raise ValueError("max_buffer_size cannot be negative")
state = PriorityMemoryObjectStreamState[T_Item](max_buffer_size)
return (MemoryObjectSendStream(state), MemoryObjectReceiveStream(state))
45 changes: 40 additions & 5 deletions src/anyio/streams/memory.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import abc
import heapq
import warnings
from collections import OrderedDict, deque
from dataclasses import dataclass, field
from types import TracebackType
from typing import Generic, NamedTuple, TypeVar
from typing import Generic, NamedTuple, Protocol, TypeVar

from .. import (
BrokenResourceError,
Expand All @@ -21,6 +23,26 @@
T_contra = TypeVar("T_contra", contravariant=True)


class Buffer(Protocol, Generic[T_Item]):
def append(self, /, v: T_Item) -> None: ...
def popleft(self) -> T_Item: ...
def __len__(self) -> int: ...


@dataclass(eq=False)
class HeapQ(Buffer[T_Item]):
items: list[T_Item] = field(default_factory=list, init=False)

def append(self, v: T_Item) -> None:
heapq.heappush(self.items, v)

def popleft(self) -> T_Item:
return heapq.heappop(self.items)

def __len__(self) -> int:
return len(self.items)


class MemoryObjectStreamStatistics(NamedTuple):
current_buffer_used: int #: number of items stored in the buffer
#: maximum number of items that can be stored on this stream (or :data:`math.inf`)
Expand All @@ -46,9 +68,8 @@ def __repr__(self) -> str:


@dataclass(eq=False)
class MemoryObjectStreamState(Generic[T_Item]):
class BaseMemoryObjectStreamState(abc.ABC, Generic[T_Item]):
max_buffer_size: float = field()
buffer: deque[T_Item] = field(init=False, default_factory=deque)
open_send_channels: int = field(init=False, default=0)
open_receive_channels: int = field(init=False, default=0)
waiting_receivers: OrderedDict[Event, MemoryObjectItemReceiver[T_Item]] = field(
Expand All @@ -58,6 +79,10 @@ class MemoryObjectStreamState(Generic[T_Item]):
init=False, default_factory=OrderedDict
)

@property
@abc.abstractmethod
def buffer(self) -> Buffer[T_Item]: ...

def statistics(self) -> MemoryObjectStreamStatistics:
return MemoryObjectStreamStatistics(
len(self.buffer),
Expand All @@ -69,9 +94,19 @@ def statistics(self) -> MemoryObjectStreamStatistics:
)


@dataclass(eq=False)
class MemoryObjectStreamState(BaseMemoryObjectStreamState[T_Item]):
buffer: deque[T_Item] = field(init=False, default_factory=deque)


@dataclass(eq=False)
class PriorityMemoryObjectStreamState(BaseMemoryObjectStreamState[T_Item]):
buffer: HeapQ[T_Item] = field(init=False, default_factory=HeapQ)


@dataclass(eq=False)
class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
_state: MemoryObjectStreamState[T_co]
_state: BaseMemoryObjectStreamState[T_co]
_closed: bool = field(init=False, default=False)

def __post_init__(self) -> None:
Expand Down Expand Up @@ -189,7 +224,7 @@ def __del__(self) -> None:

@dataclass(eq=False)
class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
_state: MemoryObjectStreamState[T_contra]
_state: BaseMemoryObjectStreamState[T_contra]
_closed: bool = field(init=False, default=False)

def __post_init__(self) -> None:
Expand Down
Loading