Skip to content

Commit da20177

Browse files
authored
Capture the concept of a "safe observer" (#597)
Capture the concept of a "safe observer" since it has been used and copied multiple times in the repo. ISafeObserver implements IDisposable and IObserver and can hold onto a resource. Implementors must at least dispose the resource on disposal of the SafeObserver, inheriting classes may choose to dispose the resource on other occasions as well. So far, AnonymousSafeObserver, SafeObserver and ObserverWithToken work like that are have been changed to implement ISafeObserver.
1 parent 46229c5 commit da20177

File tree

8 files changed

+79
-60
lines changed

8 files changed

+79
-60
lines changed

Rx.NET/Source/src/System.Reactive/AnonymousObserver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,6 @@ public AnonymousObserver(Action<T> onNext, Action onCompleted)
8484
/// </summary>
8585
protected override void OnCompletedCore() => _onCompleted();
8686

87-
internal IObserver<T> MakeSafe(IDisposable disposable) => new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted, disposable);
87+
internal ISafeObserver<T> MakeSafe() => new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted);
8888
}
8989
}

Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Reactive.Disposables;
56
using System.Threading;
67

78
namespace System.Reactive
@@ -18,21 +19,20 @@ namespace System.Reactive
1819
/// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
1920
/// helps debugging and some performance.
2021
/// </summary>
21-
internal sealed class AnonymousSafeObserver<T> : IObserver<T>
22+
internal sealed class AnonymousSafeObserver<T> : ISafeObserver<T>
2223
{
2324
private readonly Action<T> _onNext;
2425
private readonly Action<Exception> _onError;
2526
private readonly Action _onCompleted;
26-
private readonly IDisposable _disposable;
27+
private IDisposable _disposable;
2728

2829
private int isStopped;
2930

30-
public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
31+
public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
3132
{
3233
_onNext = onNext;
3334
_onError = onError;
3435
_onCompleted = onCompleted;
35-
_disposable = disposable;
3636
}
3737

3838
public void OnNext(T value)
@@ -48,7 +48,7 @@ public void OnNext(T value)
4848
finally
4949
{
5050
if (!__noError)
51-
_disposable.Dispose();
51+
Dispose();
5252
}
5353
}
5454
}
@@ -57,30 +57,32 @@ public void OnError(Exception error)
5757
{
5858
if (Interlocked.Exchange(ref isStopped, 1) == 0)
5959
{
60-
try
60+
using (this)
6161
{
6262
_onError(error);
6363
}
64-
finally
65-
{
66-
_disposable.Dispose();
67-
}
6864
}
6965
}
7066

7167
public void OnCompleted()
7268
{
7369
if (Interlocked.Exchange(ref isStopped, 1) == 0)
7470
{
75-
try
71+
using (this)
7672
{
7773
_onCompleted();
7874
}
79-
finally
80-
{
81-
_disposable.Dispose();
82-
}
8375
}
8476
}
77+
78+
public void SetResource(IDisposable resource)
79+
{
80+
Disposable.SetSingle(ref _disposable, resource);
81+
}
82+
83+
public void Dispose()
84+
{
85+
Disposable.TryDispose(ref _disposable);
86+
}
8587
}
8688
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace System.Reactive
6+
{
7+
/// <summary>
8+
/// Base interface for observers that can dispose of a resource on a terminal notification
9+
/// or when disposed itself.
10+
/// </summary>
11+
/// <typeparam name="T"></typeparam>
12+
internal interface ISafeObserver<in T> : IObserver<T>, IDisposable
13+
{
14+
void SetResource(IDisposable resource);
15+
}
16+
}

Rx.NET/Source/src/System.Reactive/Internal/ObserverWithToken.cs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,46 @@ namespace System.Reactive
1515
/// to be disposed upon termination.
1616
/// </summary>
1717
/// <typeparam name="T">The element type of the sequence.</typeparam>
18-
internal sealed class ObserverWithToken<T> : IObserver<T>
18+
internal sealed class ObserverWithToken<T> : ISafeObserver<T>
1919
{
2020
readonly IObserver<T> _downstream;
2121

2222
IDisposable _tokenDisposable;
2323

2424
public ObserverWithToken(IObserver<T> downstream)
2525
{
26-
this._downstream = downstream;
27-
}
28-
29-
internal void SetTokenDisposable(IDisposable d)
30-
{
31-
Disposable.SetSingle(ref _tokenDisposable, d);
26+
_downstream = downstream;
3227
}
3328

3429
public void OnCompleted()
3530
{
36-
try
31+
using (this)
3732
{
3833
_downstream.OnCompleted();
3934
}
40-
finally
41-
{
42-
Disposable.TryDispose(ref _tokenDisposable);
43-
}
4435
}
4536

4637
public void OnError(Exception error)
4738
{
48-
try
39+
using (this)
4940
{
5041
_downstream.OnError(error);
5142
}
52-
finally
53-
{
54-
Disposable.TryDispose(ref _tokenDisposable);
55-
}
5643
}
5744

5845
public void OnNext(T value)
5946
{
6047
_downstream.OnNext(value);
6148
}
49+
50+
public void SetResource(IDisposable resource)
51+
{
52+
Disposable.SetSingle(ref _tokenDisposable, resource);
53+
}
54+
55+
public void Dispose()
56+
{
57+
Disposable.TryDispose(ref _tokenDisposable);
58+
}
6259
}
6360
}

Rx.NET/Source/src/System.Reactive/Internal/Producer.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguar
4646
//
4747
if (enableSafeguard)
4848
{
49-
observer = SafeObserver<TSource>.Create(observer, subscription);
49+
var safeObserver = SafeObserver<TSource>.Create(observer);
50+
safeObserver.SetResource(subscription);
51+
observer = safeObserver;
5052
}
5153

5254
if (CurrentThreadScheduler.IsScheduleRequired)
@@ -90,22 +92,20 @@ public IDisposable Subscribe(IObserver<TTarget> observer)
9092

9193
public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
9294
{
93-
SingleAssignmentDisposable subscription = null;
95+
ISafeObserver<TTarget> safeObserver = null;
9496

9597
//
9698
// See AutoDetachObserver.cs for more information on the safeguarding requirement and
9799
// its implementation aspects.
98100
//
99101
if (enableSafeguard)
100102
{
101-
subscription = new SingleAssignmentDisposable();
102-
observer = SafeObserver<TTarget>.Create(observer, subscription);
103+
observer = safeObserver = SafeObserver<TTarget>.Create(observer);
103104
}
104105

105106
var sink = CreateSink(observer);
106107

107-
if (subscription != null)
108-
subscription.Disposable = sink;
108+
safeObserver?.SetResource(sink);
109109

110110
if (CurrentThreadScheduler.IsScheduleRequired)
111111
{
@@ -118,7 +118,7 @@ public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguar
118118
Run(sink);
119119
}
120120

121-
return (IDisposable)subscription ?? sink;
121+
return sink;
122122
}
123123

124124
/// <summary>

Rx.NET/Source/src/System.Reactive/Internal/SafeObserver.cs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,36 @@
22
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Reactive.Disposables;
6+
57
namespace System.Reactive
68
{
79
//
810
// See AutoDetachObserver.cs for more information on the safeguarding requirement and
911
// its implementation aspects.
1012
//
1113

12-
internal sealed class SafeObserver<TSource> : IObserver<TSource>
14+
internal sealed class SafeObserver<TSource> : ISafeObserver<TSource>
1315
{
1416
private readonly IObserver<TSource> _observer;
15-
private readonly IDisposable _disposable;
1617

17-
public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable)
18+
private IDisposable _disposable;
19+
20+
public static ISafeObserver<TSource> Create(IObserver<TSource> observer)
1821
{
1922
if (observer is AnonymousObserver<TSource> a)
2023
{
21-
return a.MakeSafe(disposable);
24+
return a.MakeSafe();
2225
}
2326
else
2427
{
25-
return new SafeObserver<TSource>(observer, disposable);
28+
return new SafeObserver<TSource>(observer);
2629
}
2730
}
2831

29-
private SafeObserver(IObserver<TSource> observer, IDisposable disposable)
32+
private SafeObserver(IObserver<TSource> observer)
3033
{
3134
_observer = observer;
32-
_disposable = disposable;
3335
}
3436

3537
public void OnNext(TSource value)
@@ -44,33 +46,35 @@ public void OnNext(TSource value)
4446
{
4547
if (!__noError)
4648
{
47-
_disposable.Dispose();
49+
Dispose();
4850
}
4951
}
5052
}
5153

5254
public void OnError(Exception error)
5355
{
54-
try
56+
using (this)
5557
{
5658
_observer.OnError(error);
5759
}
58-
finally
59-
{
60-
_disposable.Dispose();
61-
}
6260
}
6361

6462
public void OnCompleted()
6563
{
66-
try
64+
using (this)
6765
{
6866
_observer.OnCompleted();
6967
}
70-
finally
71-
{
72-
_disposable.Dispose();
73-
}
68+
}
69+
70+
public void SetResource(IDisposable resource)
71+
{
72+
Disposable.SetSingle(ref _disposable, resource);
73+
}
74+
75+
public void Dispose()
76+
{
77+
Disposable.TryDispose(ref _disposable);
7478
}
7579
}
7680
}

Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private static void Subscribe_<T>(this IObservable<T> source, IObserver<T> obser
266266
//
267267
var d = source.Subscribe/*Unsafe*/(consumer);
268268

269-
consumer.SetTokenDisposable(token.Register(state => ((IDisposable)state).Dispose(), d));
269+
consumer.SetResource(token.Register(state => ((IDisposable)state).Dispose(), d));
270270
}
271271
}
272272
else

Rx.NET/Source/src/System.Reactive/Platforms/Desktop/Linq/QueryLanguage.Remoting.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public IDisposable Subscribe(IObserver<T> observer)
9292
//
9393
var d = remotableObservable.Subscribe/*Unsafe*/(new RemotableObserver<T>(consumer));
9494

95-
consumer.SetTokenDisposable(d);
95+
consumer.SetResource(d);
9696

9797
return d;
9898
}

0 commit comments

Comments
 (0)