From ddc0d0ef0a5404f892aa19a8f65f4ba84befb488 Mon Sep 17 00:00:00 2001 From: Bjorn Ramberg Date: Mon, 13 Nov 2017 16:21:55 +0100 Subject: [PATCH 1/2] Rename ScheduledDisposable -> ScheduledSubscription Observable#subscribe_on incorrectly referred to the latter, but the class was actually called the former. Looking at similar implementation it seems to me to be more in line with a subscription than a disposable. --- lib/rx/subscriptions/scheduled_subscription.rb | 8 ++++---- test/rx/operators/test_synchronization.rb | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) create mode 100644 test/rx/operators/test_synchronization.rb diff --git a/lib/rx/subscriptions/scheduled_subscription.rb b/lib/rx/subscriptions/scheduled_subscription.rb index cada291..ae0d25a 100644 --- a/lib/rx/subscriptions/scheduled_subscription.rb +++ b/lib/rx/subscriptions/scheduled_subscription.rb @@ -1,8 +1,8 @@ # Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. module Rx - # Represents a disposable resource whose disposal invocation will be scheduled on the specified scheduler - class ScheduledDisposable + # Represents a subscription that is unsubscribed on the specified scheduler + class ScheduledSubscription attr_reader :scheduler, :subscription @@ -21,12 +21,12 @@ def unsubscribed? # Unsubscribes the wrapped subscription on the provided scheduler. def unsubscribe - @scheduler.schedule lambda do + @scheduler.schedule lambda { unless @subscription.nil? @subscription.unsubscribe @subscription = nil end - end + } end end end \ No newline at end of file diff --git a/test/rx/operators/test_synchronization.rb b/test/rx/operators/test_synchronization.rb new file mode 100644 index 0000000..36cb1ac --- /dev/null +++ b/test/rx/operators/test_synchronization.rb @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +require "#{File.dirname(__FILE__)}/../../test_helper" + +class TestObservableSynchronization < Minitest::Test + include Rx::ReactiveTest + + def test_subscribe_on_default_scheduler_does_not_raise + Rx::Observable.just(1).subscribe_on(Rx::DefaultScheduler.instance).subscribe + end + + def test_subscribe_on_current_thread_scheduiler_does_not_raise + Rx::Observable.just(1).subscribe_on(Rx::CurrentThreadScheduler.instance).subscribe + end +end From b527314f7c0c1053ab67e4f80d8130ed2e1e5473 Mon Sep 17 00:00:00 2001 From: Bittrance Date: Tue, 14 Nov 2017 09:33:40 +0100 Subject: [PATCH 2/2] Example and proper test for subscribe_on --- examples/subscribe_on.rb | 38 +++++++++++++++++++++++ test/rx/operators/test_synchronization.rb | 11 +++++++ 2 files changed, 49 insertions(+) create mode 100644 examples/subscribe_on.rb diff --git a/examples/subscribe_on.rb b/examples/subscribe_on.rb new file mode 100644 index 0000000..d733d0f --- /dev/null +++ b/examples/subscribe_on.rb @@ -0,0 +1,38 @@ +require 'rx' + +def in_a_second(n) + Rx::Observable.create do |observer| + sleep(1) + observer.on_next(n) + observer.on_completed + end +end + +# subscribe_on will schedule ancestry on given scheduler; each sleep happens in a separate thread +source = Rx::Observable.of( + in_a_second(1) + .map {|v| v + 1 } + .subscribe_on(Rx::DefaultScheduler.instance), + in_a_second(2) + .map {|v| v + 1 } + .subscribe_on(Rx::DefaultScheduler.instance) +).merge_all +.time_interval + +subscription = source.subscribe( + lambda {|x| + puts 'Next: ' + x.to_s + }, + lambda {|err| + puts 'Error: ' + err.to_s + }, + lambda { + puts 'Completed' + }) + +# => Next: (3)@(1.004153) +# => Next: (2)@(0.000251) + +while Thread.list.size > 1 + (Thread.list - [Thread.current]).each &:join +end diff --git a/test/rx/operators/test_synchronization.rb b/test/rx/operators/test_synchronization.rb index 36cb1ac..3a3ef6f 100644 --- a/test/rx/operators/test_synchronization.rb +++ b/test/rx/operators/test_synchronization.rb @@ -5,6 +5,17 @@ class TestObservableSynchronization < Minitest::Test include Rx::ReactiveTest + def test_subscribe_on + scheduler = Rx::TestScheduler.new + mock = Rx::MockObserver.new scheduler + Rx::Observable.just(1) + .subscribe_on(scheduler) + .subscribe(mock) + assert_equal 0, mock.messages.length + scheduler.advance_by 100 + assert_equal 2, mock.messages.length + end + def test_subscribe_on_default_scheduler_does_not_raise Rx::Observable.just(1).subscribe_on(Rx::DefaultScheduler.instance).subscribe end