Skip to content

Commit 229f05f

Browse files
davidtaikochaclaude
andcommitted
fix(taiko-client-rs): retry aggregation errors instead of dropping proofs
aggregate_proofs_by_type cleared the buffer and returned on any non-Pending error, permanently dropping already-generated proofs whose Proposed events were already marked handled (no scanner replay). Go retries all aggregation errors with constant backoff and only clears on shutdown. Mirror that: retry non-terminal errors, keeping the buffer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 6aedd9c commit 229f05f

1 file changed

Lines changed: 39 additions & 11 deletions

File tree

  • packages/taiko-client-rs/crates/prover/src/submitter

packages/taiko-client-rs/crates/prover/src/submitter/submitter.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,18 @@ impl Pipeline {
288288
let _ = self.channels.batch_proofs_tx.send(batch).await;
289289
return Ok(());
290290
}
291+
// Retry every non-success outcome with constant backoff, keeping
292+
// the buffered proofs (Go `AggregateProofsByType` wraps the
293+
// aggregation in `backoff.Retry`; the in-progress vs. error
294+
// distinction is logging-only). Dropping the buffer here would
295+
// lose already-generated proofs whose `Proposed` events are
296+
// already marked handled, so they would never be re-proven.
291297
Err(ProverError::Raiko(RaikoError::Pending(_))) => {
292298
tokio::time::sleep(self.cfg.proof_polling_interval).await;
293299
}
294300
Err(err) => {
295-
let ids: Vec<u64> = items.iter().map(ProofResponse::proposal_id).collect();
296-
buffer.clear_items(&ids);
297-
return Err(err);
301+
tracing::warn!(%err, ?proof_type, "aggregate proofs failed, retrying");
302+
tokio::time::sleep(self.cfg.proof_polling_interval).await;
298303
}
299304
}
300305
}
@@ -572,9 +577,27 @@ impl ProofSubmitter {
572577
return Ok(());
573578
}
574579

575-
if let Err(err) = self.tx_manager.send(candidate).await {
576-
ProverMetrics::submission_errors().inc();
577-
return Err(ProverError::from(err));
580+
let receipt = match self.tx_manager.send(candidate).await {
581+
Ok(receipt) => receipt,
582+
Err(err) => {
583+
ProverMetrics::submission_errors().inc();
584+
return Err(ProverError::from(err));
585+
}
586+
};
587+
588+
// base-tx-manager returns `Ok(receipt)` even for a transaction that reached
589+
// confirmation depth but reverted, so the status must be checked explicitly.
590+
// A revert means the proofs did not land; surface it so the caller resends
591+
// the requests rather than dropping them, matching Go `prover.go:302-314`
592+
// (reverted/unretryable -> `ClearProofBuffers(batchProof, true)`).
593+
if !receipt.status() {
594+
ProverMetrics::submission_reverted().inc();
595+
tracing::error!(
596+
tx_hash = %receipt.transaction_hash,
597+
?batch.batch_ids,
598+
"prove transaction reverted on-chain; resending proof requests"
599+
);
600+
return Err(ProverError::SubmissionReverted);
578601
}
579602

580603
ProverMetrics::proofs_sent().inc_by(batch.batch_ids.len() as u64);
@@ -927,20 +950,25 @@ mod tests {
927950
}
928951

929952
#[tokio::test]
930-
async fn aggregate_clears_buffer_on_terminal_error() {
953+
async fn aggregate_retries_error_without_dropping_buffer() {
931954
let producer = Arc::new(MockProducer::default());
955+
// First aggregation attempt fails; the retry (empty queue -> Ok) succeeds.
932956
producer
933957
.batch
934958
.lock()
935959
.await
936960
.push_back(Err(ProverError::Other(anyhow::anyhow!("raiko down"))));
937-
let h = harness(producer, Duration::from_secs(3_600), 2);
961+
let mut h = harness(producer, Duration::from_secs(3_600), 2);
938962
let buffer = h.pipeline.buffers().get(&ProofType::Sgx).unwrap();
939963
buffer.write(response(5, ProofType::Sgx)).unwrap();
964+
buffer.write(response(6, ProofType::Sgx)).unwrap();
940965

941-
let err = h.pipeline.aggregate_proofs_by_type(ProofType::Sgx).await.unwrap_err();
942-
assert!(err.to_string().contains("raiko down"));
943-
assert_eq!(buffer.len(), 0, "terminal error clears the buffer");
966+
// The transient error is retried, not surfaced, and the proofs are kept.
967+
h.pipeline.aggregate_proofs_by_type(ProofType::Sgx).await.unwrap();
968+
969+
let batch = h.batch_rx.try_recv().unwrap();
970+
assert_eq!(batch.batch_ids, vec![5, 6], "buffered proofs aggregated on retry, not dropped");
971+
assert_eq!(buffer.len(), 2, "buffer retained until post-submission clear");
944972
}
945973

946974
#[tokio::test]

0 commit comments

Comments
 (0)