diff --git a/examples/subscribe_on.rb b/examples/subscribe_on.rb new file mode 100644 index 0000000..d733d0f --- /dev/null +++ b/examples/subscribe_on.rb @@ -0,0 +1,38 @@ +require 'rx' + +def in_a_second(n) + Rx::Observable.create do |observer| + sleep(1) + observer.on_next(n) + observer.on_completed + end +end + +# subscribe_on will schedule ancestry on given scheduler; each sleep happens in a separate thread +source = Rx::Observable.of( + in_a_second(1) + .map {|v| v + 1 } + .subscribe_on(Rx::DefaultScheduler.instance), + in_a_second(2) + .map {|v| v + 1 } + .subscribe_on(Rx::DefaultScheduler.instance) +).merge_all +.time_interval + +subscription = source.subscribe( + lambda {|x| + puts 'Next: ' + x.to_s + }, + lambda {|err| + puts 'Error: ' + err.to_s + }, + lambda { + puts 'Completed' + }) + +# => Next: (3)@(1.004153) +# => Next: (2)@(0.000251) + +while Thread.list.size > 1 + (Thread.list - [Thread.current]).each &:join +end diff --git a/lib/rx/subscriptions/scheduled_subscription.rb b/lib/rx/subscriptions/scheduled_subscription.rb index cada291..ae0d25a 100644 --- a/lib/rx/subscriptions/scheduled_subscription.rb +++ b/lib/rx/subscriptions/scheduled_subscription.rb @@ -1,8 +1,8 @@ # Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. module Rx - # Represents a disposable resource whose disposal invocation will be scheduled on the specified scheduler - class ScheduledDisposable + # Represents a subscription that is unsubscribed on the specified scheduler + class ScheduledSubscription attr_reader :scheduler, :subscription @@ -21,12 +21,12 @@ def unsubscribed? # Unsubscribes the wrapped subscription on the provided scheduler. def unsubscribe - @scheduler.schedule lambda do + @scheduler.schedule lambda { unless @subscription.nil? @subscription.unsubscribe @subscription = nil end - end + } end end end \ No newline at end of file diff --git a/test/rx/operators/test_synchronization.rb b/test/rx/operators/test_synchronization.rb new file mode 100644 index 0000000..3a3ef6f --- /dev/null +++ b/test/rx/operators/test_synchronization.rb @@ -0,0 +1,26 @@ +# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +require "#{File.dirname(__FILE__)}/../../test_helper" + +class TestObservableSynchronization < Minitest::Test + include Rx::ReactiveTest + + def test_subscribe_on + scheduler = Rx::TestScheduler.new + mock = Rx::MockObserver.new scheduler + Rx::Observable.just(1) + .subscribe_on(scheduler) + .subscribe(mock) + assert_equal 0, mock.messages.length + scheduler.advance_by 100 + assert_equal 2, mock.messages.length + end + + def test_subscribe_on_default_scheduler_does_not_raise + Rx::Observable.just(1).subscribe_on(Rx::DefaultScheduler.instance).subscribe + end + + def test_subscribe_on_current_thread_scheduiler_does_not_raise + Rx::Observable.just(1).subscribe_on(Rx::CurrentThreadScheduler.instance).subscribe + end +end