Skip to content

tokio_stream::ReceiverStream and tokio_stream::StreamExt::skip() together behave in an unexpected way #7864

@Erik-Sovereign

Description

@Erik-Sovereign

I'm not sure if this is unexpected or if my thinking is bad :), but let me try to explain my issue with this example:

I post 10 backups. Each backup sends a message in a channel, when the backup is done.

let response = test_setup.post(backups_url).await.send().await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

I post an eleventh backup, which posts a different message, because only 10 backups are allowed:

let response = test_setup.post(backups_url).await.send().await.unwrap();
assert_eq!(response.status(), StatusCode::CONFLICT);

I check the eleventh message with a combination of mpsc::ReceiverStream and skip(), both of tokio_stream:

if let Some(params) = receiver_stream
	.skip(10)
	.next()
	.await
{
	assert_eq!(...);
};

Now comes the problem. After this code, I execute another post request, that wants to send to the receiver, but it is closed, because, I think, skip() consumes self and makes it go out of scope after the code beforehand.

My expectation was, that if ReceiverStream's purpose is to make a mpsc::channel into an useful Stream, that methods on StreamExt work on it, without consuming a single consumer, since this is a quite drastic thing to do with a mpsc.

I can fix the problem by putting the code that skip()'s at the end, after all POST-requests, and with it after all sends to the channel. But this makes the code unnice.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-tokio-streamArea: The tokio-stream crate

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions