Skip to content

Commit e36ac77

Browse files
authored
feat(iroh)!: make ProtocolHandler use async functions (#3320)
## Description This changes the `ProtocolHandler` trait so that the functions return `impl Future` instead of `BoxFuture`, which means implementors can simply implement the trait with `async fn`. We don't even need a `'static` bound on these futures, so you can use `&self` freely without cloning and moving. Internally, we still box the futures return by `accept`, `on_connecting` and `shutdown`. So performance wise this change should not have any effects. But the boxing is hidden from the user and implementing the trait now feels a lot more normal and simpler. ## Breaking Changes * `iroh::protocol::ProtocolHandler` methods now return `impl Future` instead of `BoxFuture`. You can simply remove the `Box::pin(async move {})` from the implementations and instead implement the methods as `async fn`. See the updated documentation for the `iroh::protocol` module for an example. * `iroh::protocol::ProtocolHandler` is no longer dyn-compatible. If you need a dyn-compatible version, you need to build your own dyn-compatible wrapper trait. See the (non-public) `DynProtocolHandler` in `iroh::protocol` as an example. ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. - [x] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme)
1 parent b0b11f0 commit e36ac77

5 files changed

Lines changed: 179 additions & 146 deletions

File tree

README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,16 @@ let router = Router::builder(endpoint)
9797
struct Echo;
9898

9999
impl ProtocolHandler for Echo {
100-
fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> {
101-
Box::pin(async move {
102-
let (mut send, mut recv) = connection.accept_bi().await?;
100+
async fn accept(&self, connection: Connection) -> Result<()> {
101+
let (mut send, mut recv) = connection.accept_bi().await?;
103102

104-
// Echo any bytes received back directly.
105-
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
103+
// Echo any bytes received back directly.
104+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
106105

107-
send.finish()?;
108-
connection.closed().await;
106+
send.finish()?;
107+
connection.closed().await;
109108

110-
Ok(())
111-
})
109+
Ok(())
112110
}
113111
}
114112
```

iroh/examples/echo.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use iroh::{
1313
watcher::Watcher as _,
1414
Endpoint, NodeAddr,
1515
};
16-
use n0_future::boxed::BoxFuture;
1716

1817
/// Each protocol is identified by its ALPN string.
1918
///
@@ -84,31 +83,28 @@ impl ProtocolHandler for Echo {
8483
///
8584
/// The returned future runs on a newly spawned tokio task, so it can run as long as
8685
/// the connection lasts.
87-
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
88-
// We have to return a boxed future from the handler.
89-
Box::pin(async move {
90-
// We can get the remote's node id from the connection.
91-
let node_id = connection.remote_node_id()?;
92-
println!("accepted connection from {node_id}");
93-
94-
// Our protocol is a simple request-response protocol, so we expect the
95-
// connecting peer to open a single bi-directional stream.
96-
let (mut send, mut recv) = connection.accept_bi().await?;
97-
98-
// Echo any bytes received back directly.
99-
// This will keep copying until the sender signals the end of data on the stream.
100-
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
101-
println!("Copied over {bytes_sent} byte(s)");
102-
103-
// By calling `finish` on the send stream we signal that we will not send anything
104-
// further, which makes the receive stream on the other end terminate.
105-
send.finish()?;
106-
107-
// Wait until the remote closes the connection, which it does once it
108-
// received the response.
109-
connection.closed().await;
110-
111-
Ok(())
112-
})
86+
async fn accept(&self, connection: Connection) -> Result<()> {
87+
// We can get the remote's node id from the connection.
88+
let node_id = connection.remote_node_id()?;
89+
println!("accepted connection from {node_id}");
90+
91+
// Our protocol is a simple request-response protocol, so we expect the
92+
// connecting peer to open a single bi-directional stream.
93+
let (mut send, mut recv) = connection.accept_bi().await?;
94+
95+
// Echo any bytes received back directly.
96+
// This will keep copying until the sender signals the end of data on the stream.
97+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
98+
println!("Copied over {bytes_sent} byte(s)");
99+
100+
// By calling `finish` on the send stream we signal that we will not send anything
101+
// further, which makes the receive stream on the other end terminate.
102+
send.finish()?;
103+
104+
// Wait until the remote closes the connection, which it does once it
105+
// received the response.
106+
connection.closed().await;
107+
108+
Ok(())
113109
}
114110
}

iroh/examples/search.rs

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use iroh::{
3838
protocol::{ProtocolHandler, Router},
3939
Endpoint, NodeId,
4040
};
41-
use n0_future::boxed::BoxFuture;
4241
use tokio::sync::Mutex;
4342
use tracing_subscriber::{prelude::*, EnvFilter};
4443

@@ -127,40 +126,36 @@ impl ProtocolHandler for BlobSearch {
127126
///
128127
/// The returned future runs on a newly spawned tokio task, so it can run as long as
129128
/// the connection lasts.
130-
fn accept(&self, connection: Connection) -> BoxFuture<Result<()>> {
131-
let this = self.clone();
132-
// We have to return a boxed future from the handler.
133-
Box::pin(async move {
134-
// We can get the remote's node id from the connection.
135-
let node_id = connection.remote_node_id()?;
136-
println!("accepted connection from {node_id}");
137-
138-
// Our protocol is a simple request-response protocol, so we expect the
139-
// connecting peer to open a single bi-directional stream.
140-
let (mut send, mut recv) = connection.accept_bi().await?;
141-
142-
// We read the query from the receive stream, while enforcing a max query length.
143-
let query_bytes = recv.read_to_end(64).await?;
144-
145-
// Now, we can perform the actual query on our local database.
146-
let query = String::from_utf8(query_bytes)?;
147-
let num_matches = this.query_local(&query).await;
148-
149-
// We want to return a list of hashes. We do the simplest thing possible, and just send
150-
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
151-
// very easy to parse on the other end.
152-
send.write_all(&num_matches.to_le_bytes()).await?;
153-
154-
// By calling `finish` on the send stream we signal that we will not send anything
155-
// further, which makes the receive stream on the other end terminate.
156-
send.finish()?;
157-
158-
// Wait until the remote closes the connection, which it does once it
159-
// received the response.
160-
connection.closed().await;
161-
162-
Ok(())
163-
})
129+
async fn accept(&self, connection: Connection) -> Result<()> {
130+
// We can get the remote's node id from the connection.
131+
let node_id = connection.remote_node_id()?;
132+
println!("accepted connection from {node_id}");
133+
134+
// Our protocol is a simple request-response protocol, so we expect the
135+
// connecting peer to open a single bi-directional stream.
136+
let (mut send, mut recv) = connection.accept_bi().await?;
137+
138+
// We read the query from the receive stream, while enforcing a max query length.
139+
let query_bytes = recv.read_to_end(64).await?;
140+
141+
// Now, we can perform the actual query on our local database.
142+
let query = String::from_utf8(query_bytes)?;
143+
let num_matches = self.query_local(&query).await;
144+
145+
// We want to return a list of hashes. We do the simplest thing possible, and just send
146+
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
147+
// very easy to parse on the other end.
148+
send.write_all(&num_matches.to_le_bytes()).await?;
149+
150+
// By calling `finish` on the send stream we signal that we will not send anything
151+
// further, which makes the receive stream on the other end terminate.
152+
send.finish()?;
153+
154+
// Wait until the remote closes the connection, which it does once it
155+
// received the response.
156+
connection.closed().await;
157+
158+
Ok(())
164159
}
165160
}
166161

iroh/src/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1991,7 +1991,7 @@ impl Connection {
19911991
/// [`Connecting::handshake_data()`] succeeds. See that method's documentations for
19921992
/// details on the returned value.
19931993
///
1994-
/// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
1994+
/// [`Connection::handshake_data()`]: crate::endpoint::Connecting::handshake_data
19951995
#[inline]
19961996
pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
19971997
self.inner.handshake_data()

0 commit comments

Comments
 (0)