Skip to content

Commit 6ef2530

Browse files
BugFix: unsubscribe was not propagating to parent Observable on merge(Observable<Observable<T>>)
1 parent dc7a3f8 commit 6ef2530

File tree

1 file changed

+76
-4
lines changed

1 file changed

+76
-4
lines changed

Diff for: rxjava-core/src/main/java/rx/operators/OperationMerge.java

+76-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

@@ -36,6 +37,10 @@
3637
import rx.Observable.OnSubscribeFunc;
3738
import rx.Observer;
3839
import rx.Subscription;
40+
import rx.subscriptions.CompositeSubscription;
41+
import rx.subscriptions.Subscriptions;
42+
import rx.util.functions.Action0;
43+
import rx.util.functions.Action1;
3944

4045
/**
4146
* Flattens a list of Observables into one Observable sequence, without any transformation.
@@ -93,6 +98,7 @@ public Subscription onSubscribe(Observer<? super Observable<? extends T>> observ
9398

9499
@Override
95100
public void unsubscribe() {
101+
System.out.println("unsubscribe from merge");
96102
unsubscribed = true;
97103
}
98104

@@ -125,6 +131,7 @@ private MergeObservable(Observable<? extends Observable<? extends T>> sequences)
125131
}
126132

127133
public Subscription onSubscribe(Observer<? super T> actualObserver) {
134+
CompositeSubscription completeSubscription = new CompositeSubscription();
128135

129136
/**
130137
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
@@ -134,15 +141,16 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
134141
* Bug report: https://github.com/Netflix/RxJava/issues/200
135142
*/
136143
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
144+
completeSubscription.add(subscription);
137145
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
138146

139147
/**
140148
* Subscribe to the parent Observable to get to the children Observables
141149
*/
142-
sequences.subscribe(new ParentObserver(synchronizedObserver));
150+
completeSubscription.add(sequences.subscribe(new ParentObserver(synchronizedObserver)));
143151

144152
/* return our subscription to allow unsubscribing */
145-
return subscription;
153+
return completeSubscription;
146154
}
147155

148156
/**
@@ -380,6 +388,70 @@ public void testUnSubscribe() {
380388
verify(stringObserver, never()).onCompleted();
381389
}
382390

391+
@Test
392+
public void testUnSubscribeObservableOfObservables() throws InterruptedException {
393+
394+
final AtomicBoolean unsubscribed = new AtomicBoolean();
395+
final CountDownLatch latch = new CountDownLatch(1);
396+
397+
Observable<Observable<Long>> source = Observable.create(new OnSubscribeFunc<Observable<Long>>() {
398+
399+
@Override
400+
public Subscription onSubscribe(final Observer<? super Observable<Long>> observer) {
401+
// verbose on purpose so I can track the inside of it
402+
final Subscription s = Subscriptions.create(new Action0() {
403+
404+
@Override
405+
public void call() {
406+
System.out.println("*** unsubscribed");
407+
unsubscribed.set(true);
408+
}
409+
410+
});
411+
412+
new Thread(new Runnable() {
413+
414+
@Override
415+
public void run() {
416+
417+
while (!unsubscribed.get()) {
418+
observer.onNext(Observable.from(1L, 2L));
419+
}
420+
System.out.println("Done looping after unsubscribe: " + unsubscribed.get());
421+
observer.onCompleted();
422+
423+
// mark that the thread is finished
424+
latch.countDown();
425+
}
426+
}).start();
427+
428+
return s;
429+
};
430+
431+
});
432+
433+
final AtomicInteger count = new AtomicInteger();
434+
Observable.create(merge(source)).take(6).toBlockingObservable().forEach(new Action1<Long>() {
435+
436+
@Override
437+
public void call(Long v) {
438+
System.out.println("Value: " + v);
439+
int c = count.incrementAndGet();
440+
if (c > 6) {
441+
fail("Should be only 6");
442+
}
443+
444+
}
445+
});
446+
447+
latch.await(1000, TimeUnit.MILLISECONDS);
448+
449+
System.out.println("unsubscribed: " + unsubscribed.get());
450+
451+
assertTrue(unsubscribed.get());
452+
453+
}
454+
383455
@Test
384456
public void testMergeArrayWithThreading() {
385457
final TestASynchronousObservable o1 = new TestASynchronousObservable();
@@ -453,9 +525,9 @@ public void onNext(String v) {
453525
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
454526
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
455527
// onNext is invoked.
456-
528+
457529
Thread.sleep(300);
458-
530+
459531
try { // in try/finally so threads are released via latch countDown even if assertion fails
460532
assertEquals(1, concurrentCounter.get());
461533
} finally {

0 commit comments

Comments
 (0)