@@ -23,7 +23,6 @@ use std::os::unix::io::RawFd;
2323use std:: path:: PathBuf ;
2424use std:: sync:: Arc ;
2525use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
26- use std:: time:: Duration as StdDuration ;
2726use tokio:: io:: {
2827 AsyncRead as TokioAsyncRead , AsyncReadExt , AsyncWrite as TokioAsyncWrite , AsyncWriteExt ,
2928} ;
@@ -67,10 +66,10 @@ const MAX_STREAMING_BODY: usize = 1024;
6766/// between "thinking" and output phases. 120s provides headroom while still
6867/// catching genuinely stuck streams.
6968#[ cfg( not( test) ) ]
70- const CHUNK_IDLE_TIMEOUT : StdDuration = StdDuration :: from_secs ( 120 ) ;
69+ const CHUNK_IDLE_TIMEOUT : std :: time :: Duration = std :: time :: Duration :: from_secs ( 120 ) ;
7170// Exercise idle-timeout truncation without slowing the full package test suite.
7271#[ cfg( test) ]
73- const CHUNK_IDLE_TIMEOUT : StdDuration = StdDuration :: from_millis ( 100 ) ;
72+ const CHUNK_IDLE_TIMEOUT : std :: time :: Duration = std :: time :: Duration :: from_millis ( 100 ) ;
7473
7574/// Result of a proxy CONNECT policy decision.
7675struct ConnectDecision {
@@ -178,6 +177,7 @@ pub struct LoopbackProxyHandle {
178177 http_addr : SocketAddr ,
179178 shutdown : Option < tokio:: sync:: oneshot:: Sender < ( ) > > ,
180179 thread : Option < std:: thread:: JoinHandle < ( ) > > ,
180+ host_join : TokioJoinHandle < ( ) > ,
181181}
182182
183183impl ProxyHandle {
@@ -348,39 +348,50 @@ impl LoopbackProxyHandle {
348348
349349 let ( ready_tx, ready_rx) = std:: sync:: mpsc:: channel ( ) ;
350350 let ( shutdown_tx, shutdown_rx) = tokio:: sync:: oneshot:: channel ( ) ;
351+ let ( accepted_tx, accepted_rx) = mpsc:: unbounded_channel ( ) ;
351352
352- let thread = std:: thread:: Builder :: new ( )
353+ let host_join = spawn_loopback_dispatch_task (
354+ accepted_rx,
355+ opa_engine,
356+ identity_cache,
357+ entrypoint_pid,
358+ tls_state,
359+ inference_ctx,
360+ provider_credentials,
361+ policy_local_ctx,
362+ denial_tx,
363+ ) ;
364+
365+ let thread = match std:: thread:: Builder :: new ( )
353366 . name ( "openshell-loopback-proxy" . to_string ( ) )
354367 . spawn ( move || {
355368 run_loopback_proxy_thread (
356369 netns_fd,
357370 listen_addr,
358371 ready_tx,
359372 shutdown_rx,
360- opa_engine,
361- identity_cache,
362- entrypoint_pid,
363- tls_state,
364- inference_ctx,
365- provider_credentials,
366- policy_local_ctx,
367- denial_tx,
373+ accepted_tx,
368374 ) ;
369- } )
370- . into_diagnostic ( ) ?;
375+ } ) {
376+ Ok ( thread) => thread,
377+ Err ( err) => {
378+ host_join. abort ( ) ;
379+ return Err ( err) . into_diagnostic ( ) ;
380+ }
381+ } ;
371382
372- let http_addr = match ready_rx. recv_timeout ( StdDuration :: from_secs ( 5 ) ) {
383+ // Wait for an explicit ready/error signal instead of timing out. The
384+ // thread borrows the sandbox netns fd for setns(), so returning early
385+ // could orphan a thread that later observes a closed or reused fd.
386+ let http_addr = match ready_rx. recv ( ) {
373387 Ok ( Ok ( addr) ) => addr,
374388 Ok ( Err ( message) ) => {
375389 let _ = thread. join ( ) ;
390+ host_join. abort ( ) ;
376391 return Err ( miette:: miette!( "{message}" ) ) ;
377392 }
378- Err ( std:: sync:: mpsc:: RecvTimeoutError :: Timeout ) => {
379- return Err ( miette:: miette!(
380- "Loopback proxy did not start within 5 seconds on {listen_addr}"
381- ) ) ;
382- }
383- Err ( std:: sync:: mpsc:: RecvTimeoutError :: Disconnected ) => {
393+ Err ( std:: sync:: mpsc:: RecvError ) => {
394+ host_join. abort ( ) ;
384395 return Err ( miette:: miette!(
385396 "Loopback proxy thread exited before startup on {listen_addr}"
386397 ) ) ;
@@ -391,6 +402,7 @@ impl LoopbackProxyHandle {
391402 http_addr,
392403 shutdown : Some ( shutdown_tx) ,
393404 thread : Some ( thread) ,
405+ host_join,
394406 } )
395407 }
396408
@@ -401,11 +413,8 @@ impl LoopbackProxyHandle {
401413
402414#[ cfg( target_os = "linux" ) ]
403415#[ allow( clippy:: too_many_arguments) ]
404- fn run_loopback_proxy_thread (
405- netns_fd : RawFd ,
406- listen_addr : SocketAddr ,
407- ready_tx : std:: sync:: mpsc:: Sender < std:: result:: Result < SocketAddr , String > > ,
408- shutdown_rx : tokio:: sync:: oneshot:: Receiver < ( ) > ,
416+ fn spawn_loopback_dispatch_task (
417+ mut accepted_rx : mpsc:: UnboundedReceiver < std:: net:: TcpStream > ,
409418 opa_engine : Arc < OpaEngine > ,
410419 identity_cache : Arc < BinaryIdentityCache > ,
411420 entrypoint_pid : Arc < AtomicU32 > ,
@@ -414,6 +423,68 @@ fn run_loopback_proxy_thread(
414423 provider_credentials : Option < ProviderCredentialState > ,
415424 policy_local_ctx : Option < Arc < PolicyLocalContext > > ,
416425 denial_tx : Option < mpsc:: UnboundedSender < DenialEvent > > ,
426+ ) -> TokioJoinHandle < ( ) > {
427+ tokio:: spawn ( async move {
428+ // Detect the trusted host gateway from the supervisor context, matching
429+ // the primary gateway proxy. The loopback thread only accepts sandbox
430+ // sockets; policy, DNS, credential rewrite, and upstream dialing remain
431+ // on this host-side task.
432+ let trusted_host_gateway: Arc < Option < IpAddr > > = Arc :: new ( detect_trusted_host_gateway ( ) ) ;
433+ while let Some ( stream) = accepted_rx. recv ( ) . await {
434+ if let Err ( err) = stream. set_nonblocking ( true ) {
435+ let event = NetworkActivityBuilder :: new ( crate :: ocsf_ctx ( ) )
436+ . activity ( ActivityId :: Fail )
437+ . severity ( SeverityId :: Low )
438+ . status ( StatusId :: Failure )
439+ . message ( format ! (
440+ "Loopback proxy failed to set stream nonblocking: {err}"
441+ ) )
442+ . build ( ) ;
443+ ocsf_emit ! ( event) ;
444+ continue ;
445+ }
446+
447+ match TcpStream :: from_std ( stream) {
448+ Ok ( stream) => {
449+ spawn_proxy_connection (
450+ stream,
451+ opa_engine. clone ( ) ,
452+ identity_cache. clone ( ) ,
453+ entrypoint_pid. clone ( ) ,
454+ tls_state. clone ( ) ,
455+ inference_ctx. clone ( ) ,
456+ policy_local_ctx. clone ( ) ,
457+ trusted_host_gateway. clone ( ) ,
458+ provider_credentials
459+ . as_ref ( )
460+ . and_then ( ProviderCredentialState :: resolver) ,
461+ denial_tx. clone ( ) ,
462+ ) ;
463+ }
464+ Err ( err) => {
465+ let event = NetworkActivityBuilder :: new ( crate :: ocsf_ctx ( ) )
466+ . activity ( ActivityId :: Fail )
467+ . severity ( SeverityId :: Low )
468+ . status ( StatusId :: Failure )
469+ . message ( format ! (
470+ "Loopback proxy failed to register accepted stream: {err}"
471+ ) )
472+ . build ( ) ;
473+ ocsf_emit ! ( event) ;
474+ }
475+ }
476+ }
477+ } )
478+ }
479+
480+ #[ cfg( target_os = "linux" ) ]
481+ #[ allow( clippy:: too_many_arguments) ]
482+ fn run_loopback_proxy_thread (
483+ netns_fd : RawFd ,
484+ listen_addr : SocketAddr ,
485+ ready_tx : std:: sync:: mpsc:: Sender < std:: result:: Result < SocketAddr , String > > ,
486+ shutdown_rx : tokio:: sync:: oneshot:: Receiver < ( ) > ,
487+ accepted_tx : mpsc:: UnboundedSender < std:: net:: TcpStream > ,
417488) {
418489 let startup_failure_tx = ready_tx. clone ( ) ;
419490 let result = ( || -> std:: result:: Result < ( ) , String > {
@@ -453,7 +524,6 @@ fn run_loopback_proxy_thread(
453524 }
454525 } ;
455526
456- let trusted_host_gateway: Arc < Option < IpAddr > > = Arc :: new ( detect_trusted_host_gateway ( ) ) ;
457527 let _ = ready_tx. send ( Ok ( local_addr) ) ;
458528
459529 let event = NetworkActivityBuilder :: new ( crate :: ocsf_ctx ( ) )
@@ -476,20 +546,31 @@ fn run_loopback_proxy_thread(
476546 accepted = listener. accept( ) => {
477547 match accepted {
478548 Ok ( ( stream, _addr) ) => {
479- spawn_proxy_connection(
480- stream,
481- opa_engine. clone( ) ,
482- identity_cache. clone( ) ,
483- entrypoint_pid. clone( ) ,
484- tls_state. clone( ) ,
485- inference_ctx. clone( ) ,
486- policy_local_ctx. clone( ) ,
487- trusted_host_gateway. clone( ) ,
488- provider_credentials
489- . as_ref( )
490- . and_then( ProviderCredentialState :: resolver) ,
491- denial_tx. clone( ) ,
492- ) ;
549+ match stream. into_std( ) {
550+ Ok ( stream) => {
551+ if accepted_tx. send( stream) . is_err( ) {
552+ let event = NetworkActivityBuilder :: new( crate :: ocsf_ctx( ) )
553+ . activity( ActivityId :: Fail )
554+ . severity( SeverityId :: Low )
555+ . status( StatusId :: Failure )
556+ . message( "Loopback proxy dispatcher exited" . to_string( ) )
557+ . build( ) ;
558+ ocsf_emit!( event) ;
559+ break ;
560+ }
561+ }
562+ Err ( err) => {
563+ let event = NetworkActivityBuilder :: new( crate :: ocsf_ctx( ) )
564+ . activity( ActivityId :: Fail )
565+ . severity( SeverityId :: Low )
566+ . status( StatusId :: Failure )
567+ . message( format!(
568+ "Loopback proxy failed to detach accepted stream: {err}"
569+ ) )
570+ . build( ) ;
571+ ocsf_emit!( event) ;
572+ }
573+ }
493574 }
494575 Err ( err) => {
495576 let event = NetworkActivityBuilder :: new( crate :: ocsf_ctx( ) )
@@ -530,6 +611,7 @@ impl Drop for LoopbackProxyHandle {
530611 if let Some ( thread) = self . thread . take ( ) {
531612 let _ = thread. join ( ) ;
532613 }
614+ self . host_join . abort ( ) ;
533615 }
534616}
535617
0 commit comments