Skip to content

Commit 58dd5c5

Browse files
More More P2P Docs (#2525)
* Docs * Clarify relay upgrades * caaaaalapse * Cleanup `sd_p2p_tunnel`
1 parent 735e80a commit 58dd5c5

File tree

12 files changed

+273
-199
lines changed

12 files changed

+273
-199
lines changed

core/src/custom_uri/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ pub fn base_router() -> Router<LocalState> {
317317
request_file(
318318
state.node.p2p.p2p.clone(),
319319
node_identity,
320-
&library.id,
321320
&library.identity,
322321
file_path_pub_id,
323322
Range::Full,

core/src/library/manager/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
Node,
99
};
1010

11+
use futures::future::join_all;
1112
use sd_core_sync::SyncMessage;
1213
use sd_p2p::{Identity, RemoteIdentity};
1314
use sd_prisma::prisma::{crdt_operation, instance, location, SortOrder};
@@ -382,6 +383,39 @@ impl Libraries {
382383
self.libraries.read().await.get(library_id).cloned()
383384
}
384385

386+
// will return the library context for the given instance
387+
pub async fn get_library_for_instance(
388+
&self,
389+
instance: &RemoteIdentity,
390+
) -> Option<Arc<Library>> {
391+
join_all(
392+
self.libraries
393+
.read()
394+
.await
395+
.iter()
396+
.map(|(_, library)| async move {
397+
library
398+
.db
399+
.instance()
400+
.find_many(vec![instance::remote_identity::equals(
401+
instance.get_bytes().to_vec(),
402+
)])
403+
.exec()
404+
.await
405+
.ok()
406+
.iter()
407+
.flatten()
408+
.filter_map(|i| RemoteIdentity::from_bytes(&i.remote_identity).ok())
409+
.into_iter()
410+
.any(|i| i == *instance)
411+
.then(|| Arc::clone(library))
412+
}),
413+
)
414+
.await
415+
.into_iter()
416+
.find_map(|v| v)
417+
}
418+
385419
// get_ctx will return the library context for the given library id.
386420
pub async fn hash_library(&self, library_id: &Uuid) -> bool {
387421
self.libraries.read().await.get(library_id).is_some()

core/src/p2p/manager.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -393,23 +393,22 @@ async fn start(
393393
}) else {
394394
return;
395395
};
396-
let library_id = tunnel.library_id();
397396

398397
let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| {
399398
error!("Failed `SyncMessage::from_stream`: {}", err);
400399
}) else {
401400
return;
402401
};
403402

404-
let Ok(library) =
405-
node.libraries
406-
.get_library(&library_id)
407-
.await
408-
.ok_or_else(|| {
409-
error!("Failed to get library '{library_id}'");
403+
let Ok(library) = node
404+
.libraries
405+
.get_library_for_instance(&tunnel.library_remote_identity())
406+
.await
407+
.ok_or_else(|| {
408+
error!("Failed to get library {}", tunnel.library_remote_identity());
410409

411-
// TODO: Respond to remote client with warning!
412-
})
410+
// TODO: Respond to remote client with warning!
411+
})
413412
else {
414413
return;
415414
};

core/src/p2p/operations/library.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{p2p::Header, Node};
2323
pub async fn request_file(
2424
p2p: Arc<P2P>,
2525
identity: RemoteIdentity,
26-
library_id: &Uuid,
2726
library_identity: &Identity,
2827
file_path_id: Uuid,
2928
range: Range,
@@ -42,7 +41,7 @@ pub async fn request_file(
4241
)
4342
.await?;
4443

45-
let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_id, library_identity).await?;
44+
let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_identity).await?;
4645

4746
let block_size = BlockSize::from_stream(&mut stream).await?;
4847
let size = stream.read_u64_le().await?;
@@ -82,9 +81,9 @@ pub(crate) async fn receiver(
8281

8382
let library = node
8483
.libraries
85-
.get_library(&stream.library_id())
84+
.get_library_for_instance(&stream.library_remote_identity())
8685
.await
87-
.ok_or_else(|| format!("Library not found: {:?}", stream.library_id()))?;
86+
.ok_or_else(|| format!("Library not found: {:?}", stream.library_remote_identity()))?;
8887

8988
let file_path = library
9089
.db
@@ -93,12 +92,7 @@ pub(crate) async fn receiver(
9392
.select(file_path_to_handle_p2p_serve_file::select())
9493
.exec()
9594
.await?
96-
.ok_or_else(|| {
97-
format!(
98-
"File path {file_path_id:?} not found in {:?}",
99-
stream.library_id()
100-
)
101-
})?;
95+
.ok_or_else(|| format!("File path {file_path_id:?} not found in {:?}", library.id))?;
10296

10397
let location = file_path.location.as_ref().expect("included in query");
10498
let location_path = location.path.as_ref().expect("included in query");

core/src/p2p/sync/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,7 @@ mod originator {
107107

108108
stream.write_all(&Header::Sync.to_bytes()).await.unwrap();
109109

110-
let mut tunnel = Tunnel::initiator(stream, &library.id, &library.identity)
111-
.await
112-
.unwrap();
110+
let mut tunnel = Tunnel::initiator(stream, &library.identity).await.unwrap();
113111

114112
tunnel
115113
.write_all(&SyncMessage::NewOperations.to_bytes())
Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,143 @@
11
//! A system for creating encrypted tunnels between peers over untrusted connections.
22
3-
mod tunnel;
3+
use std::{
4+
io,
5+
pin::Pin,
6+
task::{Context, Poll},
7+
};
48

5-
pub use sd_p2p::{Identity, IdentityErr, RemoteIdentity};
6-
pub use tunnel::*;
9+
use sd_p2p_proto::{decode, encode};
10+
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
11+
12+
use thiserror::Error;
13+
14+
use sd_p2p::{Identity, IdentityErr, RemoteIdentity, UnicastStream};
15+
16+
#[derive(Debug, Error)]
17+
pub enum TunnelError {
18+
#[error("Error writing discriminator.")]
19+
DiscriminatorWriteError,
20+
#[error("Error reading discriminator. Is this stream actually a tunnel?")]
21+
DiscriminatorReadError,
22+
#[error("Invalid discriminator. Is this stream actually a tunnel?")]
23+
InvalidDiscriminator,
24+
#[error("Error sending library id: {0:?}")]
25+
ErrorSendingLibraryId(io::Error),
26+
#[error("Error receiving library identity: {0:?}")]
27+
ErrorReceivingLibraryIdentity(decode::Error),
28+
#[error("Error decoding library identity: {0:?}")]
29+
ErrorDecodingLibraryIdentity(IdentityErr),
30+
}
31+
32+
/// An encrypted tunnel between two libraries.
33+
///
34+
/// This sits on top of the existing node to node encryption provided by Quic.
35+
///
36+
/// It's primarily designed to avoid an attack where traffic flows:
37+
/// node <-> attacker node <-> node
38+
/// The attackers node can't break TLS but if they get in the middle they can present their own node identity to each side and then intercept library related traffic.
39+
/// To avoid that we use this tunnel to encrypt all library related traffic so it can only be decoded by another instance of the same library.
40+
#[derive(Debug)]
41+
pub struct Tunnel {
42+
stream: UnicastStream,
43+
library_remote_id: RemoteIdentity,
44+
}
45+
46+
impl Tunnel {
47+
/// Create a new tunnel.
48+
///
49+
/// This should be used by the node that initiated the request which this tunnel is used for.
50+
pub async fn initiator(
51+
mut stream: UnicastStream,
52+
library_identity: &Identity,
53+
) -> Result<Self, TunnelError> {
54+
stream
55+
.write_all(&[b'T'])
56+
.await
57+
.map_err(|_| TunnelError::DiscriminatorWriteError)?;
58+
59+
let mut buf = vec![];
60+
encode::buf(&mut buf, &library_identity.to_remote_identity().get_bytes());
61+
stream
62+
.write_all(&buf)
63+
.await
64+
.map_err(TunnelError::ErrorSendingLibraryId)?;
65+
66+
// TODO: Do encryption things
67+
68+
Ok(Self {
69+
stream,
70+
library_remote_id: library_identity.to_remote_identity(),
71+
})
72+
}
73+
74+
/// Create a new tunnel.
75+
///
76+
/// This should be used by the node that responded to the request which this tunnel is used for.
77+
pub async fn responder(mut stream: UnicastStream) -> Result<Self, TunnelError> {
78+
let discriminator = stream
79+
.read_u8()
80+
.await
81+
.map_err(|_| TunnelError::DiscriminatorReadError)?;
82+
if discriminator != b'T' {
83+
return Err(TunnelError::InvalidDiscriminator);
84+
}
85+
86+
// TODO: Blindly decoding this from the stream is not secure. We need a cryptographic handshake here to prove the peer on the other ends is holding the private key.
87+
let library_remote_id = decode::buf(&mut stream)
88+
.await
89+
.map_err(TunnelError::ErrorReceivingLibraryIdentity)?;
90+
91+
let library_remote_id = RemoteIdentity::from_bytes(&library_remote_id)
92+
.map_err(TunnelError::ErrorDecodingLibraryIdentity)?;
93+
94+
// TODO: Do encryption things
95+
96+
Ok(Self {
97+
library_remote_id,
98+
stream,
99+
})
100+
}
101+
102+
/// Get the `RemoteIdentity` of the peer on the other end of the tunnel.
103+
pub fn node_remote_identity(&self) -> RemoteIdentity {
104+
self.stream.remote_identity()
105+
}
106+
107+
/// Get the `RemoteIdentity` of the library instance on the other end of the tunnel.
108+
pub fn library_remote_identity(&self) -> RemoteIdentity {
109+
self.library_remote_id
110+
}
111+
}
112+
113+
impl AsyncRead for Tunnel {
114+
fn poll_read(
115+
self: Pin<&mut Self>,
116+
cx: &mut Context<'_>,
117+
buf: &mut ReadBuf<'_>,
118+
) -> Poll<io::Result<()>> {
119+
// TODO: Do decryption
120+
121+
Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
122+
}
123+
}
124+
125+
impl AsyncWrite for Tunnel {
126+
fn poll_write(
127+
self: Pin<&mut Self>,
128+
cx: &mut Context<'_>,
129+
buf: &[u8],
130+
) -> Poll<io::Result<usize>> {
131+
// TODO: Do encryption
132+
133+
Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
134+
}
135+
136+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
137+
Pin::new(&mut self.get_mut().stream).poll_flush(cx)
138+
}
139+
140+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
141+
Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
142+
}
143+
}

0 commit comments

Comments
 (0)