Skip to content

Commit f0404e9

Browse files
Merge pull request #1150 from benjchristensen/replay-fix
Fix ReplaySubject Terminal State Race Condition
2 parents cdc0c2f + 4f55f95 commit f0404e9

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

Diff for: rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

+28-15
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import rx.Observer;
2626
import rx.functions.Action0;
2727
import rx.functions.Action1;
28+
import rx.operators.NotificationLite;
2829
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2930

3031
/**
@@ -127,7 +128,7 @@ public void onCompleted() {
127128

128129
@Override
129130
public void call() {
130-
state.history.complete(Notification.<T> createOnCompleted());
131+
state.history.complete();
131132
}
132133
});
133134
if (observers != null) {
@@ -145,7 +146,7 @@ public void onError(final Throwable e) {
145146

146147
@Override
147148
public void call() {
148-
state.history.complete(Notification.<T> createOnError(e));
149+
state.history.complete(e);
149150
}
150151
});
151152
if (observers != null) {
@@ -159,7 +160,7 @@ public void call() {
159160

160161
@Override
161162
public void onNext(T v) {
162-
if (state.history.terminalValue.get() != null) {
163+
if (state.history.terminated) {
163164
return;
164165
}
165166
state.history.next(v);
@@ -200,12 +201,9 @@ private void replayObserver(SubjectObserver<? super T> observer) {
200201

201202
private static <T> int replayObserverFromIndex(History<T> history, Integer l, SubjectObserver<? super T> observer) {
202203
while (l < history.index.get()) {
203-
observer.onNext(history.list.get(l));
204+
history.accept(observer, l);
204205
l++;
205206
}
206-
if (history.terminalValue.get() != null) {
207-
history.terminalValue.get().accept(observer);
208-
}
209207

210208
return l;
211209
}
@@ -217,28 +215,43 @@ private static <T> int replayObserverFromIndex(History<T> history, Integer l, Su
217215
* @param <T>
218216
*/
219217
private static class History<T> {
218+
private final NotificationLite<T> nl = NotificationLite.instance();
220219
private final AtomicInteger index;
221-
private final ArrayList<T> list;
222-
private final AtomicReference<Notification<T>> terminalValue;
220+
private final ArrayList<Object> list;
221+
private boolean terminated;
223222

224223
public History(int initialCapacity) {
225224
index = new AtomicInteger(0);
226-
list = new ArrayList<T>(initialCapacity);
227-
terminalValue = new AtomicReference<Notification<T>>();
225+
list = new ArrayList<Object>(initialCapacity);
228226
}
229227

230228
public boolean next(T n) {
231-
if (terminalValue.get() == null) {
232-
list.add(n);
229+
if (!terminated) {
230+
list.add(nl.next(n));
233231
index.getAndIncrement();
234232
return true;
235233
} else {
236234
return false;
237235
}
238236
}
239237

240-
public void complete(Notification<T> n) {
241-
terminalValue.set(n);
238+
public void accept(Observer<? super T> o, int idx) {
239+
nl.accept(o, list.get(idx));
240+
}
241+
242+
public void complete() {
243+
if (!terminated) {
244+
terminated = true;
245+
list.add(nl.completed());
246+
index.getAndIncrement();
247+
}
248+
}
249+
public void complete(Throwable e) {
250+
if (!terminated) {
251+
terminated = true;
252+
list.add(nl.error(e));
253+
index.getAndIncrement();
254+
}
242255
}
243256
}
244257

Diff for: rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919

20-
import java.util.ArrayList;
21-
import java.util.Collections;
22-
import java.util.List;
20+
import java.util.*;
2321
import java.util.concurrent.CountDownLatch;
2422
import java.util.concurrent.TimeUnit;
2523
import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +30,8 @@
3230
import rx.Subscriber;
3331
import rx.Subscription;
3432
import rx.functions.Action1;
33+
import rx.observers.TestSubscriber;
34+
import rx.schedulers.Schedulers;
3535
import rx.subscriptions.Subscriptions;
3636

3737
public class ReplaySubjectConcurrencyTest {
@@ -303,6 +303,21 @@ public void run() {
303303
}
304304

305305
}
306+
307+
/**
308+
* https://github.com/Netflix/RxJava/issues/1147
309+
*/
310+
@Test
311+
public void testRaceForTerminalState() {
312+
final List<Integer> expected = Arrays.asList(1);
313+
for (int i = 0; i < 100000; i++) {
314+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
315+
Observable.just(1).subscribeOn(Schedulers.computation()).cache().subscribe(ts);
316+
ts.awaitTerminalEvent();
317+
ts.assertReceivedOnNext(expected);
318+
ts.assertTerminalEvent();
319+
}
320+
}
306321

307322
private static class SubjectObserverThread extends Thread {
308323

0 commit comments

Comments
 (0)