Skip to content

Commit db1a135

Browse files
authored
refactor!: change wire protocol to use uni streams per topic (#75)
* refactor: use uni streams * refactor: improve send and recv loops * cleanups and update ALPN * cleanup * more cleanups * chore: fmt * chore: clippy * chore: update MSRV to 1.85 * refactor: don't recreate closed future * cleanup SendLoop and RecvLoop * cleanup * fix: recv stream loop closing logic * refactor: remove WireStreamHeader
1 parent 1523227 commit db1a135

6 files changed

Lines changed: 343 additions & 89 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ unused-async = "warn"
3434
[dependencies]
3535
blake3 = "1.8"
3636
bytes = { version = "1.7", features = ["serde"] }
37+
data-encoding = "2.6.0"
3738
derive_more = { version = "1.0.0", features = [
3839
"add",
3940
"debug",
@@ -68,7 +69,7 @@ iroh = { git = "https://github.com/n0-computer/iroh", branch = "main", default-f
6869
tokio = { version = "1", optional = true, features = ["io-util", "sync"] }
6970
tokio-util = { version = "0.7.12", optional = true, features = ["codec"] }
7071
tracing = "0.1"
71-
data-encoding = { version = "2.6.0", optional = true }
72+
thiserror = { version = "2.0", optional = true }
7273
irpc = { version = "0.4.0", optional = true, default-features = false, features = [
7374
"derive",
7475
"stream",
@@ -154,7 +155,7 @@ simulator = [
154155
"dep:comfy-table",
155156
]
156157
metrics = ["iroh-metrics/metrics"]
157-
examples = ["net", "dep:data-encoding"]
158+
examples = ["net"]
158159

159160
[[test]]
160161
name = "sim"

src/net.rs

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use std::{
88
task::{Context, Poll},
99
};
1010

11-
use bytes::BytesMut;
1211
use futures_concurrency::stream::{stream_group, StreamGroup};
1312
use futures_util::FutureExt as _;
1413
use iroh::{
@@ -32,19 +31,17 @@ use tokio::sync::{broadcast, mpsc, oneshot};
3231
use tokio_util::sync::CancellationToken;
3332
use tracing::{debug, error, error_span, trace, warn, Instrument};
3433

35-
use self::util::{read_message, write_message, Timers};
34+
use self::util::{RecvLoop, SendLoop, Timers};
3635
use crate::{
37-
api::RpcMessage,
36+
api::{self, Command, Event, GossipApi, RpcMessage},
3837
metrics::Metrics,
3938
proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
4039
};
4140

42-
pub mod util;
43-
44-
use crate::api::{self, Command, Event, GossipApi};
41+
mod util;
4542

4643
/// ALPN protocol name
47-
pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0";
44+
pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
4845

4946
/// Channel capacity for the send queue (one per connection)
5047
const SEND_QUEUE_CAP: usize = 64;
@@ -523,10 +520,10 @@ impl Actor {
523520
async move {
524521
let res = connection_loop(
525522
peer_id,
526-
&conn,
523+
conn.clone(),
527524
origin,
528525
send_rx,
529-
&in_event_tx,
526+
in_event_tx,
530527
max_message_size,
531528
queue,
532529
)
@@ -855,48 +852,22 @@ impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
855852

856853
async fn connection_loop(
857854
from: PublicKey,
858-
conn: &Connection,
855+
conn: Connection,
859856
origin: ConnOrigin,
860-
mut send_rx: mpsc::Receiver<ProtoMessage>,
861-
in_event_tx: &mpsc::Sender<InEvent>,
857+
send_rx: mpsc::Receiver<ProtoMessage>,
858+
in_event_tx: mpsc::Sender<InEvent>,
862859
max_message_size: usize,
863860
queue: Vec<ProtoMessage>,
864861
) -> Result<(), ConnectionLoopError> {
865-
let (mut send, mut recv) = match origin {
866-
ConnOrigin::Accept => conn.accept_bi().await?,
867-
ConnOrigin::Dial => conn.open_bi().await?,
868-
};
869862
debug!(?origin, "connection established");
870-
let mut send_buf = BytesMut::new();
871-
let mut recv_buf = BytesMut::new();
872863

873-
let send_loop = async {
874-
for msg in queue {
875-
write_message(&mut send, &mut send_buf, &msg, max_message_size).await?;
876-
}
877-
while let Some(msg) = send_rx.recv().await {
878-
write_message(&mut send, &mut send_buf, &msg, max_message_size).await?;
879-
}
880-
// notify the other node no more data will be sent
881-
let _ = send.finish();
882-
// wait for the other node to ack all the sent data
883-
let _ = send.stopped().await;
884-
Result::<_, ConnectionLoopError>::Ok(())
885-
};
886-
887-
let recv_loop = async {
888-
loop {
889-
let msg = read_message(&mut recv, &mut recv_buf, max_message_size).await?;
864+
let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
865+
let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
890866

891-
match msg {
892-
None => break,
893-
Some(msg) => in_event_tx.send(InEvent::RecvMessage(from, msg)).await?,
894-
}
895-
}
896-
Result::<_, ConnectionLoopError>::Ok(())
897-
};
867+
let send_fut = send_loop.run(queue).instrument(error_span!("send"));
868+
let recv_fut = recv_loop.run().instrument(error_span!("recv"));
898869

899-
let (send_res, recv_res) = tokio::join!(send_loop, recv_loop);
870+
let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
900871
send_res?;
901872
recv_res?;
902873
Ok(())

0 commit comments

Comments
 (0)