From 8449b2101df0162529869ffbd5e75f1bbb11faa6 Mon Sep 17 00:00:00 2001 From: Nicolas VERINAUD Date: Mon, 28 Nov 2016 14:56:25 +0100 Subject: [PATCH] Fixes #47 : RACMulticastConnection should reconnect after disposal. --- ReactiveObjC/RACMulticastConnection.m | 15 ++++++-- .../RACMulticastConnectionSpec.m | 35 ++++++++++++++++--- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/ReactiveObjC/RACMulticastConnection.m b/ReactiveObjC/RACMulticastConnection.m index f59824500..be9a3ddfb 100644 --- a/ReactiveObjC/RACMulticastConnection.m +++ b/ReactiveObjC/RACMulticastConnection.m @@ -28,7 +28,7 @@ @interface RACMulticastConnection () { } @property (nonatomic, readonly, strong) RACSignal *sourceSignal; -@property (strong) RACSerialDisposable *serialDisposable; +@property (atomic, strong) RACSerialDisposable *serialDisposable; @end @implementation RACMulticastConnection @@ -51,10 +51,19 @@ - (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)s #pragma mark Connecting - (RACDisposable *)connect { - BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected); + int32_t volatile *hasConnected = &_hasConnected; + BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, hasConnected); if (shouldConnect) { - self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal]; + if ([self.serialDisposable isDisposed]) { + self.serialDisposable = [[RACSerialDisposable alloc] init]; + } + + RACDisposable *d = [self.sourceSignal subscribe:_signal]; + self.serialDisposable.disposable = [RACDisposable disposableWithBlock:^{ + OSAtomicCompareAndSwap32Barrier(1, 0, hasConnected); + [d dispose]; + }]; } return self.serialDisposable; diff --git a/ReactiveObjCTests/RACMulticastConnectionSpec.m b/ReactiveObjCTests/RACMulticastConnectionSpec.m index 5ce04ba5f..7768d9c6f 100644 --- a/ReactiveObjCTests/RACMulticastConnectionSpec.m +++ b/ReactiveObjCTests/RACMulticastConnectionSpec.m @@ -45,15 +45,16 @@ expect(@(subscriptionCount)).to(equal(@1)); }); - qck_it(@"shouldn't reconnect after disposal", ^{ + qck_it(@"should reconnect after disposal", ^{ RACDisposable *disposable1 = [connection connect]; expect(@(subscriptionCount)).to(equal(@1)); [disposable1 dispose]; RACDisposable *disposable2 = [connection connect]; - expect(@(subscriptionCount)).to(equal(@1)); - expect(disposable1).to(equal(disposable2)); + expect(@(subscriptionCount)).to(equal(@2)); + + [disposable2 dispose]; }); qck_it(@"shouldn't race when connecting", ^{ @@ -114,14 +115,38 @@ expect(@(disposed)).to(beTruthy()); }); - qck_it(@"shouldn't reconnect after disposal", ^{ + qck_it(@"should reconnect after disposal", ^{ RACDisposable *disposable = [autoconnectedSignal subscribeNext:^(id x) {}]; expect(@(subscriptionCount)).to(equal(@1)); [disposable dispose]; disposable = [autoconnectedSignal subscribeNext:^(id x) {}]; - expect(@(subscriptionCount)).to(equal(@1)); + expect(@(subscriptionCount)).to(equal(@2)); + [disposable dispose]; + }); + + qck_it(@"shouldn't dispose immediately when reconnecting", ^{ + __block NSUInteger sub = 0; + __block NSUInteger unsub = 0; + RACSignal *signal = [[[RACSignal createSignal:^(id subscriber) { + sub++; + return [RACDisposable disposableWithBlock:^{ + unsub++; + }]; + }] publish] autoconnect]; + + RACDisposable *disposable = [signal subscribeNext:^(id x) {}]; + expect(@(unsub)).to(equal(@0)); + + [disposable dispose]; + expect(@(unsub)).to(equal(@1)); + + disposable = [signal subscribeNext:^(id x) {}]; + expect(@(sub)).to(equal(@2)); + expect(@(unsub)).to(equal(@1)); + [disposable dispose]; + expect(@(unsub)).to(equal(@2)); }); qck_it(@"should replay values after disposal when multicasted to a replay subject", ^{