Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ Line wrap the file at 100 chars. Th


## [Unreleased]
### Added
- Add `snafu` as dependency. `snafu` provide macros for creating domain specific errors and adding
context to the underlying source errors.

### Changed
- Change the public API of `ApplyTcpOptionsError`. So this is a breaking change. This stops
exposing the internal details of the type which allows future changes to not be breaking.
- MSRV bumps to 1.85
- Rust language edition upgraded to 2024.

### Removed
- Remove dependency `err-context`.


## [0.4.0] - 2024-01-02
### Changed
Expand Down
31 changes: 23 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ statsd = ["cadence"]

[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] }
err-context = "0.1.0"
snafu = "0.8"
log = "0.4.11"
futures = "0.3.31"
clap = { version = "4.0", features = ["derive"], optional = true }
Expand Down
3 changes: 1 addition & 2 deletions src/bin/tcp2udp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![forbid(unsafe_code)]

use clap::Parser;
use err_context::ErrorExt as _;
use std::num::NonZeroU8;

use udp_over_tcp::tcp2udp;
Expand Down Expand Up @@ -31,7 +30,7 @@ fn main() {
let runtime = create_runtime(options.threads);

let Err(error) = runtime.block_on(tcp2udp::run(options.tcp2udp_options));
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
std::process::exit(1);
}

Expand Down
3 changes: 1 addition & 2 deletions src/bin/udp2tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![forbid(unsafe_code)]

use clap::Parser;
use err_context::BoxedErrorExt as _;
use std::net::SocketAddr;

use udp_over_tcp::udp2tcp;
Expand Down Expand Up @@ -32,7 +31,7 @@ async fn main() {

let options = Options::parse();
if let Err(error) = run(options).await {
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
std::process::exit(1);
}
}
Expand Down
31 changes: 22 additions & 9 deletions src/forward_traffic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use err_context::BoxedErrorExt as _;
use err_context::ResultExt as _;
use futures::future::select;
use futures::pin_mut;
use snafu::prelude::*;
use std::convert::Infallible;
use std::future::Future;
use std::io;
Expand Down Expand Up @@ -36,12 +35,12 @@ pub async fn process_udp_over_tcp(

let tcp2udp = async move {
if let Err(error) = process_tcp2udp(tcp_in, udp_out, tcp_recv_timeout).await {
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
}
};
let udp2tcp = async move {
let Err(error) = process_udp2tcp(udp_in, tcp_out).await;
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
};

pin_mut!(tcp2udp);
Expand All @@ -65,16 +64,16 @@ async fn process_tcp2udp(
let tcp_read_len =
maybe_timeout(tcp_recv_timeout, tcp_in.read(&mut buffer[unprocessed_i..]))
.await
.context("Timeout while reading from TCP")?
.context("Failed reading from TCP")?;
.context(TcpReadTimeoutSnafu)?
.context(TcpReadSnafu)?;
if tcp_read_len == 0 {
break;
}
unprocessed_i += tcp_read_len;

let processed_i = forward_datagrams_in_buffer(&udp_out, &buffer[..unprocessed_i])
.await
.context("Failed writing to UDP")?;
.context(UdpWriteSnafu)?;

// If we have read data that was not forwarded, because it was not a complete datagram,
// move it to the start of the buffer and start over
Expand All @@ -87,6 +86,20 @@ async fn process_tcp2udp(
Ok(())
}

#[derive(Debug, snafu::Snafu)]
enum ProcessSocketError {
#[snafu(display("Timeout while reading from TCP"))]
TcpReadTimeout { source: tokio::time::error::Elapsed },
#[snafu(display("Failed reading from TCP"))]
TcpRead { source: io::Error },
#[snafu(display("Failed writing to TCP"))]
TcpWrite { source: io::Error },
#[snafu(display("Failed reading from UDP"))]
UdpRead { source: io::Error },
#[snafu(display("Failed writing to UDP"))]
UdpWrite { source: io::Error },
}

async fn maybe_timeout<F: Future>(
duration: Option<Duration>,
future: F,
Expand Down Expand Up @@ -141,7 +154,7 @@ async fn process_udp2tcp(
let udp_read_len = udp_in
.recv(&mut buffer[HEADER_LEN..])
.await
.context("Failed reading from UDP")?;
.context(UdpReadSnafu)?;

// Set the "header" to the length of the datagram.
let datagram_len =
Expand All @@ -151,7 +164,7 @@ async fn process_udp2tcp(
tcp_out
.write_all(&buffer[..HEADER_LEN + udp_read_len])
.await
.context("Failed writing to TCP")?;
.context(TcpWriteSnafu)?;

log::trace!("Forwarded {} bytes UDP->TCP", udp_read_len);
}
Expand Down
29 changes: 20 additions & 9 deletions src/tcp2udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::exponential_backoff::ExponentialBackoff;
use crate::logging::Redact;
use err_context::{BoxedErrorExt as _, ErrorExt as _, ResultExt as _};
use snafu::prelude::*;
use std::convert::Infallible;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -135,7 +135,6 @@ impl std::error::Error for Tcp2UdpError {
}
}
}

/// Sets up TCP listening sockets on all addresses in `Options::tcp_listen_addrs`.
/// If binding a listening socket fails this returns an error. Otherwise the function
/// will continue indefinitely to accept incoming connections and forward to UDP.
Expand Down Expand Up @@ -226,7 +225,7 @@ async fn process_tcp_listener(
Ok((tcp_stream, tcp_peer_addr)) => {
log::debug!("Incoming connection from {}/TCP", Redact(tcp_peer_addr));
if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) {
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
}
let statsd = statsd.clone();
tokio::spawn(async move {
Expand All @@ -240,7 +239,7 @@ async fn process_tcp_listener(
)
.await
{
log::error!("Error: {}", error.display("\nCaused by: "));
log::error!("Error: {error}");
}
statsd.decr_connections();
});
Expand All @@ -260,7 +259,6 @@ async fn process_tcp_listener(
}
}
}

/// Sets up a UDP socket bound to `udp_bind_ip` and connected to `udp_peer_addr` and forwards
/// traffic between that UDP socket and the given `tcp_stream` until the `tcp_stream` is closed.
/// `tcp_peer_addr` should be the remote addr that `tcp_stream` is connected to.
Expand All @@ -271,15 +269,17 @@ async fn process_socket(
udp_peer_addr: SocketAddr,
tcp_recv_timeout: Option<Duration>,
) -> Result<(), Box<dyn std::error::Error>> {
let udp_bind_addr = SocketAddr::new(udp_bind_ip, 0);
let bind_addr = SocketAddr::new(udp_bind_ip, 0);

let udp_socket = UdpSocket::bind(udp_bind_addr)
let udp_socket = UdpSocket::bind(bind_addr)
.await
.with_context(|_| format!("Failed to bind UDP socket to {}", udp_bind_addr))?;
.context(UdpBindSnafu { bind_addr })?;
udp_socket
.connect(udp_peer_addr)
.await
.with_context(|_| format!("Failed to connect UDP socket to {}", udp_peer_addr))?;
.context(UdpConnectSnafu {
addr: udp_peer_addr,
})?;

log::debug!(
"UDP socket bound to {} and connected to {}",
Expand All @@ -301,3 +301,14 @@ async fn process_socket(

Ok(())
}

#[derive(Debug, snafu::Snafu)]
enum ProcessSocketError {
#[snafu(display("Failed to bind UDP socket to {bind_addr}"))]
UdpBind {
bind_addr: SocketAddr,
source: io::Error,
},
#[snafu(display("Failed to connect UDP socket to {addr}"))]
UdpConnect { addr: SocketAddr, source: io::Error },
}
Loading