Skip to content

Commit 7baf03b

Browse files
committed
test(sandbox): prove loopback proxy netns boundary
1 parent 083c066 commit 7baf03b

2 files changed

Lines changed: 282 additions & 21 deletions

File tree

crates/openshell-sandbox/src/proxy.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ pub struct ProxyHandle {
176176
pub struct LoopbackProxyHandle {
177177
http_addr: SocketAddr,
178178
shutdown: Option<tokio::sync::oneshot::Sender<()>>,
179-
thread: Option<std::thread::JoinHandle<()>>,
180-
host_join: TokioJoinHandle<()>,
179+
sandbox_acceptor_thread: Option<std::thread::JoinHandle<()>>,
180+
supervisor_dispatcher_join: TokioJoinHandle<()>,
181181
}
182182

183183
impl ProxyHandle {
@@ -323,10 +323,15 @@ fn spawn_proxy_connection(
323323

324324
#[cfg(target_os = "linux")]
325325
impl LoopbackProxyHandle {
326-
/// Start a managed loopback proxy listener inside the sandbox network
327-
/// namespace. The listener uses the same L7 proxy handler as the gateway
328-
/// proxy, preserving policy evaluation and credential rewrite behavior for
329-
/// clients that can only be configured with a loopback proxy URL.
326+
/// Start a managed loopback proxy for clients that can only be configured
327+
/// with a loopback proxy URL.
328+
///
329+
/// This is intentionally split across two network namespaces:
330+
/// - the sandbox-netns acceptor thread enters `setns()`, binds loopback,
331+
/// accepts client sockets, and sends accepted sockets out;
332+
/// - the supervisor dispatcher task stays in the supervisor namespace and
333+
/// invokes the normal proxy path for policy, DNS, SSRF checks, TLS/L7,
334+
/// WebSocket rewrite, credential resolution, and upstream connect.
330335
#[allow(clippy::too_many_arguments)]
331336
pub(crate) fn start_in_netns(
332337
netns_fd: RawFd,
@@ -350,7 +355,7 @@ impl LoopbackProxyHandle {
350355
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
351356
let (accepted_tx, accepted_rx) = mpsc::unbounded_channel();
352357

353-
let host_join = spawn_loopback_dispatch_task(
358+
let supervisor_dispatcher_join = spawn_loopback_supervisor_dispatcher_task(
354359
accepted_rx,
355360
opa_engine,
356361
identity_cache,
@@ -363,9 +368,9 @@ impl LoopbackProxyHandle {
363368
);
364369

365370
let thread = match std::thread::Builder::new()
366-
.name("openshell-loopback-proxy".to_string())
371+
.name("openshell-loopback-netns-acceptor".to_string())
367372
.spawn(move || {
368-
run_loopback_proxy_thread(
373+
run_loopback_sandbox_netns_acceptor_thread(
369374
netns_fd,
370375
listen_addr,
371376
ready_tx,
@@ -375,7 +380,7 @@ impl LoopbackProxyHandle {
375380
}) {
376381
Ok(thread) => thread,
377382
Err(err) => {
378-
host_join.abort();
383+
supervisor_dispatcher_join.abort();
379384
return Err(err).into_diagnostic();
380385
}
381386
};
@@ -387,11 +392,11 @@ impl LoopbackProxyHandle {
387392
Ok(Ok(addr)) => addr,
388393
Ok(Err(message)) => {
389394
let _ = thread.join();
390-
host_join.abort();
395+
supervisor_dispatcher_join.abort();
391396
return Err(miette::miette!("{message}"));
392397
}
393398
Err(std::sync::mpsc::RecvError) => {
394-
host_join.abort();
399+
supervisor_dispatcher_join.abort();
395400
return Err(miette::miette!(
396401
"Loopback proxy thread exited before startup on {listen_addr}"
397402
));
@@ -401,8 +406,8 @@ impl LoopbackProxyHandle {
401406
Ok(Self {
402407
http_addr,
403408
shutdown: Some(shutdown_tx),
404-
thread: Some(thread),
405-
host_join,
409+
sandbox_acceptor_thread: Some(thread),
410+
supervisor_dispatcher_join,
406411
})
407412
}
408413

@@ -413,7 +418,7 @@ impl LoopbackProxyHandle {
413418

414419
#[cfg(target_os = "linux")]
415420
#[allow(clippy::too_many_arguments)]
416-
fn spawn_loopback_dispatch_task(
421+
fn spawn_loopback_supervisor_dispatcher_task(
417422
mut accepted_rx: mpsc::UnboundedReceiver<std::net::TcpStream>,
418423
opa_engine: Arc<OpaEngine>,
419424
identity_cache: Arc<BinaryIdentityCache>,
@@ -426,9 +431,11 @@ fn spawn_loopback_dispatch_task(
426431
) -> TokioJoinHandle<()> {
427432
tokio::spawn(async move {
428433
// Detect the trusted host gateway from the supervisor context, matching
429-
// the primary gateway proxy. The loopback thread only accepts sandbox
430-
// sockets; policy, DNS, credential rewrite, and upstream dialing remain
431-
// on this host-side task.
434+
// the primary gateway proxy.
435+
//
436+
// Invariant: this task is the only half allowed to run policy, DNS,
437+
// SSRF checks, TLS/L7 handling, WebSocket rewrite, credential
438+
// resolution, or upstream dialing for managed loopback traffic.
432439
let trusted_host_gateway: Arc<Option<IpAddr>> = Arc::new(detect_trusted_host_gateway());
433440
while let Some(stream) = accepted_rx.recv().await {
434441
if let Err(err) = stream.set_nonblocking(true) {
@@ -479,7 +486,7 @@ fn spawn_loopback_dispatch_task(
479486

480487
#[cfg(target_os = "linux")]
481488
#[allow(clippy::too_many_arguments)]
482-
fn run_loopback_proxy_thread(
489+
fn run_loopback_sandbox_netns_acceptor_thread(
483490
netns_fd: RawFd,
484491
listen_addr: SocketAddr,
485492
ready_tx: std::sync::mpsc::Sender<std::result::Result<SocketAddr, String>>,
@@ -488,6 +495,10 @@ fn run_loopback_proxy_thread(
488495
) {
489496
let startup_failure_tx = ready_tx.clone();
490497
let result = (|| -> std::result::Result<(), String> {
498+
// Invariant: after setns(), this dedicated thread only binds loopback,
499+
// accepts sandbox client sockets, and hands those accepted sockets back
500+
// to the supervisor dispatcher. It must not perform DNS, policy checks,
501+
// credential rewrite, or upstream dialing from the sandbox netns.
491502
// SAFETY: setns is called on a dedicated OS thread that only serves
492503
// this listener and exits with the sandbox supervisor.
493504
#[allow(unsafe_code)]
@@ -608,10 +619,10 @@ impl Drop for LoopbackProxyHandle {
608619
if let Some(shutdown) = self.shutdown.take() {
609620
let _ = shutdown.send(());
610621
}
611-
if let Some(thread) = self.thread.take() {
622+
if let Some(thread) = self.sandbox_acceptor_thread.take() {
612623
let _ = thread.join();
613624
}
614-
self.host_join.abort();
625+
self.supervisor_dispatcher_join.abort();
615626
}
616627
}
617628

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! E2E proof that the managed loopback proxy accepts inside the sandbox
5+
//! network namespace but dispatches upstream dialing from the supervisor side.
6+
7+
#![cfg(feature = "e2e-host-gateway")]
8+
9+
use std::io::Write;
10+
11+
use openshell_e2e::harness::sandbox::SandboxGuard;
12+
use tempfile::NamedTempFile;
13+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
14+
use tokio::net::TcpListener;
15+
use tokio::task::JoinHandle;
16+
17+
const TEST_HOST: &str = "host.openshell.internal";
18+
19+
struct HostServer {
20+
port: u16,
21+
task: JoinHandle<()>,
22+
}
23+
24+
impl HostServer {
25+
async fn start() -> Result<Self, String> {
26+
let listener = TcpListener::bind(("0.0.0.0", 0))
27+
.await
28+
.map_err(|e| format!("bind host test server: {e}"))?;
29+
let port = listener
30+
.local_addr()
31+
.map_err(|e| format!("read host test server address: {e}"))?
32+
.port();
33+
let task = tokio::spawn(async move {
34+
loop {
35+
let Ok((mut stream, _)) = listener.accept().await else {
36+
break;
37+
};
38+
tokio::spawn(async move {
39+
let mut request = Vec::new();
40+
let mut buf = [0_u8; 1024];
41+
loop {
42+
let Ok(read) = stream.read(&mut buf).await else {
43+
return;
44+
};
45+
if read == 0 {
46+
return;
47+
}
48+
request.extend_from_slice(&buf[..read]);
49+
if request.windows(4).any(|window| window == b"\r\n\r\n") {
50+
break;
51+
}
52+
}
53+
54+
let body = br#"{"message":"loopback-supervisor-dispatch-ok"}"#;
55+
let response = format!(
56+
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
57+
body.len()
58+
);
59+
if stream.write_all(response.as_bytes()).await.is_err() {
60+
return;
61+
}
62+
let _ = stream.write_all(body).await;
63+
let _ = stream.shutdown().await;
64+
});
65+
}
66+
});
67+
68+
Ok(Self { port, task })
69+
}
70+
}
71+
72+
impl Drop for HostServer {
73+
fn drop(&mut self) {
74+
self.task.abort();
75+
}
76+
}
77+
78+
fn write_policy(port: u16) -> Result<NamedTempFile, String> {
79+
let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?;
80+
let policy = format!(
81+
r#"version: 1
82+
83+
filesystem_policy:
84+
include_workdir: true
85+
read_only:
86+
- /usr
87+
- /lib
88+
- /proc
89+
- /dev/urandom
90+
- /app
91+
- /etc
92+
- /var/log
93+
read_write:
94+
- /sandbox
95+
- /tmp
96+
- /dev/null
97+
98+
landlock:
99+
compatibility: best_effort
100+
101+
process:
102+
run_as_user: sandbox
103+
run_as_group: sandbox
104+
105+
network_policies:
106+
loopback_proxy_netns:
107+
name: loopback_proxy_netns
108+
endpoints:
109+
- host: {TEST_HOST}
110+
port: {port}
111+
allowed_ips:
112+
- "10.0.0.0/8"
113+
- "172.0.0.0/8"
114+
- "192.168.0.0/16"
115+
- "fc00::/7"
116+
binaries:
117+
- path: /usr/bin/python*
118+
- path: /usr/local/bin/python*
119+
- path: /sandbox/.uv/python/*/bin/python*
120+
"#
121+
);
122+
file.write_all(policy.as_bytes())
123+
.map_err(|e| format!("write temp policy file: {e}"))?;
124+
file.flush()
125+
.map_err(|e| format!("flush temp policy file: {e}"))?;
126+
Ok(file)
127+
}
128+
129+
fn netns_boundary_script(port: u16) -> String {
130+
format!(
131+
r#"
132+
import json
133+
import os
134+
import socket
135+
import urllib.parse
136+
137+
HOST = {TEST_HOST:?}
138+
PORT = {port}
139+
140+
def recv_until(sock, marker):
141+
data = b""
142+
while marker not in data:
143+
chunk = sock.recv(4096)
144+
if not chunk:
145+
break
146+
data += chunk
147+
return data
148+
149+
def read_response(sock):
150+
response = recv_until(sock, b"\r\n\r\n")
151+
headers, _, body = response.partition(b"\r\n\r\n")
152+
content_length = 0
153+
for line in headers.split(b"\r\n")[1:]:
154+
if line.lower().startswith(b"content-length:"):
155+
content_length = int(line.split(b":", 1)[1].strip())
156+
break
157+
while len(body) < content_length:
158+
chunk = sock.recv(4096)
159+
if not chunk:
160+
break
161+
body += chunk
162+
return response.decode("iso-8859-1", "replace"), body.decode("utf-8", "replace")
163+
164+
def direct_connect_result():
165+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
166+
sock.settimeout(5)
167+
try:
168+
sock.connect((HOST, PORT))
169+
sock.sendall(f"GET /direct HTTP/1.1\r\nHost: {{HOST}}:{{PORT}}\r\nConnection: close\r\n\r\n".encode("ascii"))
170+
response, body = read_response(sock)
171+
return {{"result": "connected", "response": response.splitlines()[0] if response else "", "body": body}}
172+
except ConnectionRefusedError as error:
173+
return {{"result": "refused", "error": str(error)}}
174+
except socket.timeout as error:
175+
return {{"result": "timeout", "error": str(error)}}
176+
except OSError as error:
177+
return {{"result": "error", "errno": error.errno, "error": str(error)}}
178+
finally:
179+
sock.close()
180+
181+
def loopback_connect_result():
182+
proxy_url = os.environ.get("OPENSHELL_LOOPBACK_PROXY_URL")
183+
if not proxy_url:
184+
return {{"result": "missing_proxy_url"}}
185+
parsed = urllib.parse.urlparse(proxy_url)
186+
if parsed.hostname not in ("127.0.0.1", "localhost", "::1"):
187+
return {{"result": "non_loopback_proxy_url", "proxy_url": proxy_url}}
188+
189+
target = f"{{HOST}}:{{PORT}}"
190+
with socket.create_connection((parsed.hostname, parsed.port or 80), timeout=10) as sock:
191+
sock.sendall(f"CONNECT {{target}} HTTP/1.1\r\nHost: {{target}}\r\n\r\n".encode("ascii"))
192+
connect_response = recv_until(sock, b"\r\n\r\n").decode("iso-8859-1", "replace")
193+
if not (connect_response.startswith("HTTP/1.1 200") or connect_response.startswith("HTTP/1.0 200")):
194+
return {{"result": "connect_failed", "response": connect_response.splitlines()[0] if connect_response else ""}}
195+
sock.sendall(f"GET /proxied HTTP/1.1\r\nHost: {{target}}\r\nConnection: close\r\n\r\n".encode("ascii"))
196+
response, body = read_response(sock)
197+
return {{"result": "ok", "response": response.splitlines()[0] if response else "", "body": body}}
198+
199+
print(json.dumps({{
200+
"direct": direct_connect_result(),
201+
"loopback": loopback_connect_result(),
202+
}}, sort_keys=True), flush=True)
203+
"#
204+
)
205+
}
206+
207+
#[tokio::test]
208+
async fn loopback_proxy_connect_uses_supervisor_namespace_for_upstream_dial() {
209+
let server = HostServer::start().await.expect("start host test server");
210+
let policy = write_policy(server.port).expect("write custom policy");
211+
let policy_path = policy
212+
.path()
213+
.to_str()
214+
.expect("temp policy path should be utf-8")
215+
.to_string();
216+
let script = netns_boundary_script(server.port);
217+
218+
let guard = SandboxGuard::create(&["--policy", &policy_path, "--", "python3", "-c", &script])
219+
.await
220+
.expect("sandbox create");
221+
222+
let output = guard
223+
.create_output
224+
.lines()
225+
.find(|line| line.contains("\"direct\"") && line.contains("\"loopback\""))
226+
.unwrap_or_else(|| {
227+
panic!(
228+
"expected netns boundary JSON in output:\n{}",
229+
guard.create_output
230+
)
231+
});
232+
let parsed: serde_json::Value = serde_json::from_str(output.trim())
233+
.unwrap_or_else(|err| panic!("failed to parse JSON '{output}': {err}"));
234+
235+
assert_eq!(
236+
parsed["direct"]["result"], "refused",
237+
"expected direct sandbox egress to be rejected before reaching host server:\n{}",
238+
guard.create_output
239+
);
240+
assert_eq!(
241+
parsed["loopback"]["result"], "ok",
242+
"expected CONNECT through OPENSHELL_LOOPBACK_PROXY_URL to reach host server:\n{}",
243+
guard.create_output
244+
);
245+
assert_eq!(
246+
parsed["loopback"]["body"], r#"{"message":"loopback-supervisor-dispatch-ok"}"#,
247+
"expected loopback proxy path to receive host server response:\n{}",
248+
guard.create_output
249+
);
250+
}

0 commit comments

Comments
 (0)