Skip to content

Commit 890c575

Browse files
committed
Use curry-flipping operators
1 parent 054b37a commit 890c575

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2166
-1938
lines changed

reactivex/internal/curry.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import functools
99
from collections.abc import Callable
10-
from typing import Any, Concatenate, TypeVar
10+
from typing import Concatenate, TypeVar
1111

1212
from typing_extensions import ParamSpec
1313

@@ -16,28 +16,6 @@
1616
_B = TypeVar("_B")
1717

1818

19-
def _curry(
20-
args: tuple[Any, ...], arity: int, fun: Callable[..., Any]
21-
) -> Callable[..., Any]:
22-
"""Internal curry implementation.
23-
24-
Args:
25-
args: Accumulated arguments
26-
arity: Number of arguments remaining to curry
27-
fun: The function to curry
28-
29-
Returns:
30-
A curried function wrapper
31-
"""
32-
33-
def wrapper(*args_: Any, **kw: Any) -> Any:
34-
if arity == 1:
35-
return fun(*args, *args_, **kw)
36-
return _curry(args + args_, arity - 1, fun)
37-
38-
return wrapper
39-
40-
4119
def curry_flip(
4220
fun: Callable[Concatenate[_A, _P], _B],
4321
) -> Callable[_P, Callable[[_A], _B]]:

reactivex/operators/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,9 +1810,9 @@ def materialize() -> Callable[[Observable[_T]], Observable[Notification[_T]]]:
18101810
returns an observable sequence containing the materialized
18111811
notification values from the source sequence.
18121812
"""
1813-
from ._materialize import materialize
1813+
from ._materialize import materialize_
18141814

1815-
return materialize()
1815+
return materialize_()
18161816

18171817

18181818
def max(
Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,34 @@
1-
from collections.abc import Callable
21
from typing import TypeVar
32

43
from reactivex import Observable, abc
4+
from reactivex.internal import curry_flip
55

66
_T = TypeVar("_T")
77

88

9-
def as_observable_() -> Callable[[Observable[_T]], Observable[_T]]:
10-
def as_observable(source: Observable[_T]) -> Observable[_T]:
11-
"""Hides the identity of an observable sequence.
9+
@curry_flip
10+
def as_observable_(source: Observable[_T]) -> Observable[_T]:
11+
"""Hides the identity of an observable sequence.
1212
13-
Args:
14-
source: Observable source to hide identity from.
13+
Examples:
14+
>>> res = source.pipe(as_observable())
15+
>>> res = as_observable()(source)
1516
16-
Returns:
17-
An observable sequence that hides the identity of the
18-
source sequence.
19-
"""
17+
Args:
18+
source: Observable source to hide identity from.
2019
21-
def subscribe(
22-
observer: abc.ObserverBase[_T],
23-
scheduler: abc.SchedulerBase | None = None,
24-
) -> abc.DisposableBase:
25-
return source.subscribe(observer, scheduler=scheduler)
20+
Returns:
21+
An observable sequence that hides the identity of the
22+
source sequence.
23+
"""
2624

27-
return Observable(subscribe)
25+
def subscribe(
26+
observer: abc.ObserverBase[_T],
27+
scheduler: abc.SchedulerBase | None = None,
28+
) -> abc.DisposableBase:
29+
return source.subscribe(observer, scheduler=scheduler)
2830

29-
return as_observable
31+
return Observable(subscribe)
3032

3133

3234
__all__ = ["as_observable_"]

reactivex/operators/_buffer.py

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,126 @@
11
from collections.abc import Callable
22
from typing import Any, TypeVar
33

4-
from reactivex import Observable, compose
4+
from reactivex import Observable
55
from reactivex import operators as ops
6+
from reactivex.internal import curry_flip
67

78
_T = TypeVar("_T")
89

910

11+
@curry_flip
1012
def buffer_(
13+
source: Observable[_T],
1114
boundaries: Observable[Any],
12-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
13-
return compose(
15+
) -> Observable[list[_T]]:
16+
"""Buffers elements from the source based on boundary notifications.
17+
18+
Examples:
19+
>>> res = source.pipe(buffer(boundaries))
20+
>>> res = buffer(boundaries)(source)
21+
22+
Args:
23+
source: Source observable to buffer.
24+
boundaries: Observable that triggers buffer emissions.
25+
26+
Returns:
27+
Observable of lists of buffered elements.
28+
"""
29+
return source.pipe(
1430
ops.window(boundaries),
1531
ops.flat_map(ops.to_list()),
1632
)
1733

1834

35+
@curry_flip
1936
def buffer_when_(
37+
source: Observable[_T],
2038
closing_mapper: Callable[[], Observable[Any]],
21-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
22-
return compose(
39+
) -> Observable[list[_T]]:
40+
"""Buffers elements using a closing mapper function.
41+
42+
Examples:
43+
>>> res = source.pipe(buffer_when(lambda: timer(1.0)))
44+
>>> res = buffer_when(lambda: timer(1.0))(source)
45+
46+
Args:
47+
source: Source observable to buffer.
48+
closing_mapper: Function that returns an observable signaling buffer close.
49+
50+
Returns:
51+
Observable of lists of buffered elements.
52+
"""
53+
return source.pipe(
2354
ops.window_when(closing_mapper),
2455
ops.flat_map(ops.to_list()),
2556
)
2657

2758

59+
@curry_flip
2860
def buffer_toggle_(
29-
openings: Observable[Any], closing_mapper: Callable[[Any], Observable[Any]]
30-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
31-
return compose(
61+
source: Observable[_T],
62+
openings: Observable[Any],
63+
closing_mapper: Callable[[Any], Observable[Any]],
64+
) -> Observable[list[_T]]:
65+
"""Buffers elements using opening/closing observables.
66+
67+
Examples:
68+
>>> res = source.pipe(buffer_toggle(opens, lambda x: timer(x)))
69+
>>> res = buffer_toggle(opens, lambda x: timer(x))(source)
70+
71+
Args:
72+
source: Source observable to buffer.
73+
openings: Observable that triggers buffer opening.
74+
closing_mapper: Function to create closing observable.
75+
76+
Returns:
77+
Observable of lists of buffered elements.
78+
"""
79+
return source.pipe(
3280
ops.window_toggle(openings, closing_mapper),
3381
ops.flat_map(ops.to_list()),
3482
)
3583

3684

85+
@curry_flip
3786
def buffer_with_count_(
38-
count: int, skip: int | None = None
39-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
87+
source: Observable[_T],
88+
count: int,
89+
skip: int | None = None,
90+
) -> Observable[list[_T]]:
4091
"""Projects each element of an observable sequence into zero or more
4192
buffers which are produced based on element count information.
4293
4394
Examples:
44-
>>> res = buffer_with_count(10)(xs)
45-
>>> res = buffer_with_count(10, 1)(xs)
95+
>>> res = source.pipe(buffer_with_count(10))
96+
>>> res = buffer_with_count(10)(source)
97+
>>> res = source.pipe(buffer_with_count(10, 1))
4698
4799
Args:
100+
source: Source observable to buffer.
48101
count: Length of each buffer.
49102
skip: [Optional] Number of elements to skip between
50103
creation of consecutive buffers. If not provided, defaults to
51104
the count.
52105
53106
Returns:
54-
A function that takes an observable source and returns an
55-
observable sequence of buffers.
107+
An observable sequence of buffers.
56108
"""
109+
skip_ = skip if skip is not None else count
57110

58-
def buffer_with_count(source: Observable[_T]) -> Observable[list[_T]]:
59-
nonlocal skip
60-
61-
if skip is None:
62-
skip = count
63-
64-
def mapper(value: Observable[_T]) -> Observable[list[_T]]:
65-
return value.pipe(
66-
ops.to_list(),
67-
)
68-
69-
def predicate(value: list[_T]) -> bool:
70-
return len(value) > 0
71-
72-
return source.pipe(
73-
ops.window_with_count(count, skip),
74-
ops.flat_map(mapper),
75-
ops.filter(predicate),
111+
def mapper(value: Observable[_T]) -> Observable[list[_T]]:
112+
return value.pipe(
113+
ops.to_list(),
76114
)
77115

78-
return buffer_with_count
116+
def predicate(value: list[_T]) -> bool:
117+
return len(value) > 0
118+
119+
return source.pipe(
120+
ops.window_with_count(count, skip_),
121+
ops.flat_map(mapper),
122+
ops.filter(predicate),
123+
)
79124

80125

81126
__all__ = ["buffer_", "buffer_with_count_", "buffer_when_", "buffer_toggle_"]

reactivex/operators/_bufferwithtime.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,43 @@
1-
from collections.abc import Callable
21
from typing import TypeVar
32

43
from reactivex import Observable, abc, compose, typing
54
from reactivex import operators as ops
5+
from reactivex.internal import curry_flip
66

77
_T = TypeVar("_T")
88

99

10+
@curry_flip
1011
def buffer_with_time_(
12+
source: Observable[_T],
1113
timespan: typing.RelativeTime,
1214
timeshift: typing.RelativeTime | None = None,
1315
scheduler: abc.SchedulerBase | None = None,
14-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
16+
) -> Observable[list[_T]]:
17+
"""Buffers elements based on timing information.
18+
19+
Examples:
20+
>>> source.pipe(buffer_with_time(1.0))
21+
>>> source.pipe(buffer_with_time(1.0, 0.5))
22+
>>> buffer_with_time(1.0)(source)
23+
24+
Args:
25+
source: Source observable to buffer.
26+
timespan: Length of each buffer.
27+
timeshift: Interval between creation of consecutive buffers.
28+
scheduler: Scheduler to use for timing.
29+
30+
Returns:
31+
An observable sequence of buffers.
32+
"""
1533
if not timeshift:
1634
timeshift = timespan
1735

18-
return compose(
19-
ops.window_with_time(timespan, timeshift, scheduler),
20-
ops.flat_map(ops.to_list()),
36+
return source.pipe(
37+
compose(
38+
ops.window_with_time(timespan, timeshift, scheduler),
39+
ops.flat_map(ops.to_list()),
40+
)
2141
)
2242

2343

reactivex/operators/_bufferwithtimeorcount.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
1-
from collections.abc import Callable
21
from typing import TypeVar
32

43
from reactivex import Observable, abc, compose, typing
54
from reactivex import operators as ops
5+
from reactivex.internal import curry_flip
66

77
_T = TypeVar("_T")
88

99

10+
@curry_flip
1011
def buffer_with_time_or_count_(
12+
source: Observable[_T],
1113
timespan: typing.RelativeTime,
1214
count: int,
1315
scheduler: abc.SchedulerBase | None = None,
14-
) -> Callable[[Observable[_T]], Observable[list[_T]]]:
15-
return compose(
16-
ops.window_with_time_or_count(timespan, count, scheduler),
17-
ops.flat_map(ops.to_iterable()),
16+
) -> Observable[list[_T]]:
17+
"""Buffers elements based on timing and count information.
18+
19+
Examples:
20+
>>> source.pipe(buffer_with_time_or_count(1.0, 10))
21+
>>> buffer_with_time_or_count(1.0, 10)(source)
22+
23+
Args:
24+
source: Source observable to buffer.
25+
timespan: Maximum time length of each buffer.
26+
count: Maximum element count of each buffer.
27+
scheduler: Scheduler to use for timing.
28+
29+
Returns:
30+
An observable sequence of buffers.
31+
"""
32+
return source.pipe(
33+
compose(
34+
ops.window_with_time_or_count(timespan, count, scheduler),
35+
ops.flat_map(ops.to_iterable()),
36+
)
1837
)
1938

2039

0 commit comments

Comments
 (0)