Skip to content

Commit fa406d1

Browse files
authored
2.x: Fix refCount() not resetting when cross-canceled (#6629)
* 2.x: Fix refCount() not resetting when cross-canceled * Undo test timeout comment
1 parent a9df239 commit fa406d1

File tree

6 files changed

+137
-20
lines changed

6 files changed

+137
-20
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+30-10
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,42 @@ void cancel(RefConnection rc) {
115115

116116
void terminated(RefConnection rc) {
117117
synchronized (this) {
118-
if (connection != null && connection == rc) {
119-
connection = null;
120-
if (rc.timer != null) {
121-
rc.timer.dispose();
118+
if (source instanceof FlowablePublishClassic) {
119+
if (connection != null && connection == rc) {
120+
connection = null;
121+
clearTimer(rc);
122122
}
123-
}
124-
if (--rc.subscriberCount == 0) {
125-
if (source instanceof Disposable) {
126-
((Disposable)source).dispose();
127-
} else if (source instanceof ResettableConnectable) {
128-
((ResettableConnectable)source).resetIf(rc.get());
123+
124+
if (--rc.subscriberCount == 0) {
125+
reset(rc);
126+
}
127+
} else {
128+
if (connection != null && connection == rc) {
129+
clearTimer(rc);
130+
if (--rc.subscriberCount == 0) {
131+
connection = null;
132+
reset(rc);
133+
}
129134
}
130135
}
131136
}
132137
}
133138

139+
void clearTimer(RefConnection rc) {
140+
if (rc.timer != null) {
141+
rc.timer.dispose();
142+
rc.timer = null;
143+
}
144+
}
145+
146+
void reset(RefConnection rc) {
147+
if (source instanceof Disposable) {
148+
((Disposable)source).dispose();
149+
} else if (source instanceof ResettableConnectable) {
150+
((ResettableConnectable)source).resetIf(rc.get());
151+
}
152+
}
153+
134154
void timeout(RefConnection rc) {
135155
synchronized (this) {
136156
if (rc.subscriberCount == 0 && rc == connection) {

Diff for: src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java

+30-10
Original file line numberDiff line numberDiff line change
@@ -112,22 +112,42 @@ void cancel(RefConnection rc) {
112112

113113
void terminated(RefConnection rc) {
114114
synchronized (this) {
115-
if (connection != null && connection == rc) {
116-
connection = null;
117-
if (rc.timer != null) {
118-
rc.timer.dispose();
115+
if (source instanceof ObservablePublishClassic) {
116+
if (connection != null && connection == rc) {
117+
connection = null;
118+
clearTimer(rc);
119119
}
120-
}
121-
if (--rc.subscriberCount == 0) {
122-
if (source instanceof Disposable) {
123-
((Disposable)source).dispose();
124-
} else if (source instanceof ResettableConnectable) {
125-
((ResettableConnectable)source).resetIf(rc.get());
120+
121+
if (--rc.subscriberCount == 0) {
122+
reset(rc);
123+
}
124+
} else {
125+
if (connection != null && connection == rc) {
126+
clearTimer(rc);
127+
if (--rc.subscriberCount == 0) {
128+
connection = null;
129+
reset(rc);
130+
}
126131
}
127132
}
128133
}
129134
}
130135

136+
void clearTimer(RefConnection rc) {
137+
if (rc.timer != null) {
138+
rc.timer.dispose();
139+
rc.timer = null;
140+
}
141+
}
142+
143+
void reset(RefConnection rc) {
144+
if (source instanceof Disposable) {
145+
((Disposable)source).dispose();
146+
} else if (source instanceof ResettableConnectable) {
147+
((ResettableConnectable)source).resetIf(rc.get());
148+
}
149+
}
150+
131151
void timeout(RefConnection rc) {
132152
synchronized (this) {
133153
if (rc.subscriberCount == 0 && rc == connection) {

Diff for: src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountAltTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -1443,4 +1443,23 @@ public void publishRefCountShallBeThreadSafe() {
14431443
.assertComplete();
14441444
}
14451445
}
1446+
1447+
@Test
1448+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1449+
ReplayProcessor<Integer> rp = ReplayProcessor.create();
1450+
rp.onNext(1);
1451+
rp.onComplete();
1452+
1453+
Flowable<Integer> shared = rp.share();
1454+
1455+
shared
1456+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1457+
.test()
1458+
.assertValueCount(2);
1459+
1460+
shared
1461+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1462+
.test()
1463+
.assertValueCount(2);
1464+
}
14461465
}

Diff for: src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.io.IOException;
@@ -1436,4 +1437,23 @@ public void disconnectBeforeConnect() {
14361437

14371438
flowable.take(1).test().assertResult(2);
14381439
}
1440+
1441+
@Test
1442+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1443+
ReplayProcessor<Integer> rp = ReplayProcessor.create();
1444+
rp.onNext(1);
1445+
rp.onComplete();
1446+
1447+
Flowable<Integer> shared = rp.share();
1448+
1449+
shared
1450+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1451+
.test()
1452+
.assertValueCount(2);
1453+
1454+
shared
1455+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1456+
.test()
1457+
.assertValueCount(2);
1458+
}
14391459
}

Diff for: src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountAltTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -1399,4 +1399,23 @@ public void publishRefCountShallBeThreadSafe() {
13991399
.assertComplete();
14001400
}
14011401
}
1402+
1403+
@Test
1404+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1405+
ReplaySubject<Integer> rs = ReplaySubject.create();
1406+
rs.onNext(1);
1407+
rs.onComplete();
1408+
1409+
Observable<Integer> shared = rs.share();
1410+
1411+
shared
1412+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1413+
.test()
1414+
.assertValueCount(2);
1415+
1416+
shared
1417+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1418+
.test()
1419+
.assertValueCount(2);
1420+
}
14021421
}

Diff for: src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -1380,4 +1380,23 @@ public void disconnectBeforeConnect() {
13801380

13811381
observable.take(1).test().assertResult(2);
13821382
}
1383+
1384+
@Test
1385+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1386+
ReplaySubject<Integer> rs = ReplaySubject.create();
1387+
rs.onNext(1);
1388+
rs.onComplete();
1389+
1390+
Observable<Integer> shared = rs.share();
1391+
1392+
shared
1393+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1394+
.test()
1395+
.assertValueCount(2);
1396+
1397+
shared
1398+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1399+
.test()
1400+
.assertValueCount(2);
1401+
}
13831402
}

0 commit comments

Comments
 (0)