Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The binary requires these environment variables:
- `SPICEIO_SMB_DOMAIN` — SMB domain (default empty)
- `SPICEIO_BUCKET` — virtual S3 bucket name (defaults to `SPICEIO_SMB_SHARE`)
- `SPICEIO_REGION` — AWS region to advertise (default `us-east-1`)
- `SPICEIO_SMB_CONNECTIONS` — number of SMB TCP connections in the pool (default `4`)
- `SPICEIO_SMB_MAX_IO` — max standalone read/write I/O size in bytes (default `65536`; raise for servers that handle larger I/O)
- `SPICEIO_LOG_FILE` — append logs to this file in addition to stderr (optional; non-blocking, never stalls the proxy)

## Architecture
Expand All @@ -42,7 +44,7 @@ The codebase has three modules:

- **`s3`** — HTTP layer. Parses incoming S3 API requests and produces XML responses. `router.rs` is the central dispatch (path-style bucket routing). Covers GetObject, PutObject, CopyObject, DeleteObject, HeadObject, ListObjectsV1/V2, multipart uploads, and stub endpoints for ACL/tagging/versioning. `xml.rs` is a hand-rolled XML builder. `multipart.rs` manages upload state in-memory, with parts stored as temp files under `.spiceio-uploads/` on the SMB share. `body.rs` implements `SpiceioBody`, a zero-copy streaming response body (channel-backed for large reads, inline for XML/errors).

- **`smb`** — Wire protocol client. `protocol.rs` defines SMB 3.1.x packet structures (little-endian). `client.rs` manages the TCP connection, negotiate/session-setup handshake, and exposes operations (tree connect, create, read, write, close, query directory). `auth.rs` implements NTLMv2 challenge-response. `ops.rs` provides the high-level `ShareSession` abstraction the S3 layer consumes (list, read, write, delete, stat, copy).
- **`smb`** — Wire protocol client. `protocol.rs` defines SMB 3.1.x packet structures (little-endian). `client.rs` manages a TCP connection, negotiate/session-setup handshake, and exposes operations (tree connect, create, read, write, close, query directory, pipelined read). `pool.rs` manages N authenticated connections for concurrent request fan-out. `auth.rs` implements NTLMv2 challenge-response. `ops.rs` provides the high-level `ShareSession` abstraction the S3 layer consumes (list, read, write, delete, stat, copy).

- **`crypto`** — FFI bindings to macOS CommonCrypto (`Security.framework`/`libcommonCrypto`). Exposes MD4, SHA-256, and HMAC-MD5. No Rust crypto crates.

Expand All @@ -52,6 +54,9 @@ The codebase has three modules:

- Zero external crypto dependencies — all crypto goes through `crypto::ffi` to CommonCrypto.
- No `async-trait` — the SMB client uses `tokio::sync::Mutex` around the TCP stream with manual `async` methods.
- Connection pool — N TCP connections (default 4) to the same SMB server, round-robin dispatched. Concurrent S3 requests fan out across connections instead of serializing on a single mutex. File handles are pinned to the connection that opened them.
- Pipelined reads — streaming GetObject sends batches of 8 read requests before collecting responses, hiding per-request round-trip latency.
- Configurable I/O cap — standalone read/write ops default to 64 KB (safe for commodity NAS); raisable via `SPICEIO_SMB_MAX_IO` for compliant servers. Compound operations always cap at 64 KB.
- GetObject streams SMB read chunks directly to the HTTP response via `SpiceioBody::channel` — no full-file buffering.
- PutObject streams HTTP request body chunks directly to SMB write calls — no full-body collection.
- Body is collected into `Bytes` only for operations that require the full payload (multi-delete, multipart complete, upload-part for ETag hashing).
Expand Down
25 changes: 20 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use s3::multipart::MultipartStore;
use s3::router::AppState;
use smb::client::SmbConfig;
use smb::ops::ShareSession;
use smb::pool::SmbPool;

/// Runtime configuration parsed from environment variables.
struct Config {
Expand All @@ -42,6 +43,10 @@ struct Config {
bucket_name: String,
/// AWS region to advertise
region: String,
/// Number of SMB TCP connections in the pool (default 4)
smb_connections: usize,
/// Max I/O size for standalone read/write operations (default 1MB)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field comment says the default max I/O is 1MB, but SPICEIO_SMB_MAX_IO defaults to 0 here (which the SMB client interprets as the internal default of 64KB). Update the comment to match the actual default behavior to avoid confusion.

Suggested change
/// Max I/O size for standalone read/write operations (default 1MB)
/// Max I/O size for standalone read/write operations (default 0, meaning the SMB client uses its internal default of 64KB)

Copilot uses AI. Check for mistakes.
smb_max_io: u32,
}

impl Config {
Expand All @@ -64,6 +69,14 @@ impl Config {
env::var("SPICEIO_SMB_SHARE").unwrap_or_else(|_| "data".into())
}),
region: env::var("SPICEIO_REGION").unwrap_or_else(|_| "us-east-1".into()),
smb_connections: env::var("SPICEIO_SMB_CONNECTIONS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(4),
smb_max_io: env::var("SPICEIO_SMB_MAX_IO")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0),
}
}
}
Expand All @@ -80,28 +93,30 @@ async fn main() {
let config = Config::from_env();

slog!(
"[spiceio] connecting to smb://****@{}:{}/{}",
"[spiceio] connecting to smb://****@{}:{}/{} ({}x)",
config.smb_server,
config.smb_port,
config.smb_share
config.smb_share,
config.smb_connections,
);

// Connect to SMB server
// Connect SMB connection pool
let smb_config = SmbConfig {
server: config.smb_server.clone(),
port: config.smb_port,
username: config.smb_username.clone(),
password: config.smb_password.clone(),
domain: config.smb_domain.clone(),
workstation: "SPICEIO".into(),
max_io_size: config.smb_max_io,
};

let client = smb::SmbClient::connect(smb_config)
let pool = SmbPool::connect(smb_config, config.smb_connections)
.await
.expect("failed to connect to SMB server");

let share = Arc::new(
ShareSession::connect(client, &config.smb_share)
ShareSession::connect(pool, &config.smb_share)
.await
.expect("failed to connect to SMB share"),
);
Expand Down
31 changes: 19 additions & 12 deletions src/s3/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async fn handle_get_object(
// ── Fast path: compound Create+Read+Close for small files ───────
// Tries to read the entire file in one SMB round trip. Falls back to
// streaming for large files or range requests.
let max_read = share.max_read_size();
let max_read = share.compound_max_read_size();
let no_range = range_header.is_none();

if no_range {
Expand Down Expand Up @@ -595,19 +595,26 @@ async fn handle_get_object(
let (body, tx) = SpiceioBody::channel(4);
let chunk_size = handle.max_chunk;

// Spawn background task to stream SMB reads into the channel
// Spawn background task to stream pipelined SMB reads into the channel.
// Sends batches of read requests to fill the network pipe, then pushes
// each chunk to the HTTP response body as it arrives.
tokio::spawn(async move {
let mut offset = start;
let stream_end = end + 1;
while offset < stream_end {
let to_read = ((stream_end - offset) as u32).min(chunk_size);
match handle.read_chunk(offset, to_read).await {
Ok(chunk) if chunk.is_empty() => break,
Ok(chunk) => {
offset += chunk.len() as u64;
if tx.send(chunk).await.is_err() {
crate::serr!("[spiceio] getobject client disconnected");
break;
'outer: while offset < stream_end {
let remaining = stream_end - offset;
match handle.read_pipeline(offset, chunk_size, remaining).await {
Comment on lines +604 to +606
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop can send more bytes than requested for HTTP Range requests: read_pipeline always requests chunk_size bytes per SMB read, even when remaining < chunk_size, and the returned chunk is forwarded as-is. For non-EOF ranges, SMB will happily return the full chunk_size, causing the response body to exceed Content-Length/Content-Range. Ensure the last request in a batch uses min(chunk_size, remaining) (and similarly for later requests in the batch) and/or truncate the final chunk(s) to remaining before sending.

Copilot uses AI. Check for mistakes.
Ok(chunks) if chunks.is_empty() => break,
Ok(chunks) => {
for chunk in chunks {
if chunk.is_empty() {
break 'outer;
}
offset += chunk.len() as u64;
if tx.send(chunk).await.is_err() {
crate::serr!("[spiceio] getobject client disconnected");
break 'outer;
}
}
}
Err(e) => {
Expand Down Expand Up @@ -671,7 +678,7 @@ async fn handle_put_object(
// ── Fast path: collect small bodies and use compound write ──────
let content_length: Option<u64> =
get_header(hdrs, "content-length").and_then(|s| s.parse().ok());
let max_write = share.max_write_size() as u64;
let max_write = share.compound_max_write_size() as u64;

if let Some(cl) = content_length
&& cl <= max_write
Expand Down
169 changes: 163 additions & 6 deletions src/smb/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct SmbConfig {
pub password: String,
pub domain: String,
pub workstation: String,
/// Cap for standalone read/write I/O (0 = use DEFAULT_MAX_IO).
pub max_io_size: u32,
}

impl SmbConfig {
Expand All @@ -30,14 +32,28 @@ impl SmbConfig {
}
}

/// Default I/O cap for standalone (non-compound) read/write operations.
/// Many NAS servers advertise multi-MB maximums in negotiate but fail at sizes
/// well below the advertised limit. 64 KB is the safe conservative default;
/// override via `SPICEIO_SMB_MAX_IO` for servers that handle larger I/O
/// (e.g., Windows Server, enterprise NAS). Even at 64 KB the connection pool
/// and pipelined reads still deliver major throughput gains.
const DEFAULT_MAX_IO: u32 = 65536;

/// An authenticated SMB2 session.
pub struct SmbClient {
stream: Mutex<TcpStream>,
message_id: AtomicU64,
session_id: u64,
config: SmbConfig,
/// Effective max read size for standalone (non-compound) reads.
pub max_read_size: u32,
/// Effective max write size for standalone (non-compound) writes.
pub max_write_size: u32,
/// Capped max for compound operations (64KB — some NAS servers reject
/// larger payloads inside compound requests).
pub compound_max_read_size: u32,
pub compound_max_write_size: u32,
/// 16-byte client GUID
client_guid: [u8; 16],
/// SMB 3.1.1 signing key (derived after auth)
Expand All @@ -60,6 +76,34 @@ impl SmbClient {
};
stream.set_nodelay(true)?;

// Enlarge socket buffers to 1 MB for large read/write throughput.
{
use std::os::fd::AsRawFd;

unsafe extern "C" {
fn setsockopt(
socket: i32,
level: i32,
option_name: i32,
option_value: *const u8,
option_len: u32,
) -> i32;
}

const SOL_SOCKET: i32 = 0xffff;
const SO_SNDBUF: i32 = 0x1001;
const SO_RCVBUF: i32 = 0x1002;

let fd = stream.as_raw_fd();
let buf_size: i32 = 1024 * 1024;
let ptr = std::ptr::from_ref(&buf_size).cast();
let len = size_of::<i32>() as u32;
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_of isn’t in scope here, so this won’t compile. Import it (e.g., use std::mem::size_of;) or fully-qualify the call (std::mem::size_of::<i32>()).

Suggested change
let len = size_of::<i32>() as u32;
let len = std::mem::size_of::<i32>() as u32;

Copilot uses AI. Check for mistakes.
unsafe {
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, ptr, len);
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, ptr, len);
}
}

let mut client_guid = [0u8; 16];
unsafe extern "C" {
fn arc4random_buf(buf: *mut u8, nbytes: usize);
Expand All @@ -76,6 +120,8 @@ impl SmbClient {
config,
max_read_size: 65536,
max_write_size: 65536,
compound_max_read_size: 65536,
compound_max_write_size: 65536,
client_guid,
signing_key: None,
};
Expand Down Expand Up @@ -176,10 +222,16 @@ impl SmbClient {
crate::serr!("[spiceio] smb invalid negotiate response");
io::Error::new(io::ErrorKind::InvalidData, "invalid negotiate response")
})?;
let io_cap = if self.config.max_io_size > 0 {
self.config.max_io_size
} else {
DEFAULT_MAX_IO
};
crate::slog!(
"[spiceio] negotiated SMB 0x{:04X}, max_rw={}K",
"[spiceio] negotiated SMB 0x{:04X}, server_max={}K io_cap={}K",
neg_resp.dialect_revision,
neg_resp.max_write_size.min(65536) / 1024,
neg_resp.max_read_size / 1024,
io_cap / 1024,
);

// ── Step 2: Session Setup (NTLM Negotiate) ──
Expand Down Expand Up @@ -246,10 +298,20 @@ impl SmbClient {
crate::slog!("[spiceio] authenticated, signing key derived");

self.session_id = resp_hdr.session_id;
// Cap at 64KB — the maximum reliably supported by common NAS servers.
// Some servers advertise larger max sizes but fail on compound requests.
self.max_read_size = neg_resp.max_read_size.min(65536);
self.max_write_size = neg_resp.max_write_size.min(65536);
// Cap standalone I/O by: min(server_advertised, max_transact, configured_cap).
// Many NAS servers advertise multi-MB limits but fail at much smaller sizes.
let transact = neg_resp.max_transact_size;
let io_cap = if self.config.max_io_size > 0 {
self.config.max_io_size
} else {
DEFAULT_MAX_IO
};
self.max_read_size = neg_resp.max_read_size.min(transact).min(io_cap);
self.max_write_size = neg_resp.max_write_size.min(transact).min(io_cap);
// Cap at 64KB for compound requests — some NAS servers reject larger
// payloads inside compound (chained) operations.
self.compound_max_read_size = self.max_read_size.min(65536);
self.compound_max_write_size = self.max_write_size.min(65536);
self.signing_key = Some(signing_key);
Ok(())
}
Expand Down Expand Up @@ -387,6 +449,101 @@ impl SmbClient {
})
}

/// Pipelined read: send `count` read requests, then receive all responses.
///
/// Holds the stream lock for the entire batch, eliminating per-request
/// round-trip latency. Returns chunks in offset order. Stops early on EOF.
pub async fn pipelined_read(
&self,
tree_id: u32,
file_id: &[u8; 16],
start_offset: u64,
chunk_size: u32,
count: usize,
) -> io::Result<Vec<bytes::Bytes>> {
if count == 0 {
return Ok(Vec::new());
}

// Build all request packets
let mut packets = Vec::with_capacity(count);
for i in 0..count {
let offset = start_offset + (i as u64) * (chunk_size as u64);
let msg_id = self.next_message_id();
let mut hdr = Header::new(Command::Read, msg_id).with_credit_charge(chunk_size);
hdr.session_id = self.session_id;
hdr.tree_id = tree_id;
let packet = build_request(&hdr, |buf| {
encode_read_request(buf, file_id, offset, chunk_size);
});
packets.push(packet);
}

let mut stream = self.stream.lock().await;

// Send all requests
for packet in &packets {
if let Some(ref key) = self.signing_key {
let mut signed = packet.to_vec();
sign_packet(&mut signed, key);
stream.write_all(&signed).await?;
} else {
stream.write_all(packet).await?;
}
}
stream.flush().await?;

// Receive all responses
let mut results = Vec::with_capacity(count);
for _ in 0..count {
loop {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let msg_len = u32::from_be_bytes(len_buf) as usize;

if !(SMB2_HEADER_SIZE..=16 * 1024 * 1024).contains(&msg_len) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid SMB2 message length: {msg_len}"),
));
}

let mut msg = vec![0u8; msg_len];
stream.read_exact(&mut msg).await?;

let header = Header::decode(&msg).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "invalid SMB2 header")
})?;

// Skip STATUS_PENDING interim responses
if header.status == 0x0000_0103 {
continue;
}

let status = NtStatus::from_u32(header.status);
if status == NtStatus::EndOfFile {
// EOF — no more data, return what we have
return Ok(results);
}
if status.is_error() {
return Err(io::Error::other(format!(
"pipelined read failed: 0x{:08X}",
header.status
)));
}

let body = msg[SMB2_HEADER_SIZE..].to_vec();
let data = decode_read_response_owned(body).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "invalid read response")
})?;
results.push(data);
Comment on lines +535 to +539
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelined_read assumes responses arrive in the same order requests were sent (it just pushes each chunk). SMB2 responses can be returned out-of-order, so this can reorder/corrupt the byte stream (and can also prematurely stop if an EOF response for a later offset arrives early). Track message_id→request index/offset when sending and place each decoded chunk into the correct slot before returning (or otherwise reorder by offset).

Copilot uses AI. Check for mistakes.
break;
}
}

Ok(results)
}

/// Write to an open file.
pub async fn write(
&self,
Expand Down
2 changes: 2 additions & 0 deletions src/smb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
pub mod auth;
pub mod client;
pub mod ops;
pub mod pool;
pub mod protocol;

pub use client::SmbClient;
pub use pool::SmbPool;
Loading
Loading