Skip to content

Commit 68600b7

Browse files
committed
fix: listen drop with upgrade mode
1 parent 7d849a9 commit 68600b7

File tree

5 files changed

+237
-19
lines changed

5 files changed

+237
-19
lines changed

tentacle/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ thiserror = "1.0"
3131
nohash-hasher = "0.2"
3232

3333
parking_lot = { version = "0.12", optional = true }
34-
tokio-tungstenite = { version = "0.26", optional = true }
34+
tokio-tungstenite = { version = "0.27", optional = true }
3535
httparse = { version = "1.9", optional = true }
3636
futures-timer = { version = "3.0.2", optional = true }
3737

tentacle/src/runtime/tokio_runtime/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ async fn connect_by_proxy(
166166
socks5::connect(proxy_server_url.clone(), target_addr.clone(), target_port)
167167
.await
168168
.map_err(|err| {
169-
io::Error::new(
170-
io::ErrorKind::Other,
169+
io::Error::other(
171170
format!(
172171
"socks5_connect to target_addr: {}, target_port: {} by proxy_server: {} failed, err: {}",
173172
target_addr, target_port, proxy_server_url, err
@@ -196,10 +195,7 @@ pub(crate) async fn connect(
196195
)
197196
.await
198197
.map_err(|err| {
199-
io::Error::new(
200-
io::ErrorKind::Other,
201-
format!("connect_by_proxy: {}, error: {}", proxy_url, err),
202-
)
198+
io::Error::other(format!("connect_by_proxy: {}, error: {}", proxy_url, err))
203199
}),
204200
None => connect_direct(target_addr, socket_transformer).await,
205201
}

tentacle/src/service.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,17 @@ where
227227
if let Some(client) = inner.igd_client.as_mut() {
228228
client.register(&listen_address)
229229
}
230-
inner.listens.insert(listen_address.clone());
230+
let clear_addr = listen_address
231+
.into_iter()
232+
.map(|p| {
233+
if let Protocol::Tls(_) = p {
234+
Protocol::Tls(Default::default())
235+
} else {
236+
p
237+
}
238+
})
239+
.collect::<Multiaddr>();
240+
inner.listens.insert(clear_addr);
231241

232242
if !matches!(incoming, MultiIncoming::TcpUpgrade) {
233243
inner.spawn_listener(incoming, listen_address);
@@ -312,6 +322,7 @@ where
312322
timeout: self.config.timeout,
313323
listen_addr: listen_address,
314324
future_task_sender: self.future_task_sender.clone(),
325+
listens_upgrade_modes: self.multi_transport.listens_upgrade_modes.clone(),
315326
};
316327
let mut sender = self.future_task_sender.clone();
317328
crate::runtime::spawn(async move {
@@ -1071,7 +1082,17 @@ where
10711082
.into(),
10721083
)
10731084
.await;
1074-
self.listens.insert(listen_address.clone());
1085+
let clear_addr = listen_address
1086+
.into_iter()
1087+
.map(|p| {
1088+
if let Protocol::Tls(_) = p {
1089+
Protocol::Tls(Default::default())
1090+
} else {
1091+
p
1092+
}
1093+
})
1094+
.collect::<Multiaddr>();
1095+
self.listens.insert(clear_addr);
10751096
self.state.decrease();
10761097
self.try_update_listens().await;
10771098
#[cfg(feature = "upnp")]
@@ -1136,7 +1157,17 @@ where
11361157
}
11371158
}
11381159
ServiceTask::Listen { address } => {
1139-
if !self.listens.contains(&address) {
1160+
let clear_addr = address
1161+
.into_iter()
1162+
.map(|p| {
1163+
if let Protocol::Tls(_) = p {
1164+
Protocol::Tls(Default::default())
1165+
} else {
1166+
p
1167+
}
1168+
})
1169+
.collect::<Multiaddr>();
1170+
if !self.listens.contains(&clear_addr) {
11401171
if let Err(e) = self.listen_inner(address.clone()) {
11411172
let _ignore = self
11421173
.handle_sender

tentacle/src/service/helper.rs

Lines changed: 199 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ pub struct Listener<K> {
163163
pub(crate) timeout: Duration,
164164
pub(crate) listen_addr: Multiaddr,
165165
pub(crate) future_task_sender: mpsc::Sender<BoxedFutureTask>,
166+
pub(crate) listens_upgrade_modes: std::sync::Arc<
167+
crate::lock::Mutex<
168+
std::collections::HashMap<
169+
std::net::SocketAddr,
170+
crate::transports::tcp_base_listen::UpgradeMode,
171+
>,
172+
>,
173+
>,
166174
}
167175

168176
#[cfg(not(target_family = "wasm"))]
@@ -174,15 +182,198 @@ where
174182
let mut event_sender = self.event_sender.clone();
175183
let mut future_sender = self.future_task_sender.clone();
176184
let address = self.listen_addr.clone();
185+
let mode = {
186+
use crate::utils::multiaddr_to_socketaddr;
187+
188+
let global = self.listens_upgrade_modes.lock();
189+
match multiaddr_to_socketaddr(&address) {
190+
Some(net_addr) => global.get(&net_addr).map(|u| u.to_enum()),
191+
None => None,
192+
}
193+
};
194+
#[cfg(any(feature = "ws", feature = "tls"))]
195+
use crate::multiaddr::Protocol;
196+
use crate::transports::tcp_base_listen::UpgradeModeEnum;
177197
let report_task = async move {
178-
if let Err(err) = event_sender
179-
.send(SessionEvent::ListenError {
180-
address,
181-
error: TransportErrorKind::Io(io_err),
182-
})
183-
.await
184-
{
185-
error!("Listen address result send back error: {:?}", err);
198+
match mode {
199+
None => {
200+
if let Err(err) = event_sender
201+
.send(SessionEvent::ListenError {
202+
address,
203+
error: TransportErrorKind::Io(io_err),
204+
})
205+
.await
206+
{
207+
error!("Listen address result send back error: {:?}", err);
208+
}
209+
}
210+
Some(UpgradeModeEnum::OnlyTcp) => {
211+
if let Err(err) = event_sender
212+
.send(SessionEvent::ListenError {
213+
address,
214+
error: TransportErrorKind::Io(io_err),
215+
})
216+
.await
217+
{
218+
error!("Listen address result send back error: {:?}", err);
219+
}
220+
}
221+
#[cfg(feature = "ws")]
222+
Some(UpgradeModeEnum::OnlyWs) => {
223+
if let Err(err) = event_sender
224+
.send(SessionEvent::ListenError {
225+
address,
226+
error: TransportErrorKind::Io(io_err),
227+
})
228+
.await
229+
{
230+
error!("Listen address result send back error: {:?}", err);
231+
}
232+
}
233+
#[cfg(feature = "tls")]
234+
Some(UpgradeModeEnum::OnlyTls) => {
235+
let clear_addr = listen_address
236+
.into_iter()
237+
.map(|p| {
238+
if let Protocol::Tls(_) = p {
239+
Protocol::Tls(Default::default())
240+
} else {
241+
p
242+
}
243+
})
244+
.collect::<Multiaddr>();
245+
if let Err(err) = event_sender
246+
.send(SessionEvent::ListenError {
247+
address: clear_addr,
248+
error: TransportErrorKind::Io(io_err),
249+
})
250+
.await
251+
{
252+
error!("Listen address result send back error: {:?}", err);
253+
}
254+
}
255+
#[cfg(feature = "tls")]
256+
Some(UpgradeModeEnum::TcpAndTls) => {
257+
let base_net: Multiaddr = address
258+
.iter()
259+
.filter_map(|p| {
260+
if matches!(p, Protocol::Tls(_)) {
261+
None
262+
} else {
263+
Some(p)
264+
}
265+
})
266+
.collect();
267+
let mut tls_net = base_net.clone();
268+
tls_net.push(Protocol::Tls(Default::default()));
269+
if let Err(err) = event_sender
270+
.send(SessionEvent::ListenError {
271+
address: base_net,
272+
error: TransportErrorKind::Io(std::io::Error::new(
273+
io_err.kind(),
274+
io_err.to_string(),
275+
)),
276+
})
277+
.await
278+
{
279+
error!("Listen address result send back error: {:?}", err);
280+
}
281+
if let Err(err) = event_sender
282+
.send(SessionEvent::ListenError {
283+
address: tls_net,
284+
error: TransportErrorKind::Io(io_err),
285+
})
286+
.await
287+
{
288+
error!("Listen address result send back error: {:?}", err);
289+
}
290+
}
291+
#[cfg(feature = "ws")]
292+
Some(UpgradeModeEnum::TcpAndWs) => {
293+
let base_net: Multiaddr = address
294+
.iter()
295+
.filter_map(|p| {
296+
if matches!(p, Protocol::Ws) {
297+
None
298+
} else {
299+
Some(p)
300+
}
301+
})
302+
.collect();
303+
let mut ws_net = base_net.clone();
304+
ws_net.push(Protocol::Ws);
305+
if let Err(err) = event_sender
306+
.send(SessionEvent::ListenError {
307+
address: base_net,
308+
error: TransportErrorKind::Io(std::io::Error::new(
309+
io_err.kind(),
310+
io_err.to_string(),
311+
)),
312+
})
313+
.await
314+
{
315+
error!("Listen address result send back error: {:?}", err);
316+
}
317+
if let Err(err) = event_sender
318+
.send(SessionEvent::ListenError {
319+
address: ws_net,
320+
error: TransportErrorKind::Io(io_err),
321+
})
322+
.await
323+
{
324+
error!("Listen address result send back error: {:?}", err);
325+
}
326+
}
327+
#[cfg(all(feature = "ws", feature = "tls"))]
328+
Some(UpgradeModeEnum::All) => {
329+
let base_net: Multiaddr = address
330+
.iter()
331+
.filter_map(|p| {
332+
if matches!(p, Protocol::Ws | Protocol::Tls(_)) {
333+
None
334+
} else {
335+
Some(p)
336+
}
337+
})
338+
.collect();
339+
let mut ws_net = base_net.clone();
340+
let mut tls_net = base_net.clone();
341+
ws_net.push(Protocol::Ws);
342+
tls_net.push(Protocol::Tls(Default::default()));
343+
if let Err(err) = event_sender
344+
.send(SessionEvent::ListenError {
345+
address: base_net,
346+
error: TransportErrorKind::Io(std::io::Error::new(
347+
io_err.kind(),
348+
io_err.to_string(),
349+
)),
350+
})
351+
.await
352+
{
353+
error!("Listen address result send back error: {:?}", err);
354+
}
355+
if let Err(err) = event_sender
356+
.send(SessionEvent::ListenError {
357+
address: ws_net,
358+
error: TransportErrorKind::Io(std::io::Error::new(
359+
io_err.kind(),
360+
io_err.to_string(),
361+
)),
362+
})
363+
.await
364+
{
365+
error!("Listen address result send back error: {:?}", err);
366+
}
367+
if let Err(err) = event_sender
368+
.send(SessionEvent::ListenError {
369+
address: tls_net,
370+
error: TransportErrorKind::Io(io_err),
371+
})
372+
.await
373+
{
374+
error!("Listen address result send back error: {:?}", err);
375+
}
376+
}
186377
}
187378
};
188379
crate::runtime::spawn(async move {

tentacle/src/transports/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod onion;
2020
#[cfg(not(target_family = "wasm"))]
2121
mod tcp;
2222
#[cfg(not(target_family = "wasm"))]
23-
mod tcp_base_listen;
23+
pub(crate) mod tcp_base_listen;
2424
#[cfg(all(feature = "tls", not(target_family = "wasm")))]
2525
mod tls;
2626
#[cfg(all(feature = "ws", not(target_family = "wasm")))]

0 commit comments

Comments
 (0)