Skip to content

Commit bc53bb0

Browse files
authored
Add advanced observable extension methods (#112)
* Add advanced observable extension methods Introduces multiple new extension methods for IObservable<T>, including buffering until idle, conditional scheduler switching, error logging, retry strategies, property observables, throttling, debouncing, async mapping, pairwise emission, and busy-dropping. These additions enhance reactive programming capabilities and provide more flexible stream manipulation options. * Add and expand tests for new ReactiveExtensions methods Added comprehensive unit tests for new extension methods in ReactiveExtensions, including ThrottleFirst, BufferUntilIdle, DropIfBusy, Pairwise, ScanWithInitial, SampleLatest, SwitchIfEmpty, ThrottleDistinct, ToReadOnlyBehavior, ToHotTask, and ToPropertyObservable. Also refactored the Pairwise implementation to use Zip for improved clarity and correctness. * Enable and improve SubscribeSynchronous tests Removed Ignore attributes from SubscribeSynchronous tests and improved completion waiting logic for reliability. Also fixed SynchronizeSynchronous to correctly forward OnError and OnCompleted events to observers. * Update README with new operators and usage examples Added documentation and examples for new operators: ThrottleDistinct, ToHotTask, SampleLatest, SwitchIfEmpty, DropIfBusy, BufferUntilIdle, Pairwise, ScanWithInitial, ToReadOnlyBehavior, and ToPropertyObservable. Updated operator tables and changelog to reflect these additions and fixed SynchronizeSynchronous documentation. * Update README.md * Add ObserveOnIf overloads for reactive conditions Introduces new ObserveOnIf extension methods that accept IObservable<bool> as the condition, allowing dynamic switching of schedulers based on reactive conditions. Corresponding unit tests have been added to verify the new overloads' behavior. * Refactor BufferUntilIdle test to use TestScheduler Replaces Thread.Sleep with TestScheduler in BufferUntilIdle_BuffersUntilIdle test for more reliable and deterministic timing. Also adds missing ReactiveUI.Extensions using directive and minor docstring corrections. * Add scheduler support to BufferUntilIdle extension The BufferUntilIdle extension method now accepts an optional IScheduler parameter, allowing callers to specify the scheduler used for idle timeouts. This provides greater control over timing and thread context, improving flexibility for different reactive scenarios. * Refactor DropIfBusy test to use TaskCompletionSource Replaces Task.Delay with TaskCompletionSource in DropIfBusy_DropsWhenBusy test for more deterministic async control. This change improves test reliability by explicitly controlling when the async action completes. * Add tests for ReplayLastOnSubscribe and DebounceUntil Introduces unit tests for the ReplayLastOnSubscribe and DebounceUntil extension methods, verifying correct replay and debounce behavior. Also simplifies error handler signatures in existing OnErrorRetry tests. * Add scheduler overload to ThrottleDistinct extension Introduced a new ThrottleDistinct overload that accepts an IScheduler parameter, allowing for more precise control in reactive scenarios. Updated the corresponding unit test to use TestScheduler instead of Thread.Sleep for improved test reliability.
1 parent 5214ef2 commit bc53bb0

File tree

3 files changed

+896
-95
lines changed

3 files changed

+896
-95
lines changed

README.md

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,16 @@ Some overloads omitted for brevity.
8888
| Category | Operators |
8989
|----------|-----------|
9090
| Null & Signal | `WhereIsNotNull`, `AsSignal` |
91-
| Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `DebounceImmediate` |
91+
| Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `ThrottleDistinct`, `DebounceImmediate` |
9292
| Inactivity / Liveness | `Heartbeat`, `DetectStale`, `BufferUntilInactive` |
9393
| Error Handling | `CatchIgnore`, `CatchAndReturn`, `OnErrorRetry` (overloads), `RetryWithBackoff` |
9494
| Combining & Aggregation | `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `GetMax`, `GetMin`, `Partition` |
9595
| Logical / Boolean | `Not`, `WhereTrue`, `WhereFalse` |
96-
| Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads) |
96+
| Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads), `ToHotTask` |
9797
| Backpressure | `Conflate` |
98-
| Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil` |
99-
| Buffering | `BufferUntil`, `BufferUntilInactive` |
100-
| Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose` |
98+
| Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil`, `SampleLatest`, `SwitchIfEmpty`, `DropIfBusy` |
99+
| Buffering | `BufferUntil`, `BufferUntilInactive`, `BufferUntilIdle`, `Pairwise`, `ScanWithInitial` |
100+
| Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose`, `ToReadOnlyBehavior`, `ToPropertyObservable` |
101101

102102
---
103103
## Operator Categories & Examples
@@ -128,6 +128,10 @@ var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
128128
// DebounceImmediate: emit first immediately then debounce rest
129129
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
130130
.DebounceImmediate(TimeSpan.FromMilliseconds(250));
131+
132+
// ThrottleDistinct: throttle but only emit when the value actually changes
133+
var source = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(20);
134+
var distinctThrottled = source.ThrottleDistinct(TimeSpan.FromMilliseconds(200));
131135
```
132136

133137
### Inactivity / Liveness
@@ -201,6 +205,11 @@ inputs.SubscribeAsync(async i => await Task.Delay(10));
201205

202206
// Synchronous gate: ensures per-item async completion before next is emitted
203207
a inputs.SubscribeSynchronous(async i => await Task.Delay(25));
208+
209+
// ToHotTask: convert an observable to a Task that starts immediately
210+
var source = Observable.Return(42);
211+
var task = source.ToHotTask();
212+
var result = await task; // 42
204213
```
205214

206215
### Backpressure / Conflation
@@ -217,6 +226,20 @@ var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);
217226

218227
// WaitUntil first match then complete
219228
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);
229+
230+
// SampleLatest: sample the latest value whenever a trigger fires
231+
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
232+
var trigger = Observable.Interval(TimeSpan.FromMilliseconds(300)).Take(3);
233+
var sampled = source.SampleLatest(trigger);
234+
235+
// SwitchIfEmpty: provide a fallback if the source completes without emitting
236+
var empty = Observable.Empty<int>();
237+
var fallback = Observable.Return(42);
238+
var result = empty.SwitchIfEmpty(fallback); // emits 42
239+
240+
// DropIfBusy: drop values if the previous async operation is still running
241+
var inputs = Observable.Range(1, 5);
242+
var processed = inputs.DropIfBusy(async x => { await Task.Delay(200); Console.WriteLine(x); });
220243
```
221244

222245
### Buffering & Transformation
@@ -228,6 +251,18 @@ var frames = chars.BufferUntil('<', '>'); // emits "<a>", "<bc>", "<d>"
228251
// Shuffle arrays in-place
229252
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
230253
var shuffled = arrays.Shuffle();
254+
255+
// BufferUntilIdle: emit a batch when the stream goes quiet
256+
var events = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
257+
var batches = events.BufferUntilIdle(TimeSpan.FromMilliseconds(250));
258+
259+
// Pairwise: emit consecutive pairs
260+
var numbers = Observable.Range(1, 5);
261+
var pairs = numbers.Pairwise(); // emits (1,2), (2,3), (3,4), (4,5)
262+
263+
// ScanWithInitial: scan that always emits the initial value first
264+
var values = Observable.Return(5);
265+
var accumulated = values.ScanWithInitial(10, (acc, x) => acc + x); // emits 10, then 15
231266
```
232267

233268
### Subscription & Side Effects
@@ -259,6 +294,26 @@ ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
259294
// Batch push with OnNext params
260295
var subj = new Subject<int>();
261296
subj.OnNext(1, 2, 3, 4);
297+
298+
// ToReadOnlyBehavior: create a read-only behavior subject
299+
var (observable, observer) = ReactiveExtensions.ToReadOnlyBehavior(10);
300+
observer.OnNext(20); // observable emits 10, then 20
301+
302+
// ToPropertyObservable: observe property changes on INotifyPropertyChanged
303+
public class ViewModel : INotifyPropertyChanged
304+
{
305+
private string _name;
306+
public string Name
307+
{
308+
get => _name;
309+
set { _name = value; PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name))); }
310+
}
311+
public event PropertyChangedEventHandler? PropertyChanged;
312+
}
313+
314+
var vm = new ViewModel();
315+
var nameChanges = vm.ToPropertyObservable(x => x.Name);
316+
vm.Name = "Hello"; // observable emits "Hello"
262317
```
263318

264319
---
@@ -287,7 +342,11 @@ Issues / PRs welcome. Please keep additions dependency–free and focused on bro
287342
- Added async task projection helpers (`SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`).
288343
- Added liveness operators (`Heartbeat`, `DetectStale`, `BufferUntilInactive`).
289344
- Added resilience (`RetryWithBackoff`, expanded `OnErrorRetry` overloads).
290-
- Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`).
345+
- Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`, `ThrottleDistinct`).
346+
- Added buffering and transformation operators (`BufferUntilIdle`, `Pairwise`, `ScanWithInitial`).
347+
- Added filtering and conditional operators (`SampleLatest`, `SwitchIfEmpty`, `DropIfBusy`).
348+
- Added utility operators (`ToReadOnlyBehavior`, `ToHotTask`, `ToPropertyObservable`).
349+
- Fixed `SynchronizeSynchronous` to properly propagate OnError and OnCompleted events.
291350
- Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive.
292351

293352
---

0 commit comments

Comments
 (0)