Skip to content

Commit 13a48f0

Browse files
authored
Add SMB connection pool, pipelined reads, and TCP socket tuning (#11)
Eliminate the single-TCP-connection serialization bottleneck by pooling N authenticated SMB sessions (default 4, configurable via SPICEIO_SMB_CONNECTIONS). Concurrent S3 requests now fan out across connections via round-robin instead of queuing behind one mutex. Streaming GetObject sends batches of 8 read requests before collecting responses (pipelined reads), hiding per-request round-trip latency. Additional changes: - Enlarge SO_SNDBUF/SO_RCVBUF to 1 MB on SMB TCP sockets - Add configurable standalone I/O cap via SPICEIO_SMB_MAX_IO (default 64 KB; raisable for servers that handle larger I/O) - Parse MaxTransactSize from negotiate response - Separate compound-safe sizes (64 KB) from standalone I/O sizes - New smb::pool module with SmbPool round-robin dispatcher - ShareSession backed by pool; FileHandle pinned to opening connection
1 parent 24275b3 commit 13a48f0

8 files changed

Lines changed: 453 additions & 129 deletions

File tree

CLAUDE.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ The binary requires these environment variables:
3434
- `SPICEIO_SMB_DOMAIN` — SMB domain (default empty)
3535
- `SPICEIO_BUCKET` — virtual S3 bucket name (defaults to `SPICEIO_SMB_SHARE`)
3636
- `SPICEIO_REGION` — AWS region to advertise (default `us-east-1`)
37+
- `SPICEIO_SMB_CONNECTIONS` — number of SMB TCP connections in the pool (default `4`)
38+
- `SPICEIO_SMB_MAX_IO` — max standalone read/write I/O size in bytes (default `65536`; raise for servers that handle larger I/O)
3739
- `SPICEIO_LOG_FILE` — append logs to this file in addition to stderr (optional; non-blocking, never stalls the proxy)
3840

3941
## Architecture
@@ -42,7 +44,7 @@ The codebase has three modules:
4244

4345
- **`s3`** — HTTP layer. Parses incoming S3 API requests and produces XML responses. `router.rs` is the central dispatch (path-style bucket routing). Covers GetObject, PutObject, CopyObject, DeleteObject, HeadObject, ListObjectsV1/V2, multipart uploads, and stub endpoints for ACL/tagging/versioning. `xml.rs` is a hand-rolled XML builder. `multipart.rs` manages upload state in-memory, with parts stored as temp files under `.spiceio-uploads/` on the SMB share. `body.rs` implements `SpiceioBody`, a zero-copy streaming response body (channel-backed for large reads, inline for XML/errors).
4446

45-
- **`smb`** — Wire protocol client. `protocol.rs` defines SMB 3.1.x packet structures (little-endian). `client.rs` manages the TCP connection, negotiate/session-setup handshake, and exposes operations (tree connect, create, read, write, close, query directory). `auth.rs` implements NTLMv2 challenge-response. `ops.rs` provides the high-level `ShareSession` abstraction the S3 layer consumes (list, read, write, delete, stat, copy).
47+
- **`smb`** — Wire protocol client. `protocol.rs` defines SMB 3.1.x packet structures (little-endian). `client.rs` manages a TCP connection, negotiate/session-setup handshake, and exposes operations (tree connect, create, read, write, close, query directory, pipelined read). `pool.rs` manages N authenticated connections for concurrent request fan-out. `auth.rs` implements NTLMv2 challenge-response. `ops.rs` provides the high-level `ShareSession` abstraction the S3 layer consumes (list, read, write, delete, stat, copy).
4648

4749
- **`crypto`** — FFI bindings to macOS CommonCrypto (`Security.framework`/`libcommonCrypto`). Exposes MD4, SHA-256, and HMAC-MD5. No Rust crypto crates.
4850

@@ -52,6 +54,9 @@ The codebase has three modules:
5254

5355
- Zero external crypto dependencies — all crypto goes through `crypto::ffi` to CommonCrypto.
5456
- No `async-trait` — the SMB client uses `tokio::sync::Mutex` around the TCP stream with manual `async` methods.
57+
- Connection pool — N TCP connections (default 4) to the same SMB server, round-robin dispatched. Concurrent S3 requests fan out across connections instead of serializing on a single mutex. File handles are pinned to the connection that opened them.
58+
- Pipelined reads — streaming GetObject sends batches of 8 read requests before collecting responses, hiding per-request round-trip latency.
59+
- Configurable I/O cap — standalone read/write ops default to 64 KB (safe for commodity NAS); raisable via `SPICEIO_SMB_MAX_IO` for compliant servers. Compound operations always cap at 64 KB.
5560
- GetObject streams SMB read chunks directly to the HTTP response via `SpiceioBody::channel` — no full-file buffering.
5661
- PutObject streams HTTP request body chunks directly to SMB write calls — no full-body collection.
5762
- Body is collected into `Bytes` only for operations that require the full payload (multi-delete, multipart complete, upload-part for ETag hashing).

src/main.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use s3::multipart::MultipartStore;
2121
use s3::router::AppState;
2222
use smb::client::SmbConfig;
2323
use smb::ops::ShareSession;
24+
use smb::pool::SmbPool;
2425

2526
/// Runtime configuration parsed from environment variables.
2627
struct Config {
@@ -42,6 +43,10 @@ struct Config {
4243
bucket_name: String,
4344
/// AWS region to advertise
4445
region: String,
46+
/// Number of SMB TCP connections in the pool (default 4)
47+
smb_connections: usize,
48+
/// Max I/O size for standalone read/write operations (default 1MB)
49+
smb_max_io: u32,
4550
}
4651

4752
impl Config {
@@ -64,6 +69,14 @@ impl Config {
6469
env::var("SPICEIO_SMB_SHARE").unwrap_or_else(|_| "data".into())
6570
}),
6671
region: env::var("SPICEIO_REGION").unwrap_or_else(|_| "us-east-1".into()),
72+
smb_connections: env::var("SPICEIO_SMB_CONNECTIONS")
73+
.ok()
74+
.and_then(|s| s.parse().ok())
75+
.unwrap_or(4),
76+
smb_max_io: env::var("SPICEIO_SMB_MAX_IO")
77+
.ok()
78+
.and_then(|s| s.parse().ok())
79+
.unwrap_or(0),
6780
}
6881
}
6982
}
@@ -80,28 +93,30 @@ async fn main() {
8093
let config = Config::from_env();
8194

8295
slog!(
83-
"[spiceio] connecting to smb://****@{}:{}/{}",
96+
"[spiceio] connecting to smb://****@{}:{}/{} ({}x)",
8497
config.smb_server,
8598
config.smb_port,
86-
config.smb_share
99+
config.smb_share,
100+
config.smb_connections,
87101
);
88102

89-
// Connect to SMB server
103+
// Connect SMB connection pool
90104
let smb_config = SmbConfig {
91105
server: config.smb_server.clone(),
92106
port: config.smb_port,
93107
username: config.smb_username.clone(),
94108
password: config.smb_password.clone(),
95109
domain: config.smb_domain.clone(),
96110
workstation: "SPICEIO".into(),
111+
max_io_size: config.smb_max_io,
97112
};
98113

99-
let client = smb::SmbClient::connect(smb_config)
114+
let pool = SmbPool::connect(smb_config, config.smb_connections)
100115
.await
101116
.expect("failed to connect to SMB server");
102117

103118
let share = Arc::new(
104-
ShareSession::connect(client, &config.smb_share)
119+
ShareSession::connect(pool, &config.smb_share)
105120
.await
106121
.expect("failed to connect to SMB share"),
107122
);

src/s3/router.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ async fn handle_get_object(
427427
// ── Fast path: compound Create+Read+Close for small files ───────
428428
// Tries to read the entire file in one SMB round trip. Falls back to
429429
// streaming for large files or range requests.
430-
let max_read = share.max_read_size();
430+
let max_read = share.compound_max_read_size();
431431
let no_range = range_header.is_none();
432432

433433
if no_range {
@@ -595,19 +595,26 @@ async fn handle_get_object(
595595
let (body, tx) = SpiceioBody::channel(4);
596596
let chunk_size = handle.max_chunk;
597597

598-
// Spawn background task to stream SMB reads into the channel
598+
// Spawn background task to stream pipelined SMB reads into the channel.
599+
// Sends batches of read requests to fill the network pipe, then pushes
600+
// each chunk to the HTTP response body as it arrives.
599601
tokio::spawn(async move {
600602
let mut offset = start;
601603
let stream_end = end + 1;
602-
while offset < stream_end {
603-
let to_read = ((stream_end - offset) as u32).min(chunk_size);
604-
match handle.read_chunk(offset, to_read).await {
605-
Ok(chunk) if chunk.is_empty() => break,
606-
Ok(chunk) => {
607-
offset += chunk.len() as u64;
608-
if tx.send(chunk).await.is_err() {
609-
crate::serr!("[spiceio] getobject client disconnected");
610-
break;
604+
'outer: while offset < stream_end {
605+
let remaining = stream_end - offset;
606+
match handle.read_pipeline(offset, chunk_size, remaining).await {
607+
Ok(chunks) if chunks.is_empty() => break,
608+
Ok(chunks) => {
609+
for chunk in chunks {
610+
if chunk.is_empty() {
611+
break 'outer;
612+
}
613+
offset += chunk.len() as u64;
614+
if tx.send(chunk).await.is_err() {
615+
crate::serr!("[spiceio] getobject client disconnected");
616+
break 'outer;
617+
}
611618
}
612619
}
613620
Err(e) => {
@@ -671,7 +678,7 @@ async fn handle_put_object(
671678
// ── Fast path: collect small bodies and use compound write ──────
672679
let content_length: Option<u64> =
673680
get_header(hdrs, "content-length").and_then(|s| s.parse().ok());
674-
let max_write = share.max_write_size() as u64;
681+
let max_write = share.compound_max_write_size() as u64;
675682

676683
if let Some(cl) = content_length
677684
&& cl <= max_write

src/smb/client.rs

Lines changed: 163 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub struct SmbConfig {
2222
pub password: String,
2323
pub domain: String,
2424
pub workstation: String,
25+
/// Cap for standalone read/write I/O (0 = use DEFAULT_MAX_IO).
26+
pub max_io_size: u32,
2527
}
2628

2729
impl SmbConfig {
@@ -30,14 +32,28 @@ impl SmbConfig {
3032
}
3133
}
3234

35+
/// Default I/O cap for standalone (non-compound) read/write operations.
36+
/// Many NAS servers advertise multi-MB maximums in negotiate but fail at sizes
37+
/// well below the advertised limit. 64 KB is the safe conservative default;
38+
/// override via `SPICEIO_SMB_MAX_IO` for servers that handle larger I/O
39+
/// (e.g., Windows Server, enterprise NAS). Even at 64 KB the connection pool
40+
/// and pipelined reads still deliver major throughput gains.
41+
const DEFAULT_MAX_IO: u32 = 65536;
42+
3343
/// An authenticated SMB2 session.
3444
pub struct SmbClient {
3545
stream: Mutex<TcpStream>,
3646
message_id: AtomicU64,
3747
session_id: u64,
3848
config: SmbConfig,
49+
/// Effective max read size for standalone (non-compound) reads.
3950
pub max_read_size: u32,
51+
/// Effective max write size for standalone (non-compound) writes.
4052
pub max_write_size: u32,
53+
/// Capped max for compound operations (64KB — some NAS servers reject
54+
/// larger payloads inside compound requests).
55+
pub compound_max_read_size: u32,
56+
pub compound_max_write_size: u32,
4157
/// 16-byte client GUID
4258
client_guid: [u8; 16],
4359
/// SMB 3.1.1 signing key (derived after auth)
@@ -60,6 +76,34 @@ impl SmbClient {
6076
};
6177
stream.set_nodelay(true)?;
6278

79+
// Enlarge socket buffers to 1 MB for large read/write throughput.
80+
{
81+
use std::os::fd::AsRawFd;
82+
83+
unsafe extern "C" {
84+
fn setsockopt(
85+
socket: i32,
86+
level: i32,
87+
option_name: i32,
88+
option_value: *const u8,
89+
option_len: u32,
90+
) -> i32;
91+
}
92+
93+
const SOL_SOCKET: i32 = 0xffff;
94+
const SO_SNDBUF: i32 = 0x1001;
95+
const SO_RCVBUF: i32 = 0x1002;
96+
97+
let fd = stream.as_raw_fd();
98+
let buf_size: i32 = 1024 * 1024;
99+
let ptr = std::ptr::from_ref(&buf_size).cast();
100+
let len = size_of::<i32>() as u32;
101+
unsafe {
102+
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, ptr, len);
103+
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, ptr, len);
104+
}
105+
}
106+
63107
let mut client_guid = [0u8; 16];
64108
unsafe extern "C" {
65109
fn arc4random_buf(buf: *mut u8, nbytes: usize);
@@ -76,6 +120,8 @@ impl SmbClient {
76120
config,
77121
max_read_size: 65536,
78122
max_write_size: 65536,
123+
compound_max_read_size: 65536,
124+
compound_max_write_size: 65536,
79125
client_guid,
80126
signing_key: None,
81127
};
@@ -176,10 +222,16 @@ impl SmbClient {
176222
crate::serr!("[spiceio] smb invalid negotiate response");
177223
io::Error::new(io::ErrorKind::InvalidData, "invalid negotiate response")
178224
})?;
225+
let io_cap = if self.config.max_io_size > 0 {
226+
self.config.max_io_size
227+
} else {
228+
DEFAULT_MAX_IO
229+
};
179230
crate::slog!(
180-
"[spiceio] negotiated SMB 0x{:04X}, max_rw={}K",
231+
"[spiceio] negotiated SMB 0x{:04X}, server_max={}K io_cap={}K",
181232
neg_resp.dialect_revision,
182-
neg_resp.max_write_size.min(65536) / 1024,
233+
neg_resp.max_read_size / 1024,
234+
io_cap / 1024,
183235
);
184236

185237
// ── Step 2: Session Setup (NTLM Negotiate) ──
@@ -246,10 +298,20 @@ impl SmbClient {
246298
crate::slog!("[spiceio] authenticated, signing key derived");
247299

248300
self.session_id = resp_hdr.session_id;
249-
// Cap at 64KB — the maximum reliably supported by common NAS servers.
250-
// Some servers advertise larger max sizes but fail on compound requests.
251-
self.max_read_size = neg_resp.max_read_size.min(65536);
252-
self.max_write_size = neg_resp.max_write_size.min(65536);
301+
// Cap standalone I/O by: min(server_advertised, max_transact, configured_cap).
302+
// Many NAS servers advertise multi-MB limits but fail at much smaller sizes.
303+
let transact = neg_resp.max_transact_size;
304+
let io_cap = if self.config.max_io_size > 0 {
305+
self.config.max_io_size
306+
} else {
307+
DEFAULT_MAX_IO
308+
};
309+
self.max_read_size = neg_resp.max_read_size.min(transact).min(io_cap);
310+
self.max_write_size = neg_resp.max_write_size.min(transact).min(io_cap);
311+
// Cap at 64KB for compound requests — some NAS servers reject larger
312+
// payloads inside compound (chained) operations.
313+
self.compound_max_read_size = self.max_read_size.min(65536);
314+
self.compound_max_write_size = self.max_write_size.min(65536);
253315
self.signing_key = Some(signing_key);
254316
Ok(())
255317
}
@@ -387,6 +449,101 @@ impl SmbClient {
387449
})
388450
}
389451

452+
/// Pipelined read: send `count` read requests, then receive all responses.
453+
///
454+
/// Holds the stream lock for the entire batch, eliminating per-request
455+
/// round-trip latency. Returns chunks in offset order. Stops early on EOF.
456+
pub async fn pipelined_read(
457+
&self,
458+
tree_id: u32,
459+
file_id: &[u8; 16],
460+
start_offset: u64,
461+
chunk_size: u32,
462+
count: usize,
463+
) -> io::Result<Vec<bytes::Bytes>> {
464+
if count == 0 {
465+
return Ok(Vec::new());
466+
}
467+
468+
// Build all request packets
469+
let mut packets = Vec::with_capacity(count);
470+
for i in 0..count {
471+
let offset = start_offset + (i as u64) * (chunk_size as u64);
472+
let msg_id = self.next_message_id();
473+
let mut hdr = Header::new(Command::Read, msg_id).with_credit_charge(chunk_size);
474+
hdr.session_id = self.session_id;
475+
hdr.tree_id = tree_id;
476+
let packet = build_request(&hdr, |buf| {
477+
encode_read_request(buf, file_id, offset, chunk_size);
478+
});
479+
packets.push(packet);
480+
}
481+
482+
let mut stream = self.stream.lock().await;
483+
484+
// Send all requests
485+
for packet in &packets {
486+
if let Some(ref key) = self.signing_key {
487+
let mut signed = packet.to_vec();
488+
sign_packet(&mut signed, key);
489+
stream.write_all(&signed).await?;
490+
} else {
491+
stream.write_all(packet).await?;
492+
}
493+
}
494+
stream.flush().await?;
495+
496+
// Receive all responses
497+
let mut results = Vec::with_capacity(count);
498+
for _ in 0..count {
499+
loop {
500+
let mut len_buf = [0u8; 4];
501+
stream.read_exact(&mut len_buf).await?;
502+
let msg_len = u32::from_be_bytes(len_buf) as usize;
503+
504+
if !(SMB2_HEADER_SIZE..=16 * 1024 * 1024).contains(&msg_len) {
505+
return Err(io::Error::new(
506+
io::ErrorKind::InvalidData,
507+
format!("invalid SMB2 message length: {msg_len}"),
508+
));
509+
}
510+
511+
let mut msg = vec![0u8; msg_len];
512+
stream.read_exact(&mut msg).await?;
513+
514+
let header = Header::decode(&msg).ok_or_else(|| {
515+
io::Error::new(io::ErrorKind::InvalidData, "invalid SMB2 header")
516+
})?;
517+
518+
// Skip STATUS_PENDING interim responses
519+
if header.status == 0x0000_0103 {
520+
continue;
521+
}
522+
523+
let status = NtStatus::from_u32(header.status);
524+
if status == NtStatus::EndOfFile {
525+
// EOF — no more data, return what we have
526+
return Ok(results);
527+
}
528+
if status.is_error() {
529+
return Err(io::Error::other(format!(
530+
"pipelined read failed: 0x{:08X}",
531+
header.status
532+
)));
533+
}
534+
535+
let body = msg[SMB2_HEADER_SIZE..].to_vec();
536+
let data = decode_read_response_owned(body).ok_or_else(|| {
537+
io::Error::new(io::ErrorKind::InvalidData, "invalid read response")
538+
})?;
539+
results.push(data);
540+
break;
541+
}
542+
}
543+
544+
Ok(results)
545+
}
546+
390547
/// Write to an open file.
391548
pub async fn write(
392549
&self,

src/smb/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
pub mod auth;
66
pub mod client;
77
pub mod ops;
8+
pub mod pool;
89
pub mod protocol;
910

1011
pub use client::SmbClient;
12+
pub use pool::SmbPool;

0 commit comments

Comments
 (0)