Skip to content

Commit 942c183

Browse files
committed
ci(test-suite): run e2e tests with 2-of-2 coprocessor consensus
Deploy with --coprocessors 2 --coprocessor-threshold 2 so both coprocessors must independently compute identical ciphertext digests for on-chain consensus to be reached. All existing tests pass unchanged — consensus enforcement is transparent. Adds a consensus watchdog (Mocha root hook) that monitors gateway chain events during tests: - Detects ciphertext digest divergence immediately - Detects consensus stalls within 3 minutes - No-op when GATEWAY_RPC_URL is unset (single-coprocessor runs) Closes zama-ai/fhevm-internal#1132
1 parent 8899d43 commit 942c183

File tree

5 files changed

+348
-3
lines changed

5 files changed

+348
-3
lines changed

.github/workflows/test-suite-e2e-operators-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ jobs:
125125
SNS_WORKER_VERSION: ${{ inputs.sns_worker_version }}
126126
ZKPROOF_WORKER_VERSION: ${{ inputs.zkproof_worker_version }}
127127
run: |
128-
./fhevm-cli deploy
128+
./fhevm-cli deploy --coprocessors 2 --coprocessor-threshold 2
129129
130130
- name: All operators tests
131131
working-directory: test-suite/fhevm

.github/workflows/test-suite-e2e-tests.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ jobs:
140140
- name: Deploy fhevm Stack
141141
working-directory: test-suite/fhevm
142142
run: |
143-
./fhevm-cli deploy
143+
./fhevm-cli deploy --coprocessors 2 --coprocessor-threshold 2
144144
145145
# E2E tests on pausing the Host contracts
146146
- name: Pause Host Contracts
@@ -247,6 +247,15 @@ jobs:
247247
echo "::group::TFHE Worker"
248248
./fhevm-cli logs coprocessor-tfhe-worker
249249
echo "::endgroup::"
250+
echo "::group::Coprocessor 2 - SNS Worker"
251+
./fhevm-cli logs coprocessor-2-sns-worker 2>/dev/null | grep -v "Selected 0 rows to process" || true
252+
echo "::endgroup::"
253+
echo "::group::Coprocessor 2 - Transaction Sender (filtered)"
254+
./fhevm-cli logs coprocessor-2-transaction-sender 2>/dev/null | grep -v "Selected 0 rows to process" || true
255+
echo "::endgroup::"
256+
echo "::group::Coprocessor 2 - TFHE Worker"
257+
./fhevm-cli logs coprocessor-2-tfhe-worker 2>/dev/null || true
258+
echo "::endgroup::"
250259
251260
- name: Cleanup
252261
working-directory: test-suite/fhevm

test-suite/e2e/hardhat.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ const config: HardhatUserConfig = {
165165
defaultNetwork: DEFAULT_NETWORK,
166166
mocha: {
167167
timeout: 300000,
168+
rootHooks: require('./test/consensusWatchdog').mochaHooks,
168169
},
169170
gasReporter: {
170171
currency: 'USD',
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
import { ethers } from 'ethers';
2+
3+
// Minimal ABIs — only the events we need to monitor.
4+
const CIPHERTEXT_COMMITS_ABI = [
5+
'event AddCiphertextMaterial(bytes32 indexed ctHandle, uint256 keyId, bytes32 ciphertextDigest, bytes32 snsCiphertextDigest, address coprocessorTxSender)',
6+
'event AddCiphertextMaterialConsensus(bytes32 indexed ctHandle, uint256 keyId, bytes32 ciphertextDigest, bytes32 snsCiphertextDigest, address[] coprocessorTxSenders)',
7+
];
8+
9+
const INPUT_VERIFICATION_ABI = [
10+
'event VerifyProofResponseCall(uint256 indexed zkProofId, bytes32[] ctHandles, bytes signature, address coprocessorTxSender, bytes extraData)',
11+
'event VerifyProofResponse(uint256 indexed zkProofId, bytes32[] ctHandles, bytes[] signatures)',
12+
];
13+
14+
const CONSENSUS_TIMEOUT_MS = 3 * 60 * 1000; // 3 minutes
15+
const POLL_INTERVAL_MS = 2_000;
16+
17+
interface CiphertextSubmission {
18+
coprocessor: string;
19+
ciphertextDigest: string;
20+
snsCiphertextDigest: string;
21+
keyId: bigint;
22+
}
23+
24+
interface ProofSubmission {
25+
coprocessor: string;
26+
ctHandles: string[];
27+
}
28+
29+
interface PendingHandle {
30+
firstSeenAt: number;
31+
submissions: CiphertextSubmission[];
32+
consensusReached: boolean;
33+
}
34+
35+
interface PendingProof {
36+
firstSeenAt: number;
37+
submissions: ProofSubmission[];
38+
consensusReached: boolean;
39+
}
40+
41+
class ConsensusWatchdog {
42+
private provider: ethers.JsonRpcProvider;
43+
private ciphertextCommits: ethers.Contract;
44+
private inputVerification: ethers.Contract;
45+
private pendingHandles = new Map<string, PendingHandle>();
46+
private pendingProofs = new Map<string, PendingProof>();
47+
private divergences: string[] = [];
48+
private pollTimer: ReturnType<typeof setInterval> | null = null;
49+
private lastBlock = 0;
50+
51+
constructor(gatewayRpcUrl: string, ciphertextCommitsAddress: string, inputVerificationAddress: string) {
52+
this.provider = new ethers.JsonRpcProvider(gatewayRpcUrl);
53+
this.ciphertextCommits = new ethers.Contract(ciphertextCommitsAddress, CIPHERTEXT_COMMITS_ABI, this.provider);
54+
this.inputVerification = new ethers.Contract(inputVerificationAddress, INPUT_VERIFICATION_ABI, this.provider);
55+
}
56+
57+
async start(): Promise<void> {
58+
this.lastBlock = await this.provider.getBlockNumber();
59+
this.pollTimer = setInterval(() => this.poll(), POLL_INTERVAL_MS);
60+
}
61+
62+
async stop(): Promise<void> {
63+
if (this.pollTimer) {
64+
clearInterval(this.pollTimer);
65+
this.pollTimer = null;
66+
}
67+
this.provider.destroy();
68+
}
69+
70+
private async poll(): Promise<void> {
71+
try {
72+
const currentBlock = await this.provider.getBlockNumber();
73+
if (currentBlock <= this.lastBlock) return;
74+
75+
const fromBlock = this.lastBlock + 1;
76+
const toBlock = currentBlock;
77+
78+
await Promise.all([
79+
this.pollCiphertextEvents(fromBlock, toBlock),
80+
this.pollInputVerificationEvents(fromBlock, toBlock),
81+
]);
82+
83+
this.lastBlock = toBlock;
84+
} catch (err) {
85+
// Transient RPC errors shouldn't crash the watchdog — log and retry next poll.
86+
console.warn('[consensus-watchdog] poll error:', (err as Error).message);
87+
}
88+
}
89+
90+
private async pollCiphertextEvents(fromBlock: number, toBlock: number): Promise<void> {
91+
const [submissions, consensuses] = await Promise.all([
92+
this.ciphertextCommits.queryFilter(
93+
this.ciphertextCommits.filters.AddCiphertextMaterial(),
94+
fromBlock,
95+
toBlock,
96+
),
97+
this.ciphertextCommits.queryFilter(
98+
this.ciphertextCommits.filters.AddCiphertextMaterialConsensus(),
99+
fromBlock,
100+
toBlock,
101+
),
102+
]);
103+
104+
for (const event of submissions) {
105+
const log = event as ethers.EventLog;
106+
const ctHandle = log.args[0] as string;
107+
const keyId = log.args[1] as bigint;
108+
const ciphertextDigest = log.args[2] as string;
109+
const snsCiphertextDigest = log.args[3] as string;
110+
const coprocessor = log.args[4] as string;
111+
112+
if (!this.pendingHandles.has(ctHandle)) {
113+
this.pendingHandles.set(ctHandle, {
114+
firstSeenAt: Date.now(),
115+
submissions: [],
116+
consensusReached: false,
117+
});
118+
}
119+
120+
const pending = this.pendingHandles.get(ctHandle)!;
121+
pending.submissions.push({ coprocessor, ciphertextDigest, snsCiphertextDigest, keyId });
122+
123+
// Check for divergence: compare all submissions for this handle.
124+
this.checkCiphertextDivergence(ctHandle, pending);
125+
}
126+
127+
for (const event of consensuses) {
128+
const log = event as ethers.EventLog;
129+
const ctHandle = log.args[0] as string;
130+
const pending = this.pendingHandles.get(ctHandle);
131+
if (pending) {
132+
pending.consensusReached = true;
133+
}
134+
}
135+
}
136+
137+
private async pollInputVerificationEvents(fromBlock: number, toBlock: number): Promise<void> {
138+
const [responses, consensuses] = await Promise.all([
139+
this.inputVerification.queryFilter(
140+
this.inputVerification.filters.VerifyProofResponseCall(),
141+
fromBlock,
142+
toBlock,
143+
),
144+
this.inputVerification.queryFilter(
145+
this.inputVerification.filters.VerifyProofResponse(),
146+
fromBlock,
147+
toBlock,
148+
),
149+
]);
150+
151+
for (const event of responses) {
152+
const log = event as ethers.EventLog;
153+
const zkProofId = String(log.args[0]);
154+
const ctHandles = log.args[1] as string[];
155+
const coprocessor = log.args[3] as string;
156+
157+
if (!this.pendingProofs.has(zkProofId)) {
158+
this.pendingProofs.set(zkProofId, {
159+
firstSeenAt: Date.now(),
160+
submissions: [],
161+
consensusReached: false,
162+
});
163+
}
164+
165+
const pending = this.pendingProofs.get(zkProofId)!;
166+
pending.submissions.push({ coprocessor, ctHandles: [...ctHandles] });
167+
168+
this.checkProofDivergence(zkProofId, pending);
169+
}
170+
171+
for (const event of consensuses) {
172+
const log = event as ethers.EventLog;
173+
const zkProofId = String(log.args[0]);
174+
const pending = this.pendingProofs.get(zkProofId);
175+
if (pending) {
176+
pending.consensusReached = true;
177+
}
178+
}
179+
}
180+
181+
private checkCiphertextDivergence(ctHandle: string, pending: PendingHandle): void {
182+
if (pending.submissions.length < 2) return;
183+
184+
const first = pending.submissions[0];
185+
for (let i = 1; i < pending.submissions.length; i++) {
186+
const sub = pending.submissions[i];
187+
if (sub.ciphertextDigest !== first.ciphertextDigest || sub.snsCiphertextDigest !== first.snsCiphertextDigest) {
188+
const msg =
189+
`[consensus-watchdog] CIPHERTEXT DIVERGENCE for handle ${ctHandle}\n` +
190+
` Coprocessor ${first.coprocessor}: ctDigest=${first.ciphertextDigest} snsDigest=${first.snsCiphertextDigest}\n` +
191+
` Coprocessor ${sub.coprocessor}: ctDigest=${sub.ciphertextDigest} snsDigest=${sub.snsCiphertextDigest}`;
192+
console.error(msg);
193+
this.divergences.push(msg);
194+
}
195+
}
196+
}
197+
198+
private checkProofDivergence(zkProofId: string, pending: PendingProof): void {
199+
if (pending.submissions.length < 2) return;
200+
201+
const first = pending.submissions[0];
202+
const firstHandles = first.ctHandles.join(',');
203+
for (let i = 1; i < pending.submissions.length; i++) {
204+
const sub = pending.submissions[i];
205+
const subHandles = sub.ctHandles.join(',');
206+
if (firstHandles !== subHandles) {
207+
const msg =
208+
`[consensus-watchdog] INPUT VERIFICATION DIVERGENCE for zkProofId ${zkProofId}\n` +
209+
` Coprocessor ${first.coprocessor}: handles=[${firstHandles}]\n` +
210+
` Coprocessor ${sub.coprocessor}: handles=[${subHandles}]`;
211+
console.error(msg);
212+
this.divergences.push(msg);
213+
}
214+
}
215+
}
216+
217+
/**
218+
* Check for divergences (instant) and stalls (3-minute timeout).
219+
* Called in afterEach to fail the current test if consensus is broken.
220+
*/
221+
checkHealth(): void {
222+
// Force a sync check of divergences accumulated since last poll.
223+
if (this.divergences.length > 0) {
224+
const msg = this.divergences.join('\n\n');
225+
this.divergences = [];
226+
throw new Error(`Consensus divergence detected:\n\n${msg}`);
227+
}
228+
229+
// Check for stalls: handles that received a first submission but no consensus within timeout.
230+
const now = Date.now();
231+
232+
for (const [ctHandle, pending] of this.pendingHandles) {
233+
if (pending.consensusReached) continue;
234+
const elapsed = now - pending.firstSeenAt;
235+
if (elapsed > CONSENSUS_TIMEOUT_MS) {
236+
const coprocessors = pending.submissions.map((s) => s.coprocessor).join(', ');
237+
throw new Error(
238+
`Consensus stall for ciphertext handle ${ctHandle}: ` +
239+
`only ${pending.submissions.length} coprocessor(s) submitted after ${Math.round(elapsed / 1000)}s ` +
240+
`(submitters: ${coprocessors})`,
241+
);
242+
}
243+
}
244+
245+
for (const [zkProofId, pending] of this.pendingProofs) {
246+
if (pending.consensusReached) continue;
247+
const elapsed = now - pending.firstSeenAt;
248+
if (elapsed > CONSENSUS_TIMEOUT_MS) {
249+
const coprocessors = pending.submissions.map((s) => s.coprocessor).join(', ');
250+
throw new Error(
251+
`Consensus stall for input verification zkProofId ${zkProofId}: ` +
252+
`only ${pending.submissions.length} coprocessor(s) submitted after ${Math.round(elapsed / 1000)}s ` +
253+
`(submitters: ${coprocessors})`,
254+
);
255+
}
256+
}
257+
}
258+
259+
/** Summary for afterAll — reports any remaining pending handles. */
260+
summary(): string | null {
261+
const pendingCt = [...this.pendingHandles.entries()].filter(([, p]) => !p.consensusReached);
262+
const pendingPf = [...this.pendingProofs.entries()].filter(([, p]) => !p.consensusReached);
263+
const resolvedCt = [...this.pendingHandles.values()].filter((p) => p.consensusReached).length;
264+
const resolvedPf = [...this.pendingProofs.values()].filter((p) => p.consensusReached).length;
265+
266+
const lines: string[] = [];
267+
lines.push(`[consensus-watchdog] Summary: ${resolvedCt} ciphertext(s) and ${resolvedPf} proof(s) reached consensus.`);
268+
269+
if (pendingCt.length > 0) {
270+
lines.push(` WARNING: ${pendingCt.length} ciphertext handle(s) never reached consensus:`);
271+
for (const [handle, p] of pendingCt) {
272+
lines.push(` - ${handle} (${p.submissions.length} submission(s))`);
273+
}
274+
}
275+
276+
if (pendingPf.length > 0) {
277+
lines.push(` WARNING: ${pendingPf.length} proof(s) never reached consensus:`);
278+
for (const [id, p] of pendingPf) {
279+
lines.push(` - zkProofId ${id} (${p.submissions.length} submission(s))`);
280+
}
281+
}
282+
283+
return lines.join('\n');
284+
}
285+
}
286+
287+
// Singleton — shared across all tests in a Mocha run.
288+
let watchdog: ConsensusWatchdog | null = null;
289+
290+
function isEnabled(): boolean {
291+
return !!(process.env.GATEWAY_RPC_URL && process.env.CIPHERTEXT_COMMITS_ADDRESS);
292+
}
293+
294+
export const mochaHooks = {
295+
async beforeAll(this: Mocha.Context) {
296+
if (!isEnabled()) return;
297+
298+
const gatewayRpcUrl = process.env.GATEWAY_RPC_URL!;
299+
const ciphertextCommitsAddress = process.env.CIPHERTEXT_COMMITS_ADDRESS!;
300+
const inputVerificationAddress = process.env.INPUT_VERIFICATION_ADDRESS!;
301+
302+
if (!inputVerificationAddress) {
303+
console.warn('[consensus-watchdog] INPUT_VERIFICATION_ADDRESS not set, skipping proof monitoring');
304+
}
305+
306+
console.log(`[consensus-watchdog] Starting — gateway=${gatewayRpcUrl} ciphertextCommits=${ciphertextCommitsAddress}`);
307+
watchdog = new ConsensusWatchdog(gatewayRpcUrl, ciphertextCommitsAddress, inputVerificationAddress);
308+
await watchdog.start();
309+
},
310+
311+
async afterEach(this: Mocha.Context) {
312+
if (!watchdog) return;
313+
314+
// Force one last poll before checking health so we catch recent events.
315+
await (watchdog as any).poll();
316+
watchdog.checkHealth();
317+
},
318+
319+
async afterAll(this: Mocha.Context) {
320+
if (!watchdog) return;
321+
322+
// Final poll + summary.
323+
await (watchdog as any).poll();
324+
const summary = watchdog.summary();
325+
if (summary) console.log(summary);
326+
327+
await watchdog.stop();
328+
watchdog = null;
329+
},
330+
};

test-suite/fhevm/env/staging/.env.test-suite

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,9 @@ FHEVM_EXECUTOR_CONTRACT_ADDRESS=0xcCAe95fF1d11656358E782570dF0418F59fA40e1
3030
# =============================================================================
3131
# SERVICE ENDPOINTS
3232
# =============================================================================
33-
RELAYER_URL=http://fhevm-relayer:3000/v2
33+
RELAYER_URL=http://fhevm-relayer:3000/v2
34+
# =============================================================================
35+
# CONSENSUS WATCHDOG (active when both vars are set)
36+
# =============================================================================
37+
GATEWAY_RPC_URL=http://gateway-node:8546
38+
CIPHERTEXT_COMMITS_ADDRESS=0xF0bFB159C7381F7CB332586004d8247252C5b816

0 commit comments

Comments
 (0)