Skip to content

Commit 16e98b5

Browse files
[marshal] Pipeline Ack Handling (#3151)
1 parent af7fee2 commit 16e98b5

5 files changed

Lines changed: 315 additions & 86 deletions

File tree

consensus/src/marshal/actor.rs

Lines changed: 168 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ use commonware_utils::{
4545
sequence::U64,
4646
Acknowledgement, BoxedError,
4747
};
48-
use futures::{future::join_all, try_join};
48+
use futures::{future::join_all, try_join, FutureExt};
4949
use pin_project::pin_project;
5050
use prometheus_client::metrics::gauge::Gauge;
5151
use rand_core::CryptoRngCore;
5252
use std::{
53-
collections::{btree_map::Entry, BTreeMap},
53+
collections::{btree_map::Entry, BTreeMap, VecDeque},
5454
future::Future,
5555
num::NonZeroUsize,
56+
pin::Pin,
5657
sync::Arc,
5758
};
5859
use tracing::{debug, error, info, warn};
@@ -74,7 +75,7 @@ enum PendingVerification<S: CertificateScheme, B: Block> {
7475
},
7576
}
7677

77-
/// A pending acknowledgement from the application for processing a block at the contained height/commitment.
78+
/// A pending acknowledgement from the application for a block at the contained height/commitment.
7879
#[pin_project]
7980
struct PendingAck<B: Block, A: Acknowledgement> {
8081
height: Height,
@@ -94,6 +95,80 @@ impl<B: Block, A: Acknowledgement> Future for PendingAck<B, A> {
9495
}
9596
}
9697

98+
/// Tracks in-flight application acknowledgements with FIFO semantics.
99+
struct PendingAcks<B: Block, A: Acknowledgement> {
100+
current: OptionFuture<PendingAck<B, A>>,
101+
queue: VecDeque<PendingAck<B, A>>,
102+
max: usize,
103+
}
104+
105+
impl<B: Block, A: Acknowledgement> PendingAcks<B, A> {
106+
/// Creates a new pending-ack tracker with a maximum in-flight capacity.
107+
fn new(max: usize) -> Self {
108+
Self {
109+
current: None.into(),
110+
queue: VecDeque::with_capacity(max),
111+
max,
112+
}
113+
}
114+
115+
/// Drops the current ack and all queued acks.
116+
fn clear(&mut self) {
117+
self.current = None.into();
118+
self.queue.clear();
119+
}
120+
121+
/// Returns the currently armed ack future (if any) for `select_loop!`.
122+
const fn current(&mut self) -> &mut OptionFuture<PendingAck<B, A>> {
123+
&mut self.current
124+
}
125+
126+
/// Returns whether we can dispatch another block without exceeding capacity.
127+
fn has_capacity(&self) -> bool {
128+
let reserved = usize::from(self.current.is_some());
129+
self.queue.len() < self.max - reserved
130+
}
131+
132+
/// Returns the next height to dispatch while preserving sequential order.
133+
fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
134+
self.queue
135+
.back()
136+
.map(|ack| ack.height.next())
137+
.or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
138+
.unwrap_or_else(|| last_processed_height.next())
139+
}
140+
141+
/// Enqueues a newly dispatched ack, arming it immediately when idle.
142+
fn enqueue(&mut self, ack: PendingAck<B, A>) {
143+
if self.current.is_none() {
144+
self.current.replace(ack);
145+
return;
146+
}
147+
self.queue.push_back(ack);
148+
}
149+
150+
/// Returns metadata for a completed current ack and arms the next queued ack.
151+
fn complete_current(
152+
&mut self,
153+
result: <A::Waiter as Future>::Output,
154+
) -> (Height, B::Commitment, <A::Waiter as Future>::Output) {
155+
let PendingAck {
156+
height, commitment, ..
157+
} = self.current.take().expect("ack state must be present");
158+
if let Some(next) = self.queue.pop_front() {
159+
self.current.replace(next);
160+
}
161+
(height, commitment, result)
162+
}
163+
164+
/// If the current ack is already resolved, takes it and arms the next ack.
165+
fn pop_ready(&mut self) -> Option<(Height, B::Commitment, <A::Waiter as Future>::Output)> {
166+
let pending = self.current.as_mut()?;
167+
let result = Pin::new(&mut pending.receiver).now_or_never()?;
168+
Some(self.complete_current(result))
169+
}
170+
}
171+
97172
/// A struct that holds multiple subscriptions for a block.
98173
struct BlockSubscription<B: Block> {
99174
// The subscribers that are waiting for the block
@@ -151,8 +226,8 @@ where
151226
last_processed_round: Round,
152227
// Last height processed by the application
153228
last_processed_height: Height,
154-
// Pending application acknowledgement, if any
155-
pending_ack: OptionFuture<PendingAck<B, A>>,
229+
// Pending application acknowledgements
230+
pending_acks: PendingAcks<B, A>,
156231
// Highest known finalized height
157232
tip: Height,
158233
// Outstanding subscriptions for blocks
@@ -253,7 +328,7 @@ where
253328
strategy: config.strategy,
254329
last_processed_round: Round::zero(),
255330
last_processed_height,
256-
pending_ack: None.into(),
331+
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
257332
tip: Height::zero(),
258333
block_subscriptions: BTreeMap::new(),
259334
cache,
@@ -312,7 +387,7 @@ where
312387
}
313388

314389
// Attempt to dispatch the next finalized block to the application, if it is ready.
315-
self.try_dispatch_block(&mut application).await;
390+
self.try_dispatch_blocks(&mut application).await;
316391

317392
// Attempt to repair any gaps in the finalized blocks archive, if there are any.
318393
if self
@@ -338,28 +413,41 @@ where
338413
Ok((commitment, block)) = waiters.next_completed() else continue => {
339414
self.notify_subscribers(commitment, &block);
340415
},
341-
// Handle application acknowledgements next
342-
ack = &mut self.pending_ack => {
343-
let PendingAck {
344-
height, commitment, ..
345-
} = self.pending_ack.take().expect("ack state must be present");
346-
347-
match ack {
348-
Ok(()) => {
349-
if let Err(e) = self
350-
.handle_block_processed(height, commitment, &mut resolver)
351-
.await
352-
{
353-
error!(?e, %height, "failed to update application progress");
416+
// Handle application acknowledgements (drain all ready acks, sync once)
417+
result = self.pending_acks.current() => {
418+
// Start with the ack that woke this `select_loop!` arm.
419+
let mut pending = Some(self.pending_acks.complete_current(result));
420+
loop {
421+
let (height, commitment, result) = pending.take().expect("pending ack must exist");
422+
match result {
423+
Ok(()) => {
424+
// Apply in-memory progress updates for this acknowledged block.
425+
self.handle_block_processed(height, commitment, &mut resolver)
426+
.await;
427+
}
428+
Err(e) => {
429+
// Ack failures are fatal for marshal/application coordination.
430+
error!(e = ?e, height = %height, "application did not acknowledge block");
354431
return;
355432
}
356-
self.try_dispatch_block(&mut application).await;
357-
}
358-
Err(e) => {
359-
error!(?e, %height, "application did not acknowledge block");
360-
return;
361433
}
434+
435+
// Opportunistically drain any additional already-ready acks so we
436+
// can persist one metadata sync for the whole batch below.
437+
let Some(next) = self.pending_acks.pop_ready() else {
438+
break;
439+
};
440+
pending = Some(next);
441+
}
442+
443+
// Persist buffered processed-height updates once after draining all ready acks.
444+
if let Err(e) = self.application_metadata.sync().await {
445+
error!(?e, "failed to sync application progress");
446+
return;
362447
}
448+
449+
// Fill the pipeline
450+
self.try_dispatch_blocks(&mut application).await;
363451
},
364452
// Handle consensus inputs before backfill or resolver traffic
365453
Some(message) = self.mailbox.recv() else {
@@ -552,15 +640,16 @@ where
552640
}
553641

554642
// Update the processed height
555-
if let Err(err) = self.set_processed_height(height, &mut resolver).await {
643+
self.update_processed_height(height, &mut resolver).await;
644+
if let Err(err) = self.application_metadata.sync().await {
556645
error!(?err, %height, "failed to update floor");
557646
return;
558647
}
559648

560-
// Drop the pending acknowledgement, if one exists. We must do this to prevent
649+
// Drop all pending acknowledgements. We must do this to prevent
561650
// an in-process block from being processed that is below the new floor
562651
// updating `last_processed_height`.
563-
self.pending_ack = None.into();
652+
self.pending_acks.clear();
564653

565654
// Prune the finalized block and finalization certificate archives in parallel.
566655
if let Err(err) = self.prune_finalized_archives(height).await {
@@ -941,67 +1030,75 @@ where
9411030

9421031
// -------------------- Application Dispatch --------------------
9431032

944-
/// Attempt to dispatch the next finalized block to the application if ready.
1033+
/// Dispatch finalized blocks to the application until the pipeline is full
1034+
/// or no more blocks are available.
9451035
///
9461036
/// This does NOT advance `last_processed_height` or sync metadata. It only
947-
/// sends the block to the application and stores a [PendingAck]. The
948-
/// metadata is updated later, in a subsequent `select_loop!` iteration,
949-
/// when the ack arrives and [`Self::handle_block_processed`] calls
950-
/// [`Self::set_processed_height`].
1037+
/// sends blocks to the application and enqueues pending acks. Metadata is
1038+
/// updated later, in a subsequent `select_loop!` iteration, when acks
1039+
/// arrive and [`Self::handle_block_processed`] calls
1040+
/// [`Self::update_processed_height`].
1041+
///
1042+
/// Acks are processed in FIFO order so `last_processed_height` always
1043+
/// advances sequentially.
9511044
///
9521045
/// # Crash safety
9531046
///
9541047
/// Because `select_loop!` arms run to completion, the caller's
955-
/// [`Self::sync_finalized`] always executes before the ack handler runs. This
956-
/// guarantees archive data is durable before `last_processed_height`
1048+
/// [`Self::sync_finalized`] always executes before the ack handler runs.
1049+
/// This guarantees archive data is durable before `last_processed_height`
9571050
/// advances:
9581051
///
9591052
/// ```text
9601053
/// Iteration N (caller):
9611054
/// store_finalization -> Archive::put (buffered)
962-
/// try_dispatch_block -> sends block to app, sets pending_ack
1055+
/// try_dispatch_blocks -> sends blocks to app, enqueues pending acks
9631056
/// sync_finalized -> archive durable
9641057
///
965-
/// Iteration N+1 (ack handler):
966-
/// handle_block_processed -> set_processed_height -> metadata durable
1058+
/// Iteration M (ack handler, M > N):
1059+
/// handle_block_processed -> update_processed_height -> metadata buffered
1060+
/// application_metadata.sync -> metadata durable
9671061
/// ```
968-
async fn try_dispatch_block(
1062+
async fn try_dispatch_blocks(
9691063
&mut self,
9701064
application: &mut impl Reporter<Activity = Update<B, A>>,
9711065
) {
972-
if self.pending_ack.is_some() {
973-
return;
974-
}
975-
976-
let next_height = self.last_processed_height.next();
977-
let Some(block) = self.get_finalized_block(next_height).await else {
978-
return;
979-
};
980-
assert_eq!(
981-
block.height(),
982-
next_height,
983-
"finalized block height mismatch"
984-
);
1066+
while self.pending_acks.has_capacity() {
1067+
let next_height = self
1068+
.pending_acks
1069+
.next_dispatch_height(self.last_processed_height);
1070+
let Some(block) = self.get_finalized_block(next_height).await else {
1071+
return;
1072+
};
1073+
assert_eq!(
1074+
block.height(),
1075+
next_height,
1076+
"finalized block height mismatch"
1077+
);
9851078

986-
let (height, commitment) = (block.height(), block.commitment());
987-
let (ack, ack_waiter) = A::handle();
988-
application.report(Update::Block(block, ack)).await;
989-
self.pending_ack.replace(PendingAck {
990-
height,
991-
commitment,
992-
receiver: ack_waiter,
993-
});
1079+
let (height, commitment) = (block.height(), block.commitment());
1080+
let (ack, ack_waiter) = A::handle();
1081+
application.report(Update::Block(block, ack)).await;
1082+
self.pending_acks.enqueue(PendingAck {
1083+
height,
1084+
commitment,
1085+
receiver: ack_waiter,
1086+
});
1087+
}
9941088
}
9951089

9961090
/// Handle acknowledgement from the application that a block has been processed.
1091+
///
1092+
/// Buffers the processed height update but does NOT sync to durable storage.
1093+
/// The caller must sync metadata after processing all ready acks.
9971094
async fn handle_block_processed(
9981095
&mut self,
9991096
height: Height,
10001097
commitment: B::Commitment,
10011098
resolver: &mut impl Resolver<Key = Request<B>>,
1002-
) -> Result<(), metadata::Error> {
1003-
// Update the processed height
1004-
self.set_processed_height(height, resolver).await?;
1099+
) {
1100+
// Update the processed height (buffered, not synced)
1101+
self.update_processed_height(height, resolver).await;
10051102

10061103
// Cancel any useless requests
10071104
resolver.cancel(Request::<B>::Block(commitment)).await;
@@ -1026,8 +1123,6 @@ where
10261123
.retain(Request::<B>::Notarized { round }.predicate())
10271124
.await;
10281125
}
1029-
1030-
Ok(())
10311126
}
10321127

10331128
// -------------------- Prunable Storage --------------------
@@ -1049,7 +1144,7 @@ where
10491144
/// Must be called within the same `select_loop!` arm as any preceding
10501145
/// [`Self::store_finalization`] / [`Self::try_repair_gaps`] writes, before yielding back
10511146
/// to the loop. This ensures archives are durable before the ack handler
1052-
/// advances `last_processed_height`. See [`Self::try_dispatch_block`] for details.
1147+
/// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for details.
10531148
async fn sync_finalized(&mut self) {
10541149
if let Err(e) = try_join!(
10551150
async {
@@ -1103,7 +1198,7 @@ where
11031198
/// Writes are buffered and not synced. The caller must call
11041199
/// [sync_finalized](Self::sync_finalized) before yielding to the
11051200
/// `select_loop!` so that archive data is durable before the ack handler
1106-
/// advances `last_processed_height`. See [`Self::try_dispatch_block`] for the
1201+
/// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the
11071202
/// crash safety invariant.
11081203
async fn store_finalization(
11091204
&mut self,
@@ -1148,7 +1243,7 @@ where
11481243
let _ = self.finalized_height.try_set(height.get());
11491244
}
11501245

1151-
self.try_dispatch_block(application).await;
1246+
self.try_dispatch_blocks(application).await;
11521247
}
11531248

11541249
/// Get the latest finalized block information (height and commitment tuple).
@@ -1268,20 +1363,14 @@ where
12681363
wrote
12691364
}
12701365

1271-
/// Sets the processed height in storage, metrics, and in-memory state. Also cancels any
1272-
/// outstanding requests below the new processed height.
1273-
///
1274-
/// This durably syncs `last_processed_height` via [`Metadata::put_sync`]. It must only
1275-
/// be called after [`Self::sync_finalized`] has made the corresponding archive
1276-
/// writes durable. See [`Self::try_dispatch_block`] for the crash safety invariant.
1277-
async fn set_processed_height(
1366+
/// Buffers a processed height update in memory and metrics. Does NOT sync
1367+
/// to durable storage. Sync metadata after buffered updates to make them durable.
1368+
async fn update_processed_height(
12781369
&mut self,
12791370
height: Height,
12801371
resolver: &mut impl Resolver<Key = Request<B>>,
1281-
) -> Result<(), metadata::Error> {
1282-
self.application_metadata
1283-
.put_sync(LATEST_KEY, height)
1284-
.await?;
1372+
) {
1373+
self.application_metadata.put(LATEST_KEY, height);
12851374
self.last_processed_height = height;
12861375
let _ = self
12871376
.processed_height
@@ -1291,8 +1380,6 @@ where
12911380
resolver
12921381
.retain(Request::<B>::Finalized { height }.predicate())
12931382
.await;
1294-
1295-
Ok(())
12961383
}
12971384

12981385
/// Prunes finalized blocks and certificates below the given height.

0 commit comments

Comments
 (0)