Skip to content

Commit d22838b

Browse files
authored
Merge pull request #405 from nervosnetwork/support-session-init-from-user
feat: support session init from user
2 parents 2a3e74a + fd99879 commit d22838b

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

tentacle/src/service.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub use crate::service::{
4848
HandshakeType, ProtocolHandle, ProtocolMeta, TargetProtocol, TargetSession, TcpSocket,
4949
},
5050
control::{ServiceAsyncControl, ServiceControl},
51-
event::{ServiceError, ServiceEvent},
51+
event::{RawSessionInfo, ServiceError, ServiceEvent},
5252
helper::SessionType,
5353
};
5454
use bytes::Bytes;
@@ -1150,6 +1150,23 @@ where
11501150
}
11511151
}
11521152
}
1153+
ServiceTask::RawSession {
1154+
remote_address,
1155+
raw_session,
1156+
session_info,
1157+
} => {
1158+
let (ty, listen_addr) = match session_info {
1159+
RawSessionInfo::Inbound { listen_addr } => {
1160+
(SessionType::Inbound, Some(listen_addr))
1161+
}
1162+
RawSessionInfo::Outbound { target } => {
1163+
self.dial_protocols.insert(remote_address.clone(), target);
1164+
self.state.increase();
1165+
(SessionType::Outbound, None)
1166+
}
1167+
};
1168+
self.handshake(raw_session, ty, remote_address, listen_addr);
1169+
}
11531170
ServiceTask::Disconnect { session_id } => {
11541171
self.session_close(session_id, Source::External).await
11551172
}

tentacle/src/service/control.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use futures::prelude::*;
22

3+
use std::fmt::Debug;
34
use std::sync::{atomic::Ordering, Arc};
45
use std::time::Duration;
56

67
use crate::{
78
channel::mpsc,
89
error::SendErrorKind,
910
multiaddr::Multiaddr,
10-
service::{event::ServiceTask, TargetProtocol, TargetSession},
11+
service::{
12+
event::{RawSessionInfo, ServiceTask},
13+
TargetProtocol, TargetSession,
14+
},
1115
ProtocolId, SessionId,
1216
};
1317
use bytes::Bytes;
@@ -72,6 +76,25 @@ impl ServiceControl {
7276
self.quick_send(ServiceTask::Dial { address, target })
7377
}
7478

79+
/// Receive an established connection session
80+
/// and build the tentacle protocol on top of it.
81+
#[inline]
82+
pub fn raw_session<T>(
83+
&self,
84+
raw_session: T,
85+
remote_address: Multiaddr,
86+
info: RawSessionInfo,
87+
) -> Result
88+
where
89+
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
90+
{
91+
self.quick_send(ServiceTask::RawSession {
92+
raw_session: Box::new(raw_session),
93+
remote_address,
94+
session_info: info,
95+
})
96+
}
97+
7598
/// Disconnect a connection
7699
#[inline]
77100
pub fn disconnect(&self, session_id: SessionId) -> Result {
@@ -255,6 +278,18 @@ impl From<ServiceAsyncControl> for ServiceControl {
255278
}
256279
}
257280

281+
impl Debug for ServiceControl {
282+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
283+
write!(f, "ServiceControl")
284+
}
285+
}
286+
287+
impl Debug for ServiceAsyncControl {
288+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
289+
write!(f, "ServiceAsyncControl")
290+
}
291+
}
292+
258293
/// Service control, used to send commands externally at runtime, All interfaces are async methods
259294
#[derive(Clone)]
260295
pub struct ServiceAsyncControl {
@@ -301,6 +336,26 @@ impl ServiceAsyncControl {
301336
self.quick_send(ServiceTask::Dial { address, target }).await
302337
}
303338

339+
/// Receive an established connection session
340+
/// and build the tentacle protocol on top of it.
341+
#[inline]
342+
pub async fn raw_session<T>(
343+
&self,
344+
raw_session: T,
345+
remote_address: Multiaddr,
346+
info: RawSessionInfo,
347+
) -> Result
348+
where
349+
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
350+
{
351+
self.quick_send(ServiceTask::RawSession {
352+
raw_session: Box::new(raw_session),
353+
remote_address,
354+
session_info: info,
355+
})
356+
.await
357+
}
358+
304359
/// Disconnect a connection
305360
#[inline]
306361
pub async fn disconnect(&self, session_id: SessionId) -> Result {

tentacle/src/service/event.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,46 @@ pub(crate) enum ServiceTask {
228228
/// Listen address
229229
address: Multiaddr,
230230
},
231+
/// Receive an established connection session
232+
/// and build the tentacle protocol on top of it.
233+
RawSession {
234+
/// Remote address
235+
remote_address: Multiaddr,
236+
/// Raw session
237+
raw_session: Box<dyn crate::session::AsyncRw + Send + Unpin + 'static>,
238+
/// Session info
239+
session_info: RawSessionInfo,
240+
},
231241
/// Shutdown service
232242
Shutdown(bool),
233243
}
234244

245+
/// Raw session info
246+
pub enum RawSessionInfo {
247+
/// Inbound session
248+
Inbound {
249+
/// Session provenance
250+
listen_addr: Multiaddr,
251+
},
252+
/// Outbound session
253+
Outbound {
254+
/// Dial protocols
255+
target: TargetProtocol,
256+
},
257+
}
258+
259+
impl RawSessionInfo {
260+
/// Inbound session info
261+
pub fn inbound(listen_addr: Multiaddr) -> Self {
262+
RawSessionInfo::Inbound { listen_addr }
263+
}
264+
265+
/// Outbound session info
266+
pub fn outbound(target: TargetProtocol) -> Self {
267+
RawSessionInfo::Outbound { target }
268+
}
269+
}
270+
235271
impl fmt::Debug for ServiceTask {
236272
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
237273
use self::ServiceTask::*;
@@ -269,6 +305,7 @@ impl fmt::Debug for ServiceTask {
269305
Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id),
270306
Dial { address, .. } => write!(f, "Dial address: {}", address),
271307
Listen { address } => write!(f, "Listen address: {}", address),
308+
RawSession { remote_address, .. } => write!(f, "Raw session from: {}", remote_address),
272309
ProtocolOpen { session_id, .. } => write!(f, "Open session [{}] proto", session_id),
273310
ProtocolClose {
274311
session_id,

0 commit comments

Comments
 (0)