Skip to content

Commit fdd66a1

Browse files
authored
Merge pull request #3 from bjorne/fix-subscribe-on
Rename ScheduledDisposable -> ScheduledSubscription
2 parents 9e70cd1 + b527314 commit fdd66a1

File tree

3 files changed

+68
-4
lines changed

3 files changed

+68
-4
lines changed

examples/subscribe_on.rb

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
require 'rx'
2+
3+
def in_a_second(n)
4+
Rx::Observable.create do |observer|
5+
sleep(1)
6+
observer.on_next(n)
7+
observer.on_completed
8+
end
9+
end
10+
11+
# subscribe_on will schedule ancestry on given scheduler; each sleep happens in a separate thread
12+
source = Rx::Observable.of(
13+
in_a_second(1)
14+
.map {|v| v + 1 }
15+
.subscribe_on(Rx::DefaultScheduler.instance),
16+
in_a_second(2)
17+
.map {|v| v + 1 }
18+
.subscribe_on(Rx::DefaultScheduler.instance)
19+
).merge_all
20+
.time_interval
21+
22+
subscription = source.subscribe(
23+
lambda {|x|
24+
puts 'Next: ' + x.to_s
25+
},
26+
lambda {|err|
27+
puts 'Error: ' + err.to_s
28+
},
29+
lambda {
30+
puts 'Completed'
31+
})
32+
33+
# => Next: (3)@(1.004153)
34+
# => Next: (2)@(0.000251)
35+
36+
while Thread.list.size > 1
37+
(Thread.list - [Thread.current]).each &:join
38+
end

lib/rx/subscriptions/scheduled_subscription.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
22

33
module Rx
4-
# Represents a disposable resource whose disposal invocation will be scheduled on the specified scheduler
5-
class ScheduledDisposable
4+
# Represents a subscription that is unsubscribed on the specified scheduler
5+
class ScheduledSubscription
66

77
attr_reader :scheduler, :subscription
88

@@ -21,12 +21,12 @@ def unsubscribed?
2121

2222
# Unsubscribes the wrapped subscription on the provided scheduler.
2323
def unsubscribe
24-
@scheduler.schedule lambda do
24+
@scheduler.schedule lambda {
2525
unless @subscription.nil?
2626
@subscription.unsubscribe
2727
@subscription = nil
2828
end
29-
end
29+
}
3030
end
3131
end
3232
end
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2+
3+
require "#{File.dirname(__FILE__)}/../../test_helper"
4+
5+
class TestObservableSynchronization < Minitest::Test
6+
include Rx::ReactiveTest
7+
8+
def test_subscribe_on
9+
scheduler = Rx::TestScheduler.new
10+
mock = Rx::MockObserver.new scheduler
11+
Rx::Observable.just(1)
12+
.subscribe_on(scheduler)
13+
.subscribe(mock)
14+
assert_equal 0, mock.messages.length
15+
scheduler.advance_by 100
16+
assert_equal 2, mock.messages.length
17+
end
18+
19+
def test_subscribe_on_default_scheduler_does_not_raise
20+
Rx::Observable.just(1).subscribe_on(Rx::DefaultScheduler.instance).subscribe
21+
end
22+
23+
def test_subscribe_on_current_thread_scheduiler_does_not_raise
24+
Rx::Observable.just(1).subscribe_on(Rx::CurrentThreadScheduler.instance).subscribe
25+
end
26+
end

0 commit comments

Comments
 (0)