Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement IAsyncEnumerable on CosmosLinqQuery #4355

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

onionhammer
Copy link
Contributor

@onionhammer onionhammer commented Mar 18, 2024

Pull Request Template

Description

This updates CosmosLinqQuery to support IAsyncEnumerable, a new 'AsAsyncEnumerable' extension method, and a test method to utilize the new extension method

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

@adityasa
Copy link
Contributor

    public IEnumerator<T> GetEnumerator()

This should now delegate to the Async implementation (after the L106-L110 check).


Refers to: Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs:104 in 9b513c7. [](commit_id = 9b513c7, deletion_comment = False)

…ts. Dispose feed iterator in GetEnumerator
@bartelink
Copy link
Contributor

How about if it yielded an IAsyncEnumerable<FeedResponse<T>> instead ? My problem with an API like this is that it makes it impossible to dig down and get relevant context like e.g. the request charges incurred.

For instance, I'd be able to use such a slightly lower level mechanism as a building block in wrappers that I find critical in successful usage:

    let private taskEnum (iterator: FeedIterator<'T>) = taskSeq {
        while iterator.HasMoreResults do
            let! response = iterator.ReadNextAsync()
            yield response.Diagnostics.GetClientElapsedTime(), response.RequestCharge, response.Resource }

    let fetch<'T> (desc: string) (container: Container) (queryDefinition: QueryDefinition) = taskSeq {
        if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosQuery.fetch {desc} {query}", desc, queryDefinition.QueryText)
        let sw = System.Diagnostics.Stopwatch.StartNew()
        let iterator = container.GetItemQueryIterator<'T>(queryDefinition)
        let mutable responses, items, totalRtt, totalRu = 0, 0, TimeSpan.Zero, 0.
        try for rtt, rc, response in taskEnum iterator do
                responses <- responses + 1
                totalRu <- totalRu + rc
                totalRtt <- totalRtt + rtt
                for item in response do
                    items <- items + 1
                    yield item
        finally Log.Information("CosmosQuery.fetch {desc} found {count} ({trips}r, {totalRtt:N0}ms) {rc}RU {latency}ms",
                                desc, items, responses, totalRtt.TotalMilliseconds, totalRu, sw.ElapsedMilliseconds) }

Same for GetItemStreamQueryIterator ?

@onionhammer
Copy link
Contributor Author

onionhammer commented Mar 23, 2024

How about if it yielded an IAsyncEnumerable<FeedResponse<T>> instead ? My problem with an API like this is that it makes it impossible to dig down and get relevant context like e.g. the request charges incurred.

You're not forced to use it? And this opens up capabilities for improved integration with other libraries, such as HotChocolate. Enumerating a "FeedResponse" would make observers have to know about the Cosmos SDK in a way that the proposed implementation does not.

You can already do foreach() on a query (it implements IEnumerable) - but it will almost certainly error because of requiring the "allow synchronous" flag, this just adds the IAsyncEnumerable counterpart.

@bartelink
Copy link
Contributor

I am not saying this isnt useful. But exposing IAsyncEnumerable<Response<T> instead of IAsyncEnumerable<T> would be a very useful counterpart as it means you have a way to access all that context info as you inevitably wind up wanting to do, without having to go and program things in an entirely different way.

Not sure if implementing two IAsyncEnumerable interfaces will make a mess; if it did, maybe that could be await foreach(var r in query.Responses<T>())

My point is simply to consider what happens the minute you need to add instrumentation - it's not disssimilar to #692 (comment) - thinking about what happens after someone hits the limits of what is exposed - do they fall off a cliff and have to work out lots of things that are a big jump.

None of this is to invalidate the usefulness of what you're adding; having this in the box is absolutely transformative and a no brainer to add.

@onionhammer
Copy link
Contributor Author

I am not saying this isnt useful. But exposing IAsyncEnumerable<Response<T> instead of IAsyncEnumerable<T> would be a very useful counterpart as it means you have a way to access all that context info as you inevitably wind up wanting to do, without having to go and program things in an entirely different way.

I think this would be useful as an additional extension method to elide some of the iterator ugliness

@dersia
Copy link

dersia commented Mar 29, 2024

@onionhammer

thanks for this! And I appreciate your work here, but this PR would not Close #903 because it is adding IAsyncEnumerable only to the Linq provider. so, please change that in your description or the github bot might accidentally close that issue.

Also I would like to point out that I would love for IAsyncEnumerable to work all the way to the db. what I mean by that is that it would be great if the db would make use of http-streaming and we could use IAsyncEnumerable on that. this would really improve the performance and make this a top api.

having said that, I don't know if the server supports http-streaming yet.

@adityasa
Copy link
Contributor

    public IEnumerator<T> GetEnumerator()

Is this being addressed?


In reply to: 2012904380


Refers to: Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs:104 in 9b513c7. [](commit_id = 9b513c7, deletion_comment = False)

@onionhammer
Copy link
Contributor Author

onionhammer commented Apr 19, 2024

Hi @adityasa I must have missed that one - apologies. I'm not sure if I agree that this is necessary/good; I assume to be DRY?

The code is different enough where it would be annoying to call synchronously, unless you have a better way

            IAsyncEnumerator<T> asyncEnumerator = this.GetAsyncEnumerator();

            while (true)
            {
                ValueTask<bool> moveNextTask = asyncEnumerator.MoveNextAsync();

                if (!moveNextTask.IsCompletedSuccessfully)
                {
                    #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
                    if (!TaskHelper.InlineIfPossible(() => moveNextTask.AsTask(), null).GetAwaiter().GetResult())
                    {
                        break;
                    }
                    #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
                }
                else if (!moveNextTask.Result)
                {
                    break;
                }

                yield return asyncEnumerator.Current;
            }

            ValueTask disposeTask = asyncEnumerator.DisposeAsync();

            if (!disposeTask.IsCompletedSuccessfully)
            {
                #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
                TaskHelper.InlineIfPossible(() => asyncEnumerator.DisposeAsync().AsTask().ContinueWith(t => true), null).GetAwaiter().GetResult();
                #pragma warning disable VSTHRD002 // Avoid problematic synchronous waits
            }

…se `CosmosLinqQuery<T>` instead of `IAsyncEnumerable<T>`
@adityasa
Copy link
Contributor

No worries. I have a simple sample below. Is something like this possible?

public IEnumerator GetEnumerator()
{
IAsyncEnumerator asyncEnumerator = this.GetAsyncEnumerator();
while(asyncEnumerator.MoveNextAsync().Result)
{
yield return asyncEnumerator.Current;
}
}
public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
yield return 1;
yield return 2;
yield return 3;
}


In reply to: 2066546588

@onionhammer
Copy link
Contributor Author

onionhammer commented Apr 25, 2024

No worries. I have a simple sample below. Is something like this possible?

public IEnumerator GetEnumerator() { IAsyncEnumerator asyncEnumerator = this.GetAsyncEnumerator(); while(asyncEnumerator.MoveNextAsync().Result) { yield return asyncEnumerator.Current; } } public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { yield return 1; yield return 2; yield return 3; }

In reply to: 2066546588

This code does not dispose the enumerator (which is IAsyncDisposable)

@adityasa
Copy link
Contributor

The example above does not achieve disposal, so it may not be sufficient.

Maybe use https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskasyncenumerableextensions.toblockingenumerable?view=net-8.0


In reply to: 2077739958

@onionhammer
Copy link
Contributor Author

@adityasa are you just trying to make it DRY? I think the current implementation (keeping GetEnumerator as it is today) is much simpler

@adityasa
Copy link
Contributor

Yes. In general, I would like to avoid inheritance (and favor composition) and for cases like this where there isn't a suitable option (since we already implemented IEnumerable and IAsyncEnumerable is more general), at least avoid code duplication.


In reply to: 2077756442

@onionhammer
Copy link
Contributor Author

Yes. In general, I would like to avoid inheritance (and favor composition) and for cases like this where there isn't a suitable option (since we already implemented IEnumerable and IAsyncEnumerable is more general), at least avoid code duplication.

In reply to: 2077756442

Okay; if you still want me to move ahead with that change, let me know - I think it increases the code-size and complexity to be "DRY"

Copy link
Contributor

@adityasa adityasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@adityasa
Copy link
Contributor

I agree that the full implementation increases the complexity and compared to that current implementation is sufficient.
So unless we can replace sync implementation by delegating to BlockingEnumerable over async method (or something simpler), I am ok with leaving it as is.


In reply to: 2108024057

@adityasa adityasa added the auto-merge Enables automation to merge PRs label May 14, 2024
@onionhammer
Copy link
Contributor Author

@adityasa any status update on this?

@onionhammer
Copy link
Contributor Author

@adityasa any update?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge Enables automation to merge PRs LINQ QUERY
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants