Skip to content

Commit 960b931

Browse files
committed
feat(fetcher): support socks5 proxy
1 parent eeff5b4 commit 960b931

File tree

3 files changed

+67
-4
lines changed

3 files changed

+67
-4
lines changed

Cargo.lock

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rsync-fetcher/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chron
3333
tap = "1.0"
3434
tempfile = "3.3"
3535
tokio = { version = "1.25", features = ["full"] }
36+
tokio-socks = "0.5"
3637
tokio-util = { version = "0.7", features = ["compat"] }
3738
tracing = "0.1"
3839
unix_mode = "0.1"

rsync-fetcher/src/rsync.rs

+53-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use eyre::{Context, ContextCompat, Result};
1+
use eyre::{bail, Context, ContextCompat, Result};
22
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
33
use tokio::net::TcpStream;
4+
use tokio_socks::tcp::Socks5Stream;
5+
use tracing::info;
46
use url::Url;
57

68
use crate::rsync::downloader::Downloader;
@@ -36,19 +38,66 @@ pub struct TaskBuilders {
3638
pub progress: ProgressDisplay,
3739
}
3840

41+
async fn connect_with_proxy(target: &str) -> Result<TcpStream> {
42+
let proxy = std::env::var("SOCKS5_PROXY")
43+
.ok()
44+
.and_then(|s| (!s.is_empty()).then_some(s))
45+
.or_else(|| {
46+
std::env::var("socks5_proxy")
47+
.ok()
48+
.and_then(|s| (!s.is_empty()).then_some(s))
49+
});
50+
51+
if let Some(proxy) = proxy {
52+
let proxy = Url::parse(&proxy).context("invalid proxy URL")?;
53+
if proxy.scheme().to_lowercase() != "socks5" {
54+
bail!("unsupported proxy scheme: {}", proxy.scheme());
55+
}
56+
let proxy_addr = proxy.host_str().context("missing proxy host")?;
57+
let proxy_port = proxy.port().unwrap_or(1080);
58+
let proxy_username = proxy.username();
59+
let proxy_password = proxy.password().unwrap_or_default();
60+
61+
let stream = if proxy_username.is_empty() {
62+
info!("connecting to {} via SOCKS5 proxy {}", target, proxy);
63+
Socks5Stream::connect((proxy_addr, proxy_port), target)
64+
.await
65+
.context("proxy or rsync server refused connection. Are they running?")?
66+
} else {
67+
info!(
68+
"connecting to {} via SOCKS5 proxy {} as {}",
69+
target, proxy, proxy_username
70+
);
71+
Socks5Stream::connect_with_password(
72+
(proxy_addr, proxy_port),
73+
target,
74+
proxy_username,
75+
proxy_password,
76+
)
77+
.await
78+
.context("proxy or rsync server refused connection. Are they running?")?
79+
};
80+
81+
Ok(stream.into_inner())
82+
} else {
83+
TcpStream::connect(target)
84+
.await
85+
.context("rsync server refused connection. Is it running?")
86+
}
87+
}
88+
3989
pub async fn start_handshake(url: &Url) -> Result<HandshakeConn> {
4090
let port = url.port().unwrap_or(873);
4191
let path = url.path().trim_start_matches('/');
4292
let auth = Auth::from_url_and_env(url);
4393
let module = path.split('/').next().context("empty remote path")?;
4494

45-
let stream = TcpStream::connect(format!(
95+
let stream = connect_with_proxy(&format!(
4696
"{}:{}",
4797
url.host_str().context("missing remote host")?,
4898
port
4999
))
50-
.await
51-
.context("rsync server refused connection. Is it running?")?;
100+
.await?;
52101

53102
let mut handshake = HandshakeConn::new(stream);
54103
handshake.start_inband_exchange(module, path, auth).await?;

0 commit comments

Comments
 (0)