Description
Background and motivation
Last year it seemed like LINQ extensions for IAsyncEnumerable
would not make it into .NET, so I tried using the existing System.Linq.Async
package but I found that it would not cover my use case for IAsyncEnumerable
. Since then I have been implementing LINQ operators for IAsyncEnumerable
in my project AsyncAssortments.
Looking through recent PR for System.Linq.AsyncEnumerable
, I noticed that my library still covers many more uses for IAsyncEnumerable
that I think could be very helpful when this officially ships with .NET. Here I would like to outline what those use cases are, and how they can be handled with minimal additional API surface.
What is an IAsyncEnumerable
?
So far, it seems like .NET, Ix.NET, and the new System.Linq.AsyncEnumerable
PR all treat IAsyncEnumerable
as modeling a linear sequence that could potentially have a delay between elements. This is great for things like retrieving records from a database, but there is also a very common pattern involving IEnumerable
and async/await
that does not fit this model.
Often you have a regular IEnumerable<T>
, and want to apply a series of transformations to the elements concurrently (each item is processed at the same time), where each transformation is an async
function. Right now that looks something like this:
var items = new[] { 1, 2, 3, 4, 5 };
var step1 = await Task.WhenAll(items.Select(x => Process1Async(x)));
var step2 = await Task.WhenAll(items.Select(x => Process2Async(x)));
This very unwieldy, and this example only involved Select
operators. If you have other transformations it will get even more unreadable. It's also inefficient, because all of the first transformations must finish before any of the second start. Ideally, we could write this:
var results = new[] { 1, 2, 3, 4, 5 }
.ToSomeConcurrentAsyncCollection()
.AsyncSelect(async => await Process1Async(x))
.AsyncSelect(async => await Process2Async(x));
This is simpler, cleaner, and it's easy to add more operators. Interestingly, if you think about what interface is required to support async
transformations that execute concurrently like this, all it takes is the single method defined in IAsyncEnumerable
.
Just like an IEnumerable
can be used to model different collection types, IAsyncEnumerable
can model either a linear sequence of delayed elements, or a collection of tasks executing concurrently and yielding their results.
The problem
Like IEnumerable
, IAsyncEnumerable
should ship with LINQ operators that are compatible with many different underlying collections. Unfortunately, the implementation of operators like AsyncSelect
above actually changes depending on if we're dealing with a sequence or a set of concurrent tasks.
For a sequence, we want to enumerate over the sequence and await the selector as we go, but for a set of concurrent tasks we want to enumerate the sequence and start the selectors in a non-awaiting manner so they can execute at the same time.
This means we actually need different operator implementations depending on the semantics we want, and we need a way of telling the operators which implementation to use.
API Proposal
The proposal: IScheduledAsyncEnumerable
The core of this proposal is to add a new interface IScheduledAsyncEnumerable
, which represents an IAsyncEnumerable
that prefers its async operators to execute in a particular way. There are five options for executing async operators:
- Sequential - This is what the
System.Linq.AsyncEnumerable
operators do right now. The sequence is enumerated, and async operators are awaited during the enumeration one at a time. - ConcurrentOrdered - Async operators are allowed to execute concurrently (at the same time but all on the original thread). The resulting sequence is gauranteed to be in the same order as the original.
- ConcurrentUnordered - Async operators are allowed to execute concurrently (at the same time but all on the original thread). The resulting sequence yields items as soon as they are available.
- ParallelOrdered - Async operators are allowed to execute in parallel (run on the threadpool with
Task.Run
). The resulting sequence is gauranteed to be in the same order as the original. - ParallelUnordered - Async operators are allowed to execute in parallel (run on the threadpool with
Task.Run
). The resulting sequence yields items as soon as they are available.
namespace System.Linq;
public interface IScheduledAsyncEnumerable<out T> : IAsyncEnumerable<T> {
public AsyncEnumerableScheduleMode ScheduleMode { get; }
}
public enum AsyncEnumerableScheduleMode {
Sequential,
ConcurrentOrdered,
ConcurrentUnordered,
ParallelOrdered,
ParallelUnordered
}
public static partial class AsyncEnumerable {
public static IScheduledAsyncEnumerable<TSource> AsConcurrent<TSource>(this IAsyncEnumerable<TSource> source, bool preserveOrder = true);
public static IScheduledAsyncEnumerable<TSource> AsParallel<TSource>(this IAsyncEnumerable<TSource> source, bool preserveOrder = true);
public static IScheduledAsyncEnumerable<TSource> AsSequential<TSource>(this IAsyncEnumerable<TSource> source);
}
Any LINQ operators defined for IAsyncEnumerable
would have implementations that respect this scheduling preference if called on a IScheduledAsyncEnumerable
. Luckily, I already have all of this implemented 🚀
One thing that's important: The default behavior for IAsyncEnumerable
and its LINQ operators is sequential, just like they are right now. If the user wants concurrent or parallel execution they have to opt in manually with AsConcurrent()
and AsParallel()
.
Operator Naming
For IAsyncEnumerable
there are two versions of the LINQ operators: one that execute synchronously and one that executes asynchronously. I think for clarity, discoverability, and type inference purposes it would be helpful to distinguish these, like Ix.NET did with Select
and SelectAwait
for example.
However .NET doesn't use Await
as a method suffix, so instead I propose adding AsyncXXX
as a prefix. This follows the naming conventions used elsewhere like in IAsyncEnumerable
, where Async
describes the method itself rather than the return type.
These are not new APIs, but simply renaming the operators already in the System.Linq.AsyncEnumerable
PR to use the naming conventions above:
namespace System.Linq;
public static partial class AsyncEnumerable
{
public static IAsyncEnumerable<KeyValuePair<TKey, TAccumulate>> AsyncAggregateBy<TSource, TKey, TAccumulate>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, Func<TKey, CancellationToken, ValueTask<TAccumulate>> seedSelector, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> func, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<KeyValuePair<TKey, TAccumulate>> AsyncAggregateBy<TSource, TKey, TAccumulate>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> func, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<KeyValuePair<TKey, int>> AsyncCountBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<TSource> AsyncDistinctBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncExceptBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TKey> second, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<IGrouping<TKey, TSource>> AsyncGroupBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<IGrouping<TKey, TElement>> AsyncGroupBy<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, Func<TSource, CancellationToken, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncGroupBy<TSource, TKey, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, Func<TKey, IEnumerable<TSource>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncGroupBy<TSource, TKey, TElement, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, Func<TSource, CancellationToken, ValueTask<TElement>> elementSelector, Func<TKey, IEnumerable<TElement>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncIntersectBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TKey> second, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncLeftJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner?, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncOrderByDescending<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncOrderBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncRightJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter?, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IEnumerable<TResult>>> selector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IEnumerable<TResult>>> selector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<IEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, ValueTask<TResult>> resultSelector);
public static IAsyncEnumerable<TResult> AsyncSelect<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<TResult>> selector);
public static IAsyncEnumerable<TResult> AsyncSelect<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TResult>> selector);
public static IAsyncEnumerable<TSource> AsyncSkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncSkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncTakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncTakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate);
public static IOrderedAsyncEnumerable<TSource> AsyncThenByDescending<TSource, TKey>(this IOrderedAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncThenBy<TSource, TKey>(this IOrderedAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncUnionBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncWhere<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncWhere<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate);
}
Optional CancellationToken
Right now all the async operators take selectors and predicates that require a CancellationToken
because it was required for overload resolution with the non-async operators. Now that we renamed them with the AsyncXXX
prefix, we are free to add overloads that don't require a CancellationToken
:
namespace System.Linq;
public static partial class AsyncEnumerable
{
public static IAsyncEnumerable<KeyValuePair<TKey, TAccumulate>> AsyncAggregateBy<TSource, TKey, TAccumulate>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TKey, ValueTask<TAccumulate>> seedSelector, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<KeyValuePair<TKey, TAccumulate>> AsyncAggregateBy<TSource, TKey, TAccumulate>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> func, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<KeyValuePair<TKey, int>> AsyncCountBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? keyComparer = null) where TKey : notnull;
public static IAsyncEnumerable<TSource> AsyncDistinctBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncExceptBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TKey> second, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<IGrouping<TKey, TSource>> AsyncGroupBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<IGrouping<TKey, TElement>> AsyncGroupBy<TSource, TKey, TElement>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncGroupBy<TSource, TKey, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TKey, IEnumerable<TSource>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncGroupBy<TSource, TKey, TElement, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<TKey, IEnumerable<TElement>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncIntersectBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TKey> second, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncLeftJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner?, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncOrderByDescending<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncOrderBy<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncRightJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter?, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IEnumerable<TResult>>> selector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IEnumerable<TResult>>> selector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<IEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector);
public static IAsyncEnumerable<TResult> AsyncSelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, ValueTask<TResult>> resultSelector);
public static IAsyncEnumerable<TResult> AsyncSelect<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<TResult>> selector);
public static IAsyncEnumerable<TResult> AsyncSelect<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector);
public static IAsyncEnumerable<TSource> AsyncSkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncSkipWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncTakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncTakeWhile<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate);
public static IOrderedAsyncEnumerable<TSource> AsyncThenByDescending<TSource, TKey>(this IOrderedAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IOrderedAsyncEnumerable<TSource> AsyncThenBy<TSource, TKey>(this IOrderedAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncUnionBy<TSource, TKey>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey>? comparer = null);
public static IAsyncEnumerable<TSource> AsyncWhere<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate);
public static IAsyncEnumerable<TSource> AsyncWhere<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate);
}
API Usage
The upshot
The benefit from all of this is enormous: Now we can use IAsyncEnumerable
as the backbone for easy, flexible, and performant task pipelines. Consider the following code with the current LINQ operators:
// 600ms
var results = await new[] { 300, 200, 100 }
.ToAsyncEnumerable()
.Select(async (x, _) => { await Task.Delay(x); return x; })
.ToListAsync();
Here the Task.Delay
is mimicking a long-running operation. As it stands, this code will take 600ms to execute. Under this proposal, we can turn this into a pipeline with a single line of code:
// 300ms
var results = await new[] { 300, 200, 100 }
.ToAsyncEnumerable()
.AsConcurrent(preserveOrder: false)
.AsyncSelect(async x => { await Task.Delay(x); return x; })
.ToListAsync();
Now, the async operators will run concurrently and this will finish in 300ms, the time of the longest-running item. And even better, we can chain operations, mix in normal synchronous LINQ operators, and items will move from one step of the pipeline to the next as soon as they are ready. The final list will be [100, 200, 300], and if we had called FirstAsync()
it would have finished in only 100ms.
This will vastly simplify any code that deals with both collections and tasks, because Task.WhenAll
, List<Task<T>>
, and Task<IEnumerable<T>>
will no longer be needed.
This proposal will greatly expand the usefulness of IAsyncEnumerable
, and it will allow developers to easily manipulate tasks running on collections in a way that's not possible in .NET currently.
Alternative Designs
One thing that has to be decided is if we want to support schedulers for ParallelOrdered
and ParallelUnordered
. It's possible that the user would want to provide their own threadpool to run the async operators on, and if so then we will need a way to expose that. I'm not sure if this is needed or worth the extra complexity however
Risks
Since System.Linq.AsyncEnumerable
hasn't shipped yet, there aren't risks to existing code. However this proposal will increase the complexity of all the operators which will have to be accounted for in testing