Skip to content

Commit 8db3569

Browse files
authored
2.x: Fix switchMap incorrect sync-fusion & error management (#6618)
1 parent 17a8eef commit 8db3569

File tree

4 files changed

+70
-3
lines changed

4 files changed

+70
-3
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ void drain() {
314314
if (r != Long.MAX_VALUE) {
315315
requested.addAndGet(-e);
316316
}
317-
inner.get().request(e);
317+
inner.request(e);
318318
}
319319
}
320320

@@ -398,6 +398,7 @@ public void onError(Throwable t) {
398398
if (index == p.unique && p.error.addThrowable(t)) {
399399
if (!p.delayErrors) {
400400
p.upstream.cancel();
401+
p.done = true;
401402
}
402403
done = true;
403404
p.drain();
@@ -418,5 +419,11 @@ public void onComplete() {
418419
public void cancel() {
419420
SubscriptionHelper.cancel(this);
420421
}
422+
423+
public void request(long n) {
424+
if (fusionMode != QueueSubscription.SYNC) {
425+
get().request(n);
426+
}
427+
}
421428
}
422429
}

Diff for: src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ void innerError(SwitchMapInnerObserver<T, R> inner, Throwable ex) {
314314
if (inner.index == unique && errors.addThrowable(ex)) {
315315
if (!delayErrors) {
316316
upstream.dispose();
317+
done = true;
317318
}
318319
inner.done = true;
319320
drain();

Diff for: src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -1201,4 +1201,32 @@ public Object apply(Integer w) throws Exception {
12011201
.assertNoErrors()
12021202
.assertComplete();
12031203
}
1204+
1205+
@Test
1206+
public void switchMapFusedIterable() {
1207+
Flowable.range(1, 2)
1208+
.switchMap(new Function<Integer, Publisher<Integer>>() {
1209+
@Override
1210+
public Publisher<Integer> apply(Integer v)
1211+
throws Exception {
1212+
return Flowable.fromIterable(Arrays.asList(v * 10));
1213+
}
1214+
})
1215+
.test()
1216+
.assertResult(10, 20);
1217+
}
1218+
1219+
@Test
1220+
public void switchMapHiddenIterable() {
1221+
Flowable.range(1, 2)
1222+
.switchMap(new Function<Integer, Publisher<Integer>>() {
1223+
@Override
1224+
public Publisher<Integer> apply(Integer v)
1225+
throws Exception {
1226+
return Flowable.fromIterable(Arrays.asList(v * 10)).hide();
1227+
}
1228+
})
1229+
.test()
1230+
.assertResult(10, 20);
1231+
}
12041232
}

Diff for: src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

+33-2
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

19-
import java.util.List;
20+
import java.util.*;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.*;
2223

2324
import org.junit.*;
2425
import org.mockito.InOrder;
2526

2627
import io.reactivex.*;
28+
import io.reactivex.Observable;
29+
import io.reactivex.Observer;
2730
import io.reactivex.disposables.*;
2831
import io.reactivex.exceptions.*;
2932
import io.reactivex.functions.*;
@@ -33,7 +36,7 @@
3336
import io.reactivex.observers.TestObserver;
3437
import io.reactivex.plugins.RxJavaPlugins;
3538
import io.reactivex.schedulers.*;
36-
import io.reactivex.subjects.*;
39+
import io.reactivex.subjects.PublishSubject;
3740

3841
public class ObservableSwitchTest {
3942

@@ -1191,4 +1194,32 @@ public Object apply(Integer w) throws Exception {
11911194
.assertNoErrors()
11921195
.assertComplete();
11931196
}
1197+
1198+
@Test
1199+
public void switchMapFusedIterable() {
1200+
Observable.range(1, 2)
1201+
.switchMap(new Function<Integer, Observable<Integer>>() {
1202+
@Override
1203+
public Observable<Integer> apply(Integer v)
1204+
throws Exception {
1205+
return Observable.fromIterable(Arrays.asList(v * 10));
1206+
}
1207+
})
1208+
.test()
1209+
.assertResult(10, 20);
1210+
}
1211+
1212+
@Test
1213+
public void switchMapHiddenIterable() {
1214+
Observable.range(1, 2)
1215+
.switchMap(new Function<Integer, Observable<Integer>>() {
1216+
@Override
1217+
public Observable<Integer> apply(Integer v)
1218+
throws Exception {
1219+
return Observable.fromIterable(Arrays.asList(v * 10)).hide();
1220+
}
1221+
})
1222+
.test()
1223+
.assertResult(10, 20);
1224+
}
11941225
}

0 commit comments

Comments
 (0)