Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sink for Sender<T> #25

Closed
wants to merge 1 commit into from

Conversation

black-binary
Copy link

I've read the discussion in #12, and I'm trying to implement Sink for Sender.

Crate futures-sink was added to dependencies, which only includes the trait.

Hope it would help :)

@ghost
Copy link

ghost commented Dec 16, 2020

This looks good, thanks! Would you mind adding a unit test for this feature? :)

@ghost
Copy link

ghost commented Dec 16, 2020

To fix the CI failures, you can just reapply the following changes: smol-rs/smol@77998a9

@black-binary
Copy link
Author

I found some problems after adding some unit tests for it. Need some time to fix them.

@black-binary
Copy link
Author

black-binary commented Dec 17, 2020

I gave up implementing Sink for Send. Instead, I wrote a method named poll_send to do the same thing. Here is the reason:

Here is what SinkExt::send does:

    fn poll(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        let this = &mut *self;
        if let Some(item) = this.item.take() {
            let mut sink = Pin::new(&mut this.sink);
            match sink.as_mut().poll_ready(cx)? {
                Poll::Ready(()) => sink.as_mut().start_send(item)?,
                Poll::Pending => {
                    this.item = Some(item);
                    return Poll::Pending;
                }
            }
        }

        // we're done sending the item, but want to block on flushing the
        // sink
        ready!(Pin::new(&mut this.sink).poll_flush(cx))?;

        Poll::Ready(Ok(()))
    }
            match sink.as_mut().poll_ready(cx)? {
                Poll::Ready(()) => sink.as_mut().start_send(item)?,

poll_ready can not just simply return Ready when the channel is not full. poll_ready must reserve a slot for the start_send. The result is that I have to add multiple atomic variables and Events, and rewrite the try_send method, to make sure that a slot is reserved when poll_ready return Ready. And that will make the sending operation slower and make the channel much more complicated.

I looked into the discussion here. The problem can be solved if we merge poll_ready and start_send. So I remove the code and add a new method poll_send. It is kind of like @jonhoo 's poll_start_send(), which mentioned in his comment.

    pub fn poll_send(
        &mut self,
        cx: &mut Context<'_>,
        msg: &mut Option<T>,
    ) -> Poll<Result<(), SendError<T>>>;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant