Skip to content

Commit f650dde

Browse files
authored
Merge pull request #727 from akarnokd/CleanupSystemReactive
4.x: System.Reactive fix field names, type args, usings, operator infrastructure
2 parents 45f3cb2 + 8663639 commit f650dde

File tree

76 files changed

+1370
-1405
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1370
-1405
lines changed

Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ internal sealed class AnonymousSafeObserver<T> : SafeObserver<T>
2424
private readonly Action<Exception> _onError;
2525
private readonly Action _onCompleted;
2626

27-
private int isStopped;
27+
private int _isStopped;
2828

2929
public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
3030
{
@@ -35,7 +35,7 @@ public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action
3535

3636
public override void OnNext(T value)
3737
{
38-
if (isStopped == 0)
38+
if (_isStopped == 0)
3939
{
4040
var __noError = false;
4141
try
@@ -55,7 +55,7 @@ public override void OnNext(T value)
5555

5656
public override void OnError(Exception error)
5757
{
58-
if (Interlocked.Exchange(ref isStopped, 1) == 0)
58+
if (Interlocked.Exchange(ref _isStopped, 1) == 0)
5959
{
6060
using (this)
6161
{
@@ -66,7 +66,7 @@ public override void OnError(Exception error)
6666

6767
public override void OnCompleted()
6868
{
69-
if (Interlocked.Exchange(ref isStopped, 1) == 0)
69+
if (Interlocked.Exchange(ref _isStopped, 1) == 0)
7070
{
7171
using (this)
7272
{

Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ namespace System.Reactive.Concurrency
1111
/// </summary>
1212
public sealed class AsyncLock : IDisposable
1313
{
14-
private bool isAcquired;
15-
private bool hasFaulted;
16-
private readonly object guard = new object();
17-
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> queue;
14+
private bool _isAcquired;
15+
private bool _hasFaulted;
16+
private readonly object _guard = new object();
17+
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> _queue;
1818

1919
/// <summary>
2020
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
@@ -56,32 +56,32 @@ internal void Wait<TState>(TState state, Action<TState> action)
5656
private void Wait(object state, Delegate @delegate, Action<Delegate, object> action)
5757
{
5858
// allow one thread to update the state
59-
lock (guard)
59+
lock (_guard)
6060
{
6161
// if a previous action crashed, ignore any future actions
62-
if (hasFaulted)
62+
if (_hasFaulted)
6363
{
6464
return;
6565
}
6666

6767
// if the "lock" is busy, queue up the extra work
6868
// otherwise there is no need to queue up "action"
69-
if (isAcquired)
69+
if (_isAcquired)
7070
{
7171
// create the queue if necessary
72-
var q = queue;
72+
var q = _queue;
7373
if (q == null)
7474
{
7575
q = new Queue<(Action<Delegate, object> action, Delegate @delegate, object state)>();
76-
queue = q;
76+
_queue = q;
7777
}
7878
// enqueue the work
7979
q.Enqueue((action, @delegate, state));
8080
return;
8181
}
8282

8383
// indicate there is processing going on
84-
isAcquired = true;
84+
_isAcquired = true;
8585
}
8686

8787
// if we get here, execute the "action" first
@@ -95,25 +95,25 @@ private void Wait(object state, Delegate @delegate, Action<Delegate, object> act
9595
catch
9696
{
9797
// the execution failed, terminate this AsyncLock
98-
lock (guard)
98+
lock (_guard)
9999
{
100100
// throw away the queue
101-
queue = null;
101+
_queue = null;
102102
// report fault
103-
hasFaulted = true;
103+
_hasFaulted = true;
104104
}
105105
throw;
106106
}
107107

108108
// execution succeeded, let's see if more work has to be done
109-
lock (guard)
109+
lock (_guard)
110110
{
111-
var q = queue;
111+
var q = _queue;
112112
// either there is no queue yet or we run out of work
113113
if (q == null || q.Count == 0)
114114
{
115115
// release the lock
116-
isAcquired = false;
116+
_isAcquired = false;
117117
return;
118118
}
119119

@@ -129,10 +129,10 @@ private void Wait(object state, Delegate @delegate, Action<Delegate, object> act
129129
/// </summary>
130130
public void Dispose()
131131
{
132-
lock (guard)
132+
lock (_guard)
133133
{
134-
queue = null;
135-
hasFaulted = true;
134+
_queue = null;
135+
_hasFaulted = true;
136136
}
137137
}
138138
}

Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See the LICENSE file in the project root for more information.
44

55
#if NO_THREAD && WINDOWS
6-
using System.Reactive.Disposables;
76
using System.Threading;
87

98
namespace System.Reactive.Concurrency

Rx.NET/Source/src/System.Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See the LICENSE file in the project root for more information.
44

55
#if !NO_THREAD
6-
using System.Collections.Generic;
76
using System.Reactive.Disposables;
87
using System.Threading;
98

@@ -236,7 +235,7 @@ public void Dispose()
236235
private sealed class FastPeriodicTimer : IDisposable
237236
{
238237
private readonly Action _action;
239-
private volatile bool disposed;
238+
private volatile bool _disposed;
240239

241240
public FastPeriodicTimer(Action action)
242241
{
@@ -254,15 +253,15 @@ private static void Loop(object threadParam)
254253
{
255254
var timer = (FastPeriodicTimer)threadParam;
256255

257-
while (!timer.disposed)
256+
while (!timer._disposed)
258257
{
259258
timer._action();
260259
}
261260
}
262261

263262
public void Dispose()
264263
{
265-
disposed = true;
264+
_disposed = true;
266265
}
267266
}
268267
}

Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace System.Reactive.Concurrency
1212
/// <seealso cref="Scheduler.CurrentThread">Singleton instance of this type exposed through this static property.</seealso>
1313
public sealed class CurrentThreadScheduler : LocalScheduler
1414
{
15-
private static readonly Lazy<CurrentThreadScheduler> s_instance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler());
15+
private static readonly Lazy<CurrentThreadScheduler> _staticInstance = new Lazy<CurrentThreadScheduler>(() => new CurrentThreadScheduler());
1616

1717
private CurrentThreadScheduler()
1818
{
@@ -21,34 +21,34 @@ private CurrentThreadScheduler()
2121
/// <summary>
2222
/// Gets the singleton instance of the current thread scheduler.
2323
/// </summary>
24-
public static CurrentThreadScheduler Instance => s_instance.Value;
24+
public static CurrentThreadScheduler Instance => _staticInstance.Value;
2525

2626
[ThreadStatic]
27-
private static SchedulerQueue<TimeSpan> s_threadLocalQueue;
27+
private static SchedulerQueue<TimeSpan> _threadLocalQueue;
2828

2929
[ThreadStatic]
30-
private static IStopwatch s_clock;
30+
private static IStopwatch _clock;
3131

3232
[ThreadStatic]
33-
private static bool running;
33+
private static bool _running;
3434

35-
private static SchedulerQueue<TimeSpan> GetQueue() => s_threadLocalQueue;
35+
private static SchedulerQueue<TimeSpan> GetQueue() => _threadLocalQueue;
3636

3737
private static void SetQueue(SchedulerQueue<TimeSpan> newQueue)
3838
{
39-
s_threadLocalQueue = newQueue;
39+
_threadLocalQueue = newQueue;
4040
}
4141

4242
private static TimeSpan Time
4343
{
4444
get
4545
{
46-
if (s_clock == null)
46+
if (_clock == null)
4747
{
48-
s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
48+
_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch();
4949
}
5050

51-
return s_clock.Elapsed;
51+
return _clock.Elapsed;
5252
}
5353
}
5454

@@ -64,7 +64,7 @@ private static TimeSpan Time
6464
/// Gets a value that indicates whether the caller must call a Schedule method.
6565
/// </summary>
6666
[EditorBrowsable(EditorBrowsableState.Advanced)]
67-
public static bool IsScheduleRequired => !running;
67+
public static bool IsScheduleRequired => !_running;
6868

6969
/// <summary>
7070
/// Schedules an action to be executed after dueTime.
@@ -85,9 +85,9 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
8585
var queue = default(SchedulerQueue<TimeSpan>);
8686

8787
// There is no timed task and no task is currently running
88-
if (!running)
88+
if (!_running)
8989
{
90-
running = true;
90+
_running = true;
9191

9292
if (dueTime > TimeSpan.Zero)
9393
{
@@ -103,7 +103,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
103103
catch
104104
{
105105
SetQueue(null);
106-
running = false;
106+
_running = false;
107107
throw;
108108
}
109109

@@ -120,12 +120,12 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
120120
finally
121121
{
122122
SetQueue(null);
123-
running = false;
123+
_running = false;
124124
}
125125
}
126126
else
127127
{
128-
running = false;
128+
_running = false;
129129
}
130130

131131
return d;

Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ namespace System.Reactive.Concurrency
1212
/// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
1313
public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
1414
{
15-
private static readonly Lazy<DefaultScheduler> s_instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
16-
private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current;
15+
private static readonly Lazy<DefaultScheduler> _instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
16+
private static IConcurrencyAbstractionLayer _cal = ConcurrencyAbstractionLayer.Current;
1717

1818
/// <summary>
1919
/// Gets the singleton instance of the default scheduler.
2020
/// </summary>
21-
public static DefaultScheduler Instance => s_instance.Value;
21+
public static DefaultScheduler Instance => _instance.Value;
2222

2323
private DefaultScheduler()
2424
{
@@ -41,7 +41,7 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta
4141

4242
var workItem = new UserWorkItem<TState>(this, state, action);
4343

44-
workItem.CancelQueueDisposable = s_cal.QueueUserWorkItem(
44+
workItem.CancelQueueDisposable = _cal.QueueUserWorkItem(
4545
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
4646
workItem);
4747

@@ -72,7 +72,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
7272

7373
var workItem = new UserWorkItem<TState>(this, state, action);
7474

75-
workItem.CancelQueueDisposable = s_cal.StartTimer(
75+
workItem.CancelQueueDisposable = _cal.StartTimer(
7676
closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
7777
workItem,
7878
dt);
@@ -117,7 +117,7 @@ public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState,
117117
_state = state;
118118
_action = action;
119119

120-
_cancel = s_cal.StartPeriodicTimer(Tick, period);
120+
_cancel = _cal.StartPeriodicTimer(Tick, period);
121121
}
122122

123123
private void Tick()
@@ -145,7 +145,7 @@ protected override object GetService(Type serviceType)
145145
{
146146
if (serviceType == typeof(ISchedulerLongRunning))
147147
{
148-
if (s_cal.SupportsLongRunning)
148+
if (_cal.SupportsLongRunning)
149149
{
150150
return LongRunning.Instance;
151151
}
@@ -168,7 +168,7 @@ public LongScheduledWorkItem(TState state, Action<TState, ICancelable> action)
168168
_state = state;
169169
_action = action;
170170

171-
s_cal.StartThread(
171+
_cal.StartThread(
172172
@thisObject =>
173173
{
174174
var @this = (LongScheduledWorkItem<TState>)@thisObject;

Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
1818
/// <summary>
1919
/// Counter for diagnostic purposes, to name the threads.
2020
/// </summary>
21-
private static int s_counter;
21+
private static int _counter;
2222

2323
/// <summary>
2424
/// Thread factory function.
@@ -82,7 +82,7 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
8282
/// Creates an object that schedules units of work on a designated thread.
8383
/// </summary>
8484
public EventLoopScheduler()
85-
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
85+
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true })
8686
{
8787
}
8888

Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative
6969
/// </summary>
7070
public class HistoricalScheduler : HistoricalSchedulerBase
7171
{
72-
private readonly SchedulerQueue<DateTimeOffset> queue = new SchedulerQueue<DateTimeOffset>();
72+
private readonly SchedulerQueue<DateTimeOffset> _queue = new SchedulerQueue<DateTimeOffset>();
7373

7474
/// <summary>
7575
/// Creates a new historical scheduler with the minimum value of <see cref="DateTimeOffset"/> as the initial clock value.
@@ -105,13 +105,13 @@ public HistoricalScheduler(DateTimeOffset initialClock, IComparer<DateTimeOffset
105105
/// <returns>The next scheduled item.</returns>
106106
protected override IScheduledItem<DateTimeOffset> GetNext()
107107
{
108-
while (queue.Count > 0)
108+
while (_queue.Count > 0)
109109
{
110-
var next = queue.Peek();
110+
var next = _queue.Peek();
111111

112112
if (next.IsCanceled)
113113
{
114-
queue.Dequeue();
114+
_queue.Dequeue();
115115
}
116116
else
117117
{
@@ -142,12 +142,12 @@ public override IDisposable ScheduleAbsolute<TState>(TState state, DateTimeOffse
142142

143143
var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
144144
{
145-
queue.Remove(si);
145+
_queue.Remove(si);
146146
return action(scheduler, state1);
147147
});
148148

149149
si = new ScheduledItem<DateTimeOffset, TState>(this, state, run, dueTime, Comparer);
150-
queue.Enqueue(si);
150+
_queue.Enqueue(si);
151151

152152
return si;
153153
}

0 commit comments

Comments
 (0)