Skip to content

Commit df17246

Browse files
authored
Update SubscribeSynchronous and Add SubscribeAsync (#29)
Update Synchronous and Async Subscribe Rename SynchronizeAsync to SynchronizeSynchronous as is blocking to the source. Add SynchronizeAsync as a non blocking version of the same.
1 parent df99448 commit df17246

File tree

4 files changed

+161
-11
lines changed

4 files changed

+161
-11
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ Extensions for concerns found in System.Reactive that make consuming the library
3636
- OnErrorRetry
3737
- TakeUntil
3838
- SyncronizeAsync
39+
- SubscribeAsync
40+
- SyncronizeSynchronous
3941
- SubscribeSynchronous

src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs

+92
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,96 @@ public void SyncronizeAsync_RunsWithAsyncTasksInSubscriptions()
157157
.Should()
158158
.Be(0);
159159
}
160+
161+
/// <summary>
162+
/// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions.
163+
/// </summary>
164+
[Fact]
165+
public void SynchronizeSynchronous_RunsWithAsyncTasksInSubscriptions()
166+
{
167+
// Given, When
168+
var result = 0;
169+
var itterations = 0;
170+
var subject = new Subject<bool>();
171+
using var disposable = subject
172+
.SynchronizeSynchronous()
173+
.Subscribe(async x =>
174+
{
175+
if (x.Value)
176+
{
177+
await Task.Delay(1000);
178+
result++;
179+
}
180+
else
181+
{
182+
await Task.Delay(500);
183+
result--;
184+
}
185+
186+
x.Sync.Dispose();
187+
itterations++;
188+
});
189+
190+
subject.OnNext(true);
191+
subject.OnNext(false);
192+
subject.OnNext(true);
193+
subject.OnNext(false);
194+
subject.OnNext(true);
195+
subject.OnNext(false);
196+
197+
while (itterations < 6)
198+
{
199+
Thread.Yield();
200+
}
201+
202+
// Then
203+
result
204+
.Should()
205+
.Be(0);
206+
}
207+
208+
/// <summary>
209+
/// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions.
210+
/// </summary>
211+
[Fact]
212+
public void SubscribeAsync_RunsWithAsyncTasksInSubscriptions()
213+
{
214+
// Given, When
215+
var result = 0;
216+
var itterations = 0;
217+
var subject = new Subject<bool>();
218+
using var disposable = subject
219+
.SubscribeAsync(async x =>
220+
{
221+
if (x)
222+
{
223+
await Task.Delay(1000);
224+
result++;
225+
}
226+
else
227+
{
228+
await Task.Delay(500);
229+
result--;
230+
}
231+
232+
itterations++;
233+
});
234+
235+
subject.OnNext(true);
236+
subject.OnNext(false);
237+
subject.OnNext(true);
238+
subject.OnNext(false);
239+
subject.OnNext(true);
240+
subject.OnNext(false);
241+
242+
while (itterations < 6)
243+
{
244+
Thread.Yield();
245+
}
246+
247+
// Then
248+
result
249+
.Should()
250+
.Be(0);
251+
}
160252
}

src/ReactiveMarbles.Extensions/Continuation.cs

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ public class Continuation : IDisposable
3030
/// </summary>
3131
public void Dispose()
3232
{
33-
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
3433
Dispose(disposing: true);
3534
GC.SuppressFinalize(this);
3635
}

src/ReactiveMarbles.Extensions/ReactiveExtensions.cs

+67-10
Original file line numberDiff line numberDiff line change
@@ -913,12 +913,8 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
913913
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
914914
/// <param name="source">The source.</param>
915915
/// <returns>An Observable of T and a release mechanism.</returns>
916-
public static IObservable<(T Value, IDisposable Sync)> SynchronizeAsync<T>(this IObservable<T> source) =>
917-
Observable.Create<(T Value, IDisposable Sync)>(observer =>
918-
{
919-
var gate = new object();
920-
return source.Synchronize(gate).Subscribe(item => new Continuation().Lock(item, observer).Wait());
921-
});
916+
public static IObservable<(T Value, IDisposable Sync)> SynchronizeSynchronous<T>(this IObservable<T> source) =>
917+
Observable.Create<(T Value, IDisposable Sync)>(observer => source.Subscribe(item => new Continuation().Lock(item, observer).Wait()));
922918

923919
/// <summary>
924920
/// Subscribes to the specified source synchronously.
@@ -930,7 +926,7 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
930926
/// <param name="onCompleted">The on completed.</param>
931927
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
932928
public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) =>
933-
source.SynchronizeAsync().Subscribe(
929+
source.SynchronizeSynchronous().Subscribe(
934930
async observer =>
935931
{
936932
await onNext(observer.Value);
@@ -948,7 +944,7 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
948944
/// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
949945
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
950946
public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError) =>
951-
source.SynchronizeAsync().Subscribe(
947+
source.SynchronizeSynchronous().Subscribe(
952948
async observer =>
953949
{
954950
await onNext(observer.Value);
@@ -966,7 +962,7 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
966962
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
967963
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is <c>null</c>.</exception>
968964
public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext, Action onCompleted) =>
969-
source.SynchronizeAsync().Subscribe(
965+
source.SynchronizeSynchronous().Subscribe(
970966
async observer =>
971967
{
972968
await onNext(observer.Value);
@@ -982,13 +978,74 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
982978
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
983979
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
984980
public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Func<T, Task> onNext) =>
985-
source.SynchronizeAsync().Subscribe(
981+
source.SynchronizeSynchronous().Subscribe(
986982
async observer =>
987983
{
988984
await onNext(observer.Value);
989985
observer.Sync.Dispose();
990986
});
991987

988+
/// <summary>
989+
/// Synchronizes the asynchronous operations in downstream operations.
990+
/// Use SubscribeSynchronus instead for a simpler version.
991+
/// Call Sync.Dispose() to release the lock in the downstream methods.
992+
/// </summary>
993+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
994+
/// <param name="source">The source.</param>
995+
/// <returns>An Observable of T and a release mechanism.</returns>
996+
public static IObservable<(T Value, IDisposable Sync)> SynchronizeAsync<T>(this IObservable<T> source) =>
997+
Observable.Create<(T Value, IDisposable Sync)>(observer => source.Select(item => Observable.FromAsync(() => new Continuation().Lock(item, observer))).Concat().Subscribe());
998+
999+
/// <summary>
1000+
/// Subscribes allowing asynchronous operations to be executed without blocking the source.
1001+
/// </summary>
1002+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1003+
/// <param name="source">Observable sequence to subscribe to.</param>
1004+
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1005+
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
1006+
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext) =>
1007+
source.Select(o => Observable.FromAsync(() => onNext(o))).Concat().Subscribe();
1008+
1009+
/// <summary>
1010+
/// Subscribes allowing asynchronous operations to be executed without blocking the source.
1011+
/// </summary>
1012+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1013+
/// <param name="source">Observable sequence to subscribe to.</param>
1014+
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1015+
/// <param name="onCompleted">The on completed.</param>
1016+
/// <returns>
1017+
/// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1018+
/// </returns>
1019+
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action onCompleted) =>
1020+
source.Select(o => Observable.FromAsync(() => onNext(o))).Concat().Subscribe(_ => { }, onCompleted);
1021+
1022+
/// <summary>
1023+
/// Subscribes allowing asynchronous operations to be executed without blocking the source.
1024+
/// </summary>
1025+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1026+
/// <param name="source">Observable sequence to subscribe to.</param>
1027+
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1028+
/// <param name="onError">The on error.</param>
1029+
/// <returns>
1030+
/// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1031+
/// </returns>
1032+
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError) =>
1033+
source.Select(o => Observable.FromAsync(() => onNext(o))).Concat().Subscribe(_ => { }, onError);
1034+
1035+
/// <summary>
1036+
/// Subscribes allowing asynchronous operations to be executed without blocking the source.
1037+
/// </summary>
1038+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1039+
/// <param name="source">Observable sequence to subscribe to.</param>
1040+
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1041+
/// <param name="onError">The on error.</param>
1042+
/// <param name="onCompleted">The on completed.</param>
1043+
/// <returns>
1044+
/// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1045+
/// </returns>
1046+
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) =>
1047+
source.Select(o => Observable.FromAsync(() => onNext(o))).Concat().Subscribe(_ => { }, onError, onCompleted);
1048+
9921049
private static void FastForEach<T>(IObserver<T> observer, IEnumerable<T> source)
9931050
{
9941051
if (source is List<T> fullList)

0 commit comments

Comments
 (0)