@@ -347,6 +347,13 @@ where
347347 let mut durable_available: HashSet < u64 > = HashSet :: new ( ) ;
348348 let mut variant_available: HashSet < u64 > = HashSet :: new ( ) ;
349349 let mut finalized_available: HashSet < u64 > = HashSet :: new ( ) ;
350+ // Heights for which marshal holds a live buffer waiter, created by a
351+ // Subscribe for a block it does not yet have. Only such a waiter makes a
352+ // later PublishViaVariant actor-observable: it fires the subscription,
353+ // marshal ingests the block, and a pending floor anchored on it
354+ // completes. Without a waiter a publish only populates the variant cache
355+ // and marshal never re-checks it. Cleared on Restart with the actor.
356+ let mut subscribed_heights: HashSet < u64 > = HashSet :: new ( ) ;
350357 // Publish order backing `variant_available`'s FIFO eviction.
351358 let mut variant_order: std:: collections:: VecDeque < u64 > = std:: collections:: VecDeque :: new ( ) ;
352359 // ready_prefix is monotone non-decreasing. It advances when the
@@ -662,6 +669,11 @@ where
662669 let block = & canonical[ block_index ( block_idx) ] ;
663670 let height = H :: height ( block) ;
664671 let round = round_for_height ( height) ;
672+ // Marshal registers a buffer waiter only when it lacks the
673+ // block; a hit answers the subscription inline without one.
674+ if !block_available ( & durable_available, & variant_available, height. get ( ) ) {
675+ subscribed_heights. insert ( height. get ( ) ) ;
676+ }
665677 if by_commitment {
666678 subscriptions. push ( handle. mailbox . subscribe_by_commitment (
667679 H :: commitment ( block) ,
@@ -843,6 +855,11 @@ where
843855 height : fresh_height,
844856 } ,
845857 ) ) ;
858+ // The live (non-closed) subscriptions above leave marshal a
859+ // waiter for the fresh block when it is missing locally.
860+ if !block_available ( & durable_available, & variant_available, fresh_height. get ( ) ) {
861+ subscribed_heights. insert ( fresh_height. get ( ) ) ;
862+ }
846863 handle. mailbox . hint_notarized ( fresh_round, fresh_commitment) ;
847864
848865 // Only report consensus certificates for a locally missing
@@ -971,17 +988,28 @@ where
971988 }
972989 }
973990 }
974- // Marshal holds a buffer subscription for a pending
975- // floor anchor; the publish completes it and marshal
976- // ingests the block as the durable floor anchor.
977- repair_wake |= apply_pending_floor (
978- & mut pending_floor,
979- height,
980- & mut durable_available,
981- & mut finalized_available,
982- & mut processed_height,
983- & mut segment_starts,
984- ) ;
991+ // A publish only completes a pending floor anchor when
992+ // marshal holds a live waiter for this block (from a
993+ // prior Subscribe): the publish fires the subscription
994+ // and marshal ingests it. Without a waiter the block
995+ // only lands in the variant cache, which marshal does
996+ // not re-check, so the floor stays parked on the
997+ // resolver. This is independent of the variant.
998+ //
999+ // Marshal subscriptions are one-shot: ingest notifies and
1000+ // removes them. Consume the waiter here unconditionally so
1001+ // a later republish (after the block is FIFO-evicted) does
1002+ // not wake a floor that real marshal can no longer serve.
1003+ if subscribed_heights. remove ( & h) {
1004+ repair_wake |= apply_pending_floor (
1005+ & mut pending_floor,
1006+ height,
1007+ & mut durable_available,
1008+ & mut finalized_available,
1009+ & mut processed_height,
1010+ & mut segment_starts,
1011+ ) ;
1012+ }
9851013 }
9861014 }
9871015 MarshalEvent :: AckNext => {
@@ -1070,6 +1098,9 @@ where
10701098 // visible to marshal.
10711099 variant_available. clear ( ) ;
10721100 variant_order. clear ( ) ;
1101+ // The new actor starts with no buffer waiters; any
1102+ // subscription registered by the prior instance died with it.
1103+ subscribed_heights. clear ( ) ;
10731104 // Marshal's processed_height for the new instance
10741105 // comes from its persistent metadata, which
10751106 // setup.height reflects. Pending deliveries that
0 commit comments