@@ -486,8 +486,8 @@ static void agent_worker_run(mino_state_t *S, agent_pool_kind_t kind,
486486
487487 /* Detach worker ctx from S->threading.worker_ctxs_head and decrement the
488488 * thread count under the brief worker_list_lock. The pthread_t
489- * is left intact for a follow-up pthread_join from
490- * agent_worker_ensure or mino_agent_quiesce_workers (signaled by
489+ * is left intact for a follow-up pthread_join from a later
490+ * agent_enqueue or mino_agent_quiesce_workers (signaled by
491491 * worker_pending_join, which the spawn path set when it created
492492 * us). Joining an already-exited joinable pthread returns
493493 * immediately. */
@@ -554,33 +554,97 @@ static void agent_worker_reap_pending(mino_state_t *S, agent_pool_kind_t kind)
554554 mino_resume_lock (S , saved_depth );
555555}
556556
557- /* Lazy-spawn the agent worker for the given pool. Caller must hold
558- * state_lock. Throws MTH001 if thread budget is exhausted. Returns 0
559- * on success, 1 on throw (caller should propagate NULL). Idempotent:
560- * returns 0 fast when a worker is already running. If a previous
561- * worker exited (queue drained), reap its pthread handle before
562- * spawning a fresh one so thread_count is accurate. */
563- static int agent_worker_ensure (mino_state_t * S , agent_pool_kind_t kind )
557+ /* Enqueue (agent fn extra env dyn_snap) onto the named pool's queue.
558+ * Caller holds state_lock. Spawns the pool's worker if needed and
559+ * enqueues the action under a single agent_mu critical section so a
560+ * newly-spawned worker cannot observe an empty runq before this
561+ * producer publishes its node.
562+ *
563+ * Race that this closes: if ensure released agent_mu before
564+ * agent_enqueue re-acquired it, a freshly-spawned worker could
565+ * sneak in between the two acquisitions, see an empty runq, set
566+ * worker_alive=0, and exit. The subsequent enqueue would then push
567+ * the node + bump in_flight into a runq with no consumer, and
568+ * `(await a)` blocks forever waiting on in_flight=0. Reproduces on
569+ * GHA ubuntu-24.04 (both arches), masked on macos-14 + Apple Silicon
570+ * Docker by stricter effective memory ordering.
571+ *
572+ * Returns 0 on success, 1 on OOM or thread-budget refusal (caller
573+ * propagates NULL). On 1, the diag has been published via
574+ * prim_throw_classified and no observable state change happened
575+ * (no enqueue, no in_flight bump, no pthread spawned). */
576+ static int agent_enqueue (mino_state_t * S , agent_pool_kind_t kind ,
577+ mino_val_t * agent , mino_val_t * fn ,
578+ mino_val_t * extra , mino_env_t * env )
564579{
565- int alive ;
580+ agent_action_node_t * n ;
581+ int need_spawn = 0 ;
582+ int budget_ok = 1 ;
583+
566584 if (!S -> agent .mu_inited ) {
567585 agent_mu_ensure_inited (S );
568586 }
569- agent_mu_lock (& S -> agent .mu );
570- alive = S -> agent .pool [kind ].worker_alive ;
571- agent_mu_unlock (& S -> agent .mu );
572- if (alive ) return 0 ;
573587
574- /* Reap any pthread handle left by a previously-drained worker
575- * for this pool. */
576- agent_worker_reap_pending (S , kind );
588+ n = (agent_action_node_t * )calloc (1 , sizeof (* n ));
589+ if (n == NULL ) return 1 ;
590+ n -> agent = agent ;
591+ n -> fn = fn ;
592+ n -> extra = extra ;
593+ n -> env = env ;
594+ n -> dyn_snap = mino_snapshot_thread_bindings (S );
595+ n -> next = NULL ;
577596
578- /* Gate-and-increment thread_count under worker_list_lock so a
579- * concurrent spawn (e.g. another pool's agent_worker_ensure or
580- * mino_future_spawn) cannot both pass the limit check. */
581- mino_worker_list_lock_acquire (S );
582- if (S -> threading .thread_count >= S -> threading .thread_limit ) {
583- mino_worker_list_lock_release (S );
597+ /* Decide whether a new worker is needed, take the thread-budget
598+ * slot, and publish the node + in_flight bump as one atomic
599+ * critical section. The worker we are about to spawn (if any)
600+ * cannot acquire agent_mu until we release, so its first runq
601+ * check is guaranteed to see this node. */
602+ agent_mu_lock (& S -> agent .mu );
603+ if (!S -> agent .pool [kind ].worker_alive ) {
604+ need_spawn = 1 ;
605+ /* Reap a prior worker's pthread handle synchronously here
606+ * rather than at the spawn step below, so the budget check
607+ * sees the up-to-date thread_count. The previous worker
608+ * exited via the for-loop break and decremented thread_count
609+ * during its detach; its pthread_t still lives in the slot
610+ * until we join it. */
611+ if (S -> agent .pool [kind ].worker_pending_join ) {
612+ S -> agent .pool [kind ].worker_pending_join = 0 ;
613+ agent_mu_unlock (& S -> agent .mu );
614+ {
615+ int saved_depth = mino_yield_lock (S );
616+ #if defined(_WIN32 ) && defined(_MSC_VER )
617+ WaitForSingleObject (S -> agent .pool [kind ].worker , INFINITE );
618+ CloseHandle (S -> agent .pool [kind ].worker );
619+ #else
620+ pthread_join (S -> agent .pool [kind ].worker , NULL );
621+ #endif
622+ mino_resume_lock (S , saved_depth );
623+ }
624+ agent_mu_lock (& S -> agent .mu );
625+ /* While the lock was yielded a concurrent producer could
626+ * have re-spawned the worker. Re-check; if alive again,
627+ * we no longer need to spawn. */
628+ if (S -> agent .pool [kind ].worker_alive ) {
629+ need_spawn = 0 ;
630+ }
631+ }
632+ if (need_spawn ) {
633+ mino_worker_list_lock_acquire (S );
634+ if (S -> threading .thread_count >= S -> threading .thread_limit ) {
635+ mino_worker_list_lock_release (S );
636+ budget_ok = 0 ;
637+ } else {
638+ S -> threading .thread_count ++ ;
639+ mino_worker_list_lock_release (S );
640+ S -> threading .multi_threaded = 1 ;
641+ S -> agent .pool [kind ].worker_alive = 1 ;
642+ }
643+ }
644+ }
645+ if (!budget_ok ) {
646+ agent_mu_unlock (& S -> agent .mu );
647+ free (n );
584648 prim_throw_classified (S , "mino/thread-limit-exceeded" , "MTH001" ,
585649 "agent dispatch requires a host-granted worker thread; "
586650 "raise via mino_set_thread_limit (>= 1 for one agent "
@@ -589,37 +653,50 @@ static int agent_worker_ensure(mino_state_t *S, agent_pool_kind_t kind)
589653 "against the limit -- only spawned workers do." );
590654 return 1 ;
591655 }
592- S -> threading .thread_count ++ ;
593- mino_worker_list_lock_release (S );
594- S -> threading .multi_threaded = 1 ;
595- /* Mark alive BEFORE pthread_create returns so a concurrent send
596- * on the embedder thread (impossible in single-thread mode but
597- * defensive) doesn't race. */
598- agent_mu_lock (& S -> agent .mu );
599- S -> agent .pool [kind ].worker_alive = 1 ;
656+ agent_runq_push_tail (S , kind , n );
657+ if (agent -> as .agent .in_flight < INT_MAX ) {
658+ agent -> as .agent .in_flight ++ ;
659+ }
660+ agent_cv_broadcast (& S -> agent .cv );
600661 agent_mu_unlock (& S -> agent .mu );
662+
663+ /* Spawn the worker AFTER the node is published. The new pthread
664+ * will block on agent_mu_lock at its first iteration if we are
665+ * still holding (we aren't anymore) and immediately observe the
666+ * node we just pushed. */
667+ if (need_spawn ) {
601668#if defined(_WIN32 ) && defined(_MSC_VER )
602- {
603669 unsigned stack = (unsigned )S -> threading .thread_stack_size ;
604670 uintptr_t h = _beginthreadex (NULL , stack ,
605671 kind == AGENT_POOL_POOLED ? agent_worker_entry_pooled
606672 : agent_worker_entry_solo ,
607673 S , 0 , NULL );
608674 if (h == 0 ) {
675+ /* pthread create refused. Roll back the in_flight bump
676+ * and pop the node we just pushed so await is not
677+ * stranded on a permanently-pending action. The runq
678+ * head is what we pushed; pop it back. */
609679 agent_mu_lock (& S -> agent .mu );
680+ if (S -> agent .pool [kind ].run_tail == n ) {
681+ S -> agent .pool [kind ].run_tail = NULL ;
682+ }
683+ S -> agent .pool [kind ].run_head = n -> next ;
684+ if (agent -> as .agent .in_flight > 0 ) {
685+ agent -> as .agent .in_flight -- ;
686+ }
610687 S -> agent .pool [kind ].worker_alive = 0 ;
688+ agent_cv_broadcast (& S -> agent .cv );
611689 agent_mu_unlock (& S -> agent .mu );
612690 mino_worker_list_lock_acquire (S );
613691 if (S -> threading .thread_count > 0 ) { S -> threading .thread_count -- ; }
614692 mino_worker_list_lock_release (S );
693+ free (n );
615694 prim_throw_classified (S , "mino/thread-limit-exceeded" , "MTH001" ,
616695 "host refused agent worker thread" );
617696 return 1 ;
618697 }
619698 S -> agent .pool [kind ].worker = (HANDLE )h ;
620- }
621699#else
622- {
623700 pthread_attr_t attr ;
624701 pthread_attr_t * attrp = NULL ;
625702 int rc ;
@@ -635,45 +712,29 @@ static int agent_worker_ensure(mino_state_t *S, agent_pool_kind_t kind)
635712 S );
636713 if (attrp != NULL ) { pthread_attr_destroy (attrp ); }
637714 if (rc != 0 ) {
715+ /* Same rollback as the Windows arm above. */
638716 agent_mu_lock (& S -> agent .mu );
717+ if (S -> agent .pool [kind ].run_tail == n ) {
718+ S -> agent .pool [kind ].run_tail = NULL ;
719+ }
720+ S -> agent .pool [kind ].run_head = n -> next ;
721+ if (agent -> as .agent .in_flight > 0 ) {
722+ agent -> as .agent .in_flight -- ;
723+ }
639724 S -> agent .pool [kind ].worker_alive = 0 ;
725+ agent_cv_broadcast (& S -> agent .cv );
640726 agent_mu_unlock (& S -> agent .mu );
641727 mino_worker_list_lock_acquire (S );
642728 if (S -> threading .thread_count > 0 ) { S -> threading .thread_count -- ; }
643729 mino_worker_list_lock_release (S );
730+ free (n );
644731 prim_throw_classified (S , "mino/thread-limit-exceeded" , "MTH001" ,
645732 "host refused agent worker thread" );
646733 return 1 ;
647734 }
648- }
649735#endif
650- S -> agent .pool [kind ].worker_pending_join = 1 ;
651- return 0 ;
652- }
653-
654- /* Enqueue (agent fn extra env dyn_snap) onto the named pool's queue.
655- * Caller holds state_lock. agent_worker_ensure has spawned the
656- * pool's worker if needed. Returns 0 on success, 1 on OOM (caller
657- * propagates NULL). */
658- static int agent_enqueue (mino_state_t * S , agent_pool_kind_t kind ,
659- mino_val_t * agent , mino_val_t * fn ,
660- mino_val_t * extra , mino_env_t * env )
661- {
662- agent_action_node_t * n = (agent_action_node_t * )calloc (1 , sizeof (* n ));
663- if (n == NULL ) return 1 ;
664- n -> agent = agent ;
665- n -> fn = fn ;
666- n -> extra = extra ;
667- n -> env = env ;
668- n -> dyn_snap = mino_snapshot_thread_bindings (S );
669- n -> next = NULL ;
670- agent_mu_lock (& S -> agent .mu );
671- agent_runq_push_tail (S , kind , n );
672- if (agent -> as .agent .in_flight < INT_MAX ) {
673- agent -> as .agent .in_flight ++ ;
736+ S -> agent .pool [kind ].worker_pending_join = 1 ;
674737 }
675- agent_cv_broadcast (& S -> agent .cv );
676- agent_mu_unlock (& S -> agent .mu );
677738 return 0 ;
678739}
679740
@@ -868,10 +929,13 @@ static mino_val_t *agent_send_core(mino_state_t *S, agent_pool_kind_t kind,
868929 : mino_empty_list (S ));
869930 return agent ;
870931 }
871- if (agent_worker_ensure (S , kind )) return NULL ;
932+ /* agent_enqueue handles lazy worker spawn under the same
933+ * agent_mu critical section as the push + in_flight bump, so a
934+ * freshly-spawned worker is guaranteed to observe the node on
935+ * its first runq check (see the race note in the function
936+ * comment). */
872937 if (agent_enqueue (S , kind , agent , fn , extra , env )) {
873- prim_throw_classified (S , "eval/state" , "MST002" ,
874- "send: out of memory enqueueing action" );
938+ /* Diag already published by agent_enqueue. */
875939 return NULL ;
876940 }
877941 return agent ;
@@ -925,19 +989,15 @@ void mino_agent_drain_pending(mino_state_t *S, mino_val_t *pending,
925989 }
926990 if (S -> agent .shutdown ) continue ;
927991 /* In-tx send is the JVM-canon shape -- post-commit pending
928- * drains route through the POOLED pool. */
929- if (agent_worker_ensure (S , AGENT_POOL_POOLED )) {
930- /* Worker spawn refused (no thread budget). Pending
931- * sends are silently dropped; the commit has already
932- * gone through. Clear the error that
933- * agent_worker_ensure -> prim_throw_classified set so
934- * the post-commit drain doesn't surface as a failed
935- * dosync. */
936- clear_error (S );
937- return ;
938- }
992+ * drains route through the POOLED pool. agent_enqueue lazy-
993+ * spawns the worker atomically with the push so the worker
994+ * is guaranteed to drain. If the spawn refuses (no thread
995+ * budget) the action is silently dropped: the commit has
996+ * already gone through and throwing here would surprise a
997+ * caller that wasn't holding state_lock. Clear the diag
998+ * that agent_enqueue published. */
939999 if (agent_enqueue (S , AGENT_POOL_POOLED , agent_v , fn , extra , env )) {
940- /* OOM enqueueing: same logic as above. */
1000+ clear_error ( S );
9411001 return ;
9421002 }
9431003 }
0 commit comments