Skip to content

Commit b74da88

Browse files
committed
fix(coprocessor): silence drift replay on restart
1 parent a3c2447 commit b74da88

File tree

2 files changed

+106
-5
lines changed

2 files changed

+106
-5
lines changed

coprocessor/fhevm-engine/gw-listener/src/drift_detector.rs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub(crate) struct DriftDetector {
6767
no_consensus_timeout_blocks: u64,
6868
post_consensus_grace_blocks: u64,
6969
deferred_metrics: DeferredMetrics,
70+
alerts_enabled: bool,
7071
}
7172

7273
impl DriftDetector {
@@ -84,9 +85,14 @@ impl DriftDetector {
8485
no_consensus_timeout_blocks,
8586
post_consensus_grace_blocks,
8687
deferred_metrics: DeferredMetrics::default(),
88+
alerts_enabled: true,
8789
}
8890
}
8991

92+
pub(crate) fn set_alerts_enabled(&mut self, alerts_enabled: bool) {
93+
self.alerts_enabled = alerts_enabled;
94+
}
95+
9096
pub(crate) fn observe_submission(
9197
&mut self,
9298
event: CiphertextCommits::AddCiphertextMaterial,
@@ -116,7 +122,7 @@ impl DriftDetector {
116122
.iter()
117123
.find(|submission| submission.sender == event.coprocessorTxSender)
118124
{
119-
if existing.digests != digests {
125+
if self.alerts_enabled && existing.digests != digests {
120126
warn!(
121127
handle = %handle,
122128
host_chain_id = self.host_chain_id.as_i64(),
@@ -141,7 +147,7 @@ impl DriftDetector {
141147
digests,
142148
});
143149

144-
if !state.drift_reported {
150+
if self.alerts_enabled && !state.drift_reported {
145151
let variants = variant_summaries(&state.submissions);
146152
if variants.len() > 1 {
147153
warn!(
@@ -239,8 +245,9 @@ impl DriftDetector {
239245
return Ok(());
240246
};
241247

242-
if event.ciphertextDigest.as_slice() != local_ciphertext_digest.as_slice()
243-
|| event.snsCiphertextDigest.as_slice() != local_ciphertext128_digest.as_slice()
248+
if self.alerts_enabled
249+
&& (event.ciphertextDigest.as_slice() != local_ciphertext_digest.as_slice()
250+
|| event.snsCiphertextDigest.as_slice() != local_ciphertext128_digest.as_slice())
244251
{
245252
warn!(
246253
handle = %handle,
@@ -348,6 +355,58 @@ impl DriftDetector {
348355
self.deferred_metrics = DeferredMetrics::default();
349356
}
350357

358+
pub(crate) fn evaluate_open_handles(&mut self, current_block: u64) {
359+
if !self.alerts_enabled {
360+
return;
361+
}
362+
363+
let drift_handles = self
364+
.open_handles
365+
.iter()
366+
.filter_map(|(handle, state)| {
367+
(!state.drift_reported && variant_summaries(&state.submissions).len() > 1)
368+
.then_some(*handle)
369+
})
370+
.collect::<Vec<_>>();
371+
372+
for handle in drift_handles {
373+
let Some(state) = self.open_handles.get_mut(&handle) else {
374+
continue;
375+
};
376+
warn!(
377+
handle = %handle,
378+
host_chain_id = self.host_chain_id.as_i64(),
379+
local_node_id = %self.local_node_id,
380+
first_seen_block = state.first_seen_block,
381+
first_seen_block_hash = ?state.first_seen_block_hash,
382+
last_seen_block = state.last_seen_block,
383+
variant_count = variant_summaries(&state.submissions).len(),
384+
variants = ?variant_summaries(&state.submissions),
385+
seen_senders = ?seen_sender_strings(&state.submissions),
386+
missing_senders = ?missing_sender_strings(&self.expected_senders, &state.submissions),
387+
source = "peer_submission",
388+
"Drift detected: observed multiple digest variants for handle"
389+
);
390+
state.drift_reported = true;
391+
self.deferred_metrics.drift_detected += 1;
392+
}
393+
394+
let completed_without_consensus = self
395+
.open_handles
396+
.iter()
397+
.filter_map(|(handle, state)| {
398+
(state.submissions.len() == self.expected_senders.len()
399+
&& state.consensus.is_none())
400+
.then_some(*handle)
401+
})
402+
.collect::<Vec<_>>();
403+
for handle in completed_without_consensus {
404+
self.finish_if_complete(handle);
405+
}
406+
407+
self.evict_stale(current_block);
408+
}
409+
351410
pub(crate) fn earliest_open_block(&self) -> Option<u64> {
352411
self.open_handles
353412
.values()
@@ -369,6 +428,10 @@ impl DriftDetector {
369428
return;
370429
}
371430

431+
if !self.alerts_enabled {
432+
return;
433+
}
434+
372435
warn!(
373436
handle = %handle,
374437
host_chain_id = self.host_chain_id.as_i64(),
@@ -556,6 +619,42 @@ mod tests {
556619
assert_eq!(detector.earliest_open_block(), Some(20));
557620
}
558621

622+
#[test]
623+
fn rebuild_replays_silently_then_alerts_once_on_evaluate() {
624+
let mut detector = detector();
625+
let handle = FixedBytes::from([7u8; 32]);
626+
let senders = senders();
627+
628+
detector.set_alerts_enabled(false);
629+
detector.observe_submission(
630+
make_submission_event(
631+
handle,
632+
FixedBytes::from([8u8; 32]),
633+
FixedBytes::from([9u8; 32]),
634+
senders[0],
635+
),
636+
context(10),
637+
);
638+
detector.observe_submission(
639+
make_submission_event(
640+
handle,
641+
FixedBytes::from([10u8; 32]),
642+
FixedBytes::from([11u8; 32]),
643+
senders[1],
644+
),
645+
context(11),
646+
);
647+
648+
assert_eq!(detector.deferred_metrics.drift_detected, 0);
649+
assert!(!detector.open_handles.get(&handle).unwrap().drift_reported);
650+
651+
detector.set_alerts_enabled(true);
652+
detector.evaluate_open_handles(11);
653+
654+
assert_eq!(detector.deferred_metrics.drift_detected, 1);
655+
assert!(detector.open_handles.get(&handle).unwrap().drift_reported);
656+
}
657+
559658
#[test]
560659
fn matching_submissions_keep_single_variant() {
561660
let mut detector = detector();

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
388388
from_block,
389389
to_block, "Rebuilding drift detector from persisted watermark"
390390
);
391+
drift_detector.set_alerts_enabled(false);
391392

392393
let mut batch_from = from_block;
393394
while batch_from <= to_block {
@@ -417,6 +418,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
417418

418419
batch_from = batch_to.saturating_add(1);
419420
}
421+
drift_detector.set_alerts_enabled(true);
420422

421423
let current_block = self
422424
.provider
@@ -428,7 +430,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
428430
.inspect_err(|_| {
429431
GET_BLOCK_NUM_FAIL_COUNTER.inc();
430432
})?;
431-
drift_detector.evict_stale(current_block);
433+
drift_detector.evaluate_open_handles(current_block);
432434
drift_detector.flush_metrics();
433435
Ok(())
434436
}

0 commit comments

Comments
 (0)