Skip to content

Commit f389a54

Browse files
authored
Add additional functions. (#23)
* Add additional functions * Tidy foreach to make more performant * Fix missing headers * Update Copyright to 2023
1 parent 27b92c1 commit f389a54

16 files changed

+474
-20
lines changed

src/ReactiveMarbles.Extensions.Tests/DisposableExtensionsTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

src/ReactiveMarbles.Extensions/DisposableExtensions.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

src/ReactiveMarbles.Extensions/IHeartbeat.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

src/ReactiveMarbles.Extensions/IStale.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
2+
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Reactive.Disposables;
8+
using System.Reactive.Linq;
9+
using System.Threading.Tasks;
10+
11+
namespace ReactiveMarbles.Extensions.Internal;
12+
13+
internal class ConcurrencyLimiter<T>
14+
{
15+
private readonly object _locker = new();
16+
private readonly object _disposalLocker = new();
17+
private bool _disposed;
18+
private int _outstanding;
19+
private IEnumerator<Task<T>>? _rator;
20+
21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="ConcurrencyLimiter{T}"/> class.
23+
/// </summary>
24+
/// <param name="taskFunctions">The task functions.</param>
25+
/// <param name="maxConcurrency">The maximum concurrency.</param>
26+
public ConcurrencyLimiter(IEnumerable<Task<T>> taskFunctions, int maxConcurrency)
27+
{
28+
_rator = taskFunctions.GetEnumerator();
29+
IObservable = Observable.Create<T>(observer =>
30+
{
31+
for (var i = 0; i < maxConcurrency; i++)
32+
{
33+
PullNextTask(observer);
34+
}
35+
36+
return Disposable.Create(() => Disposed = true);
37+
});
38+
}
39+
40+
/// <summary>
41+
/// Gets the i observable.
42+
/// </summary>
43+
public IObservable<T> IObservable { get; }
44+
45+
/// <summary>
46+
/// Gets or sets a value indicating whether this <see cref="ConcurrencyLimiter{T}"/> is disposed.
47+
/// </summary>
48+
/// <value><c>true</c> if disposed; otherwise, <c>false</c>.</value>
49+
private bool Disposed
50+
{
51+
get
52+
{
53+
lock (_disposalLocker)
54+
{
55+
return _disposed;
56+
}
57+
}
58+
59+
set
60+
{
61+
lock (_disposalLocker)
62+
{
63+
_disposed = value;
64+
}
65+
}
66+
}
67+
68+
/// <summary>
69+
/// Clears the rator.
70+
/// </summary>
71+
private void ClearRator()
72+
{
73+
_rator?.Dispose();
74+
_rator = null;
75+
}
76+
77+
/// <summary>
78+
/// Processes the task completion.
79+
/// </summary>
80+
/// <param name="observer">The observer.</param>
81+
/// <param name="decendantTask">The decendant Task.</param>
82+
private void ProcessTaskCompletion(IObserver<T> observer, Task<T> decendantTask)
83+
{
84+
lock (_locker)
85+
{
86+
if (Disposed || decendantTask.IsFaulted || decendantTask.IsCanceled)
87+
{
88+
ClearRator();
89+
if (!Disposed)
90+
{
91+
observer.OnError((decendantTask.Exception == null ? new OperationCanceledException() : decendantTask.Exception.InnerException)!);
92+
}
93+
}
94+
else
95+
{
96+
observer.OnNext(decendantTask.Result);
97+
if (--_outstanding == 0 && _rator == null)
98+
{
99+
observer.OnCompleted();
100+
}
101+
else
102+
{
103+
PullNextTask(observer);
104+
}
105+
}
106+
}
107+
}
108+
109+
/// <summary>
110+
/// Pulls the next task.
111+
/// </summary>
112+
/// <param name="observer">The observer.</param>
113+
private void PullNextTask(IObserver<T> observer)
114+
{
115+
lock (_locker)
116+
{
117+
if (Disposed)
118+
{
119+
ClearRator();
120+
}
121+
122+
if (_rator == null)
123+
{
124+
return;
125+
}
126+
127+
if (!_rator.MoveNext())
128+
{
129+
ClearRator();
130+
if (_outstanding == 0)
131+
{
132+
observer.OnCompleted();
133+
}
134+
135+
return;
136+
}
137+
138+
_outstanding++;
139+
_rator?.Current?.ContinueWith(ant => ProcessTaskCompletion(observer, ant));
140+
}
141+
}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
2+
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System.Collections.Generic;
6+
7+
namespace ReactiveMarbles.Extensions.Internal;
8+
9+
internal static class EnumerableIList
10+
{
11+
/// <summary>
12+
/// Creates the specified list.
13+
/// </summary>
14+
/// <typeparam name="T">The type.</typeparam>
15+
/// <param name="list">The list.</param>
16+
/// <returns>Enumerable IList.</returns>
17+
public static EnumerableIList<T> Create<T>(IList<T> list) => new(list);
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
2+
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System.Collections;
6+
using System.Collections.Generic;
7+
8+
namespace ReactiveMarbles.Extensions.Internal;
9+
10+
internal readonly struct EnumerableIList<T>(IList<T> list) : IEnumerableIList<T>, IList<T>
11+
{
12+
public static EnumerableIList<T> Empty { get; }
13+
14+
/// <inheritdoc />
15+
public int Count => list.Count;
16+
17+
/// <inheritdoc />
18+
public bool IsReadOnly => list.IsReadOnly;
19+
20+
/// <inheritdoc />
21+
public T this[int index]
22+
{
23+
get => list[index];
24+
set => list[index] = value;
25+
}
26+
27+
public static implicit operator EnumerableIList<T>(List<T> list) => new(list);
28+
29+
public static implicit operator EnumerableIList<T>(T[] array) => new(array);
30+
31+
public EnumeratorIList<T> GetEnumerator() => new(list);
32+
33+
/// <inheritdoc />
34+
public void Add(T item) => list.Add(item);
35+
36+
/// <inheritdoc />
37+
public void Clear() => list.Clear();
38+
39+
/// <inheritdoc />
40+
public bool Contains(T item) => list.Contains(item);
41+
42+
/// <inheritdoc />
43+
public void CopyTo(T[] array, int arrayIndex) => list.CopyTo(array, arrayIndex);
44+
45+
/// <inheritdoc />
46+
public int IndexOf(T item) => list.IndexOf(item);
47+
48+
/// <inheritdoc />
49+
public void Insert(int index, T item) => list.Insert(index, item);
50+
51+
/// <inheritdoc />
52+
public bool Remove(T item) => list.Remove(item);
53+
54+
/// <inheritdoc />
55+
public void RemoveAt(int index) => list.RemoveAt(index);
56+
57+
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
58+
59+
IEnumerator<T> IEnumerable<T>.GetEnumerator() => GetEnumerator();
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
2+
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System.Collections;
6+
using System.Collections.Generic;
7+
8+
namespace ReactiveMarbles.Extensions.Internal;
9+
10+
internal struct EnumeratorIList<T>(IList<T> list) : IEnumerator<T>
11+
{
12+
private int _index = -1;
13+
14+
public readonly T Current => list[_index];
15+
16+
readonly object? IEnumerator.Current => Current;
17+
18+
public bool MoveNext()
19+
{
20+
_index++;
21+
22+
return _index < list.Count;
23+
}
24+
25+
public readonly void Dispose() => list.Clear();
26+
27+
public void Reset() => _index = -1;
28+
}

src/ReactiveMarbles.Extensions/Internal/Heartbeat.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
2+
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
using System.Collections.Generic;
6+
7+
namespace ReactiveMarbles.Extensions.Internal;
8+
9+
/// <summary>
10+
/// A enumerable that also contains the enumerable list.
11+
/// </summary>
12+
/// <typeparam name="T">The type of items.</typeparam>
13+
internal interface IEnumerableIList<T> : IEnumerable<T>
14+
{
15+
/// <summary>
16+
/// Gets the enumerator.
17+
/// </summary>
18+
/// <returns>The enumerator.</returns>
19+
new EnumeratorIList<T> GetEnumerator();
20+
}

src/ReactiveMarbles.Extensions/Internal/Stale.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2022 ReactiveUI Association Incorporated. All rights reserved.
1+
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
22
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

0 commit comments

Comments
 (0)