Skip to content

Commit 1272b10

Browse files
authored
feat(coprocessor): drift detection in gw-listener (#2096)
* feat(coprocessor): add opt-in drift detection to gw-listener Compare local ciphertext digests against on-chain consensus from the CiphertextCommits contract. Enabled via --ciphertext-commits-address. Adds early-warning logging when peer submissions diverge and structured warn logs when local digest mismatches consensus. Four new Prometheus counters provide observability. Detection-only — no recovery action. * fix(coprocessor): propagate consensus errors and defer unresolved drift checks - P1: Propagate DB errors from handle_consensus instead of swallowing them, so the block is retried via the outer backoff loop. - P2: Queue unresolved consensus events (local digest not yet computed) in a bounded pending queue (10k cap) and retry each block tick, so late-arriving local digests are still checked. - Refactor comparison logic into try_resolve_consensus shared by both the initial check and the retry path. * refactor(coprocessor): trim drift detection to m1 * chore(coprocessor): drop m1 wording * feat(coprocessor): track drift variants and lagging coprocessors * refactor(coprocessor): fetch expected senders from gateway config * feat(coprocessor): rebuild drift tracking after restart * fix(coprocessor): silence drift replay on restart * fix(coprocessor): keep consensus checks until local digests exist * chore(coprocessor): apply rustfmt cleanup * Fix drift detector rebuild and sender refresh * Fix catch-up sender seeding and batch finalization * Wire drift detection into gw-listener chart * Bump coprocessor chart version * Add ciphertext drift e2e workflow * Remove stray plan file * Keep e2e drift test env intact * Tighten drift listener and e2e injector * Use DB trigger for ciphertext drift e2e * Fix drift handle completion and e2e exit path * test(e2e): assert ciphertext drift via listener logs * feat(gw-listener): add local attribution fields to drift logs * feat(gw-listener): expose drift tuning histograms * refactor(gw-listener): simplify drift detector and listener internals - Avoid double variant_summaries() calls by binding result once - Add has_multiple_variants() for cheap filter without string allocation - Lazy-clone current_expected_senders only on new handle insertion - Remove dead missing_submission_reported field - Embed EventContext in ConsensusState instead of duplicating fields - Use fetch_optional instead of fetch_all+assert for single-row query - Hoist filter_addresses out of the polling loop * test(drift-detector): add boundary and edge-case tests Cover gaps identified in test audit: - Grace period boundary: handle not evicted within grace, evicted at boundary - No-consensus timeout boundary: handle not evicted within window, evicted at boundary - Consensus before any submission: creates handle, evicts after grace - Equivocation: same sender different digests warns but keeps first submission - Duplicate submission: same sender same digests silently ignored - Local digest not ready: consensus defers check, completes after DB update - Local digest match: no false-positive drift when digests agree - Local check not ready eviction: consensus with pending check times out * refactor(tests): extract make_consensus_state helper, remove dead code Simplify drift_detector tests by extracting a shared constructor, removing an unused pool variable, and adding metric assertions. * Fix review findings: metric pollution, DB waste, completion bug, e2e robustness - Guard CONSENSUS_LATENCY_BLOCKS_HISTOGRAM behind alerts_enabled to prevent rebuild replay from polluting latency metrics - Early-return from try_check_local_consensus when !alerts_enabled to skip unnecessary DB queries during rebuild; handles are re-checked via refresh_pending_consensus_checks once alerts resume - Change finish_if_complete threshold from == to >= so unexpected senders don't prevent handle completion (previously hung until timeout eviction) - Capture injector exit code in e2e script instead of letting set -e abort on wait; report injector failure explicitly - Add test: unexpected_sender_does_not_block_completion * docs(gw-listener): clarify consensus invariant * refactor(gw-listener): simplify drift detector helpers and listener dispatch Inline trivial free functions (has_multiple_variants, seen_sender_strings, missing_sender_strings, address_strings, digest_pair_from_db_digests) at call sites and delete them. Remove redundant .or_else() in sender_seed_block computation. Convert if-else address dispatch to match. Add submit() test helper to compress repetitive observe_submission boilerplate. * rename submit() to submit_digest_event_and_drift_check() in tests * refactor(gw-listener): simplify DriftDetector internals - HandleState::new associated function replaces free function - Inline DeferredMetrics fields directly into DriftDetector - has_multiple_variants() avoids allocation in hot-path variant check - classify_handle + HandleDisposition enum replaces continue chains in evict_stale - end_of_batch/end_of_rebuild facade methods encapsulate multi-step protocols - Fix dead unwrap_or in finish_if_complete (consensus already checked is_some) * docs(gw-listener): document rebuild_drift_detector and facade methods * fix(gw-listener): clarify drift counter description Address review feedback: the old wording "peer submissions or consensus mismatch" was ambiguous. Reword to make the two detection paths explicit. * address review feedback: metrics docs, default timeouts, clap requires - Clarify drift counter description in metrics.rs - Document all 5 drift metrics in docs/metrics/metrics.md - Bump drift timeout defaults from 50/10 to 3000/3000 blocks (~5 min at 100ms block time) to accommodate coprocessors stuck for minutes - Replace manual bail with clap requires on ciphertext_commits_address * refactor(gw-listener): rename alerts_enabled to replaying (inverted) The field name now reflects what the detector is *doing* (replaying historical logs) rather than a side-effect toggle. Inverted semantics: `replaying: true` suppresses warns and DB queries, `false` is normal operation. Also removes redundant `if self.alerts_enabled` guard in try_check_local_consensus — already short-circuited by the early return at the top of the function. * address review feedback: type alias, flush_metrics, tx-senders signature - Add `type CiphertextDigest = FixedBytes<32>` to avoid literal 32 - Document implicit upper bound on open_handles - Remove redundant early-return guard in flush_metrics - fetch_expected_coprocessor_tx_senders now requires gateway_config_address parameter; caller decides whether to call based on config * refactor(gw-listener): rename HandleDisposition to HandleOutcome Use domain-meaningful names: HandleOutcome describes what happened to a handle, not an internal mechanism. Variants renamed: - KeepWaiting → Pending - NoConsensusTimeout → GatewayNeverReachedConsensus - ConsensusUncheckedTimeout → LocalDigestNeverAppeared - MissingSubmissionPostGrace → NotAllCoprocessorsSubmitted * address self-review: fix double-counted metrics, remove dead code, improve clarity - Fix verify_proof metrics being incremented both inside the function and at the call site (bug: double-counting) - Remove unnecessary .clone() on owned event field - Replace unreachable Pending return with unreachable!() - Replace cross-function .unwrap() with let-else in evict_stale - Collapse two identical "local digests not ready" debug branches - Use < instead of != in classify_handle (invariant is always <) - Rename fetch_expected_coprocessor_tx_senders to fetch_expected_senders - Remove redundant replaying guard in finalize_completed_without_consensus * fix(gw-listener): prevent drift detection errors from killing the listener All drift detection code paths now log-and-continue instead of propagating errors to run_get_logs. A transient DB or RPC failure during drift checking should not interrupt processing of VerifyProofRequest, ActivateKey, or ActivateCrs events. * refactor(gw-listener): replace block-based drift timeouts with wall-clock Duration The gateway chain only produces blocks when there are transactions, making block count an unreliable proxy for elapsed time. Switch drift detection from block-number arithmetic to std::time::Instant/Duration: - ConfigSettings and CLI args use Duration (humantime format, e.g. "5m") - EventContext carries observed_at: Instant - HandleState/ConsensusState carry first_seen_at/received_at: Instant - classify_handle compares wall-clock elapsed time against Duration thresholds - Block numbers retained for logging/metrics (still useful diagnostics) - Removes get_block_number RPC call from rebuild_drift_detector (no longer needed) * docs(gw-listener): update metrics docs for wall-clock timeouts, clarify Default is test-only * refactor(gw-listener): address review nits on drift detector - Prefix DriftDetector fields with `drift_` for consistency with ConfigSettings - Move classify_handle from free function to &self method on DriftDetector
1 parent e561ebd commit 1272b10

File tree

15 files changed

+2476
-113
lines changed

15 files changed

+2476
-113
lines changed

.github/workflows/test-suite-e2e-tests.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ jobs:
238238
run: |
239239
./fhevm-cli test hcu-block-cap
240240
241+
- name: Ciphertext drift test
242+
working-directory: test-suite/fhevm
243+
run: |
244+
./fhevm-cli test ciphertext-drift
245+
241246
- name: Host listener poller test
242247
working-directory: test-suite/fhevm
243248
run: |

charts/coprocessor/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: coprocessor
22
description: A helm chart to distribute and deploy Zama fhevm Co-Processor services
3-
version: 0.8.4
3+
version: 0.8.5
44
apiVersion: v2
55
keywords:
66
- fhevm

charts/coprocessor/values.yaml

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,31 @@ gwListener:
417417
replicas: 1
418418

419419
env:
420-
# =========================================================================
421-
# NEW ENVIRONMENT VARIABLES
422-
# =========================================================================
423-
# The address of the KMSGeneration contract
420+
- name: DATABASE_URL
421+
valueFrom:
422+
secretKeyRef:
423+
name: coprocessor-db-url
424+
key: coprocessor-db-url
425+
- name: INPUT_VERIFICATION_ADDRESS
426+
valueFrom:
427+
configMapKeyRef:
428+
name: gateway-sc-addresses
429+
key: input_verification.address
424430
- name: KMS_GENERATION_ADDRESS
425-
value: ""
431+
valueFrom:
432+
configMapKeyRef:
433+
name: gateway-sc-addresses
434+
key: kms_generation.address
435+
- name: CIPHERTEXT_COMMITS_ADDRESS
436+
valueFrom:
437+
configMapKeyRef:
438+
name: gateway-sc-addresses
439+
key: ciphertext_commits.address
440+
- name: GATEWAY_CONFIG_ADDRESS
441+
valueFrom:
442+
configMapKeyRef:
443+
name: gateway-sc-addresses
444+
key: gateway_config.address
426445

427446
# Command line arguments for the gateway listener
428447
args:
@@ -432,6 +451,8 @@ gwListener:
432451
- --gw-url=ws://gateway-rpc-node:8548
433452
- --input-verification-address=$(INPUT_VERIFICATION_ADDRESS)
434453
- --kms-generation-address=$(KMS_GENERATION_ADDRESS)
454+
- --ciphertext-commits-address=$(CIPHERTEXT_COMMITS_ADDRESS)
455+
- --gateway-config-address=$(GATEWAY_CONFIG_ADDRESS)
435456
- --error-sleep-initial-secs=1
436457
- --error-sleep-max-secs=10
437458
- --health-check-port=8080
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE gw_listener_last_block
2+
ADD COLUMN IF NOT EXISTS earliest_open_ct_commits_block BIGINT
3+
CHECK (earliest_open_ct_commits_block >= 0);

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,31 @@ struct Conf {
8989
help = "Skip VerifyProofRequest events during replay"
9090
)]
9191
pub replay_skip_verify_proof: bool,
92+
93+
#[arg(
94+
long,
95+
requires = "gateway_config_address",
96+
help = "CiphertextCommits contract address for drift detection"
97+
)]
98+
ciphertext_commits_address: Option<Address>,
99+
100+
#[arg(
101+
long,
102+
requires = "ciphertext_commits_address",
103+
help = "GatewayConfig contract address used to fetch coprocessor tx-senders"
104+
)]
105+
gateway_config_address: Option<Address>,
106+
107+
/// How long to wait for the gateway to emit a consensus event after the
108+
/// first submission is seen. Wall-clock duration — the default of 5 minutes
109+
/// accommodates coprocessors that may be stuck for a few minutes.
110+
#[arg(long, default_value = "5m", value_parser = parse_duration, requires = "ciphertext_commits_address")]
111+
drift_no_consensus_timeout: Duration,
112+
113+
/// After consensus, how many additional blocks to wait for remaining
114+
/// coprocessors to submit their ciphertext material. Wall-clock duration.
115+
#[arg(long, default_value = "5m", value_parser = parse_duration, requires = "ciphertext_commits_address")]
116+
drift_post_consensus_grace: Duration,
92117
}
93118

94119
fn install_signal_handlers(cancel_token: CancellationToken) -> anyhow::Result<()> {
@@ -171,6 +196,10 @@ async fn main() -> anyhow::Result<()> {
171196
replay_from_block: conf.replay_from_block,
172197
replay_skip_verify_proof: conf.replay_skip_verify_proof,
173198
log_last_processed_every_number_of_updates: conf.log_last_processed_every_number_of_updates,
199+
ciphertext_commits_address: conf.ciphertext_commits_address,
200+
gateway_config_address: conf.gateway_config_address,
201+
drift_no_consensus_timeout: conf.drift_no_consensus_timeout,
202+
drift_post_consensus_grace: conf.drift_post_consensus_grace,
174203
};
175204

176205
let gw_listener = GatewayListener::new(

0 commit comments

Comments
 (0)