Skip to content

Commit b11e707

Browse files
authored
feat(iroh)!: Retain stats for closed and abandoned paths in the path watcher (#3899)
## Description Improves the path watcher for connections to reliably allow accessing the stats of abandoned paths. * Abandoned paths are not removed from the value of the watchable for a connection's paths, as long as there are active watchers (subscribers) * Only once all watchers are dropped, we drop the abandoned paths from the watchable's value on the next update * This means that if you keep a `PathWatcher` alive for the duration of a connection, it will contain *all* paths the connection ever used. * We use the changes from n0-computer/noq#386 and keep a `WeakPathHandle` for all paths in the watchable's value. This prevents the path stats to be dropped within a quinn connection even if the path is abandoned. * We do no longer store path stats anywhere in iroh directly, but only ever access them from quinn. Due to keeping the `WeakPathHandle`, we can ensure that the stats are always available as long as the connection hasn't been dropped. The combination of all this gives us a reliable way to access final path stats for all paths used in a connection, as long as you keep a reference to the connection around, which is quite straightforward to do and document. ## Breaking Changes * `Connection::paths` and `ConnectionInfo::paths` now return a `PathWatcher` (which still implements `n0_watcher::Watcher` but now also is a named struct) * `PathInfo::stats` and `PathInfo::rtt` now return `Option`. They return `None` if the underlying connection has been droped. ## Notes & open questions I spend quite some time going back-and-forth over different approaches. Very open to other ideas, but I'm a bit out of further ideas currently and this is the best I could come up with so far. ## Change checklist <!-- Remove any that are not relevant. --> - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme)
1 parent 1b4ee2a commit b11e707

10 files changed

Lines changed: 523 additions & 365 deletions

File tree

iroh/examples/monitor-connections.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl Monitor {
111111
Some(conn) = rx.recv() => {
112112
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
113113
let remote = conn.remote_id().fmt_short();
114-
let rtt = conn.paths().peek().iter().map(|p| p.stats().rtt).min();
114+
let rtt = conn.paths().peek().iter().map(|p| p.stats().expect("conn is not dropped").rtt).min();
115115
info!(%remote, %alpn, ?rtt, "new connection");
116116
tasks.spawn(async move {
117117
match conn.closed().await {

iroh/examples/remote-info.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,11 @@ mod remote_map {
217217
if path.is_relay() {
218218
self.relay_path = true;
219219
}
220-
let stats = path.stats();
221-
debug!("path update addr {:?} {stats:?}", path.remote_addr());
222-
self.rtt_min = self.rtt_min.min(stats.rtt);
223-
self.rtt_max = self.rtt_max.max(stats.rtt);
220+
if let Some(stats) = path.stats() {
221+
debug!("path update addr {:?} {stats:?}", path.remote_addr());
222+
self.rtt_min = self.rtt_min.min(stats.rtt);
223+
self.rtt_max = self.rtt_max.max(stats.rtt);
224+
}
224225
}
225226
}
226227

@@ -237,8 +238,9 @@ mod remote_map {
237238
/// Returns `None` if there are no active connections.
238239
pub fn current_min_rtt(&self) -> Option<Duration> {
239240
self.connections()
240-
.flat_map(|c| c.paths().get())
241-
.map(|path| path.stats().rtt)
241+
.flat_map(|c| c.paths().get().into_iter())
242+
.flat_map(|p| p.stats())
243+
.map(|s| s.rtt)
242244
.min()
243245
}
244246

iroh/examples/transfer.rs

Lines changed: 31 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//! ```
2323
2424
use std::{
25-
collections::BTreeMap,
2625
fmt,
2726
fs::File,
2827
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
@@ -49,8 +48,8 @@ use iroh::{
4948
},
5049
dns::{DnsResolver, N0_DNS_ENDPOINT_ORIGIN_PROD, N0_DNS_ENDPOINT_ORIGIN_STAGING},
5150
endpoint::{
52-
BindOpts, Connection, ConnectionError, PathId, PathInfoList, RecvStream, SendStream,
53-
VarInt, WriteError,
51+
BindOpts, Connection, ConnectionError, PathId, PathWatcher, RecvStream, SendStream, VarInt,
52+
WriteError,
5453
},
5554
};
5655
use n0_error::{Result, StackResultExt, StdResultExt, anyerr, ensure_any};
@@ -582,8 +581,8 @@ async fn provide(endpoint: Endpoint, output: Output) -> Result<()> {
582581
async fn handle_connection(conn: Connection, output: Output) {
583582
let start = Instant::now();
584583
let remote_id = conn.remote_id();
585-
let _guard = watch_conn_type(conn.paths(), Some(remote_id), output);
586-
let stats_task = watch_path_stats(conn.clone());
584+
let watcher = conn.paths();
585+
let _guard = watch_conn_type(&conn, Some(remote_id), output);
587586

588587
// Accept incoming streams in a loop until the connection is closed by the remote.
589588
let close_reason = loop {
@@ -615,8 +614,7 @@ async fn handle_connection(conn: Connection, output: Output) {
615614
duration: start.elapsed(),
616615
},
617616
);
618-
let path_stats = stats_task.await.expect("path stats task panicked");
619-
output.emit_with_remote(remote_id, path_stats);
617+
output.emit_with_remote(remote_id, PathStats::from_watcher(watcher));
620618
}
621619

622620
#[instrument("handle", skip_all, fields(id=send.id().index()))]
@@ -658,9 +656,9 @@ async fn fetch(
658656
remote_id,
659657
duration: start.elapsed(),
660658
});
659+
let watcher = conn.paths();
661660
// Spawn a background task that prints connection type changes. Will be aborted on drop.
662-
let _guard = watch_conn_type(conn.paths(), None, output);
663-
let stats_task = watch_path_stats(conn.clone());
661+
let _guard = watch_conn_type(&conn, None, output);
664662

665663
output.emit(StartRequest { mode, length });
666664
// Perform requests depending on the request mode.
@@ -706,8 +704,7 @@ async fn fetch(
706704
});
707705

708706
close_endpoint_with_timeout(&endpoint, output).await;
709-
let path_stats = stats_task.await.expect("path stats task panicked");
710-
output.emit(path_stats);
707+
output.emit(PathStats::from_watcher(watcher));
711708

712709
res
713710
}
@@ -866,7 +863,7 @@ fn parse_byte_size(s: &str) -> std::result::Result<u64, parse_size::Error> {
866863
}
867864

868865
fn watch_conn_type(
869-
paths_watcher: impl Watcher<Value = PathInfoList> + Send + Unpin + 'static,
866+
conn: &Connection,
870867
remote_id: Option<EndpointId>,
871868
output: Output,
872869
) -> AbortOnDropHandle<()> {
@@ -878,8 +875,8 @@ fn watch_conn_type(
878875
output.emit(event)
879876
}
880877
};
878+
let mut stream = conn.paths().stream();
881879
let task = tokio::task::spawn(async move {
882-
let mut stream = paths_watcher.stream();
883880
let mut previous = None;
884881
while let Some(paths) = stream.next().await {
885882
if let Some(path) = paths.iter().find(|p| p.is_selected()) {
@@ -890,7 +887,7 @@ fn watch_conn_type(
890887
print(SelectedPath::Selected {
891888
id: path.id(),
892889
addr: path.remote_addr().clone(),
893-
rtt: path.rtt(),
890+
rtt: path.rtt().expect("conn is not dropped"),
894891
});
895892
previous = Some(path.clone());
896893
} else {
@@ -902,45 +899,6 @@ fn watch_conn_type(
902899
AbortOnDropHandle::new(task)
903900
}
904901

905-
fn watch_path_stats(conn: iroh::endpoint::Connection) -> AbortOnDropHandle<PathStats> {
906-
let task = tokio::spawn(async move {
907-
let mut watcher = conn.paths();
908-
let mut latest_stats_by_path = BTreeMap::new();
909-
while conn.close_reason().is_none() {
910-
n0_future::future::race(
911-
async {
912-
conn.closed().await;
913-
},
914-
async {
915-
let _ = watcher.updated().await;
916-
},
917-
)
918-
.await;
919-
// Insert what could possibly be new path stats.
920-
for path in watcher.get() {
921-
let stats = path.stats();
922-
latest_stats_by_path.insert(path.remote_addr().clone(), (path, stats));
923-
}
924-
// Update all stat values, even for paths that are removed by now.
925-
for (path, stats) in latest_stats_by_path.values_mut() {
926-
*stats = path.stats();
927-
}
928-
}
929-
let list = latest_stats_by_path
930-
.into_iter()
931-
.map(|(addr, (info, stats))| PathData {
932-
id: info.id(),
933-
remote_addr: addr,
934-
rtt: stats.rtt,
935-
bytes_sent: stats.udp_tx.bytes,
936-
bytes_recv: stats.udp_tx.bytes,
937-
})
938-
.collect();
939-
PathStats { paths: list }
940-
});
941-
AbortOnDropHandle::new(task)
942-
}
943-
944902
fn parse_ipv4_net(s: &str) -> Result<(SocketAddrV4, u8)> {
945903
let (net, port) = s.split_once(":").std_context("missing colon")?;
946904
let net: Ipv4Net = net.parse().std_context("invalid net")?;
@@ -1132,6 +1090,26 @@ struct PathStats {
11321090
paths: Vec<PathData>,
11331091
}
11341092

1093+
impl PathStats {
1094+
fn from_watcher(mut watcher: PathWatcher) -> Self {
1095+
let list = watcher
1096+
.get()
1097+
.iter()
1098+
.filter_map(|info| {
1099+
let stats = info.stats()?;
1100+
Some(PathData {
1101+
id: info.id(),
1102+
remote_addr: info.remote_addr().clone(),
1103+
rtt: stats.rtt,
1104+
bytes_sent: stats.udp_tx.bytes,
1105+
bytes_recv: stats.udp_rx.bytes,
1106+
})
1107+
})
1108+
.collect();
1109+
PathStats { paths: list }
1110+
}
1111+
}
1112+
11351113
impl fmt::Display for PathStats {
11361114
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
11371115
write!(f, "Path stats:")?;

iroh/src/endpoint.rs

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ use url::Url;
3030

3131
use self::hooks::EndpointHooksList;
3232
pub use super::socket::{
33-
BindError, DirectAddr, DirectAddrType, PathInfo,
34-
remote_map::{PathInfoList, RemoteInfo, Source, TransportAddrInfo, TransportAddrUsage},
33+
BindError, DirectAddr, DirectAddrType,
34+
remote_map::{
35+
PathInfo, PathInfoList, PathInfoListIter, PathWatcher, RemoteInfo, Source,
36+
TransportAddrInfo, TransportAddrUsage,
37+
},
3538
};
3639
#[cfg(wasm_browser)]
3740
use crate::address_lookup::PkarrResolver;
@@ -1677,11 +1680,10 @@ mod tests {
16771680
use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
16781681
use iroh_relay::endpoint_info::UserData;
16791682
use n0_error::{AnyError as Error, Result, StdResultExt};
1680-
use n0_future::{
1681-
BufferedStreamExt, StreamExt, future::now_or_never, stream, task::AbortOnDropHandle, time,
1682-
};
1683+
use n0_future::{BufferedStreamExt, StreamExt, future::now_or_never, stream, time};
16831684
use n0_tracing_test::traced_test;
16841685
use n0_watcher::Watcher;
1686+
use quinn::PathStats;
16851687
use rand::SeedableRng;
16861688
use rand_chacha::ChaCha8Rng;
16871689
use tokio::sync::oneshot;
@@ -1693,7 +1695,7 @@ mod tests {
16931695
address_lookup::memory::MemoryLookup,
16941696
endpoint::{
16951697
ApplicationClose, BindError, BindOpts, ConnectError, ConnectOptions,
1696-
ConnectWithOptsError, Connection, ConnectionError, PathInfo,
1698+
ConnectWithOptsError, Connection, ConnectionError, PathWatcher,
16971699
},
16981700
protocol::{AcceptError, ProtocolHandler, Router},
16991701
test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
@@ -2193,7 +2195,7 @@ mod tests {
21932195
info!("Waiting for connection");
21942196
'outer: while let Some(infos) = paths.next().await {
21952197
info!(?infos, "new PathInfos");
2196-
for info in infos {
2198+
for info in infos.iter() {
21972199
if info.is_ip() {
21982200
panic!("should not happen: {:?}", info);
21992201
}
@@ -3152,15 +3154,17 @@ mod tests {
31523154
));
31533155
let transfer_size = 1_000_000;
31543156

3155-
async fn collect_path_infos(conn: Connection) -> BTreeMap<TransportAddr, PathInfo> {
3156-
let mut path_infos = BTreeMap::new();
3157-
let mut paths = conn.paths().stream();
3158-
while let Some(path_list) = paths.next().await {
3159-
for path in path_list {
3160-
path_infos.insert(path.remote_addr().clone(), path);
3161-
}
3162-
}
3163-
path_infos
3157+
fn collect_stats(mut watcher: PathWatcher) -> BTreeMap<TransportAddr, PathStats> {
3158+
watcher
3159+
.get()
3160+
.iter()
3161+
.map(|info| {
3162+
(
3163+
info.remote_addr().clone(),
3164+
info.stats().expect("conn is not yet dropped"),
3165+
)
3166+
})
3167+
.collect()
31643168
}
31653169

31663170
let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
@@ -3178,49 +3182,49 @@ mod tests {
31783182
let server_task = tokio::spawn(async move {
31793183
let incoming = server.accept().await.anyerr()?;
31803184
let conn = incoming.await.anyerr()?;
3181-
let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3185+
let watcher = conn.paths();
31823186
let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
31833187
let msg = recv.read_to_end(transfer_size).await.anyerr()?;
31843188
send.write_all(&msg).await.anyerr()?;
31853189
send.finish().anyerr()?;
31863190
conn.closed().await;
3187-
let stats = stats_task.await.std_context("server stats task failed")?;
3191+
let stats = collect_stats(watcher);
31883192
Ok::<_, Error>(stats)
31893193
});
31903194

31913195
let conn = client.connect(server_addr, TEST_ALPN).await?;
3192-
let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3196+
let watcher = conn.paths();
31933197
let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
31943198
send.write_all(&vec![42u8; transfer_size]).await.anyerr()?;
31953199
send.finish().anyerr()?;
31963200
recv.read_to_end(transfer_size).await.anyerr()?;
31973201
conn.close(0u32.into(), b"thanks, bye!");
31983202
client.close().await;
3199-
let client_stats = stats_task.await.std_context("client stats task failed")?;
3203+
let client_stats = collect_stats(watcher);
32003204
let server_stats = server_task.await.anyerr()??;
32013205

32023206
info!("client stats: {client_stats:#?}");
32033207
info!("server stats: {server_stats:#?}");
32043208

32053209
let client_total_relay_tx = client_stats
3206-
.values()
3207-
.filter(|p| p.is_relay())
3208-
.map(|p| p.stats().udp_tx.bytes)
3210+
.iter()
3211+
.filter(|(remote, _stats)| remote.is_relay())
3212+
.map(|(_, stats)| stats.udp_tx.bytes)
32093213
.sum::<u64>();
32103214
let client_total_relay_rx = client_stats
3211-
.values()
3212-
.filter(|p| p.is_relay())
3213-
.map(|p| p.stats().udp_rx.bytes)
3215+
.iter()
3216+
.filter(|(remote, _stats)| remote.is_relay())
3217+
.map(|(_, stats)| stats.udp_rx.bytes)
32143218
.sum::<u64>();
32153219
let server_total_relay_tx = server_stats
3216-
.values()
3217-
.filter(|p| p.is_relay())
3218-
.map(|p| p.stats().udp_tx.bytes)
3220+
.iter()
3221+
.filter(|(remote, _stats)| remote.is_relay())
3222+
.map(|(_, stats)| stats.udp_tx.bytes)
32193223
.sum::<u64>();
32203224
let server_total_relay_rx = server_stats
3221-
.values()
3222-
.filter(|p| p.is_relay())
3223-
.map(|p| p.stats().udp_rx.bytes)
3225+
.iter()
3226+
.filter(|(remote, _stats)| remote.is_relay())
3227+
.map(|(_, stats)| stats.udp_rx.bytes)
32243228
.sum::<u64>();
32253229

32263230
info!(?client_total_relay_tx, "total");

0 commit comments

Comments
 (0)