Skip to content

Commit a9df239

Browse files
authored
2.x: Fix blockingIterable hang when force-disposed (#6627)
1 parent 8db3569 commit a9df239

File tree

10 files changed

+85
-14
lines changed

10 files changed

+85
-14
lines changed

Diff for: src/main/java/io/reactivex/Flowable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
* </code></pre>
9090
* <p>
9191
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
92-
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
92+
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
9393
* request amounts via {@link Subscription#request(long)}.
9494
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
9595
* All RxJava operators are implemented with these relaxed rules in mind.
@@ -112,7 +112,7 @@
112112
*
113113
* // could be some blocking operation
114114
* Thread.sleep(1000);
115-
*
115+
*
116116
* // the consumer might have cancelled the flow
117117
* if (emitter.isCancelled() {
118118
* return;
@@ -138,7 +138,7 @@
138138
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
139139
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
140140
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
141-
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
141+
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
142142
* <p>
143143
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
144144
* documentation</a>.

Diff for: src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static final class BlockingFlowableIterator<T>
6262
long produced;
6363

6464
volatile boolean done;
65-
Throwable error;
65+
volatile Throwable error;
6666

6767
BlockingFlowableIterator(int batchSize) {
6868
this.queue = new SpscArrayQueue<T>(batchSize);
@@ -75,6 +75,13 @@ static final class BlockingFlowableIterator<T>
7575
@Override
7676
public boolean hasNext() {
7777
for (;;) {
78+
if (isDisposed()) {
79+
Throwable e = error;
80+
if (e != null) {
81+
throw ExceptionHelper.wrapOrThrow(e);
82+
}
83+
return false;
84+
}
7885
boolean d = done;
7986
boolean empty = queue.isEmpty();
8087
if (d) {
@@ -90,7 +97,7 @@ public boolean hasNext() {
9097
BlockingHelper.verifyNonBlocking();
9198
lock.lock();
9299
try {
93-
while (!done && queue.isEmpty()) {
100+
while (!done && queue.isEmpty() && !isDisposed()) {
94101
condition.await();
95102
}
96103
} catch (InterruptedException ex) {
@@ -175,6 +182,7 @@ public void remove() {
175182
@Override
176183
public void dispose() {
177184
SubscriptionHelper.cancel(this);
185+
signalConsumer();
178186
}
179187

180188
@Override

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ public void onComplete() {
419419
public void cancel() {
420420
SubscriptionHelper.cancel(this);
421421
}
422-
422+
423423
public void request(long n) {
424424
if (fusionMode != QueueSubscription.SYNC) {
425425
get().request(n);

Diff for: src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static final class BlockingObservableIterator<T>
5353
final Condition condition;
5454

5555
volatile boolean done;
56-
Throwable error;
56+
volatile Throwable error;
5757

5858
BlockingObservableIterator(int batchSize) {
5959
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
@@ -64,6 +64,13 @@ static final class BlockingObservableIterator<T>
6464
@Override
6565
public boolean hasNext() {
6666
for (;;) {
67+
if (isDisposed()) {
68+
Throwable e = error;
69+
if (e != null) {
70+
throw ExceptionHelper.wrapOrThrow(e);
71+
}
72+
return false;
73+
}
6774
boolean d = done;
6875
boolean empty = queue.isEmpty();
6976
if (d) {
@@ -80,7 +87,7 @@ public boolean hasNext() {
8087
BlockingHelper.verifyNonBlocking();
8188
lock.lock();
8289
try {
83-
while (!done && queue.isEmpty()) {
90+
while (!done && queue.isEmpty() && !isDisposed()) {
8491
condition.await();
8592
}
8693
} finally {
@@ -146,6 +153,7 @@ public void remove() {
146153
@Override
147154
public void dispose() {
148155
DisposableHelper.dispose(this);
156+
signalConsumer();
149157
}
150158

151159
@Override

Diff for: src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.*;
2122
import org.reactivestreams.*;
2223

2324
import io.reactivex.Flowable;
25+
import io.reactivex.disposables.Disposable;
2426
import io.reactivex.exceptions.*;
2527
import io.reactivex.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator;
2628
import io.reactivex.internal.subscriptions.BooleanSubscription;
29+
import io.reactivex.processors.PublishProcessor;
30+
import io.reactivex.schedulers.Schedulers;
2731

2832
public class BlockingFlowableToIteratorTest {
2933

@@ -185,4 +189,28 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
185189

186190
it.next();
187191
}
192+
193+
@Test(expected = NoSuchElementException.class)
194+
public void disposedIteratorHasNextReturns() {
195+
Iterator<Integer> it = PublishProcessor.<Integer>create()
196+
.blockingIterable().iterator();
197+
((Disposable)it).dispose();
198+
assertFalse(it.hasNext());
199+
it.next();
200+
}
201+
202+
@Test
203+
public void asyncDisposeUnblocks() {
204+
final Iterator<Integer> it = PublishProcessor.<Integer>create()
205+
.blockingIterable().iterator();
206+
207+
Schedulers.single().scheduleDirect(new Runnable() {
208+
@Override
209+
public void run() {
210+
((Disposable)it).dispose();
211+
}
212+
}, 1, TimeUnit.SECONDS);
213+
214+
assertFalse(it.hasNext());
215+
}
188216
}

Diff for: src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -2770,6 +2770,7 @@ public void timedSizeBufferAlreadyCleared() {
27702770
}
27712771

27722772
@Test
2773+
@SuppressWarnings("unchecked")
27732774
public void bufferExactFailingSupplier() {
27742775
Flowable.empty()
27752776
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {

Diff for: src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.reactivex.exceptions.TestException;
2929
import io.reactivex.internal.operators.observable.BlockingObservableNext.NextObserver;
3030
import io.reactivex.plugins.RxJavaPlugins;
31-
import io.reactivex.processors.BehaviorProcessor;
3231
import io.reactivex.schedulers.Schedulers;
3332
import io.reactivex.subjects.*;
3433

@@ -332,9 +331,9 @@ public void testSingleSourceManyIterators() throws InterruptedException {
332331

333332
@Test
334333
public void testSynchronousNext() {
335-
assertEquals(1, BehaviorProcessor.createDefault(1).take(1).blockingSingle().intValue());
336-
assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue());
337-
assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue());
334+
assertEquals(1, BehaviorSubject.createDefault(1).take(1).blockingSingle().intValue());
335+
assertEquals(2, BehaviorSubject.createDefault(2).blockingIterable().iterator().next().intValue());
336+
assertEquals(3, BehaviorSubject.createDefault(3).blockingNext().iterator().next().intValue());
338337
}
339338

340339
@Test

Diff for: src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.*;
2122

2223
import io.reactivex.Observable;
2324
import io.reactivex.ObservableSource;
2425
import io.reactivex.Observer;
25-
import io.reactivex.disposables.Disposables;
26+
import io.reactivex.disposables.*;
2627
import io.reactivex.exceptions.TestException;
2728
import io.reactivex.internal.operators.observable.BlockingObservableIterable.BlockingObservableIterator;
29+
import io.reactivex.schedulers.Schedulers;
30+
import io.reactivex.subjects.PublishSubject;
2831

2932
public class BlockingObservableToIteratorTest {
3033

@@ -119,4 +122,28 @@ public void remove() {
119122
BlockingObservableIterator<Integer> it = new BlockingObservableIterator<Integer>(128);
120123
it.remove();
121124
}
125+
126+
@Test(expected = NoSuchElementException.class)
127+
public void disposedIteratorHasNextReturns() {
128+
Iterator<Integer> it = PublishSubject.<Integer>create()
129+
.blockingIterable().iterator();
130+
((Disposable)it).dispose();
131+
assertFalse(it.hasNext());
132+
it.next();
133+
}
134+
135+
@Test
136+
public void asyncDisposeUnblocks() {
137+
final Iterator<Integer> it = PublishSubject.<Integer>create()
138+
.blockingIterable().iterator();
139+
140+
Schedulers.single().scheduleDirect(new Runnable() {
141+
@Override
142+
public void run() {
143+
((Disposable)it).dispose();
144+
}
145+
}, 1, TimeUnit.SECONDS);
146+
147+
assertFalse(it.hasNext());
148+
}
122149
}

Diff for: src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,7 @@ public ObservableSource<List<Object>> apply(Observable<Object> o)
21372137
}
21382138

21392139
@Test
2140+
@SuppressWarnings("unchecked")
21402141
public void bufferExactFailingSupplier() {
21412142
Observable.empty()
21422143
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {

Diff for: src/test/java/io/reactivex/schedulers/TrampolineSchedulerTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.reactivestreams.Subscriber;
3232

3333
import static org.junit.Assert.assertEquals;
34-
import static org.junit.Assert.assertTrue;
3534

3635
public class TrampolineSchedulerTest extends AbstractSchedulerTests {
3736

0 commit comments

Comments
 (0)