280280 # help system
281281 -h|-\? |--help|--help=* |--usage) _frun_displayHelp " $1 " ; return 0 ;;
282282
283- -V|--version|--VERSION) echo ' forkrun v3.1.1 ' ; return 0 ;;
283+ -V|--version|--VERSION) echo ' forkrun v3.1.2 ' ; return 0 ;;
284284
285285 --) shift ; break ;;
286286
@@ -477,22 +477,23 @@ toc() { :; }
477477
478478 # --- 2 & 3. THE PRODUCER PLUMBING ---
479479 declare -a fd_scan_death_r fd_scan_death_w SCANNER_P
480+ ring_pipe fd_trap_ack_r fd_trap_ack_w
480481
481482 if (( FORKRUN_NUM_NODES > 1 )) ; then
482- # NUMA TOPOLOGICAL PIPELINE (No Pipes, Lock-Free Meta Ring)
483+ # NUMA TOPOLOGICAL PIPELINE
483484 ordered_flag=0
484485 [[ " ${order_mode} " == " ordered" ]] && ordered_flag=1
485486
486- ( ring_numa_ingest ${fd0} ${fd_write} $FORKRUN_NUM_NODES $ordered_flag ) &
487+ ( exec {fd_trap_ack_w} >& - ; ring_numa_ingest ${fd0} ${fd_write} $FORKRUN_NUM_NODES $ordered_flag ) &
487488
488489 for (( i= 0 ; i< FORKRUN_NUM_NODES; i++ )) ; do
489- ( ring_indexer_numa ${fd_scan} $i ) &
490+ ( exec {fd_trap_ack_w} >& - ; ring_indexer_numa ${fd_scan} $i ) &
490491 done
491492
492493 for (( i= 0 ; i< FORKRUN_NUM_NODES; i++ )) ; do
493494 ring_pipe fd_scan_death_r[$i ] fd_scan_death_w[$i ]
494495 (
495- exec {fd_fallow_w}>& -
496+ exec {fd_fallow_w}>& - {fd_trap_ack_w} >& -
496497 ring_numa_scanner ${fd_scan} $i $fd_spawn_w $FORKRUN_NUM_NODES
497498 ) &
498499 SCANNER_P[$i ]=$!
@@ -501,10 +502,10 @@ toc() { :; }
501502
502503 else
503504 # LEGACY FLAT PIPELINE
504- ( ring_copy ${fd_write} ${fd0} ; ring_signal ) &
505+ ( exec {fd_trap_ack_w} >& - ; ring_copy ${fd_write} ${fd0} ; ring_signal ) &
505506 ring_pipe fd_scan_death_r[0] fd_scan_death_w[0]
506507 (
507- exec {fd_spawn_r}< & -
508+ exec {fd_spawn_r}< & - {fd_trap_ack_w} >& -
508509 ring_scanner ${fd_scan} ${fd_spawn_w}
509510 ) &
510511 SCANNER_P[0]=$!
@@ -514,7 +515,7 @@ toc() { :; }
514515 exec {fd_spawn_w}>& -
515516
516517 (
517- exec {fd_fallow_w}>& -
518+ exec {fd_fallow_w}>& - {fd_trap_ack_w} >& -
518519 fallow_args=( " ${fd_fallow_r} " " ${fd_write} " )
519520 if (( FORKRUN_NUM_NODES > 1 )) ; then
520521 ring_fallow_phys " ${fallow_args[@]} "
@@ -528,7 +529,7 @@ toc() { :; }
528529 [[ " ${order_mode} " == " realtime" ]] || {
529530 ring_pipe fd_order_r fd_order_w
530531 (
531- exec {fd_order_w}>& -
532+ exec {fd_order_w}>& - {fd_trap_ack_w} >& -
532533
533534 order_args=( " ${fd_order_r} " ' memfd' )
534535 [[ " ${order_mode} " == " buffered" ]] && order_args+=( " unordered" )
@@ -637,26 +638,26 @@ toc() { :; }
637638 set +m
638639 export RING_NODE_ID="$2"
639640 export RING_WID="$3"
641+ export FD_TRAP_ACK_W="$4"
640642
641643 _ring_registered=false
642644
643645 trap ' " '" '
644646 status=$?
645647 ${_ring_registered} && { ring_worker dec; ring_cleanup_waiter; }
646648
647- # Only attempt recovery if we actually held an active batch
648649 if (( status != 0 && RING_BATCH_SLOTS > 0 )); then
649650 (( RING_NUM_KILLS++ ))
650-
651- # Revert partial corrupted output from this failed run (unless realtime)
652651 if [[ "$order_mode" != "realtime" && -n "${fd_out[$RING_WID]:-}" ]]; then
653652 ring_revert_output "${fd_out[$RING_WID]}"
654653 fi
655-
656- # Always throw it back into escrow.
657- # If kills >= LIMIT, the NEXT worker will see RING_POISONED=1 and safely skip & ack it.
658654 ring_escrow_put "$RING_NODE_ID" "$RING_BATCH_IDX" "$RING_BATCH_SLOTS" "$RING_NUM_KILLS"
659655 fi
656+
657+ # Notify parent that the trap successfully fired
658+ if (( status != 0 )); then
659+ echo "$RING_WID" >&"${FD_TRAP_ACK_W}" 2>/dev/null
660+ fi
660661 exit $status
661662 ' " '" ' EXIT
662663
@@ -672,7 +673,7 @@ toc() { :; }
672673 '
673674 ${insert_id_flag:- false} && worker_func_src+=' W_BATCH=0
674675 '
675- worker_func_src+=' shift 3
676+ worker_func_src+=' shift 4
676677 ring_worker inc $fd_read
677678 _ring_registered=true
678679 while ring_claim; do
@@ -702,6 +703,11 @@ W_NODE[$3]=$2
702703 # --- SPAWN LOOP REACTOR ---
703704 nWorkers=0
704705 local -a node_workers W_NODE fd_worker_r fd_worker_w P wID_free
706+
707+ local -A trap_ack_pending
708+ local _poll_timer_cmd=0
709+ local _timer_armed=false
710+
705711 for (( i= 0 ; i< FORKRUN_NUM_NODES; i++ )) ; do node_workers[i]=0; done
706712 node_worker_max=$(( nWorkersMax / FORKRUN_NUM_NODES ))
707713 (( node_worker_max < 1 )) && node_worker_max=1
@@ -712,9 +718,33 @@ W_NODE[$3]=$2
712718 wID_free[$nn ]=' '
713719 done
714720
715- while ring_poll " $fd_spawn_arg " fd_scan_death_r fd_worker_r; do
721+ while ring_poll " $fd_spawn_arg " fd_scan_death_r fd_worker_r " $_poll_timer_cmd " " $fd_trap_ack_r " ; do
722+ _poll_timer_cmd=0
723+
716724 case " $POLL_EVENT " in
717725 IGNORE) ;;
726+ TIMEOUT)
727+ _timer_armed=false
728+ if (( ${# trap_ack_pending[@]} > 0 )) ; then
729+ echo " forkrun [FATAL]: Worker(s) [ ${! trap_ack_pending[@]} ] exited non-zero and EXIT trap did not confirm recovery within 3s grace period. Aborting." >&2
730+ ring_abort
731+ exit 1
732+ fi
733+ ;;
734+ TRAP_ACK)
735+ wID=$POLL_ARG1
736+ (( trap_ack_pending[$wID ]-- ))
737+
738+ # If it balanced out to 0 (DEATH arrived first), clean it up
739+ if (( trap_ack_pending[$wID ] == 0 )) ; then
740+ unset ' trap_ack_pending[$wID]'
741+ fi
742+
743+ if (( ${# trap_ack_pending[@]} == 0 )) ; then
744+ _poll_timer_cmd=-1 # Cancel timer cleanly
745+ _timer_armed=false
746+ fi
747+ ;;
718748 SPAWN)
719749 spawn_count=$POLL_ARG1
720750 node_idx=$POLL_ARG2
@@ -723,28 +753,20 @@ W_NODE[$3]=$2
723753 (( target > node_worker_max )) && target=$node_worker_max
724754
725755 for (( ; node_workers[node_idx] < target; node_workers[node_idx]++ )) ; do
726- # Find first empty wID slot
727756 for wID in " ${! wID_free[@]} " ; do break ; done
728757 unset ' wID_free[$wID]'
729758
730759 ring_pipe fd_worker_r[$wID ] fd_worker_w[$wID ]
731- spawn_worker " $wID " " $node_idx " " $wID "
760+ spawn_worker " $wID " " $node_idx " " $wID " " ${fd_trap_ack_w} "
732761 exec {fd_worker_w[$wID ]}>& -
733762 (( nWorkers++ ))
734763 done
735764 ;;
736-
737765 WORKER_DEATH)
738766 wID=$POLL_ARG1
739767 wait " ${P[$wID]} " 2> /dev/null
740768 status=$?
741769
742- if (( status > 128 && status != 143 )) ; then
743- echo " forkrun [FATAL]: Worker $wID killed by signal $(( status - 128 )) . Aborting." >&2
744- ring_abort
745- exit 1
746- fi
747-
748770 exec {fd_worker_r[$wID ]}< & -
749771 unset ' fd_worker_r[$wID]' ' fd_worker_w[$wID]' ' P[$wID]'
750772
@@ -753,18 +775,31 @@ W_NODE[$3]=$2
753775
754776 (( node_workers[node_idx]-- ))
755777
756- # ONLY respawn if the worker crashed. Clean exit (0) means no work left.
757778 if (( status != 0 )) ; then
779+ (( trap_ack_pending[$wID ]++ ))
780+
781+ if (( trap_ack_pending[$wID ] == 0 )) ; then
782+ # TRAP_ACK already arrived! Clean up safely.
783+ unset ' trap_ack_pending[$wID]'
784+ elif (( trap_ack_pending[$wID ] > 0 )) ; then
785+ # Death arrived before TRAP_ACK. Arm the 3-second deadline.
786+ if ! $_timer_armed ; then
787+ _poll_timer_cmd=3000
788+ _timer_armed=true
789+ echo " forkrun [WARN]: Worker $wID (node ${node_idx} ) exited with status $status . Waiting up to 3s for EXIT trap confirmation." >&2
790+ fi
791+ fi
792+
793+ # Unconditionally respawn replacement worker
758794 ring_pipe fd_worker_r[$wID ] fd_worker_w[$wID ]
759- spawn_worker " $wID " " $node_idx " " $wID "
795+ spawn_worker " $wID " " $node_idx " " $wID " " ${fd_trap_ack_w} "
760796 exec {fd_worker_w[$wID ]}>& -
761797 (( nWorkers++ ))
762798 (( node_workers[node_idx]++ ))
763799 else
764800 wID_free[$wID ]=' '
765801 fi
766802 ;;
767-
768803 SCAN_DEATH)
769804 sID=$POLL_ARG1
770805 wait " ${SCANNER_P[$sID]} " 2> /dev/null
@@ -779,7 +814,6 @@ W_NODE[$3]=$2
779814 exec {fd_scan_death_r[$sID ]}< & -
780815 unset ' fd_scan_death_r[$sID]' ' SCANNER_P[$sID]'
781816 ;;
782-
783817 EOF)
784818 fd_spawn_arg=" -1"
785819 ;;
@@ -789,7 +823,7 @@ W_NODE[$3]=$2
789823 ${verbose_flag} && printf ' \nSPAWNED %s workers\n' " ${nWorkers} " >&2
790824
791825 # --- SHUTDOWN ---
792- exec {fd_spawn_r}< & - {fd_fallow_w}>& -
826+ exec {fd_spawn_r}< & - {fd_fallow_w}>& - {fd_trap_ack_r} < & - {fd_trap_ack_w} >& -
793827 [[ " ${order_mode} " == " realtime" ]] || exec {fd_order_w}>& -
794828
795829 wait
0 commit comments