Skip to content

Commit b527314

Browse files
BittranceBjorn Ramberg
Bittrance
authored and
Bjorn Ramberg
committed
Example and proper test for subscribe_on
1 parent ddc0d0e commit b527314

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

examples/subscribe_on.rb

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
require 'rx'
2+
3+
def in_a_second(n)
4+
Rx::Observable.create do |observer|
5+
sleep(1)
6+
observer.on_next(n)
7+
observer.on_completed
8+
end
9+
end
10+
11+
# subscribe_on will schedule ancestry on given scheduler; each sleep happens in a separate thread
12+
source = Rx::Observable.of(
13+
in_a_second(1)
14+
.map {|v| v + 1 }
15+
.subscribe_on(Rx::DefaultScheduler.instance),
16+
in_a_second(2)
17+
.map {|v| v + 1 }
18+
.subscribe_on(Rx::DefaultScheduler.instance)
19+
).merge_all
20+
.time_interval
21+
22+
subscription = source.subscribe(
23+
lambda {|x|
24+
puts 'Next: ' + x.to_s
25+
},
26+
lambda {|err|
27+
puts 'Error: ' + err.to_s
28+
},
29+
lambda {
30+
puts 'Completed'
31+
})
32+
33+
# => Next: (3)@(1.004153)
34+
# => Next: (2)@(0.000251)
35+
36+
while Thread.list.size > 1
37+
(Thread.list - [Thread.current]).each &:join
38+
end

test/rx/operators/test_synchronization.rb

+11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@
55
class TestObservableSynchronization < Minitest::Test
66
include Rx::ReactiveTest
77

8+
def test_subscribe_on
9+
scheduler = Rx::TestScheduler.new
10+
mock = Rx::MockObserver.new scheduler
11+
Rx::Observable.just(1)
12+
.subscribe_on(scheduler)
13+
.subscribe(mock)
14+
assert_equal 0, mock.messages.length
15+
scheduler.advance_by 100
16+
assert_equal 2, mock.messages.length
17+
end
18+
819
def test_subscribe_on_default_scheduler_does_not_raise
920
Rx::Observable.just(1).subscribe_on(Rx::DefaultScheduler.instance).subscribe
1021
end

0 commit comments

Comments
 (0)