Skip to content

4.x: Intercepting the chaining of operators #540

Closed
@akarnokd

Description

@akarnokd

There is a plan to inline the IQueryLanguage interface and QueryLanguage class into the Observable static class (See #526 & #539) to save on an indirection and interface lookup. The IQueryLanguage has been always internal so in theory, nothing should depend on it or the original indirection.

In practice though, looks like there exist tools such as RxSpy that seem to latch onto this feature so that the original s_impl in every operator could be overridden via a proxy implementation of the IQueryLanguage interface. Obviously, if s_impl is no longer used, this and other similar tools will break.

I'm convinced that the performance benefit of the inlining is quite worth it, but supporting external diagnostic options is also considerable feature requirement.

I can think of two approaches that could re-enable the overriding of the standard implementations, but there are caveats.

1. Keep the IQueryLanguage interface

The IQueryLanguage interface is retained an implementation can be inserted globally that takes precedence. Example (via IQueryLanguageEx for brevity):

namespace System.Reactive.Linq
{
    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        private static IQueryLanguageEx _override;

        /// <summary>
        /// Override the default implementation of operators.
        /// </summary>
        public static IQueryLanguageEx Override
        {
            get { return Volatile.Read(ref _override); }
            set { Interlocked.Exchange(ref _override, value);  }
        }

        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            var ovr = Override;
            if (ovr != null)
            {
                return ovr.Create(iteratorMethod);
            }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            return new AnonymousObservable<TResult>(observer =>
                iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
        }
    }
}

Concerns:

  • Will the property access be inlined, minimizing the overhead of this approach to a simple volatile read and an uncommon branch?
  • This requires a full implementation of the IQueryObservable interface and there is no way to delegate back to the standard implementation publicly.
  • Security concerns about who can hook override a chain.

2. Use type-specific operator-assembly hooks

The alternative, that RxJava also uses is to define function hooks that get called before an implementation is returned, allowing insertion of custom logic and/or complete replacement:

namespace System.Reactive.Linq
{
    /// <summary>
    /// Holds type specific assembly hooks to intercept
    /// the chaining of standard operators.
    /// </summary>
    /// <typeparam name="T">The element type of the observable returned.</typeparam>
    public static class QueryLanguageHooks<T>
    {
        private static Func<IObservable<T>, IObservable<T>> _onAssembly;

        public static Func<IObservable<T>, IObservable<T>> OnAssembly
        {
            get { return Volatile.Read(ref _onAssembly); }
            set { Interlocked.Exchange(ref _onAssembly, value); }
        }
        
        public static IObservable<T> Hook(IObservable<T> source)
        {
            var f = OnAssembly;
            if (f == null)
            {
                return source;
            }
            return f(source);
        }
    }

    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            return QueryLanguageHooks<TResult>.Hook(
                new AnonymousObservable<TResult>(observer =>
                iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted))
            );
        }
    }
}

Concerns:

  • As I understand the struct/class generics, this approach requires the programmer do explicitly define typed callbacks for all possible types used in the host program: QueryLanguageHooks<int>.Hook = intobs -> intobs;
  • Using any standard method call on Observable inside the Func may lead to an infinite recursion.
  • Security concerns about who can hook into a chain.

(None of these is an issue with RxJava because Java currently only supports class-based generics so one can define an Observable<Object> hook only, the default implementations are semi-publicly available and finally there is a Java security manager infrastructure that can limit accesses so we don't have to deal with that at all.)

3. Keep both IQueryLanguage & QueryLanguage but do the override approach

Establish a static path via QueryLanguage.SomeMethod() instead of s_impl.Create:

namespace System.Reactive.Linq
{
    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        private static IQueryLanguageEx _override;

        /// <summary>
        /// Override the default implementation of operators.
        /// </summary>
        public static IQueryLanguageEx Override
        {
            get { return Volatile.Read(ref _override); }
            set { Interlocked.Exchange(ref _override, value);  }
        }

        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            var ovr = Override;
            if (ovr != null)
            {
                return ovr.Create(iteratorMethod);
            }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            return QueryLanguage.Create(iteratorMethod);
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions