Skip to content

Commit c478097

Browse files
authored
3.x: Fix Observable.flatMap with maxConcurrency hangs (#6946)
* 3.x: Fix Observable.flatMap with maxConcurrency hangs * Verify Flowable
1 parent 14a9525 commit c478097

File tree

3 files changed

+75
-12
lines changed

3 files changed

+75
-12
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMap.java

+27-12
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ void drainLoop() {
318318
if (checkTerminate()) {
319319
return;
320320
}
321+
int innerCompleted = 0;
321322
SimplePlainQueue<U> svq = queue;
322323

323324
if (svq != null) {
@@ -333,9 +334,18 @@ void drainLoop() {
333334
}
334335

335336
child.onNext(o);
337+
innerCompleted++;
336338
}
337339
}
338340

341+
if (innerCompleted != 0) {
342+
if (maxConcurrency != Integer.MAX_VALUE) {
343+
subscribeMore(innerCompleted);
344+
innerCompleted = 0;
345+
}
346+
continue;
347+
}
348+
339349
boolean d = done;
340350
svq = queue;
341351
InnerObserver<?, ?>[] inner = observers.get();
@@ -353,7 +363,6 @@ void drainLoop() {
353363
return;
354364
}
355365

356-
int innerCompleted = 0;
357366
if (n != 0) {
358367
int j = Math.min(n - 1, lastIndex);
359368

@@ -415,27 +424,33 @@ void drainLoop() {
415424

416425
if (innerCompleted != 0) {
417426
if (maxConcurrency != Integer.MAX_VALUE) {
418-
while (innerCompleted-- != 0) {
419-
ObservableSource<? extends U> p;
420-
synchronized (this) {
421-
p = sources.poll();
422-
if (p == null) {
423-
wip--;
424-
continue;
425-
}
426-
}
427-
subscribeInner(p);
428-
}
427+
subscribeMore(innerCompleted);
428+
innerCompleted = 0;
429429
}
430430
continue;
431431
}
432+
432433
missed = addAndGet(-missed);
433434
if (missed == 0) {
434435
break;
435436
}
436437
}
437438
}
438439

440+
void subscribeMore(int innerCompleted) {
441+
while (innerCompleted-- != 0) {
442+
ObservableSource<? extends U> p;
443+
synchronized (this) {
444+
p = sources.poll();
445+
if (p == null) {
446+
wip--;
447+
continue;
448+
}
449+
}
450+
subscribeInner(p);
451+
}
452+
}
453+
439454
boolean checkTerminate() {
440455
if (disposed) {
441456
return true;

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -1478,4 +1478,28 @@ public void innerCompletesAfterOnNextInDrainThenCancels() {
14781478
.requestMore(1)
14791479
.assertValuesOnly(1);
14801480
}
1481+
1482+
@Test(timeout = 5000)
1483+
public void mixedScalarAsync() {
1484+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1485+
Flowable
1486+
.range(0, 20)
1487+
.flatMap(
1488+
integer -> {
1489+
if (integer % 5 != 0) {
1490+
return Flowable
1491+
.just(integer);
1492+
}
1493+
1494+
return Flowable
1495+
.just(-integer)
1496+
.observeOn(Schedulers.computation());
1497+
},
1498+
false,
1499+
1
1500+
)
1501+
.ignoreElements()
1502+
.blockingAwait();
1503+
}
1504+
}
14811505
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -1240,4 +1240,28 @@ public void fusedInnerCrash2() {
12401240

12411241
to.assertFailure(TestException.class, 1, 2);
12421242
}
1243+
1244+
@Test(timeout = 5000)
1245+
public void mixedScalarAsync() {
1246+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1247+
Observable
1248+
.range(0, 20)
1249+
.flatMap(
1250+
integer -> {
1251+
if (integer % 5 != 0) {
1252+
return Observable
1253+
.just(integer);
1254+
}
1255+
1256+
return Observable
1257+
.just(-integer)
1258+
.observeOn(Schedulers.computation());
1259+
},
1260+
false,
1261+
1
1262+
)
1263+
.ignoreElements()
1264+
.blockingAwait();
1265+
}
1266+
}
12431267
}

0 commit comments

Comments
 (0)