Skip to content

RFE: Sequential Merge #332

Open
Open
@atifaziz

Description

@atifaziz

Like Merge but merges the observables sequence into a single observable sequence while maintaining the source order of the observables sequence. This is sometimes desirable when using observables to model asynchronous operations that need to maintain order but which can be satisfied concurrently otherwise.

Here's an example of a hypothetical SequentialMerge in action (with a concurrency limit of 3):

var q =
    Observable
        .Range(1, 9)
        .Select(x => Observable.FromAsync(async () =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            return x;
        }))
        .SequentialMerge(3);

The query will complete in approximately 3 seconds with 3 items yielded each second. No matter how you play with the delay or the maximum concurrency argument, the results will always be in the order of the original range (i.e. [1…9]). In contrast, Merge will do the same but with no guarantee of order and will yield each item as soon as it becomes available.

Implementation Proposal

Here's what the implementation (minus optimizations) could look like:

public static IObservable<T> SequentialMerge<T>(this IObservable<IObservable<T>> source, int maxConcurrency) =>
    Observable.Create<T>(observer =>
    {
        var @lock = new object();
        var queue = new Queue<KeyValuePair<int, IObservable<T>>>();
        var items = new T[0];
        var joins = new bool[0];
        var ii = 0;
        var root = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable { root };
        var flights = 0;
        var completed = false;

        var subscribe = (Action<KeyValuePair<int, IObservable<T>>>) null;
        subscribe = observable =>
        {
            var i = observable.Key;
            var subscription = new SingleAssignmentDisposable();
            subscriptions.Add(subscription);
            subscription.Disposable = observable.Value.Subscribe(
                item =>
                {
                    lock (@lock)
                    {
                        var size = Math.Max(items.Length, i + 1);
                        Array.Resize(ref items, size);
                        Array.Resize(ref joins, size);
                        items[i] = item; // remember only the last observation
                        joins[i] = true;
                    }
                },
                onCompleted: () =>
                {
                    subscriptions.Remove(subscription);
                    lock (@lock)
                    {
                        if (ii == i)
                        {
                            // notify consecutive items that have joined
                            for (; ii < items.Length && joins[ii]; ii++)
                                observer.OnNext(items[ii]);
                        }
                        if (queue.Count > 0)
                        {
                            subscribe(queue.Dequeue());
                        }
                        else
                        {
                            flights--;
                            if (completed && flights == 0)
                                observer.OnCompleted();
                        }
                    }
                },
                onError: ex => { lock (@lock) { observer.OnError(ex); } });
        };

        root.Disposable =
            source
                .Select((e, i) => new KeyValuePair<int, IObservable<T>>(i, e))
                .Subscribe(
                    observable =>
                    {
                        lock (@lock)
                        {
                            if (flights < maxConcurrency)
                            {
                                flights++;
                                subscribe(observable);
                            }
                            else
                            {
                                queue.Enqueue(observable);
                            }
                        }
                    },
                    onCompleted: () =>
                    {
                        lock (@lock)
                        {
                            completed = true;
                            if (flights == 0)
                                observer.OnCompleted();
                        }
                    },
                    onError: ex => { lock (@lock) { observer.OnError(ex); } });
            
        return subscriptions;
    });

Notes

The implementation, like ForkJoin, only collects the last element from each observable and therefore somewhat different from Merge. One could then debate whether it's a SequentialMerge or a ForkSequentialJoin. Also, sequential in the name could be confusing as one may be lead to think the execution is sequential. One could use the word ordered instead but that usually evokes a sort order when that's not the case. What's important is the sequential order of the observables.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions