Skip to content

feat(coprocessor): drift detection in gw-listener#2096

Merged
mergify[bot] merged 43 commits intomainfrom
coproc-drift-heal
Mar 16, 2026
Merged

feat(coprocessor): drift detection in gw-listener#2096
mergify[bot] merged 43 commits intomainfrom
coproc-drift-heal

Conversation

@Eikix
Copy link
Copy Markdown
Contributor

@Eikix Eikix commented Mar 11, 2026

Summary

  • Add opt-in drift detection to gw-listener by polling CiphertextCommits events
  • Fetch the expected coprocessor tx-sender set from GatewayConfig when drift detection is enabled
  • Track per-handle submissions in memory until the handle is complete or stale
  • Snapshot the expected sender set per handle so open handles are evaluated against the membership they started with
  • Persist the earliest open CiphertextCommits block and rebuild open-handle state from chain logs after restart
  • Detect multiple digest variants for a handle and log the sender breakdown
  • Compare consensus digests against the local ciphertext_digest row, retrying later if local digests are not ready yet
  • Alert if all expected coprocessors submitted but no consensus event was observed
  • Alert if consensus was reached but some expected coprocessors never submitted within a grace window
  • Metrics: drift_detected, consensus_timeout, missing_submission

Scope Notes

  • Restart recovery persists only one watermark: the earliest open CiphertextCommits block
  • Open-handle state is rebuilt from chain logs on startup; it is not stored in Postgres as structured detector state
  • Rebuild is silent while historical logs are replayed, then open handles are evaluated once after alerts are re-enabled
  • GatewayConfig updates refresh the current expected sender set for new handles; existing open handles keep their original membership snapshot
  • This PR does not pause processing, fetch canonical ciphertexts, or wipe/recompute state

Test Plan

  • SQLX_OFFLINE=true cargo build -p gw-listener --manifest-path coprocessor/fhevm-engine/Cargo.toml
  • SQLX_OFFLINE=true cargo test -p gw-listener --manifest-path coprocessor/fhevm-engine/Cargo.toml
  • ./test-suite/fhevm/fhevm-cli test ciphertext-drift
  • CI: test-suite-e2e-tests with deploy-build=true
  • Manual: restart gw-listener while a handle is still open and verify tracking resumes from the persisted watermark
  • Manual: verify /metrics exposes drift_detected, consensus_timeout, and missing_submission

E2E Drift Scenario

  • The e2e drift test uses the existing multi-coprocessor stack
  • One coprocessor DB installs a temporary one-shot Postgres trigger on ciphertext_digest
  • The trigger flips one byte in ciphertext the first time a row becomes fully ready
  • transaction-sender then submits the already-polluted digest, which should trigger the Rust drift detector

closes https://github.com/zama-ai/fhevm-internal/issues/1147

@cla-bot cla-bot bot added the cla-signed label Mar 11, 2026
@Eikix Eikix force-pushed the coproc-drift-heal branch from 637dca2 to a76cffe Compare March 11, 2026 13:18
@Eikix Eikix marked this pull request as ready for review March 11, 2026 18:04
@Eikix Eikix requested review from a team as code owners March 11, 2026 18:04
Eikix added 14 commits March 11, 2026 19:06
Compare local ciphertext digests against on-chain consensus from the
CiphertextCommits contract. Enabled via --ciphertext-commits-address.

Adds early-warning logging when peer submissions diverge and structured
warn logs when local digest mismatches consensus. Four new Prometheus
counters provide observability. Detection-only — no recovery action.
…ft checks

- P1: Propagate DB errors from handle_consensus instead of swallowing
  them, so the block is retried via the outer backoff loop.
- P2: Queue unresolved consensus events (local digest not yet computed)
  in a bounded pending queue (10k cap) and retry each block tick, so
  late-arriving local digests are still checked.
- Refactor comparison logic into try_resolve_consensus shared by both
  the initial check and the retry path.
@Eikix Eikix force-pushed the coproc-drift-heal branch from 515f6a5 to 56d267f Compare March 11, 2026 18:07
@zama-ai zama-ai deleted a comment from claude bot Mar 11, 2026
@mergify
Copy link
Copy Markdown

mergify bot commented Mar 11, 2026

🧪 CI Insights

Here's what we observed from your CI run for 26ddfd4.

🟢 All jobs passed!

But CI Insights is watching 👀

- Clarify drift counter description in metrics.rs
- Document all 5 drift metrics in docs/metrics/metrics.md
- Bump drift timeout defaults from 50/10 to 3000/3000 blocks (~5 min at
  100ms block time) to accommodate coprocessors stuck for minutes
- Replace manual bail with clap requires on ciphertext_commits_address
dartdart26
dartdart26 previously approved these changes Mar 13, 2026
Copy link
Copy Markdown
Collaborator

@dartdart26 dartdart26 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on it so quickly! I know it's been rushed and I understand the reasoning and the extensive use of AI.

That said, it's been a bit hard for me to verify all works as expected.

I am approving it, knowing that this is just for detection and won't change coprocessor state in any way. If we observe issues, we can fix them, but it won't affect coprocessor functionality.

Thanks @Eikix, good job!

P.S. Maybe the things that could affect the coprocessor are changes in gw_listener.rs, we could double check these, but they do look ok.

The field name now reflects what the detector is *doing* (replaying
historical logs) rather than a side-effect toggle. Inverted semantics:
`replaying: true` suppresses warns and DB queries, `false` is normal
operation.

Also removes redundant `if self.alerts_enabled` guard in
try_check_local_consensus — already short-circuited by the early return
at the top of the function.
Eikix added 4 commits March 13, 2026 15:08
- Add `type CiphertextDigest = FixedBytes<32>` to avoid literal 32
- Document implicit upper bound on open_handles
- Remove redundant early-return guard in flush_metrics
- fetch_expected_coprocessor_tx_senders now requires gateway_config_address
  parameter; caller decides whether to call based on config
Use domain-meaningful names: HandleOutcome describes what happened to
a handle, not an internal mechanism. Variants renamed:
- KeepWaiting → Pending
- NoConsensusTimeout → GatewayNeverReachedConsensus
- ConsensusUncheckedTimeout → LocalDigestNeverAppeared
- MissingSubmissionPostGrace → NotAllCoprocessorsSubmitted
…prove clarity

- Fix verify_proof metrics being incremented both inside the function
  and at the call site (bug: double-counting)
- Remove unnecessary .clone() on owned event field
- Replace unreachable Pending return with unreachable!()
- Replace cross-function .unwrap() with let-else in evict_stale
- Collapse two identical "local digests not ready" debug branches
- Use < instead of != in classify_handle (invariant is always <)
- Rename fetch_expected_coprocessor_tx_senders to fetch_expected_senders
- Remove redundant replaying guard in finalize_completed_without_consensus
…tener

All drift detection code paths now log-and-continue instead of
propagating errors to run_get_logs. A transient DB or RPC failure
during drift checking should not interrupt processing of
VerifyProofRequest, ActivateKey, or ActivateCrs events.
Copy link
Copy Markdown
Contributor Author

@Eikix Eikix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AUTOMATED]

Code Review

Reviewed for bugs, guideline compliance, and error handling. Found 2 issues.

Summary

  • Bug (low severity): Histogram metrics are observed eagerly, breaking the deferred-metric invariant stated in the main loop comment.
  • Bug (low severity): finalize_completed_without_consensus uses == instead of >=, missing handles when unexpected senders submit.

No guideline violations found. Error handling for drift detection paths correctly uses log-and-continue.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 13, 2026

Changed Lines Coverage

Coverage of added/modified lines: 86.0%

Per-file breakdown

Diff Coverage

Diff: origin/main...HEAD, staged and unstaged changes

  • coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs (0.0%): Missing lines 199-202
  • coprocessor/fhevm-engine/gw-listener/src/drift_detector.rs (95.3%): Missing lines 156,167,193,201,206,275,278,296,300,326,334-336,339,342,348,363,367,376,382,387,396,399-403,405,414,419-423,425,449,463,468,473,475-478,480,515,521,526-527,529,538,568-572,594,596,609
  • coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs (43.7%): Missing lines 159-161,163-166,188,195,198,259-260,270-273,275,289-291,293-295,317,321,324-329,331-333,335-338,341,346,393-402,405-407,409-410,432-433,435,437-439,441,443,445,447-451,453-461,463-474,476-481,483,487,489-492,495-515,517,520,522-523,525-532,535-544,548,550-551,553-554,770
  • coprocessor/fhevm-engine/gw-listener/src/lib.rs (100%)
  • coprocessor/fhevm-engine/gw-listener/src/metrics.rs (100%)

Summary

  • Total: 1533 lines
  • Missing: 213 lines
  • Coverage: 86%

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

  195         get_logs_block_batch_size: conf.get_logs_block_batch_size,
  196         replay_from_block: conf.replay_from_block,
  197         replay_skip_verify_proof: conf.replay_skip_verify_proof,
  198         log_last_processed_every_number_of_updates: conf.log_last_processed_every_number_of_updates,
! 199         ciphertext_commits_address: conf.ciphertext_commits_address,
! 200         gateway_config_address: conf.gateway_config_address,
! 201         drift_no_consensus_timeout: conf.drift_no_consensus_timeout,
! 202         drift_post_consensus_grace: conf.drift_post_consensus_grace,
  203     };
  204 
  205     let gw_listener = GatewayListener::new(
  206         conf.input_verification_address,

coprocessor/fhevm-engine/gw-listener/src/drift_detector.rs

  152         {
  153             if !self.replaying && existing.digests != digests {
  154                 warn!(
  155                     handle = %handle,
! 156                     host_chain_id = self.host_chain_id.as_i64(),
  157                     local_node_id = %self.local_node_id,
  158                     block_number = context.block_number,
  159                     block_hash = ?context.block_hash,
  160                     tx_hash = ?context.tx_hash,

  163                     previous_ciphertext_digest = %existing.digests.ciphertext_digest,
  164                     previous_ciphertext128_digest = %existing.digests.ciphertext128_digest,
  165                     new_ciphertext_digest = %digests.ciphertext_digest,
  166                     new_ciphertext128_digest = %digests.ciphertext128_digest,
! 167                     "Same coprocessor submitted different digests for one handle"
  168                 );
  169             }
  170             return;
  171         }

  189                 .map(ToString::to_string)
  190                 .collect();
  191             warn!(
  192                 handle = %handle,
! 193                 host_chain_id = self.host_chain_id.as_i64(),
  194                 local_node_id = %self.local_node_id,
  195                 first_seen_block = state.first_seen_block,
  196                 first_seen_block_hash = ?state.first_seen_block_hash,
  197                 block_number = context.block_number,

  197                 block_number = context.block_number,
  198                 block_hash = ?context.block_hash,
  199                 tx_hash = ?context.tx_hash,
  200                 log_index = ?context.log_index,
! 201                 variant_count = variants.len(),
  202                 variants = ?variants,
  203                 seen_senders = ?seen,
  204                 missing_senders = ?missing,
  205                 source = "peer_submission",
! 206                 "Drift detected: observed multiple digest variants for handle"
  207             );
  208             state.drift_reported = true;
  209             self.deferred_drift_detected += 1;
  210         }

  271             return Ok(());
  272         }
  273 
  274         let Some(state) = self.open_handles.get(&handle) else {
! 275             return Ok(());
  276         };
  277         let Some(consensus) = &state.consensus else {
! 278             return Ok(());
  279         };
  280 
  281         let row = sqlx::query(
  282             "SELECT ciphertext, ciphertext128 FROM ciphertext_digest WHERE handle = $1",

  292         });
  293         let Some((local_ciphertext_digest, local_ciphertext128_digest)) = local_digests else {
  294             debug!(
  295                 handle = %handle,
! 296                 host_chain_id = self.host_chain_id.as_i64(),
  297                 local_node_id = %self.local_node_id,
  298                 block_number = consensus.context.block_number,
  299                 tx_hash = ?consensus.context.tx_hash,
! 300                 "Local digests not yet available; deferring drift check"
  301             );
  302             return Ok(());
  303         };

  322                 sender_count_for_variant(&state.submissions, consensus.digests);
  323             let observed_variants = variant_summaries(&state.submissions);
  324             warn!(
  325                 handle = %handle,
! 326                 host_chain_id = self.host_chain_id.as_i64(),
  327                 local_node_id = %self.local_node_id,
  328                 block_number = consensus.context.block_number,
  329                 block_hash = ?consensus.context.block_hash,
  330                 tx_hash = ?consensus.context.tx_hash,

  330                 tx_hash = ?consensus.context.tx_hash,
  331                 log_index = ?consensus.context.log_index,
  332                 consensus_ciphertext_digest = %consensus.digests.ciphertext_digest,
  333                 consensus_ciphertext128_digest = %consensus.digests.ciphertext128_digest,
! 334                 local_ciphertext_digest = %to_hex(&local_ciphertext_digest),
! 335                 local_ciphertext128_digest = %to_hex(&local_ciphertext128_digest),
! 336                 local_matches_observed_variant = local_variant_sender_count > 0,
  337                 local_variant_sender_count,
  338                 consensus_variant_sender_count,
! 339                 observed_variant_count = observed_variants.len(),
  340                 observed_variants = ?observed_variants,
  341                 source = "consensus",
! 342                 "Drift detected: local digest does not match consensus"
  343             );
  344             self.deferred_drift_detected += 1;
  345         }

  344             self.deferred_drift_detected += 1;
  345         }
  346 
  347         let Some(state) = self.open_handles.get_mut(&handle) else {
! 348             return Ok(());
  349         };
  350         state.local_consensus_checked = true;
  351         self.finish_if_complete(handle);
  352         Ok(())

  359             match self.classify_handle(state, now) {
  360                 HandleOutcome::Pending => {}
  361                 HandleOutcome::LocalDigestNeverAppeared => {
  362                     let Some(consensus) = state.consensus.as_ref() else {
! 363                         continue;
  364                     };
  365                     warn!(
  366                         handle = %handle,
! 367                         host_chain_id = self.host_chain_id.as_i64(),
  368                         local_node_id = %self.local_node_id,
  369                         first_seen_block = state.first_seen_block,
  370                         first_seen_block_hash = ?state.first_seen_block_hash,
  371                         last_seen_block = state.last_seen_block,

  372                         consensus_block = consensus.context.block_number,
  373                         consensus_block_hash = ?consensus.context.block_hash,
  374                         consensus_tx_hash = ?consensus.context.tx_hash,
  375                         consensus_log_index = ?consensus.context.log_index,
! 376                         "Consensus was observed but local digests never became available for comparison"
  377                     );
  378                     finished.push(*handle);
  379                 }
  380                 HandleOutcome::NotAllCoprocessorsSubmitted => {

  378                     finished.push(*handle);
  379                 }
  380                 HandleOutcome::NotAllCoprocessorsSubmitted => {
  381                     let Some(consensus) = state.consensus.as_ref() else {
! 382                         continue;
  383                     };
  384                     let variants = variant_summaries(&state.submissions);
  385                     warn!(
  386                         handle = %handle,
! 387                         host_chain_id = self.host_chain_id.as_i64(),
  388                         local_node_id = %self.local_node_id,
  389                         first_seen_block = state.first_seen_block,
  390                         first_seen_block_hash = ?state.first_seen_block_hash,
  391                         last_seen_block = state.last_seen_block,

  392                         consensus_block = consensus.context.block_number,
  393                         consensus_block_hash = ?consensus.context.block_hash,
  394                         consensus_tx_hash = ?consensus.context.tx_hash,
  395                         consensus_log_index = ?consensus.context.log_index,
! 396                         consensus_senders = ?consensus.senders.iter().map(ToString::to_string).collect::<Vec<_>>(),
  397                         consensus_ciphertext_digest = %consensus.digests.ciphertext_digest,
  398                         consensus_ciphertext128_digest = %consensus.digests.ciphertext128_digest,
! 399                         seen_senders = ?state.submissions.iter().map(|s| s.sender.to_string()).collect::<Vec<_>>(),
! 400                         missing_senders = ?state.expected_senders.iter()
! 401                             .filter(|s| !state.submissions.iter().any(|sub| sub.sender == **s))
! 402                             .map(ToString::to_string).collect::<Vec<_>>(),
! 403                         variant_count = variants.len(),
  404                         variants = ?variants,
! 405                         "Not all expected coprocessors submitted before post-consensus grace period expired"
  406                     );
  407                     self.deferred_missing_submission += 1;
  408                     finished.push(*handle);
  409                 }

  410                 HandleOutcome::GatewayNeverReachedConsensus => {
  411                     let variants = variant_summaries(&state.submissions);
  412                     warn!(
  413                         handle = %handle,
! 414                         host_chain_id = self.host_chain_id.as_i64(),
  415                         local_node_id = %self.local_node_id,
  416                         first_seen_block = state.first_seen_block,
  417                         first_seen_block_hash = ?state.first_seen_block_hash,
  418                         last_seen_block = state.last_seen_block,
! 419                         seen_senders = ?state.submissions.iter().map(|s| s.sender.to_string()).collect::<Vec<_>>(),
! 420                         missing_senders = ?state.expected_senders.iter()
! 421                             .filter(|s| !state.submissions.iter().any(|sub| sub.sender == **s))
! 422                             .map(ToString::to_string).collect::<Vec<_>>(),
! 423                         variant_count = variants.len(),
  424                         variants = ?variants,
! 425                         "Handle timed out before consensus was observed"
  426                     );
  427                     self.deferred_consensus_timeout += 1;
  428                     finished.push(*handle);
  429                 }

  445     }
  446 
  447     fn evaluate_open_handles(&mut self, now: Instant) {
  448         if self.replaying {
! 449             return;
  450         }
  451 
  452         let drift_handles = self
  453             .open_handles

  459             .collect::<Vec<_>>();
  460 
  461         for handle in drift_handles {
  462             let Some(state) = self.open_handles.get_mut(&handle) else {
! 463                 continue;
  464             };
  465             let variants = variant_summaries(&state.submissions);
  466             warn!(
  467                 handle = %handle,
! 468                 host_chain_id = self.host_chain_id.as_i64(),
  469                 local_node_id = %self.local_node_id,
  470                 first_seen_block = state.first_seen_block,
  471                 first_seen_block_hash = ?state.first_seen_block_hash,
  472                 last_seen_block = state.last_seen_block,
! 473                 variant_count = variants.len(),
  474                 variants = ?variants,
! 475                 seen_senders = ?state.submissions.iter().map(|s| s.sender.to_string()).collect::<Vec<_>>(),
! 476                 missing_senders = ?state.expected_senders.iter()
! 477                     .filter(|s| !state.submissions.iter().any(|sub| sub.sender == **s))
! 478                     .map(ToString::to_string).collect::<Vec<_>>(),
  479                 source = "peer_submission",
! 480                 "Drift detected: observed multiple digest variants for handle"
  481             );
  482             state.drift_reported = true;
  483             self.deferred_drift_detected += 1;
  484         }

  511             .collect::<Vec<_>>();
  512 
  513         for handle in completed_without_consensus {
  514             let Some(state) = self.open_handles.get(&handle) else {
! 515                 continue;
  516             };
  517 
  518             let variants = variant_summaries(&state.submissions);
  519             warn!(

  517 
  518             let variants = variant_summaries(&state.submissions);
  519             warn!(
  520                 handle = %handle,
! 521                 host_chain_id = self.host_chain_id.as_i64(),
  522                 local_node_id = %self.local_node_id,
  523                 first_seen_block = state.first_seen_block,
  524                 first_seen_block_hash = ?state.first_seen_block_hash,
  525                 last_seen_block = state.last_seen_block,
! 526                 seen_senders = ?state.submissions.iter().map(|s| s.sender.to_string()).collect::<Vec<_>>(),
! 527                 variant_count = variants.len(),
  528                 variants = ?variants,
! 529                 "All expected coprocessors submitted but no consensus event was observed"
  530             );
  531             self.deferred_consensus_timeout += 1;
  532             self.open_handles.remove(&handle);
  533         }

  534     }
  535 
  536     fn finish_if_complete(&mut self, handle: CiphertextDigest) {
  537         let Some(state) = self.open_handles.get(&handle) else {
! 538             return;
  539         };
  540 
  541         if state.submissions.len() < state.expected_senders.len() {
  542             return;

  564 
  565     /// Finalize a rebuild replay: check deferred consensus results and evaluate
  566     /// all open handles against the current chain tip. Called by
  567     /// `rebuild_drift_detector` in `gw_listener.rs` after log replay completes.
! 568     pub(crate) async fn end_of_rebuild(&mut self, db_pool: &Pool<Postgres>) -> anyhow::Result<()> {
! 569         self.refresh_pending_consensus_checks(db_pool).await?;
! 570         self.evaluate_open_handles(Instant::now());
! 571         Ok(())
! 572     }
  573 
  574     fn classify_handle(&self, state: &HandleState, now: Instant) -> HandleOutcome {
  575         if let Some(consensus) = &state.consensus {
  576             if !state.local_consensus_checked {

  590                     HandleOutcome::NotAllCoprocessorsSubmitted
  591                 } else {
  592                     HandleOutcome::Pending
  593                 };
! 594             }
  595 
! 596             unreachable!("handle should have been removed by finish_if_complete");
  597         }
  598 
  599         if now.duration_since(state.first_seen_at) >= self.drift_no_consensus_timeout {
  600             HandleOutcome::GatewayNeverReachedConsensus

  605 }
  606 
  607 fn has_multiple_variants(submissions: &[Submission]) -> bool {
  608     let Some(first) = submissions.first() else {
! 609         return false;
  610     };
  611     submissions[1..].iter().any(|s| s.digests != first.digests)
  612 }

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

  155         let sender_seed_block = replay_start_block
  156             .map(|block| block.saturating_sub(1))
  157             .or(last_processed_block_num);
  158         let expected_senders = if let Some(gw_config_addr) = self.conf.gateway_config_address {
! 159             match self
! 160                 .fetch_expected_senders(gw_config_addr, sender_seed_block)
! 161                 .await
  162             {
! 163                 Ok(senders) => senders,
! 164                 Err(e) => {
! 165                     error!(error = %e, "Failed to fetch expected tx-senders; drift detection disabled until GatewayConfig event arrives");
! 166                     Vec::new()
  167                 }
  168             }
  169         } else {
  170             Vec::new()

  184                     last_processed_block_num,
  185                 )
  186                 .await
  187             {
! 188                 error!(error = %e, "Failed to rebuild drift detector; continuing with partial state");
  189             }
  190         }
  191 
  192         let filter_addresses = {

  191 
  192         let filter_addresses = {
  193             let mut addrs = vec![self.kms_generation_address, self.input_verification_address];
  194             if let Some(addr) = self.conf.ciphertext_commits_address {
! 195                 addrs.push(addr);
  196             }
  197             if let Some(addr) = self.conf.gateway_config_address {
! 198                 addrs.push(addr);
  199             }
  200             addrs
  201         };

  255                     for log in logs {
  256                         match log.address() {
  257                             a if a == self.input_verification_address => {
  258                                 if replay_from_block.is_some() && self.conf.replay_skip_verify_proof {
! 259                                     debug!(log = ?log, "Skipping VerifyProofRequest during replay");
! 260                                     continue;
  261                                 }
  262                                 if let Ok(event) = InputVerification::InputVerificationEvents::decode_log(&log.inner) {
  263                                     // This listener only reacts to proof requests. Other known InputVerification
  264                                     // events are expected when multiple coprocessors interact with the gateway.

  266                                         self.verify_proof_request(db_pool, request, log.clone()).await.
  267                                             inspect(|_| {
  268                                                 verify_proof_success += 1;
  269                                             }).inspect_err(|e| {
! 270                                                 error!(error = %e, "VerifyProofRequest processing failed");
! 271                                                 VERIFY_PROOF_FAIL_COUNTER.inc();
! 272                                         })?;
! 273                                     }
  274                                 } else {
! 275                                     error!(log = ?log, "Failed to decode InputVerification event log");
  276                                 }
  277                             }
  278                             a if a == self.kms_generation_address => {
  279                                 if let Ok(event) = KMSGeneration::KMSGenerationEvents::decode_log(&log.inner) {

  285                                                 Ok(_) => {
  286                                                     activate_crs_success += 1;
  287                                                     info!("ActivateCrs event successful");
  288                                                 },
! 289                                                 Err(e) if e.is::<DigestMismatchError>() => {
! 290                                                     crs_digest_mismatch += 1;
! 291                                                     error!(error = %e, "CRS digest mismatch, ignoring event");
  292                                                 }
! 293                                                 Err(e) => {
! 294                                                     ACTIVATE_CRS_FAIL_COUNTER.inc();
! 295                                                     return Err(e);
  296                                                 }
  297                                             }
  298                                         },
  299                                         // IMPORTANT: See comment above.

  313                                                 }
  314                                             };
  315                                         },
  316                                         _ => {
! 317                                             error!(log = ?log, "Unknown KMSGeneration event")
  318                                         }
  319                                     }
  320                                 } else {
! 321                                     error!(log = ?log, "Failed to decode KMSGeneration event log");
  322                                 }
  323                             }
! 324                             a if Some(a) == self.conf.ciphertext_commits_address => {
! 325                                 if let Err(e) = self.process_ciphertext_commits_log(
! 326                                     &mut drift_detector,
! 327                                     log,
! 328                                     to_block,
! 329                                     db_pool,
  330                                 )
! 331                                 .await {
! 332                                     error!(error = %e, "Failed to process CiphertextCommits log");
! 333                                 }
  334                             }
! 335                             a if Some(a) == self.conf.gateway_config_address => {
! 336                                 if let Err(e) = self.process_gateway_config_log(&mut drift_detector, log) {
! 337                                     error!(error = %e, "Failed to process GatewayConfig log");
! 338                                 }
  339                             }
  340                             _ => {
! 341                                 error!(log = ?log, "Unexpected log address");
  342                             }
  343                         }
  344                     }
  345                     if let Err(e) = drift_detector.end_of_batch(db_pool).await {
! 346                         error!(error = %e, "Drift detector end_of_batch failed");
  347                     }
  348                     last_processed_block_num = Some(to_block);
  349                     if replay_from_block.is_some() {
  350                         if to_block == current_block {

  389         }
  390         Ok(())
  391     }
  392 
! 393     async fn fetch_expected_senders(
! 394         &self,
! 395         gateway_config_address: Address,
! 396         at_block: Option<u64>,
! 397     ) -> anyhow::Result<Vec<Address>> {
! 398         let gateway_config = GatewayConfig::new(gateway_config_address, self.provider.clone());
! 399         let call = gateway_config.getCoprocessorTxSenders();
! 400         let senders = match at_block {
! 401             Some(block) => call.block(BlockId::number(block)).call().await?,
! 402             None => call.call().await?,
  403         };
  404 
! 405         if senders.is_empty() {
! 406             anyhow::bail!("GatewayConfig returned no coprocessor tx-senders");
! 407         }
  408 
! 409         Ok(senders)
! 410     }
  411 
  412     /// Reconstruct the drift detector's in-memory state after a restart.
  413     ///
  414     /// The detector tracks open ciphertext-commit handles in memory. On restart

  428     ) -> anyhow::Result<()> {
  429         let Some(ciphertext_commits_address) = self.conf.ciphertext_commits_address else {
  430             return Ok(());
  431         };
! 432         let (Some(from_block), Some(to_block)) =
! 433             (earliest_open_ct_commits_block, last_processed_block_num)
  434         else {
! 435             return Ok(());
  436         };
! 437         if from_block > to_block {
! 438             return Ok(());
! 439         }
  440 
! 441         info!(
  442             from_block,
! 443             to_block, "Rebuilding drift detector from persisted watermark"
  444         );
! 445         drift_detector.set_replaying(true);
  446 
! 447         let mut batch_from = from_block;
! 448         while batch_from <= to_block {
! 449             let batch_to = std::cmp::min(
! 450                 batch_from.saturating_add(self.conf.get_logs_block_batch_size.saturating_sub(1)),
! 451                 to_block,
  452             );
! 453             let filter = Filter::new()
! 454                 .address(
! 455                     [
! 456                         Some(ciphertext_commits_address),
! 457                         self.conf.gateway_config_address,
! 458                     ]
! 459                     .into_iter()
! 460                     .flatten()
! 461                     .collect::<Vec<_>>(),
  462                 )
! 463                 .from_block(batch_from)
! 464                 .to_block(batch_to);
! 465             let logs = self
! 466                 .provider
! 467                 .get_logs(&filter)
! 468                 .await
! 469                 .inspect(|_| {
! 470                     GET_LOGS_SUCCESS_COUNTER.inc();
! 471                 })
! 472                 .inspect_err(|_| {
! 473                     GET_LOGS_FAIL_COUNTER.inc();
! 474                 })?;
  475 
! 476             for log in logs {
! 477                 if log.address() == ciphertext_commits_address {
! 478                     self.process_ciphertext_commits_log(drift_detector, log, batch_to, db_pool)
! 479                         .await?;
! 480                 } else if Some(log.address()) == self.conf.gateway_config_address {
! 481                     self.process_gateway_config_log(drift_detector, log)?;
  482                 } else {
! 483                     error!(log = ?log, "Unexpected log address while rebuilding drift detector");
  484                 }
  485             }
  486 
! 487             batch_from = batch_to.saturating_add(1);
  488         }
! 489         drift_detector.set_replaying(false);
! 490         drift_detector.end_of_rebuild(db_pool).await?;
! 491         drift_detector.flush_metrics();
! 492         Ok(())
  493     }
  494 
! 495     async fn process_ciphertext_commits_log(
! 496         &self,
! 497         drift_detector: &mut DriftDetector,
! 498         log: Log,
! 499         fallback_block: u64,
! 500         db_pool: &Pool<Postgres>,
! 501     ) -> anyhow::Result<()> {
! 502         let context = EventContext {
! 503             block_number: log.block_number.unwrap_or(fallback_block),
! 504             block_hash: log.block_hash,
! 505             tx_hash: log.transaction_hash,
! 506             log_index: log.log_index,
! 507             observed_at: Instant::now(),
! 508         };
! 509         if let Ok(event) = CiphertextCommits::CiphertextCommitsEvents::decode_log(&log.inner) {
! 510             match event.data {
! 511                 CiphertextCommits::CiphertextCommitsEvents::AddCiphertextMaterial(e) => {
! 512                     drift_detector.observe_submission(e, context);
! 513                 }
! 514                 CiphertextCommits::CiphertextCommitsEvents::AddCiphertextMaterialConsensus(e) => {
! 515                     drift_detector.handle_consensus(e, context, db_pool).await?;
  516                 }
! 517                 _ => {}
  518             }
  519         } else {
! 520             error!(log = ?log, "Failed to decode CiphertextCommits event log");
  521         }
! 522         Ok(())
! 523     }
  524 
! 525     fn process_gateway_config_log(
! 526         &self,
! 527         drift_detector: &mut DriftDetector,
! 528         log: Log,
! 529     ) -> anyhow::Result<()> {
! 530         let Ok(event) = GatewayConfig::GatewayConfigEvents::decode_log(&log.inner) else {
! 531             error!(log = ?log, "Failed to decode GatewayConfig event log");
! 532             return Ok(());
  533         };
  534 
! 535         if let GatewayConfig::GatewayConfigEvents::UpdateCoprocessors(update) = event.data {
! 536             let expected_senders = update
! 537                 .newCoprocessors
! 538                 .into_iter()
! 539                 .map(|coprocessor| coprocessor.txSenderAddress)
! 540                 .collect::<Vec<_>>();
! 541             if expected_senders.is_empty() {
! 542                 anyhow::bail!("GatewayConfig update removed all coprocessor tx-senders");
! 543             }
! 544             info!(
  545                 block_number = ?log.block_number,
  546                 tx_hash = ?log.transaction_hash,
  547                 expected_senders = ?expected_senders,
! 548                 "Refreshing expected coprocessor tx-senders from GatewayConfig"
  549             );
! 550             drift_detector.set_current_expected_senders(expected_senders);
! 551         }
  552 
! 553         Ok(())
! 554     }
  555 
  556     async fn verify_proof_request(
  557         &self,
  558         db_pool: &Pool<Postgres>,

  766             .is_multiple_of(self.conf.log_last_processed_every_number_of_updates)
  767         {
  768             info!(
  769                 last_block_num,
! 770                 earliest_open_ct_commits_block, "Updated listener progress"
  771             );
  772         }
  773         Ok(())
  774     }

Copy link
Copy Markdown
Collaborator

@dartdart26 dartdart26 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just put one comment re grace periods, please let me know what you think of it.

Eikix added 3 commits March 16, 2026 13:17
…lock Duration

The gateway chain only produces blocks when there are transactions, making
block count an unreliable proxy for elapsed time. Switch drift detection
from block-number arithmetic to std::time::Instant/Duration:

- ConfigSettings and CLI args use Duration (humantime format, e.g. "5m")
- EventContext carries observed_at: Instant
- HandleState/ConsensusState carry first_seen_at/received_at: Instant
- classify_handle compares wall-clock elapsed time against Duration thresholds
- Block numbers retained for logging/metrics (still useful diagnostics)
- Removes get_block_number RPC call from rebuild_drift_detector (no longer needed)
Resolve conflict in test-suite/fhevm/fhevm-cli by taking main's version
(tab indentation, negative-acl test type) and re-adding our
ciphertext-drift test type.
dartdart26
dartdart26 previously approved these changes Mar 16, 2026
Copy link
Copy Markdown
Collaborator

@dartdart26 dartdart26 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some nits, will let you decide if you want to address them or not. Otherwise, approved.

- Prefix DriftDetector fields with `drift_` for consistency with ConfigSettings
- Move classify_handle from free function to &self method on DriftDetector
@Eikix
Copy link
Copy Markdown
Contributor Author

Eikix commented Mar 16, 2026

@Mergifyio queue

@mergify
Copy link
Copy Markdown

mergify bot commented Mar 16, 2026

Merge Queue Status

  • 🟠 Waiting for queue conditions
  • ⏳ Enter queue
  • ⏳ Run checks
  • ⏳ Merge
Required conditions to enter a queue
  • -closed [📌 queue requirement]
  • -conflict [📌 queue requirement]
  • -draft [📌 queue requirement]
  • any of [📌 queue -> configuration change requirements]:
    • -mergify-configuration-changed
    • check-success = Configuration changed
  • any of [🔀 queue conditions]:
    • all of [📌 queue conditions of queue main]:
      • #approved-reviews-by >= 1 [🛡 GitHub branch protection]
      • #changes-requested-reviews-by = 0 [🛡 GitHub branch protection]
      • #review-threads-unresolved = 0 [🛡 GitHub branch protection]
      • base = main
      • branch-protection-review-decision = APPROVED [🛡 GitHub branch protection]
      • label!=do-not-merge
      • any of [🛡 GitHub branch protection]:
        • check-success = common-pull-request/lint (bpr)
        • check-neutral = common-pull-request/lint (bpr)
        • check-skipped = common-pull-request/lint (bpr)
      • any of [🛡 GitHub branch protection]:
        • check-skipped = coprocessor-cargo-listener-tests/cargo-tests (bpr)
        • check-neutral = coprocessor-cargo-listener-tests/cargo-tests (bpr)
        • check-success = coprocessor-cargo-listener-tests/cargo-tests (bpr)
      • any of [🛡 GitHub branch protection]:
        • check-success = coprocessor-cargo-test/cargo-tests (bpr)
        • check-neutral = coprocessor-cargo-test/cargo-tests (bpr)
        • check-skipped = coprocessor-cargo-test/cargo-tests (bpr)
      • any of [🛡 GitHub branch protection]:
        • check-success = coprocessor-dependency-analysis/dependencies-check (bpr)
        • check-neutral = coprocessor-dependency-analysis/dependencies-check (bpr)
        • check-skipped = coprocessor-dependency-analysis/dependencies-check (bpr)
      • any of [🛡 GitHub branch protection]:
        • check-skipped = gateway-contracts-deployment-tests/sc-deploy (bpr)
        • check-neutral = gateway-contracts-deployment-tests/sc-deploy (bpr)
        • check-success = gateway-contracts-deployment-tests/sc-deploy (bpr)
      • any of [🛡 GitHub branch protection]:
        • check-skipped = kms-connector-tests/test-connector (bpr)
        • check-neutral = kms-connector-tests/test-connector (bpr)
        • check-success = kms-connector-tests/test-connector (bpr)

@mergify
Copy link
Copy Markdown

mergify bot commented Mar 16, 2026

Merge Queue Status

This pull request spent 2 hours 49 minutes 13 seconds in the queue, including 1 hour 48 minutes 22 seconds running CI.

Required conditions to merge
  • #approved-reviews-by >= 1 [🛡 GitHub branch protection]
  • #changes-requested-reviews-by = 0 [🛡 GitHub branch protection]
  • #review-threads-unresolved = 0 [🛡 GitHub branch protection]
  • branch-protection-review-decision = APPROVED [🛡 GitHub branch protection]
  • check-success = run-e2e-tests / fhevm-e2e-test
  • any of [🛡 GitHub branch protection]:
    • check-success = common-pull-request/lint (bpr)
    • check-neutral = common-pull-request/lint (bpr)
    • check-skipped = common-pull-request/lint (bpr)
  • any of [🛡 GitHub branch protection]:
    • check-skipped = coprocessor-cargo-listener-tests/cargo-tests (bpr)
    • check-neutral = coprocessor-cargo-listener-tests/cargo-tests (bpr)
    • check-success = coprocessor-cargo-listener-tests/cargo-tests (bpr)
  • any of [🛡 GitHub branch protection]:
    • check-success = coprocessor-cargo-test/cargo-tests (bpr)
    • check-neutral = coprocessor-cargo-test/cargo-tests (bpr)
    • check-skipped = coprocessor-cargo-test/cargo-tests (bpr)
  • any of [🛡 GitHub branch protection]:
    • check-success = coprocessor-dependency-analysis/dependencies-check (bpr)
    • check-neutral = coprocessor-dependency-analysis/dependencies-check (bpr)
    • check-skipped = coprocessor-dependency-analysis/dependencies-check (bpr)
  • any of [🛡 GitHub branch protection]:
    • check-skipped = gateway-contracts-deployment-tests/sc-deploy (bpr)
    • check-neutral = gateway-contracts-deployment-tests/sc-deploy (bpr)
    • check-success = gateway-contracts-deployment-tests/sc-deploy (bpr)
  • any of [🛡 GitHub branch protection]:
    • check-skipped = kms-connector-tests/test-connector (bpr)
    • check-neutral = kms-connector-tests/test-connector (bpr)
    • check-success = kms-connector-tests/test-connector (bpr)

@mergify mergify bot removed the merge-queued label Mar 16, 2026
mergify bot added a commit that referenced this pull request Mar 16, 2026
@mergify mergify bot merged commit 1272b10 into main Mar 16, 2026
67 of 68 checks passed
@mergify mergify bot deleted the coproc-drift-heal branch March 16, 2026 19:38
@mergify mergify bot removed the queued label Mar 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants