Skip to content

Sequence from ToAsyncEnumerable is not unsubscribed when break or throw #361

@aka-nse

Description

@aka-nse

version

v1.3.0

description

Await-foreach against IAsyncEnumerable<T> which is created from ObservableExtensions.ToAsyncEnumerable<T> does not unsubscribe from source observable when the loop has been got out by break or throw.

reproduction code

var stream = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(static (_, x) =>
    {
        Console.WriteLine($"Select: {x}");
        return x;
    });
await foreach(var x in stream.ToAsyncEnumerable())
{
    Console.WriteLine($"await-foreach: {x}");
    if(x >= 3)
    {
        break;
    }
}
Console.WriteLine("await-foreach has exited.");
await Task.Delay(TimeSpan.FromSeconds(3));

expected behavior

Select: 0
await-foreach: 0
Select: 1
await-foreach: 1
Select: 2
await-foreach: 2
Select: 3
await-foreach: 3
await-foreach has exited.

[process has exited with code 0.]

If I use the same code with System.Reactive, it behaviors as expected.

actual behaviour

Select: 0
await-foreach: 0
Select: 1
await-foreach: 1
Select: 2
await-foreach: 2
Select: 3
await-foreach: 3
await-foreach has exited.
Select: 4
Select: 5
Select: 6

[process has exited with code 0.]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions