11use tokio:: {
22 io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ,
33 net:: { UnixListener , UnixStream } ,
4+ time:: { Duration , timeout} ,
45} ;
56
6- /// Spawns a new thread that listens for connections to a UNIX socket
7+ /// Spawns a new async task that listens for connections to a UNIX socket
78/// at "/tmp/heap_dump_<process_name>.sock".
89/// When "dump" command is sent, it generates a heap profile using
910/// jemalloc_pprof and streams the binary protobuf data back through the socket.
@@ -31,7 +32,7 @@ pub fn spawn_heap_dump_handler() {
3132 }
3233
3334 tokio:: spawn ( async move {
34- let name = binary_name ( ) . unwrap_or_default ( ) ;
35+ let name = binary_name ( ) . unwrap_or ( "unknown" . to_string ( ) ) ;
3536 let socket_path = format ! ( "/tmp/heap_dump_{name}.sock" ) ;
3637
3738 tracing:: info!( socket = socket_path, "heap dump handler started" ) ;
@@ -43,16 +44,29 @@ pub fn spawn_heap_dump_handler() {
4344 } ;
4445
4546 loop {
46- // Catch any panics in connection handling to prevent the handler from crashing
47- let result = std:: panic:: catch_unwind ( std:: panic:: AssertUnwindSafe ( || {
48- let listener = & handle. listener ;
49- async move { handle_connection ( listener) . await }
50- } ) ) ;
51-
52- match result {
53- Ok ( future) => future. await ,
47+ // Accept connection in main loop, then spawn task for each connection
48+ // Sequential processing prevents multiple simultaneous expensive heap dumps
49+ match handle. listener . accept ( ) . await {
50+ Ok ( ( socket, _addr) ) => {
51+ let handle = tokio:: spawn ( async move {
52+ handle_connection_with_socket ( socket) . await ;
53+ } ) ;
54+
55+ // 5-minute timeout to prevent stuck dumps from blocking future requests
56+ match timeout ( Duration :: from_secs ( 300 ) , handle) . await {
57+ Ok ( Ok ( ( ) ) ) => {
58+ // Task completed successfully
59+ }
60+ Ok ( Err ( err) ) => {
61+ tracing:: error!( ?err, "panic in heap dump connection handler" ) ;
62+ }
63+ Err ( _) => {
64+ tracing:: error!( "heap dump request timed out after 5 minutes" ) ;
65+ }
66+ }
67+ }
5468 Err ( err) => {
55- tracing:: error !( ?err, "panic in heap dump connection handler " ) ;
69+ tracing:: debug !( ?err, "failed to accept connection" ) ;
5670 }
5771 }
5872 }
@@ -80,12 +94,7 @@ fn binary_name() -> Option<String> {
8094 )
8195}
8296
83- async fn handle_connection ( listener : & UnixListener ) {
84- let Ok ( ( mut socket, _addr) ) = listener. accept ( ) . await else {
85- tracing:: debug!( "failed to accept connection" ) ;
86- return ;
87- } ;
88-
97+ async fn handle_connection_with_socket ( mut socket : UnixStream ) {
8998 let message = read_line ( & mut socket) . await ;
9099 match message. as_deref ( ) {
91100 Some ( "dump" ) => {
0 commit comments