Skip to content

Conversation

@muhamadazmy
Copy link

@muhamadazmy muhamadazmy commented Oct 28, 2025

[ChunksTimeout] Consumes the stream reminder and return the items immediately

Summary:
When the underlying stream is an exclusive reference (&mut stream), and we need to drop the ChunksTimout stream without losing the buffered items.

@muhamadazmy
Copy link
Author

muhamadazmy commented Oct 28, 2025

Use case

use std::time::Duration;

use tokio::sync::mpsc;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<u64>(20);

    let mut receiver = ReceiverStream::new(rx);

    let chunked = (&mut receiver).chunks_timeout(3, Duration::from_secs(3));
    tokio::pin!(chunked);

    tx.send(10).await.unwrap();
    tx.send(20).await.unwrap();

    tokio::select! {
        Some(batch) = chunked.next() => {
            println!("Got: {:?}", batch);
        }
        _ = tokio::time::sleep(Duration::from_secs(1)) => {
            // another condition
        }
    }

    let reminder = chunked.into_reminder();

    // use `receiver` here
}

@ADD-SP ADD-SP added A-tokio Area: The main tokio crate A-tokio-stream Area: The tokio-stream crate and removed A-tokio Area: The main tokio crate labels Oct 28, 2025
@muhamadazmy muhamadazmy force-pushed the pr7715 branch 3 times, most recently from aaa757e to 0fb2203 Compare October 30, 2025 13:51
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar interface in the standard library: std::slice::ChunksExactMut:into_remainder. This might be a better interface for your requirements. What do you think?

@ADD-SP ADD-SP added the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
@muhamadazmy
Copy link
Author

muhamadazmy commented Nov 5, 2025

Thank you @ADD-SP for your review. The original plan was to completely consume the stream indeed and just return the reminder. This however won't work with the Pinned ChunksTimeout stream which is required when polling next() in a select statement.

This means an into_reminder won't actually be useful because the only way for this to be useful is if the ChunksTimeout stream is not pinned. And in that case, the reminder will always guaranteed to be empty since there is no way to interrupt a next().await call (unless you are select statement which requires the stream to be pinned)

I will still rename the method to reminder instead of remaining to match the std interface.

@muhamadazmy muhamadazmy force-pushed the pr7715 branch 2 times, most recently from f2c8ed1 to 4187d7b Compare November 5, 2025 08:19
@muhamadazmy muhamadazmy changed the title [ChunksTimeout] Consumes the stream and return the buffered items immediately [ChunksTimeout] Consumes the stream reminder and return the items immediately Nov 5, 2025
@ADD-SP ADD-SP removed the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
}

/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
pub fn reminder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn reminder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {

Does this make more sense?

}
}

/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
/// Consumes the [`ChunksTimeout`] and then returns all buffered items.

@ADD-SP ADD-SP added the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
…ediately

Summary:
When the underlying stream is an exclusive reference (&mut stream), and we need to drop the `ChunksTimout` stream without losing the buffered items.
@muhamadazmy
Copy link
Author

Thank you so much @ADD-SP for your review. I applied all your comments. Let me know if you have more comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio-stream Area: The tokio-stream crate S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants