Skip to content

Commit 917e11b

Browse files
committed
Better generic types
1 parent 890c575 commit 917e11b

File tree

11 files changed

+150
-117
lines changed

11 files changed

+150
-117
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ classifiers = [
2121
"Programming Language :: Python :: Implementation :: CPython",
2222
"Topic :: Software Development :: Libraries :: Python Modules",
2323
]
24-
dependencies = ["typing-extensions>=4.1.1,<5"]
24+
dependencies = ["typing-extensions>=4.15.0,<5"]
2525

2626
[project.urls]
2727
Homepage = "http://reactivex.io"

reactivex/abc/observable.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from collections.abc import Callable
33
from typing import Generic, TypeVar
44

5+
from typing_extensions import TypeAliasType
6+
57
from .disposable import DisposableBase
68
from .observer import ObserverBase, OnCompleted, OnError, OnNext
79
from .scheduler import SchedulerBase
@@ -41,6 +43,10 @@ def subscribe(
4143
raise NotImplementedError
4244

4345

44-
Subscription = Callable[[ObserverBase[_T_out], SchedulerBase | None], DisposableBase]
46+
Subscription = TypeAliasType(
47+
"Subscription",
48+
Callable[[ObserverBase[_T_out], SchedulerBase | None], DisposableBase],
49+
type_params=(_T_out,),
50+
)
4551

4652
__all__ = ["ObservableBase", "Subscription"]

reactivex/operators/_flatmap.py

Lines changed: 79 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from asyncio import Future
2-
from collections.abc import Callable
32
from typing import Any, TypeVar, Union, cast
43

54
from reactivex import Observable, from_, from_future
65
from reactivex import operators as ops
6+
from reactivex.internal import curry_flip
77
from reactivex.internal.basic import identity
88
from reactivex.typing import Mapper, MapperIndexed
99

@@ -38,95 +38,98 @@ def projection(x: _T1, i: int) -> Observable[Any]:
3838
)
3939

4040

41+
@curry_flip
4142
def flat_map_(
43+
source: Observable[_T1],
4244
mapper: Mapper[_T1, Observable[_T2]] | None = None,
43-
) -> Callable[[Observable[_T1]], Observable[_T2]]:
44-
def flat_map(source: Observable[_T1]) -> Observable[_T2]:
45-
"""One of the Following:
46-
Projects each element of an observable sequence to an observable
47-
sequence and merges the resulting observable sequences into one
48-
observable sequence.
49-
50-
Example:
51-
>>> flat_map(source)
52-
53-
Args:
54-
source: Source observable to flat map.
55-
56-
Returns:
57-
An operator function that takes a source observable and returns
58-
an observable sequence whose elements are the result of invoking
59-
the one-to-many transform function on each element of the
60-
input sequence .
61-
"""
62-
63-
if callable(mapper):
64-
ret = _flat_map_internal(source, mapper=mapper)
65-
else:
66-
ret = _flat_map_internal(source, mapper=lambda _: mapper)
45+
) -> Observable[_T2]:
46+
"""Projects each element of an observable sequence to an observable
47+
sequence and merges the resulting observable sequences into one
48+
observable sequence.
49+
50+
Examples:
51+
>>> source.pipe(flat_map(lambda x: of(x * 2)))
52+
>>> flat_map(lambda x: of(x * 2))(source)
53+
54+
Args:
55+
source: Source observable to flat map.
56+
mapper: Transform function to apply to each element.
6757
68-
return ret
58+
Returns:
59+
An observable sequence whose elements are the result of invoking
60+
the one-to-many transform function on each element of the
61+
input sequence.
62+
"""
6963

70-
return flat_map
64+
if callable(mapper):
65+
ret = _flat_map_internal(source, mapper=mapper)
66+
else:
67+
ret = _flat_map_internal(source, mapper=lambda _: mapper)
7168

69+
return ret
7270

71+
72+
@curry_flip
7373
def flat_map_indexed_(
74+
source: Observable[Any],
7475
mapper_indexed: Any | None = None,
75-
) -> Callable[[Observable[Any]], Observable[Any]]:
76-
def flat_map_indexed(source: Observable[Any]) -> Observable[Any]:
77-
"""One of the Following:
78-
Projects each element of an observable sequence to an observable
79-
sequence and merges the resulting observable sequences into one
80-
observable sequence.
81-
82-
Example:
83-
>>> flat_map_indexed(source)
84-
85-
Args:
86-
source: Source observable to flat map.
87-
88-
Returns:
89-
An observable sequence whose elements are the result of invoking
90-
the one-to-many transform function on each element of the input
91-
sequence.
92-
"""
93-
94-
if callable(mapper_indexed):
95-
ret = _flat_map_internal(source, mapper_indexed=mapper_indexed)
96-
else:
97-
ret = _flat_map_internal(source, mapper=lambda _: mapper_indexed)
98-
return ret
76+
) -> Observable[Any]:
77+
"""Projects each element of an observable sequence to an observable
78+
sequence and merges the resulting observable sequences into one
79+
observable sequence.
9980
100-
return flat_map_indexed
81+
Examples:
82+
>>> source.pipe(flat_map_indexed(lambda x, i: of(x * i)))
83+
>>> flat_map_indexed(lambda x, i: of(x * i))(source)
10184
85+
Args:
86+
source: Source observable to flat map.
87+
mapper_indexed: Transform function with index to apply to each element.
10288
89+
Returns:
90+
An observable sequence whose elements are the result of invoking
91+
the one-to-many transform function on each element of the input
92+
sequence.
93+
"""
94+
95+
if callable(mapper_indexed):
96+
ret = _flat_map_internal(source, mapper_indexed=mapper_indexed)
97+
else:
98+
ret = _flat_map_internal(source, mapper=lambda _: mapper_indexed)
99+
return ret
100+
101+
102+
@curry_flip
103103
def flat_map_latest_(
104+
source: Observable[_T1],
104105
mapper: Mapper[_T1, Union[Observable[_T2], "Future[_T2]"]],
105-
) -> Callable[[Observable[_T1]], Observable[_T2]]:
106-
def flat_map_latest(source: Observable[_T1]) -> Observable[_T2]:
107-
"""Projects each element of an observable sequence into a new
108-
sequence of observable sequences by incorporating the element's
109-
index and then transforms an observable sequence of observable
110-
sequences into an observable sequence producing values only
111-
from the most recent observable sequence.
112-
113-
Args:
114-
source: Source observable to flat map latest.
115-
116-
Returns:
117-
An observable sequence whose elements are the result of
118-
invoking the transform function on each element of source
119-
producing an observable of Observable sequences and that at
120-
any point in time produces the elements of the most recent
121-
inner observable sequence that has been received.
122-
"""
123-
124-
return source.pipe(
125-
ops.map(mapper),
126-
ops.switch_latest(),
127-
)
106+
) -> Observable[_T2]:
107+
"""Projects each element of an observable sequence into a new
108+
sequence of observable sequences by incorporating the element's
109+
index and then transforms an observable sequence of observable
110+
sequences into an observable sequence producing values only
111+
from the most recent observable sequence.
112+
113+
Examples:
114+
>>> source.pipe(flat_map_latest(lambda x: of(x * 2)))
115+
>>> flat_map_latest(lambda x: of(x * 2))(source)
116+
117+
Args:
118+
source: Source observable to flat map latest.
119+
mapper: Transform function to apply to each element.
120+
121+
Returns:
122+
An observable sequence whose elements are the result of
123+
invoking the transform function on each element of source
124+
producing an observable of Observable sequences and that at
125+
any point in time produces the elements of the most recent
126+
inner observable sequence that has been received.
127+
"""
128128

129-
return flat_map_latest
129+
return source.pipe(
130+
ops.map(mapper),
131+
ops.switch_latest(),
132+
)
130133

131134

132135
__all__ = ["flat_map_", "flat_map_latest_", "flat_map_indexed_"]

reactivex/operators/_publish.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,35 @@
1-
from collections.abc import Callable
21
from typing import TypeVar
32

43
from reactivex import ConnectableObservable, Observable, abc, compose
54
from reactivex import operators as ops
5+
from reactivex.internal import curry_flip
66
from reactivex.subject import Subject
77
from reactivex.typing import Mapper
88

99
_TSource = TypeVar("_TSource")
1010
_TResult = TypeVar("_TResult")
1111

1212

13+
@curry_flip
1314
def publish_(
15+
source: Observable[_TSource],
1416
mapper: Mapper[Observable[_TSource], Observable[_TResult]] | None = None,
15-
) -> Callable[
16-
[Observable[_TSource]], Observable[_TResult] | ConnectableObservable[_TSource]
17-
]:
17+
) -> Observable[_TResult] | ConnectableObservable[_TSource]:
1818
"""Returns an observable sequence that is the result of invoking the
1919
mapper on a connectable observable sequence that shares a single
2020
subscription to the underlying sequence. This operator is a
2121
specialization of Multicast using a regular Subject.
2222
23-
Example:
24-
>>> res = publish()
25-
>>> res = publish(lambda x: x)
23+
Examples:
24+
>>> source.pipe(publish())
25+
>>> source.pipe(publish(lambda x: x))
26+
>>> publish()(source)
2627
27-
mapper: [Optional] Selector function which can use the
28-
multicasted source sequence as many times as needed, without causing
29-
multiple subscriptions to the source sequence. Subscribers to the
30-
given source will receive all notifications of the source from the
31-
time of the subscription on.
28+
Args:
29+
source: Source observable to publish.
30+
mapper: [Optional] Selector function which can use the
31+
multicasted source sequence as many times as needed, without
32+
causing multiple subscriptions to the source sequence.
3233
3334
Returns:
3435
An observable sequence that contains the elements of a sequence
@@ -41,14 +42,15 @@ def publish_(
4142
def factory(scheduler: abc.SchedulerBase | None = None) -> Subject[_TSource]:
4243
return Subject()
4344

44-
return ops.multicast(subject_factory=factory, mapper=mapper)
45+
return source.pipe(ops.multicast(subject_factory=factory, mapper=mapper))
4546

4647
subject: Subject[_TSource] = Subject()
47-
return ops.multicast(subject=subject)
48+
return source.pipe(ops.multicast(subject=subject))
4849

4950

50-
def share_() -> Callable[[Observable[_TSource]], Observable[_TSource]]:
51-
"""Share a single subscription among multple observers.
51+
@curry_flip
52+
def share_(source: Observable[_TSource]) -> Observable[_TSource]:
53+
"""Share a single subscription among multiple observers.
5254
5355
Returns a new Observable that multicasts (shares) the original
5456
Observable. As long as there is at least one Subscriber this
@@ -57,10 +59,22 @@ def share_() -> Callable[[Observable[_TSource]], Observable[_TSource]]:
5759
Observable.
5860
5961
This is an alias for a composed publish() and ref_count().
62+
63+
Examples:
64+
>>> source.pipe(share())
65+
>>> share()(source)
66+
67+
Args:
68+
source: Source observable to share.
69+
70+
Returns:
71+
An observable that shares a single subscription.
6072
"""
61-
return compose(
62-
ops.publish(),
63-
ops.ref_count(),
73+
return source.pipe(
74+
compose(
75+
ops.publish(),
76+
ops.ref_count(),
77+
)
6478
)
6579

6680

reactivex/operators/_sample.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def sample_observable(
1313
) -> Observable[_T]:
1414
def subscribe(
1515
observer: abc.ObserverBase[_T], scheduler: abc.SchedulerBase | None = None
16-
):
16+
) -> abc.DisposableBase:
1717
at_end = False
1818
has_value = False
1919
value: _T = cast(_T, None)

reactivex/operators/_sequenceequal.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def sequence_equal(source: Observable[_T]) -> Observable[bool]:
4545
def subscribe(
4646
observer: abc.ObserverBase[bool],
4747
scheduler: abc.SchedulerBase | None = None,
48-
):
48+
) -> abc.DisposableBase:
4949
donel = [False]
5050
doner = [False]
5151
ql: list[_T] = []

reactivex/operators/_singleordefault.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def single_or_default_async(source: Observable[_T]) -> Observable[_T]:
1616
def subscribe(
1717
observer: abc.ObserverBase[_T],
1818
scheduler: abc.SchedulerBase | None = None,
19-
):
19+
) -> abc.DisposableBase:
2020
value = cast(_T, default_value)
2121
seen_value = False
2222

reactivex/operators/_skip.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def skip_(source: Observable[_T], count: int) -> Observable[_T]:
3232
def subscribe(
3333
observer: abc.ObserverBase[_T],
3434
scheduler: abc.SchedulerBase | None = None,
35-
):
35+
) -> abc.DisposableBase:
3636
remaining = count
3737

3838
def on_next(value: _T) -> None:

reactivex/operators/_skiplast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def skip_last_(source: Observable[_T], count: int) -> Observable[_T]:
3333
def subscribe(
3434
observer: abc.ObserverBase[_T],
3535
scheduler: abc.SchedulerBase | None = None,
36-
):
36+
) -> abc.DisposableBase:
3737
q: list[_T] = []
3838

3939
def on_next(value: _T) -> None:

0 commit comments

Comments
 (0)