Skip to content

Commit e1b6cb4

Browse files
authored
3.x: ThrottleWithTimeout+Consumer cleanup (#7511)
1 parent bf8da15 commit e1b6cb4

File tree

4 files changed

+40
-34
lines changed

4 files changed

+40
-34
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/core/Flowable.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -8970,13 +8970,15 @@ public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull
89708970
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
89718971
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
89728972
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
8973-
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
8973+
* @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
8974+
* @since 3.1.6 - Experimental
89748975
*/
89758976
@CheckReturnValue
89768977
@NonNull
89778978
@BackpressureSupport(BackpressureKind.ERROR)
89788979
@SchedulerSupport(SchedulerSupport.CUSTOM)
8979-
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
8980+
@Experimental
8981+
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
89808982
Objects.requireNonNull(unit, "unit is null");
89818983
Objects.requireNonNull(scheduler, "scheduler is null");
89828984
Objects.requireNonNull(onDropped, "onDropped is null");
@@ -17640,7 +17642,7 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
1764017642
/**
1764117643
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
1764217644
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
17643-
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
17645+
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler, Consumer)}).
1764417646
* <p>
1764517647
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
1764617648
* will be emitted by the resulting {@code Flowable}.
@@ -17668,13 +17670,15 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
1766817670
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
1766917671
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
1767017672
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17671-
* @see #debounce(long, TimeUnit, Scheduler)
17673+
* @see #debounce(long, TimeUnit, Scheduler, Consumer)
17674+
* @since 3.1.6 - Experimental
1767217675
*/
1767317676
@CheckReturnValue
1767417677
@BackpressureSupport(BackpressureKind.ERROR)
1767517678
@SchedulerSupport(SchedulerSupport.CUSTOM)
1767617679
@NonNull
17677-
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
17680+
@Experimental
17681+
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
1767817682
return debounce(timeout, unit, scheduler, onDropped);
1767917683
}
1768017684

Diff for: src/main/java/io/reactivex/rxjava3/core/Observable.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -7933,12 +7933,14 @@ public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNu
79337933
* @return the new {@code Observable} instance
79347934
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null}
79357935
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
7936-
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
7936+
* @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
7937+
* @since 3.1.6 - Experimental
79377938
*/
79387939
@CheckReturnValue
79397940
@SchedulerSupport(SchedulerSupport.CUSTOM)
79407941
@NonNull
7941-
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
7942+
@Experimental
7943+
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
79427944
Objects.requireNonNull(unit, "unit is null");
79437945
Objects.requireNonNull(scheduler, "scheduler is null");
79447946
Objects.requireNonNull(onDropped, "onDropped is null");
@@ -14671,12 +14673,14 @@ public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit u
1467114673
* @return the new {@code Observable} instance
1467214674
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
1467314675
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
14674-
* @see #debounce(long, TimeUnit, Scheduler)
14676+
* @see #debounce(long, TimeUnit, Scheduler, Consumer)
14677+
* @since 3.1.6 - Experimental
1467514678
*/
1467614679
@CheckReturnValue
1467714680
@SchedulerSupport(SchedulerSupport.CUSTOM)
1467814681
@NonNull
14679-
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
14682+
@Experimental
14683+
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
1468014684
return debounce(timeout, unit, scheduler, onDropped);
1468114685
}
1468214686

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java

+12-13
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream
3434
final long timeout;
3535
final TimeUnit unit;
3636
final Scheduler scheduler;
37-
final Consumer<T> onDropped;
37+
final Consumer<? super T> onDropped;
3838

39-
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
39+
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
4040
super(source);
4141
this.timeout = timeout;
4242
this.unit = unit;
@@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
5858
final long timeout;
5959
final TimeUnit unit;
6060
final Scheduler.Worker worker;
61-
final Consumer<T> onDropped;
61+
final Consumer<? super T> onDropped;
6262

6363
Subscription upstream;
6464

@@ -68,7 +68,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
6868

6969
boolean done;
7070

71-
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
71+
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<? super T> onDropped) {
7272
this.downstream = actual;
7373
this.timeout = timeout;
7474
this.unit = unit;
@@ -93,14 +93,14 @@ public void onNext(T t) {
9393
long idx = index + 1;
9494
index = idx;
9595

96-
Disposable d = timer;
97-
if (d != null) {
98-
d.dispose();
96+
DebounceEmitter<T> currentEmitter = timer;
97+
if (currentEmitter != null) {
98+
currentEmitter.dispose();
9999
}
100100

101-
if (onDropped != null && timer != null) {
101+
if (onDropped != null && currentEmitter != null) {
102102
try {
103-
onDropped.accept(timer.value);
103+
onDropped.accept(currentEmitter.value);
104104
} catch (Throwable ex) {
105105
Exceptions.throwIfFatal(ex);
106106
upstream.cancel();
@@ -110,10 +110,9 @@ public void onNext(T t) {
110110
}
111111
}
112112

113-
DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
114-
timer = de;
115-
d = worker.schedule(de, timeout, unit);
116-
de.setResource(d);
113+
DebounceEmitter<T> newEmitter = new DebounceEmitter<>(t, idx, this);
114+
timer = newEmitter;
115+
newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
117116
}
118117

119118
@Override

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public final class ObservableDebounceTimed<T> extends AbstractObservableWithUpst
2929
final long timeout;
3030
final TimeUnit unit;
3131
final Scheduler scheduler;
32-
final Consumer<T> onDropped;
32+
final Consumer<? super T> onDropped;
3333

34-
public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
34+
public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
3535
super(source);
3636
this.timeout = timeout;
3737
this.unit = unit;
@@ -51,7 +51,7 @@ static final class DebounceTimedObserver<T>
5151
final long timeout;
5252
final TimeUnit unit;
5353
final Scheduler.Worker worker;
54-
final Consumer<T> onDropped;
54+
final Consumer<? super T> onDropped;
5555

5656
Disposable upstream;
5757

@@ -61,7 +61,7 @@ static final class DebounceTimedObserver<T>
6161

6262
boolean done;
6363

64-
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
64+
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<? super T> onDropped) {
6565
this.downstream = actual;
6666
this.timeout = timeout;
6767
this.unit = unit;
@@ -85,12 +85,12 @@ public void onNext(T t) {
8585
long idx = index + 1;
8686
index = idx;
8787

88-
Disposable d = timer;
89-
if (d != null) {
90-
d.dispose();
88+
DebounceEmitter<T> currentEmitter = timer;
89+
if (currentEmitter != null) {
90+
currentEmitter.dispose();
9191
}
9292

93-
if (onDropped != null && timer != null) {
93+
if (onDropped != null && currentEmitter != null) {
9494
try {
9595
onDropped.accept(timer.value);
9696
} catch (Throwable ex) {
@@ -101,10 +101,9 @@ public void onNext(T t) {
101101
}
102102
}
103103

104-
DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
105-
timer = de;
106-
d = worker.schedule(de, timeout, unit);
107-
de.setResource(d);
104+
DebounceEmitter<T> newEmitter = new DebounceEmitter<>(t, idx, this);
105+
timer = newEmitter;
106+
newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
108107
}
109108

110109
@Override

0 commit comments

Comments
 (0)