Skip to content

Commit 23a2ae9

Browse files
authored
Merge pull request #4 from fluentassertions/fix-unit-test
fix unit test
2 parents 89fa586 + 1d55f2e commit 23a2ae9

File tree

3 files changed

+97
-49
lines changed

3 files changed

+97
-49
lines changed

Src/FluentAssertions.Reactive/FluentTestObserver.cs

+18-18
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ namespace FluentAssertions.Reactive
1515
/// <typeparam name="TPayload"></typeparam>
1616
public class FluentTestObserver<TPayload> : IObserver<TPayload>, IDisposable
1717
{
18-
private readonly IDisposable subscription;
19-
private readonly IScheduler observeScheduler;
20-
private readonly RollingReplaySubject<Recorded<Notification<TPayload>>> rollingReplaySubject = new RollingReplaySubject<Recorded<Notification<TPayload>>>();
18+
private readonly IDisposable _subscription;
19+
private readonly IScheduler _observeScheduler;
20+
private readonly RollingReplaySubject<Recorded<Notification<TPayload>>> _rollingReplaySubject = new RollingReplaySubject<Recorded<Notification<TPayload>>>();
2121

2222
/// <summary>
2323
/// The observable which is observed by this instance
@@ -27,13 +27,13 @@ public class FluentTestObserver<TPayload> : IObserver<TPayload>, IDisposable
2727
/// <summary>
2828
/// The stream of recorded <see cref="Notification{T}"/>s
2929
/// </summary>
30-
public IObservable<Recorded<Notification<TPayload>>> RecordedNotificationStream => rollingReplaySubject.AsObservable();
30+
public IObservable<Recorded<Notification<TPayload>>> RecordedNotificationStream => _rollingReplaySubject.AsObservable();
3131

3232
/// <summary>
3333
/// The recorded <see cref="Notification{T}"/>s
3434
/// </summary>
3535
public IEnumerable<Recorded<Notification<TPayload>>> RecordedNotifications =>
36-
rollingReplaySubject.GetSnapshot();
36+
_rollingReplaySubject.GetSnapshot();
3737

3838
/// <summary>
3939
/// The recorded messages
@@ -64,8 +64,8 @@ public class FluentTestObserver<TPayload> : IObserver<TPayload>, IDisposable
6464
public FluentTestObserver(IObservable<TPayload> subject)
6565
{
6666
Subject = subject;
67-
observeScheduler = new EventLoopScheduler();
68-
subscription = new CompositeDisposable(); subject.ObserveOn(observeScheduler).Subscribe(this);
67+
_observeScheduler = new EventLoopScheduler();
68+
_subscription = subject.ObserveOn(_observeScheduler).Subscribe(this);
6969
}
7070

7171
/// <summary>
@@ -76,8 +76,8 @@ public FluentTestObserver(IObservable<TPayload> subject)
7676
public FluentTestObserver(IObservable<TPayload> subject, IScheduler scheduler)
7777
{
7878
Subject = subject;
79-
observeScheduler = scheduler;
80-
subscription = subject.ObserveOn(scheduler).Subscribe(this);
79+
_observeScheduler = scheduler;
80+
_subscription = subject.ObserveOn(scheduler).Subscribe(this);
8181
}
8282

8383
/// <summary>
@@ -88,35 +88,35 @@ public FluentTestObserver(IObservable<TPayload> subject, IScheduler scheduler)
8888
public FluentTestObserver(IObservable<TPayload> subject, TestScheduler testScheduler)
8989
{
9090
Subject = subject;
91-
observeScheduler = testScheduler;
92-
subscription = subject.ObserveOn(Scheduler.CurrentThread).Subscribe(this);
91+
_observeScheduler = testScheduler;
92+
_subscription = subject.ObserveOn(Scheduler.CurrentThread).Subscribe(this);
9393
}
9494

9595
/// <summary>
9696
/// Clears the recorded notifications and messages as well as the recorded notifications stream buffer
9797
/// </summary>
98-
public void Clear() => rollingReplaySubject.Clear();
98+
public void Clear() => _rollingReplaySubject.Clear();
9999

100100
/// <inheritdoc />
101101
public void OnNext(TPayload value)
102102
{
103-
rollingReplaySubject.OnNext(
104-
new Recorded<Notification<TPayload>>(observeScheduler.Now.UtcTicks, Notification.CreateOnNext(value)));
103+
_rollingReplaySubject.OnNext(
104+
new Recorded<Notification<TPayload>>(_observeScheduler.Now.UtcTicks, Notification.CreateOnNext(value)));
105105
}
106106

107107
/// <inheritdoc />
108108
public void OnError(Exception exception) =>
109-
rollingReplaySubject.OnNext(new Recorded<Notification<TPayload>>(observeScheduler.Now.UtcTicks, Notification.CreateOnError<TPayload>(exception)));
109+
_rollingReplaySubject.OnNext(new Recorded<Notification<TPayload>>(_observeScheduler.Now.UtcTicks, Notification.CreateOnError<TPayload>(exception)));
110110

111111
/// <inheritdoc />
112112
public void OnCompleted() =>
113-
rollingReplaySubject.OnNext(new Recorded<Notification<TPayload>>(observeScheduler.Now.UtcTicks, Notification.CreateOnCompleted<TPayload>()));
113+
_rollingReplaySubject.OnNext(new Recorded<Notification<TPayload>>(_observeScheduler.Now.UtcTicks, Notification.CreateOnCompleted<TPayload>()));
114114

115115
/// <inheritdoc />
116116
public void Dispose()
117117
{
118-
subscription?.Dispose();
119-
rollingReplaySubject?.Dispose();
118+
_subscription?.Dispose();
119+
_rollingReplaySubject?.Dispose();
120120
}
121121

122122
/// <summary>
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,100 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Reactive.Disposables;
34
using System.Reactive.Linq;
45
using System.Reactive.Subjects;
56

67
namespace FluentAssertions.Reactive
78
{
89
/// <summary>
9-
/// Clearable <see cref="ReplaySubject{T}"/> taken from James World: https://stackoverflow.com/a/28945444/4340541
10+
/// Clearable <see cref="ReplaySubject{T}"/> taken from James World: https://gist.github.com/james-world/c46f09f32e2d4f338b07
1011
/// </summary>
1112
/// <typeparam name="T"></typeparam>
12-
public class RollingReplaySubject<T> : ISubject<T>, IDisposable
13+
public class RollingReplaySubject
1314
{
14-
private readonly ReplaySubject<IObservable<T>> subjects;
15-
private readonly IObservable<T> concatenatedSubjects;
16-
private ISubject<T> currentSubject;
15+
protected class NopSubject<TSource> : ISubject<TSource>
16+
{
17+
public static readonly NopSubject<TSource> Default = new NopSubject<TSource>();
18+
19+
public void OnCompleted()
20+
{
21+
}
22+
23+
public void OnError(Exception error)
24+
{
25+
}
26+
27+
public void OnNext(TSource value)
28+
{
29+
}
30+
31+
public IDisposable Subscribe(IObserver<TSource> observer)
32+
{
33+
return Disposable.Empty;
34+
}
35+
}
36+
}
37+
38+
public class RollingReplaySubject<TSource> : RollingReplaySubject, ISubject<TSource>
39+
{
40+
private readonly ReplaySubject<IObservable<TSource>> _subjects;
41+
private readonly IObservable<TSource> _concatenatedSubjects;
42+
private ISubject<TSource> _currentSubject;
43+
private readonly object _gate = new object();
1744

1845
public RollingReplaySubject()
1946
{
20-
subjects = new ReplaySubject<IObservable<T>>(1);
21-
concatenatedSubjects = subjects.Concat();
22-
currentSubject = new ReplaySubject<T>();
23-
subjects.OnNext(currentSubject);
47+
_subjects = new ReplaySubject<IObservable<TSource>>(1);
48+
_concatenatedSubjects = _subjects.Concat();
49+
_currentSubject = new ReplaySubject<TSource>();
50+
_subjects.OnNext(_currentSubject);
2451
}
25-
52+
2653
public void Clear()
2754
{
28-
currentSubject.OnCompleted();
29-
currentSubject = new ReplaySubject<T>();
30-
subjects.OnNext(currentSubject);
55+
lock (_gate)
56+
{
57+
_currentSubject.OnCompleted();
58+
_currentSubject = new ReplaySubject<TSource>();
59+
_subjects.OnNext(_currentSubject);
60+
}
3161
}
3262

33-
public void OnNext(T value) => currentSubject.OnNext(value);
63+
public void OnNext(TSource value)
64+
{
65+
lock (_gate)
66+
{
67+
_currentSubject.OnNext(value);
68+
}
69+
}
3470

35-
public void OnError(Exception error) => currentSubject.OnError(error);
71+
public void OnError(Exception error)
72+
{
73+
lock (_gate)
74+
{
75+
_currentSubject.OnError(error);
76+
_currentSubject = NopSubject<TSource>.Default;
77+
}
78+
}
3679

3780
public void OnCompleted()
3881
{
39-
currentSubject.OnCompleted();
40-
subjects.OnCompleted();
41-
// a quick way to make the current ReplaySubject unreachable
42-
// except to in-flight observers, and not hold up collection
43-
currentSubject = new Subject<T>();
82+
lock (_gate)
83+
{
84+
_currentSubject.OnCompleted();
85+
_subjects.OnCompleted();
86+
_currentSubject = NopSubject<TSource>.Default;
87+
}
4488
}
4589

46-
public IDisposable Subscribe(IObserver<T> observer) => concatenatedSubjects.Subscribe(observer);
90+
public IDisposable Subscribe(IObserver<TSource> observer)
91+
{
92+
return _concatenatedSubjects.Subscribe(observer);
93+
}
4794

48-
public IEnumerable<T> GetSnapshot()
95+
public IEnumerable<TSource> GetSnapshot()
4996
{
50-
var snapshot = new List<T>();
97+
var snapshot = new List<TSource>();
5198
using (this.Subscribe(item => snapshot.Add(item)))
5299
{
53100
// Deliberately empty; subscribing will add everything to the list.
@@ -58,7 +105,7 @@ public IEnumerable<T> GetSnapshot()
58105
public void Dispose()
59106
{
60107
OnCompleted();
61-
subjects?.Dispose();
108+
_subjects?.Dispose();
62109
}
63110
}
64111
}

Tests/FluentAssertions.Reactive.Specs/ReactiveAssertionSpecs.cs

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Linq;
34
using System.Reactive;
45
using System.Reactive.Linq;
@@ -50,20 +51,20 @@ public void When_the_expected_number_of_notifications_where_not_pushed_it_should
5051

5152
// assert a single notification
5253
// Act
53-
Action act = () => observer.Should().Push(1, TimeSpan.Zero);
54+
Action act = () => observer.Should().Push(1, TimeSpan.FromMilliseconds(1));
5455
// Assert
5556
act.Should().Throw<XunitException>().WithMessage(
56-
$"Expected observable to push at least 1 notification, but 0 were received within {Formatter.ToString(TimeSpan.Zero)}.");
57+
$"Expected observable to push at least 1 notification, but 0 were received within {Formatter.ToString(TimeSpan.FromMilliseconds(1))}.");
5758
observer.RecordedNotifications.Should().BeEmpty("because no messages have been pushed");
5859

5960
// assert multiple notifications
6061
scheduler.AdvanceTo(250);
6162

6263
// Act
63-
act = () => observer.Should().Push(3, TimeSpan.Zero);
64+
act = () => observer.Should().Push(3, TimeSpan.FromMilliseconds(1));
6465
// Assert
6566
act.Should().Throw<XunitException>().WithMessage(
66-
$"Expected observable to push at least 3 notifications, but 2 were received within {Formatter.ToString(TimeSpan.Zero)}.");
67+
$"Expected observable to push at least 3 notifications, but 2 were received within {Formatter.ToString(TimeSpan.FromMilliseconds(1))}.");
6768
observer.RecordedNotifications.Should().BeEquivalentTo(observable.Messages.Take(2));
6869
}
6970

@@ -117,7 +118,7 @@ public void When_the_observable_is_expected_to_fail_but_does_not_it_should_throw
117118
using var observer = observable.Observe();
118119

119120
// Act
120-
Action act = () => observer.Should().Throw<ArgumentException>(TimeSpan.Zero);
121+
Action act = () => observer.Should().Throw<ArgumentException>(TimeSpan.FromMilliseconds(1));
121122

122123
// Assert
123124
act.Should().Throw<XunitException>().WithMessage(
@@ -151,10 +152,10 @@ public void When_the_observable_is_expected_to_complete_but_does_not_it_should_t
151152
using var observer = observable.Observe();
152153

153154
// Act
154-
Action act = () => observer.Should().Complete(TimeSpan.Zero);
155+
Action act = () => observer.Should().Complete(TimeSpan.FromMilliseconds(1));
155156
// Assert
156157
act.Should().Throw<XunitException>().WithMessage(
157-
$"Expected observable to complete within {Formatter.ToString(TimeSpan.Zero)}, but it did not.");
158+
$"Expected observable to complete within {Formatter.ToString(TimeSpan.FromMilliseconds(1))}, but it did not.");
158159
observer.Error.Should().BeNull();
159160
}
160161

0 commit comments

Comments
 (0)