Description
Question
What is the context of your question or problem?
This is more of a performance-related question, if anything.
What is the question or problem you try to solve?
Would it be worth it to move away from locks in the implementation of ForkJoin
, seeing as that would only benefit large quantities of observables (> 1000) ?
What were the (original) requirements you tried to solve?
I have an application that is producing dozens of thousands (usually hundreds) of IObservable
and need to execute these observables in parallel. I however have to wait for all of them to complete before moving on.
What have you tried so far, what code have you written so far?
I have written a (possibly crude) lock-free implementation of ForkJoinObservable
.
sealed class ForkJoinObservable<TSource> : ObservableBase<TSource[]>
{
private readonly IObservable<TSource>[] _sources;
public ForkJoinObservable(IObservable<TSource>[] sources)
{
_sources = sources;
}
protected override IDisposable SubscribeCore(IObserver<TSource[]> observer)
{
var count = _sources.Length;
if (count == 0)
{
observer.OnCompleted();
return Disposable.Empty;
}
var group = new CompositeDisposable(count);
var finished = false;
var hasResults = new bool[count];
var atomicRequiredCompletedCount = count;
var results = new TSource[count];
Parallel.For(0, count, (index, state) =>
{
var currentIndex = index;
var source = _sources[index];
group.Add(source.Subscribe(
value => {
if (Volatile.Read(ref finished))
return;
hasResults[currentIndex] = true;
results[currentIndex] = value;
},
error => {
Volatile.Write(ref finished, true);
observer.OnError(error);
group.Dispose();
},
() => {
if (Volatile.Read(ref finished))
return;
if (!hasResults[currentIndex])
{
observer.OnCompleted();
return;
}
if (Interlocked.Decrement(ref atomicRequiredCompletedCount) > 0)
return;
Volatile.Write(ref finished, true);
observer.OnNext(results);
observer.OnCompleted();
}));
});
return group;
}
}
I have also benchmarked it:
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19042
Intel Core i7-7700HQ CPU 2.80GHz (Kaby Lake), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=5.0.201
[Host] : .NET Core 5.0.4 (CoreCLR 5.0.421.11614, CoreFX 5.0.421.11614), X64 RyuJIT
DefaultJob : .NET Core 5.0.4 (CoreCLR 5.0.421.11614, CoreFX 5.0.421.11614), X64 RyuJIT
| Method | ObservableCount | Mean | Error | StdDev | Median |
|-------------------- |---------------- |----------------:|--------------:|--------------:|----------------:|
| 'Rx.NET ForkJoin()' | 10 | 15.70 us | 0.300 us | 0.321 us | 15.71 us |
| 'Custom ForkJoin()' | 10 | 18.65 us | 0.361 us | 0.494 us | 18.68 us |
| 'Rx.NET ForkJoin()' | 100 | 157.87 us | 3.004 us | 3.214 us | 157.94 us |
| 'Custom ForkJoin()' | 100 | 132.38 us | 1.227 us | 1.148 us | 132.64 us |
| 'Rx.NET ForkJoin()' | 1000 | 1,625.10 us | 31.486 us | 72.344 us | 1,604.59 us |
| 'Custom ForkJoin()' | 1000 | 1,262.97 us | 23.834 us | 25.502 us | 1,262.70 us |
| 'Rx.NET ForkJoin()' | 10000 | 53,397.78 us | 1,164.106 us | 3,414.120 us | 53,030.44 us |
| 'Custom ForkJoin()' | 10000 | 17,382.86 us | 604.506 us | 1,734.441 us | 16,901.41 us |
| 'Rx.NET ForkJoin()' | 100000 | 3,517,157.55 us | 62,678.544 us | 52,339.423 us | 3,531,034.90 us |
| 'Custom ForkJoin()' | 100000 | 214,621.17 us | 3,792.702 us | 3,547.695 us | 215,474.57 us |
public class ForkJoinBenchmark
{
[Params(1, 10, 100, 1000, 10000)]
public int ObservableCount { get; set; }
[Benchmark(Description = "Rx.NET ForkJoin()")]
public async Task<long> RxForkJoin()
{
var value = await Enumerable.Range(1, ObservableCount)
.Select(v => Observable.Start(() => (long) v, TaskPoolScheduler.Default))
.ForkJoin()
.Select(values => values.Sum());
Debug.Assert(value == ObservableCount * (ObservableCount + 1) / 2);
return value;
}
[Benchmark(Description = "Custom ForkJoin()")]
public async Task<long> CustomForkJoin()
{
var value = await new ForkJoinObservable<long>(
Enumerable.Range(1, ObservableCount)
.Select(v => Observable.Start(() => (long) v, TaskPoolScheduler.Default)).ToArray()
).Select(values => values.Sum());
Debug.Assert(value == ObservableCount * (ObservableCount + 1) / 2);
return value;
}
}
Debug assertions are included to make sure I don't run into parallelism issues and miscount. This is in line with what I noticed in a real-world test, where I have ~725.000 observables that need to be waited for, where it actually takes a few minutes for ForkJoin()
to process.
An argument could be made that observables are not the best solution for this specific problem .... but it is quite convenient.