Skip to content

Commit f504c65

Browse files
Merge pull request #437 from benjchristensen/bug-fixes
Fixes: Scheduler and Merge
2 parents 0b3a6f5 + 6ef2530 commit f504c65

File tree

3 files changed

+158
-26
lines changed

3 files changed

+158
-26
lines changed

Diff for: rxjava-core/src/main/java/rx/Scheduler.java

+60-22
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import org.mockito.Mockito;
2828

2929
import rx.concurrency.TestScheduler;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.MultipleAssignmentSubscription;
3032
import rx.subscriptions.Subscriptions;
3133
import rx.util.functions.Action0;
34+
import rx.util.functions.Action1;
3235
import rx.util.functions.Func1;
3336
import rx.util.functions.Func2;
3437

@@ -83,23 +86,23 @@ public abstract class Scheduler {
8386
* Schedules a cancelable action to be executed periodically.
8487
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
8588
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
86-
*
87-
* @param state
89+
*
90+
* @param state
8891
* State to pass into the action.
89-
* @param action
92+
* @param action
9093
* The action to execute periodically.
91-
* @param initialDelay
94+
* @param initialDelay
9295
* Time to wait before executing the action for the first time.
93-
* @param period
96+
* @param period
9497
* The time interval to wait each time in between executing the action.
95-
* @param unit
98+
* @param unit
9699
* The time unit the interval above is given in.
97100
* @return A subscription to be able to unsubscribe from action.
98101
*/
99102
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
100103
final long periodInNanos = unit.toNanos(period);
101104
final AtomicBoolean complete = new AtomicBoolean();
102-
105+
103106
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
104107
@Override
105108
public Subscription call(Scheduler scheduler, T state0) {
@@ -128,7 +131,7 @@ public void call() {
128131
}
129132
});
130133
}
131-
134+
132135
/**
133136
* Schedules a cancelable action to be executed at dueTime.
134137
*
@@ -150,6 +153,40 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
150153
}
151154
}
152155

156+
/**
157+
* Schedules an action and receives back an action for recursive execution.
158+
*
159+
* @param action
160+
* action
161+
* @return a subscription to be able to unsubscribe from action.
162+
*/
163+
public Subscription schedule(final Action1<Action0> action) {
164+
final CompositeSubscription parentSubscription = new CompositeSubscription();
165+
final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
166+
parentSubscription.add(childSubscription);
167+
168+
final Func2<Scheduler, Func2, Subscription> parentAction = new Func2<Scheduler, Func2, Subscription>() {
169+
170+
@Override
171+
public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
172+
action.call(new Action0() {
173+
174+
@Override
175+
public void call() {
176+
if (!parentSubscription.isUnsubscribed()) {
177+
childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction));
178+
}
179+
}
180+
181+
});
182+
return childSubscription;
183+
}
184+
};
185+
186+
parentSubscription.add(schedule(parentAction, parentAction));
187+
188+
return parentSubscription;
189+
}
153190

154191
/**
155192
* Schedules an action to be executed.
@@ -187,17 +224,16 @@ public Subscription call(Scheduler scheduler, Void state) {
187224
}, delayTime, unit);
188225
}
189226

190-
191227
/**
192228
* Schedules an action to be executed periodically.
193229
*
194-
* @param action
230+
* @param action
195231
* The action to execute periodically.
196-
* @param initialDelay
232+
* @param initialDelay
197233
* Time to wait before executing the action for the first time.
198-
* @param period
234+
* @param period
199235
* The time interval to wait each time in between executing the action.
200-
* @param unit
236+
* @param unit
201237
* The time unit the interval above is given in.
202238
* @return A subscription to be able to unsubscribe from action.
203239
*/
@@ -230,39 +266,41 @@ public int degreeOfParallelism() {
230266
}
231267

232268
public static class UnitTest {
233-
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
269+
@SuppressWarnings("unchecked")
270+
// mocking is unchecked, unfortunately
234271
@Test
235272
public void testPeriodicScheduling() {
236273
final Func1<Long, Void> calledOp = mock(Func1.class);
237-
274+
238275
final TestScheduler scheduler = new TestScheduler();
239276
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
240-
@Override public void call() {
277+
@Override
278+
public void call() {
241279
System.out.println(scheduler.now());
242280
calledOp.call(scheduler.now());
243281
}
244282
}, 1, 2, TimeUnit.SECONDS);
245-
283+
246284
verify(calledOp, never()).call(anyLong());
247285

248286
InOrder inOrder = Mockito.inOrder(calledOp);
249-
287+
250288
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
251289
inOrder.verify(calledOp, never()).call(anyLong());
252290

253291
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
254292
inOrder.verify(calledOp, times(1)).call(1000L);
255-
293+
256294
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
257295
inOrder.verify(calledOp, never()).call(3000L);
258-
296+
259297
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
260298
inOrder.verify(calledOp, times(1)).call(3000L);
261-
299+
262300
scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
263301
inOrder.verify(calledOp, times(1)).call(5000L);
264302
inOrder.verify(calledOp, times(1)).call(7000L);
265-
303+
266304
subscription.unsubscribe();
267305
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
268306
inOrder.verify(calledOp, never()).call(anyLong());

Diff for: rxjava-core/src/main/java/rx/operators/OperationMerge.java

+76-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

@@ -36,6 +37,10 @@
3637
import rx.Observable.OnSubscribeFunc;
3738
import rx.Observer;
3839
import rx.Subscription;
40+
import rx.subscriptions.CompositeSubscription;
41+
import rx.subscriptions.Subscriptions;
42+
import rx.util.functions.Action0;
43+
import rx.util.functions.Action1;
3944

4045
/**
4146
* Flattens a list of Observables into one Observable sequence, without any transformation.
@@ -93,6 +98,7 @@ public Subscription onSubscribe(Observer<? super Observable<? extends T>> observ
9398

9499
@Override
95100
public void unsubscribe() {
101+
System.out.println("unsubscribe from merge");
96102
unsubscribed = true;
97103
}
98104

@@ -125,6 +131,7 @@ private MergeObservable(Observable<? extends Observable<? extends T>> sequences)
125131
}
126132

127133
public Subscription onSubscribe(Observer<? super T> actualObserver) {
134+
CompositeSubscription completeSubscription = new CompositeSubscription();
128135

129136
/**
130137
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
@@ -134,15 +141,16 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
134141
* Bug report: https://github.com/Netflix/RxJava/issues/200
135142
*/
136143
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
144+
completeSubscription.add(subscription);
137145
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
138146

139147
/**
140148
* Subscribe to the parent Observable to get to the children Observables
141149
*/
142-
sequences.subscribe(new ParentObserver(synchronizedObserver));
150+
completeSubscription.add(sequences.subscribe(new ParentObserver(synchronizedObserver)));
143151

144152
/* return our subscription to allow unsubscribing */
145-
return subscription;
153+
return completeSubscription;
146154
}
147155

148156
/**
@@ -380,6 +388,70 @@ public void testUnSubscribe() {
380388
verify(stringObserver, never()).onCompleted();
381389
}
382390

391+
@Test
392+
public void testUnSubscribeObservableOfObservables() throws InterruptedException {
393+
394+
final AtomicBoolean unsubscribed = new AtomicBoolean();
395+
final CountDownLatch latch = new CountDownLatch(1);
396+
397+
Observable<Observable<Long>> source = Observable.create(new OnSubscribeFunc<Observable<Long>>() {
398+
399+
@Override
400+
public Subscription onSubscribe(final Observer<? super Observable<Long>> observer) {
401+
// verbose on purpose so I can track the inside of it
402+
final Subscription s = Subscriptions.create(new Action0() {
403+
404+
@Override
405+
public void call() {
406+
System.out.println("*** unsubscribed");
407+
unsubscribed.set(true);
408+
}
409+
410+
});
411+
412+
new Thread(new Runnable() {
413+
414+
@Override
415+
public void run() {
416+
417+
while (!unsubscribed.get()) {
418+
observer.onNext(Observable.from(1L, 2L));
419+
}
420+
System.out.println("Done looping after unsubscribe: " + unsubscribed.get());
421+
observer.onCompleted();
422+
423+
// mark that the thread is finished
424+
latch.countDown();
425+
}
426+
}).start();
427+
428+
return s;
429+
};
430+
431+
});
432+
433+
final AtomicInteger count = new AtomicInteger();
434+
Observable.create(merge(source)).take(6).toBlockingObservable().forEach(new Action1<Long>() {
435+
436+
@Override
437+
public void call(Long v) {
438+
System.out.println("Value: " + v);
439+
int c = count.incrementAndGet();
440+
if (c > 6) {
441+
fail("Should be only 6");
442+
}
443+
444+
}
445+
});
446+
447+
latch.await(1000, TimeUnit.MILLISECONDS);
448+
449+
System.out.println("unsubscribed: " + unsubscribed.get());
450+
451+
assertTrue(unsubscribed.get());
452+
453+
}
454+
383455
@Test
384456
public void testMergeArrayWithThreading() {
385457
final TestASynchronousObservable o1 = new TestASynchronousObservable();
@@ -453,9 +525,9 @@ public void onNext(String v) {
453525
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
454526
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
455527
// onNext is invoked.
456-
528+
457529
Thread.sleep(300);
458-
530+
459531
try { // in try/finally so threads are released via latch countDown even if assertion fails
460532
assertEquals(1, concurrentCounter.get());
461533
} finally {

Diff for: rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

+22
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import rx.Subscription;
3434
import rx.subscriptions.BooleanSubscription;
3535
import rx.subscriptions.Subscriptions;
36+
import rx.util.functions.Action0;
3637
import rx.util.functions.Action1;
3738
import rx.util.functions.Func1;
3839
import rx.util.functions.Func2;
@@ -473,6 +474,27 @@ public Subscription onSubscribe(final Observer<? super String> observer) {
473474
fail("Error: " + observer.error.get().getMessage());
474475
}
475476
}
477+
478+
@Test
479+
public void testRecursion() {
480+
TestScheduler s = new TestScheduler();
481+
482+
final AtomicInteger counter = new AtomicInteger(0);
483+
484+
Subscription subscription = s.schedule(new Action1<Action0>() {
485+
486+
@Override
487+
public void call(Action0 self) {
488+
counter.incrementAndGet();
489+
System.out.println("counter: " + counter.get());
490+
self.call();
491+
}
492+
493+
});
494+
subscription.unsubscribe();
495+
assertEquals(0, counter.get());
496+
}
497+
476498

477499
/**
478500
* Used to determine if onNext is being invoked concurrently.

0 commit comments

Comments
 (0)