From dabe0672443698e3c91a0848ca4627879c4acaf8 Mon Sep 17 00:00:00 2001 From: Jack Wampler Date: Tue, 24 Oct 2023 16:54:40 -0600 Subject: [PATCH 1/2] first (incomplete) pass on configurably fallible wasm module --- crates/wasm/src/decoder.rs | 4 +- crates/wasm/src/dialer.rs | 2 - crates/wasm/src/encoder.rs | 4 +- examples/water_bins/fallible/.cargo/config | 5 + examples/water_bins/fallible/Cargo.toml | 31 ++ .../fallible/src/async_socks5_listener.rs | 297 ++++++++++++++++++ examples/water_bins/fallible/src/failures.rs | 140 +++++++++ examples/water_bins/fallible/src/lib.rs | 167 ++++++++++ 8 files changed, 644 insertions(+), 6 deletions(-) create mode 100644 examples/water_bins/fallible/.cargo/config create mode 100644 examples/water_bins/fallible/Cargo.toml create mode 100644 examples/water_bins/fallible/src/async_socks5_listener.rs create mode 100644 examples/water_bins/fallible/src/failures.rs create mode 100644 examples/water_bins/fallible/src/lib.rs diff --git a/crates/wasm/src/decoder.rs b/crates/wasm/src/decoder.rs index 3c608a6..002a93e 100644 --- a/crates/wasm/src/decoder.rs +++ b/crates/wasm/src/decoder.rs @@ -6,14 +6,14 @@ use tokio::io::AsyncRead; // A trait for a decoder, developers should implement this trait and pass it to _read_from_outbound pub trait Decoder { - fn decode(&self, input: &[u8], output: &mut [u8]) -> Result; + fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result; } // A default decoder that does just copy + paste pub struct DefaultDecoder; impl Decoder for DefaultDecoder { - fn decode(&self, input: &[u8], output: &mut [u8]) -> Result { + fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result { let len = input.len(); output[..len].copy_from_slice(&input[..len]); Ok(len as u32) diff --git a/crates/wasm/src/dialer.rs b/crates/wasm/src/dialer.rs index 6c92b9d..bf47e54 100644 --- a/crates/wasm/src/dialer.rs +++ b/crates/wasm/src/dialer.rs @@ -23,8 +23,6 @@ impl Dialer { pub fn dial(&mut self) -> Result { info!("[WASM] running in dial func..."); - - // FIXME: hardcoded the filename for now, make it a config later let fd: i32 = self.tcp_connect()?; if fd < 0 { diff --git a/crates/wasm/src/encoder.rs b/crates/wasm/src/encoder.rs index 46adf05..8d518e1 100644 --- a/crates/wasm/src/encoder.rs +++ b/crates/wasm/src/encoder.rs @@ -6,14 +6,14 @@ use tokio::io::AsyncWrite; // A trait for a encoder, developers should implement this trait and pass it to _write_to_outbound pub trait Encoder { - fn encode(&self, input: &[u8], output: &mut [u8]) -> Result; + fn encode(&mut self, input: &[u8], output: &mut [u8]) -> Result; } // A default encoder that does just copy + paste pub struct DefaultEncoder; impl Encoder for DefaultEncoder { - fn encode(&self, input: &[u8], output: &mut [u8]) -> Result { + fn encode(&mut self, input: &[u8], output: &mut [u8]) -> Result { let len = input.len(); output[..len].copy_from_slice(&input[..len]); Ok(len as u32) diff --git a/examples/water_bins/fallible/.cargo/config b/examples/water_bins/fallible/.cargo/config new file mode 100644 index 0000000..bca99d0 --- /dev/null +++ b/examples/water_bins/fallible/.cargo/config @@ -0,0 +1,5 @@ +[build] +target = "wasm32-wasi" + +[target.wasm32-wasi] +rustflags = [ "--cfg", "tokio_unstable"] diff --git a/examples/water_bins/fallible/Cargo.toml b/examples/water_bins/fallible/Cargo.toml new file mode 100644 index 0000000..bc3ee53 --- /dev/null +++ b/examples/water_bins/fallible/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "fallible" +version = "0.1.0" +authors.workspace = true +description.workspace = true +edition.workspace = true +publish = false + +[lib] +name = "fallible" +path = "src/lib.rs" +crate-type = ["cdylib"] + +[dependencies] +tokio = { version = "1.24.2", default-features = false, features = ["net", "rt", "macros", "io-util", "io-std", "time", "sync"] } +tokio-util = { version = "0.7.1", features = ["codec"] } + +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.107" +bincode = "1.3" + +anyhow = "1.0.7" +tracing = "0.1" +tracing-subscriber = "0.3.17" +toml = "0.5.9" +lazy_static = "1.4" +url = { version = "2.2.2", features = ["serde"] } +libc = "0.2.147" + +# water wasm lib import +water-wasm = { path = "../../../crates/wasm/", version = "0.1.0" } diff --git a/examples/water_bins/fallible/src/async_socks5_listener.rs b/examples/water_bins/fallible/src/async_socks5_listener.rs new file mode 100644 index 0000000..7b44a14 --- /dev/null +++ b/examples/water_bins/fallible/src/async_socks5_listener.rs @@ -0,0 +1,297 @@ +use super::*; + +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; + +use std::net::{SocketAddr, ToSocketAddrs}; + +// ----------------------- Listener methods ----------------------- +#[export_name = "v1_listen"] +fn listen() { + wrapper().unwrap(); +} + +fn _listener_creation() -> Result { + let global_conn = match DIALER.lock() { + Ok(conf) => conf, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to lock config", + )); + } + }; + + // FIXME: hardcoded the filename for now, make it a config later + let stream = StreamConfigV1::init( + global_conn.config.local_address.clone(), + global_conn.config.local_port, + "LISTEN".to_string(), + ); + + let encoded: Vec = bincode::serialize(&stream).expect("Failed to serialize"); + + let address = encoded.as_ptr() as u32; + let size = encoded.len() as u32; + + let fd = unsafe { create_listen(address, size) }; + + if fd < 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to create listener", + )); + } + + info!( + "[WASM] ready to start listening at {}:{}", + global_conn.config.local_address, global_conn.config.local_port + ); + + Ok(fd) +} + +#[tokio::main(flavor = "current_thread")] +async fn wrapper() -> std::io::Result<()> { + let fd = _listener_creation().unwrap(); + + // Set up pre-established listening socket. + let standard = unsafe { std::net::TcpListener::from_raw_fd(fd) }; + // standard.set_nonblocking(true).unwrap(); + let listener = TcpListener::from_std(standard)?; + + info!("[WASM] Starting to listen..."); + + loop { + // Accept new sockets in a loop. + let socket = match listener.accept().await { + Ok(s) => s.0, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + continue; + } + }; + + // Spawn a background task for each new connection. + tokio::spawn(async move { + eprintln!("[WASM] > CONNECTED"); + match handle_incoming(socket).await { + Ok(()) => eprintln!("[WASM] > DISCONNECTED"), + Err(e) => eprintln!("[WASM] > ERROR: {}", e), + } + }); + } +} + +// SS handle incoming connections +async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { + let mut buffer = [0; 512]; + + // Read the SOCKS5 greeting + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); + + println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); + + if nbytes < 2 || buffer[0] != 0x05 { + eprintln!("Not a SOCKS5 request"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Not a SOCKS5 request", + )); + } + + let nmethods = buffer[1] as usize; + if nbytes < 2 + nmethods { + eprintln!("Incomplete SOCKS5 greeting"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete SOCKS5 greeting", + )); + } + + // For simplicity, always use "NO AUTHENTICATION REQUIRED" + stream + .write_all(&[0x05, 0x00]) + .await + .expect("Failed to write to stream"); + + // Read the actual request + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); + + println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); + + if nbytes < 7 || buffer[0] != 0x05 || buffer[1] != 0x01 { + println!("Invalid SOCKS5 request"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid SOCKS5 request", + )); + } + + // Extract address and port + let addr: SocketAddr = match buffer[3] { + 0x01 => { + // IPv4 + if nbytes < 10 { + eprintln!("Incomplete request for IPv4 address"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for IPv4 address", + )); + } + let addr = std::net::Ipv4Addr::new(buffer[4], buffer[5], buffer[6], buffer[7]); + let port = u16::from_be_bytes([buffer[8], buffer[9]]); + SocketAddr::from((addr, port)) + } + 0x03 => { + // Domain name + let domain_length = buffer[4] as usize; + if nbytes < domain_length + 5 { + eprintln!("Incomplete request for domain name"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for domain name", + )); + } + let domain = + std::str::from_utf8(&buffer[5..5 + domain_length]).expect("Invalid domain string"); + + println!("Domain: {}", domain); + + let port = + u16::from_be_bytes([buffer[5 + domain_length], buffer[5 + domain_length + 1]]); + + println!("Port: {}", port); + + let domain_with_port = format!("{}:443", domain); // Assuming HTTPS + + // domain.to_socket_addrs().unwrap().next().unwrap() + match domain_with_port.to_socket_addrs() { + Ok(mut addrs) => match addrs.next() { + Some(addr) => addr, + None => { + eprintln!("Domain resolved, but no addresses found for {}", domain); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Domain resolved, but no addresses found for {}", domain), + )); + } + }, + Err(e) => { + eprintln!("Failed to resolve domain {}: {}", domain, e); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Failed to resolve domain {}: {}", domain, e), + )); + } + } + } + _ => { + eprintln!("Address type not supported"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Address type not supported", + )); + } + }; + + // NOTE: create a new Dialer to dial any target address as it wants to + // Add more features later -- connect to target thru rules (direct / server) + + // Connect to target address + let mut tcp_dialer = Dialer::new(); + tcp_dialer.config.remote_address = addr.ip().to_string(); + tcp_dialer.config.remote_port = addr.port() as u32; + + let _tcp_fd = tcp_dialer.dial().expect("Failed to dial"); + + let target_stream = match tcp_dialer.file_conn.outbound_conn.file.unwrap() { + ConnStream::TcpStream(s) => s, + _ => { + eprintln!("Failed to get outbound tcp stream"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get outbound tcp stream", + )); + } + }; + + target_stream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + + let target_stream = + TcpStream::from_std(target_stream).expect("Failed to convert to tokio stream"); + + // Construct the response based on the target address + let response = match addr { + SocketAddr::V4(a) => { + let mut r = vec![0x05, 0x00, 0x00, 0x01]; + r.extend_from_slice(&a.ip().octets()); + r.extend_from_slice(&a.port().to_be_bytes()); + r + } + SocketAddr::V6(a) => { + let mut r = vec![0x05, 0x00, 0x00, 0x04]; + r.extend_from_slice(&a.ip().octets()); + r.extend_from_slice(&a.port().to_be_bytes()); + r + } + }; + + stream + .write_all(&response) + .await + .expect("Failed to write to stream"); + + let (mut client_read, mut client_write) = tokio::io::split(stream); + let (mut target_read, mut target_write) = tokio::io::split(target_stream); + + let client_to_target = async move { + let mut buffer = vec![0; 4096]; + loop { + match client_read.read(&mut buffer).await { + Ok(0) => { + break; + } + Ok(n) => { + if (target_write.write_all(&buffer[0..n]).await).is_err() { + break; + } + } + Err(_) => break, + } + } + }; + + let target_to_client = async move { + let mut buffer = vec![0; 4096]; + loop { + match target_read.read(&mut buffer).await { + Ok(0) => { + break; + } + Ok(n) => { + if (client_write.write_all(&buffer[0..n]).await).is_err() { + break; + } + } + Err(_) => break, + } + } + }; + + // Run both handlers concurrently + tokio::join!(client_to_target, target_to_client); + + Ok(()) +} diff --git a/examples/water_bins/fallible/src/failures.rs b/examples/water_bins/fallible/src/failures.rs new file mode 100644 index 0000000..00f771f --- /dev/null +++ b/examples/water_bins/fallible/src/failures.rs @@ -0,0 +1,140 @@ +use std::{default, str::FromStr}; + +use water_wasm::{Decoder, DefaultDecoder, DefaultEncoder, Encoder}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Failures { + Success, + ConfigError, + ConfigPanic, + DialError, + DialPanic, + ReadError, + ReadPanic, + ReadTimeout, + ReadHang, + CloseOnRead, + WriteError, + WritePanic, + WriteTimeout, + WriteHang, + CloseOnWrite, + HandshakeError, +} + +impl default::Default for Failures { + fn default() -> Self { + Failures::Success + } +} + +impl FromStr for Failures { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let out = match s { + "ConfigError" => Failures::ConfigError, + "ConfigPanic" => Failures::ConfigPanic, + "DialError" => Failures::DialError, + "DialPanic" => Failures::DialPanic, + "ReadError" => Failures::ReadError, + "ReadPanic" => Failures::ReadPanic, + "ReadTimeout" => Failures::ReadTimeout, + "ReadHang" => Failures::ReadHang, + "CloseOnRead" => Failures::CloseOnRead, + "WriteError" => Failures::WriteError, + "WritePanic" => Failures::WritePanic, + "WriteTimeout" => Failures::WriteTimeout, + "WriteHang" => Failures::WriteHang, + "CloseOnWrite" => Failures::CloseOnWrite, + "HandshakeError" => Failures::HandshakeError, + _ => Failures::Success, + }; + Ok(out) + } +} + +pub trait Configurable { + fn with_config(self, config_str: String) -> anyhow::Result + where + Self: Sized; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +struct Config { + throw: Failures, +} + +impl FromStr for Config { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(Config { + throw: Failures::from_str(s)?, + }) + } +} + +impl TryFrom<&str> for Config { + type Error = anyhow::Error; + + fn try_from(s: &str) -> Result { + Ok(Config { + throw: Failures::from_str(s)?, + }) + } +} + +pub struct IdentityTransport { + config: Config, + decoder: DefaultDecoder, + encoder: DefaultEncoder, + n_encodes: i32, + n_decodes: i32, +} + +impl IdentityTransport { + pub fn new() -> Self { + IdentityTransport { + config: Config::default(), + decoder: DefaultDecoder, + encoder: DefaultEncoder, + n_encodes: 0, + n_decodes: 0, + } + } + + fn should_throw(&self, failure: Failures) -> bool { + self.config.throw == failure + } +} + +impl Configurable for IdentityTransport { + fn with_config(mut self, config_str: String) -> anyhow::Result { + self.config = Config::from_str(&config_str)?; + if self.should_throw(Failures::ConfigError) { + return Err(anyhow::anyhow!("throw ConfigError")); + } else if self.should_throw(Failures::ConfigPanic) { + panic!("throw ConfigPanic"); + } + Ok(self) + } +} + +impl Encoder for IdentityTransport { + fn encode(&mut self, input: &[u8], output: &mut [u8]) -> anyhow::Result { + if self.n_encodes == 0 && self.should_throw(Failures::HandshakeError) { + return Err(anyhow::anyhow!("throw HandshakeError")); + } + + self.n_encodes += 1; + self.encoder.encode(input, output) + } +} + +impl Decoder for IdentityTransport { + fn decode(&mut self, input: &[u8], output: &mut [u8]) -> anyhow::Result { + self.n_decodes += 1; + self.decoder.decode(input, output) + } +} diff --git a/examples/water_bins/fallible/src/lib.rs b/examples/water_bins/fallible/src/lib.rs new file mode 100644 index 0000000..89f4595 --- /dev/null +++ b/examples/water_bins/fallible/src/lib.rs @@ -0,0 +1,167 @@ +// =================== Imports & Modules ===================== +use std::{io::Read, os::fd::FromRawFd, sync::Mutex, vec}; + +use anyhow::Result; +use bincode::{self}; +use lazy_static::lazy_static; +use tracing::{info, Level}; + +use water_wasm::*; + +pub mod async_socks5_listener; +pub mod failures; + +// Export the version of this WASM module +#[export_name = "V1"] +pub static V1: i32 = 0; + +// create a mutable global variable stores a pointer to the config +lazy_static! { + static ref DIALER: Mutex = Mutex::new(Dialer::new()); + // static ref CONN: Mutex = Mutex::new(Connection::new()); +} + +#[cfg(target_family = "wasm")] +#[export_name = "_init"] +pub fn _init(debug: bool) { + if debug { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + } + + info!("[WASM] running in _init"); +} + +#[cfg(not(target_family = "wasm"))] +pub fn _init(debug: bool) { + if debug { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + } + + info!("[WASM] running in _init"); +} + +#[export_name = "_set_inbound"] +pub fn _water_bridging(fd: i32) { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + global_dialer.file_conn.set_inbound( + fd, + ConnStream::File(unsafe { std::fs::File::from_raw_fd(fd) }), + ); +} + +#[export_name = "_set_outbound"] +pub fn _water_bridging_out(fd: i32) { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + global_dialer.file_conn.set_outbound( + fd, + ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) }), + ); +} + +#[export_name = "_config"] +pub fn _process_config(fd: i32) { + info!("[WASM] running in _process_config"); + + let mut config_file = unsafe { std::fs::File::from_raw_fd(fd) }; + let mut config = String::new(); + match config_file.read_to_string(&mut config) { + Ok(_) => { + let config: Config = match serde_json::from_str(&config) { + Ok(config) => config, + Err(e) => { + eprintln!("[WASM] > _process_config ERROR: {}", e); + return; + } + }; + + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + // global_dialer.file_conn.config = config.clone(); + global_dialer.config = config; + } + Err(e) => { + eprintln!( + "[WASM] > WASM _process_config failed reading path ERROR: {}", + e + ); + } + }; +} + +#[export_name = "_write"] +pub fn _write(bytes_write: i64) -> i64 { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return -1; + } + }; + + match global_dialer + .file_conn + ._write_2_outbound(&mut DefaultEncoder, bytes_write) + { + Ok(n) => n, + Err(e) => { + eprintln!("[WASM] > ERROR in _write: {}", e); + -1 + } + } +} + +#[export_name = "_read"] +pub fn _read() -> i64 { + match DIALER.lock() { + Ok(mut global_dialer) => { + match global_dialer + .file_conn + ._read_from_outbound(&mut DefaultDecoder) + { + Ok(n) => n, + Err(e) => { + eprintln!("[WASM] > ERROR in _read: {}", e); + -1 + } + } + } + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + -1 + } + } +} + +#[export_name = "_dial"] +pub fn _dial() { + match DIALER.lock() { + Ok(mut global_dialer) => { + if let Err(e) = global_dialer.dial() { + eprintln!("[WASM] > ERROR in _dial: {}", e); + } + } + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + } + } +} From 4fc5a23997df1e9f6a694754c63533ba67ffa8ce Mon Sep 17 00:00:00 2001 From: jmwample <8297368+jmwample@users.noreply.github.com> Date: Tue, 24 Oct 2023 23:30:41 -0600 Subject: [PATCH 2/2] clippy lints for now, still incomplete --- examples/water_bins/fallible/src/failures.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/examples/water_bins/fallible/src/failures.rs b/examples/water_bins/fallible/src/failures.rs index 00f771f..7112a6c 100644 --- a/examples/water_bins/fallible/src/failures.rs +++ b/examples/water_bins/fallible/src/failures.rs @@ -1,9 +1,10 @@ -use std::{default, str::FromStr}; +use std::str::FromStr; use water_wasm::{Decoder, DefaultDecoder, DefaultEncoder, Encoder}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum Failures { + #[default] Success, ConfigError, ConfigPanic, @@ -22,12 +23,6 @@ pub enum Failures { HandshakeError, } -impl default::Default for Failures { - fn default() -> Self { - Failures::Success - } -} - impl FromStr for Failures { type Err = anyhow::Error; @@ -93,6 +88,12 @@ pub struct IdentityTransport { n_decodes: i32, } +impl Default for IdentityTransport { + fn default() -> Self { + Self::new() + } +} + impl IdentityTransport { pub fn new() -> Self { IdentityTransport {