Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sink for Sender #12

Open
korrat opened this issue Aug 2, 2020 · 18 comments
Open

Implement Sink for Sender #12

korrat opened this issue Aug 2, 2020 · 18 comments

Comments

@korrat
Copy link

korrat commented Aug 2, 2020

The futures crate contains a Sink trait. This trait allows for a convenient interaction between its implementors and streams using the forward method on the StreamExt trait.

So I think implementing Sink for Sender would improve the ergonomics of the library.

@ghost
Copy link

ghost commented Aug 2, 2020

This could be easily implemented. I think my main hesitation is that the Sink trait is not very popular and my impression is that its future is uncertain.

But maybe it doesn’t matter. Adding a Sink impl probably doesn’t hurt - it can only make things better.

cc @yoshuawuyts do you perhaps have an opinion on this?

@yoshuawuyts
Copy link
Member

yoshuawuyts commented Aug 2, 2020

I don't particularly like the Sink trait in both shape and functionality, but am sympathetic to those who do. My main concern with adding the trait is that it pulls in everything and the kitchen sink (ha) from the futures-util crate. If I'm not mistaken this includes the futures-rs channel impl too.

I would like to recommend that if we add Sink to async-channel we do so exclusive behind a feature flag to ensure async-channel continues to compile quickly.

edit (2020-08-24): A good post on sinks vs iterators in C++ was posted by a WG21 member; I think the issues raised translate to Future Sinks vs async Iteration in Rust as well (link).

@korrat
Copy link
Author

korrat commented Aug 2, 2020

There is a futures-sink crate, that only includes the trait and has no dependencies. That should keep compile times down.

@najamelan
Copy link

I had a look at the Sink impl on piper, and probably that can be ported quite easily. However, I don't really understand the reasoning behind the event-listeners, like

  • why is there both send_ops, sink_ops and recv_ops on the inner as well as a listener on the Sender/Receiver?
  • why distinguish between send_ops and sink_ops at all, since they both just try to send?
  • why does try_recv call both send_ops.notify_one() and sink_ops.notify_all()?
  • where is the link being made between the sink_ops on the Inner and the one on the Sender?

So I don't really feel confident proposing a PR on code I don't quite understand. Also the VecDeque is an understandable choice to avoid a panic in Sink::start_send, but it does have the downside of making the buffer size unbounded. Maybe at least initialize with VecDeque::with_capacity(1)?

@stsydow
Copy link

stsydow commented Oct 21, 2020

I would propose to add an adapter to consume a Stream, like:
Sender::drain(self, s: Stream) -> DrainFuture

Would that help?

@najamelan
Copy link

@stsydow I don't think that helps when you need an interface. That is if you want to pass the channel to something that takes a Sink. For me being able to use it as an interface is all the reason. It allows to write crates that use channels, but that can transparently support different channel implementations.

@stsydow
Copy link

stsydow commented Oct 26, 2020

I proposed it as Sink is quite controversial and I don't expect it to become a Standard soon.
Tokio for example recently dropped Sink support.
I know the pain and am searching for a solution for my own library.

@najamelan
Copy link

I personally don't like how Sink works either, but until we propose something better, it's the interface we have and most async projects will have the futures lib in their deps anyway, so it doesn't add to much strain. Some discussion was started here, but it hasn't seem much action.

@schell
Copy link

schell commented Sep 28, 2021

Is there any movement here? I have a project that takes Streams as view input and I would love to take Sink as view output to abstract out the channel implementation and keep the API balanced. Sink provides Sink::with which is the contravariant of Stream::map - this keeps things nice and balanced. Instead my API must take a impl Stream as input and then take async_channel::Sender<Event> as output, mapping output events using the Receiver<Event>. It seems clunky and quite a leaky abstraction.

@bugeats
Copy link

bugeats commented Mar 6, 2024

Hi from the future. In the future we still have Futures, and Sink is alive and well.

futures::channel::mpsc::Sender implements Sink and it's no big deal.

@Frando
Copy link

Frando commented Jul 30, 2024

I needed this too. The following seems to work - it does not use any internals of async_channel, but needs a one allocation per sink, and one clone of the sender per send operation.

use std::{
    pin::Pin,
    task::{Context, Poll},
};

use async_channel::{SendError, Sender};
use futures_util::Sink;
use tokio_util::sync::ReusableBoxFuture;

/// A wrapper around [`Sender`] that implements [`Sink`].
///
/// Uses a [`ReusableBoxFuture`] under the hood, so this will allocate once.
/// The [`Sender`] is internally cloned for each send operation.
#[derive(Debug)]
pub struct SenderSink<T> {
    sender: Sender<T>,
    fut: Option<ReusableBoxFuture<'static, Result<(), SendError<T>>>>,
    flushed: bool,
}

impl<T> SenderSink<T> {
    /// Creates a new [`SenderSink`].
    pub fn new(sender: Sender<T>) -> Self {
        Self {
            sender,
            fut: None,
            flushed: true,
        }
    }
}

impl<T: Send + 'static> Sink<T> for SenderSink<T> {
    type Error = SendError<T>;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.poll_flush(cx)
    }

    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let sender = self.sender.clone();
        let fut = async move { sender.send(item).await };
        match self.fut.as_mut() {
            None => self.fut = Some(ReusableBoxFuture::new(fut)),
            Some(f) => f.set(fut),
        }
        self.flushed = false;
        Ok(())
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.flushed {
            Poll::Ready(Ok(()))
        } else {
            match self.fut.as_mut() {
                None => Poll::Ready(Ok(())),
                Some(fut) => match fut.poll(cx) {
                    Poll::Pending => Poll::Pending,
                    Poll::Ready(output) => {
                        self.flushed = true;
                        Poll::Ready(output)
                    }
                },
            }
        }
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.sender.close();
        Poll::Ready(Ok(()))
    }
}

@notgull
Copy link
Member

notgull commented Jul 31, 2024

My understanding is that futures is awaiting a pending rework, potentially involving the reworking or removal of the Sink trait. So I would be hesitant to add this, especially given the longstanding design problems involved with Sink.

@Frando
Copy link

Frando commented Jul 31, 2024

Is there any information on an ongoing or planned rework of futures?
I agree to the issues with Sink.
Posted the above here also because this works without any changes to async_channel, it is a pure wrapper which only uses public APIs from async_channel and thus can be copy-pasted to wherever downstream libraries where people need Sink for async_channel::Sender.

@dignifiedquire
Copy link

So I would be hesitant to add this, especially given the longstanding design problems involved with Sink.

I understand the histancy, but futures is moving quite slowly, and until the ecosystem has updated it will be quite a while, so having this behind a feature flag would be very helpful until then. And I would gladly PR removal once everyone has moved on to greener grass.

@notgull
Copy link
Member

notgull commented Aug 1, 2024

Is there any information on an ongoing or planned rework of futures?

rust-lang/futures-rs#2207 (cc @taiki-e)

I understand the histancy, but futures is moving quite slowly, and until the ecosystem has updated it will be quite a while, so having this behind a feature flag would be very helpful until then.

Could you clarify what the use case behind this is? Code these days really shouldn't be using the Sink trait.

The Sink trait is a historical mess; there's a reason why it isn't in futures-core (and why there isn't a corresponding non-async version in the standard library). I just can't help but imagine there is a better way of solving this problem.

@dignifiedquire
Copy link

last real activity on that issue seems from 2021, what indication is there this is moving forward?

@dignifiedquire
Copy link

the biggest uses case for Sink I have and have seen in other places is using tokio codec, which is really useful, but unfortunately built on the Sink trait

@notgull
Copy link
Member

notgull commented Aug 2, 2024

last real activity on that issue seems from 2021, what indication is there this is moving forward?

At the moment it is blocked on stabilization of the Stream/AsyncIterator trait.

the biggest uses case for Sink I have and have seen in other places is using tokio codec, which is really useful, but unfortunately built on the Sink trait

The biggest factor differentiating smol from tokio is the fact that smol intentionally limits itself to reduce feature creep. Sink crosses the line in my opinion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

9 participants