Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/test-suite-e2e-operators-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
SNS_WORKER_VERSION: ${{ inputs.sns_worker_version }}
ZKPROOF_WORKER_VERSION: ${{ inputs.zkproof_worker_version }}
run: |
./fhevm-cli deploy
./fhevm-cli deploy --coprocessors 2 --coprocessor-threshold 2

- name: All operators tests
working-directory: test-suite/fhevm
Expand All @@ -150,6 +150,15 @@ jobs:
echo "::group::Transaction Sender Logs (filtered)"
./fhevm-cli logs transaction-sender | grep -v "Selected 0 rows to process"
echo "::endgroup::"
echo "::group::Coprocessor 2 - SNS Worker"
./fhevm-cli logs coprocessor-2-sns-worker 2>/dev/null | grep -v "Selected 0 rows to process" || true
echo "::endgroup::"
echo "::group::Coprocessor 2 - Transaction Sender (filtered)"
./fhevm-cli logs coprocessor-2-transaction-sender 2>/dev/null | grep -v "Selected 0 rows to process" || true
echo "::endgroup::"
echo "::group::Coprocessor 2 - TFHE Worker"
./fhevm-cli logs coprocessor-2-tfhe-worker 2>/dev/null || true
echo "::endgroup::"

- name: Cleanup
working-directory: test-suite/fhevm
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/test-suite-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
- name: Deploy fhevm Stack
working-directory: test-suite/fhevm
run: |
./fhevm-cli deploy
./fhevm-cli deploy --coprocessors 2 --coprocessor-threshold 2

# E2E tests on pausing the Host contracts
- name: Pause Host Contracts
Expand Down Expand Up @@ -247,6 +247,15 @@ jobs:
echo "::group::TFHE Worker"
./fhevm-cli logs coprocessor-tfhe-worker
echo "::endgroup::"
echo "::group::Coprocessor 2 - SNS Worker"
./fhevm-cli logs coprocessor-2-sns-worker 2>/dev/null | grep -v "Selected 0 rows to process" || true
echo "::endgroup::"
echo "::group::Coprocessor 2 - Transaction Sender (filtered)"
./fhevm-cli logs coprocessor-2-transaction-sender 2>/dev/null | grep -v "Selected 0 rows to process" || true
echo "::endgroup::"
echo "::group::Coprocessor 2 - TFHE Worker"
./fhevm-cli logs coprocessor-2-tfhe-worker 2>/dev/null || true
echo "::endgroup::"

- name: Cleanup
working-directory: test-suite/fhevm
Expand Down
1 change: 1 addition & 0 deletions test-suite/e2e/hardhat.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ const config: HardhatUserConfig = {
defaultNetwork: DEFAULT_NETWORK,
mocha: {
timeout: 300000,
rootHooks: require('./test/consensusWatchdog').mochaHooks,
},
gasReporter: {
currency: 'USD',
Expand Down
340 changes: 340 additions & 0 deletions test-suite/e2e/test/consensusWatchdog.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
import { expect } from 'chai';

import { ConsensusWatchdog } from './consensusWatchdog';

// Helper: create a minimal fake EventLog with positional args.
function fakeEvent(...args: unknown[]) {
return { args } as any;
}

// Helper: create a mock provider + contracts on a watchdog instance.
function mockWatchdog(): {
watchdog: ConsensusWatchdog;
setBlock: (n: number) => void;
setCiphertextEvents: (submissions: any[], consensuses: any[]) => void;
setProofEvents: (responses: any[], consensuses: any[]) => void;
} {
const w = new ConsensusWatchdog('http://fake:1234', '0x1111', '0x2222');

let blockNumber = 0;
let ctSubmissions: any[] = [];
let ctConsensuses: any[] = [];
let pfResponses: any[] = [];
let pfConsensuses: any[] = [];

// Destroy the real provider (created by the constructor) before replacing with stub.
(w as any).provider.destroy();
(w as any).provider = {
getBlockNumber: async () => blockNumber,
destroy: () => {},
};

// Replace ciphertextCommits contract with stub.
(w as any).ciphertextCommits = {
filters: {
AddCiphertextMaterial: () => 'ct-sub-filter',
AddCiphertextMaterialConsensus: () => 'ct-con-filter',
},
queryFilter: async (filter: string) => {
return filter === 'ct-sub-filter' ? ctSubmissions : ctConsensuses;
},
};

// Replace inputVerification contract with stub.
(w as any).inputVerification = {
filters: {
VerifyProofResponseCall: () => 'pf-sub-filter',
VerifyProofResponse: () => 'pf-con-filter',
},
queryFilter: async (filter: string) => {
return filter === 'pf-sub-filter' ? pfResponses : pfConsensuses;
},
};

return {
watchdog: w,
setBlock: (n: number) => {
blockNumber = n;
},
setCiphertextEvents: (subs, cons) => {
ctSubmissions = subs;
ctConsensuses = cons;
},
setProofEvents: (resp, cons) => {
pfResponses = resp;
pfConsensuses = cons;
},
};
}

describe('ConsensusWatchdog', function () {
describe('checkHealth — divergence detection', function () {
it('should throw on ciphertext digest divergence', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

// Two coprocessors submit different digests for the same handle.
setCiphertextEvents(
[
fakeEvent('0xhandle1', 1n, '0xdigestA', '0xsnsDigestA', '0xCoprocessor1'),
fakeEvent('0xhandle1', 1n, '0xdigestB', '0xsnsDigestA', '0xCoprocessor2'),
],
[],
);

setBlock(1);
await watchdog.flush();

expect(() => watchdog.checkHealth()).to.throw('Consensus divergence detected');
expect(() => watchdog.checkHealth()).to.not.throw(); // divergences cleared after first throw
});

it('should throw on SNS digest divergence', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

setCiphertextEvents(
[
fakeEvent('0xhandle1', 1n, '0xdigestA', '0xsnsA', '0xCopro1'),
fakeEvent('0xhandle1', 1n, '0xdigestA', '0xsnsB', '0xCopro2'),
],
[],
);

setBlock(1);
await watchdog.flush();

expect(() => watchdog.checkHealth()).to.throw('CIPHERTEXT DIVERGENCE');
});

it('should throw on input verification divergence', async function () {
const { watchdog, setBlock, setProofEvents } = mockWatchdog();

setProofEvents(
[
fakeEvent(42n, ['0xhandleA', '0xhandleB'], '0xsig1', '0xCopro1', '0x'),
fakeEvent(42n, ['0xhandleA', '0xhandleC'], '0xsig2', '0xCopro2', '0x'),
],
[],
);

setBlock(1);
await watchdog.flush();

expect(() => watchdog.checkHealth()).to.throw('INPUT VERIFICATION DIVERGENCE');
});
});

describe('checkHealth — stall detection', function () {
it('should throw when consensus is not reached within timeout', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

// Single submission, no consensus.
setCiphertextEvents([fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1')], []);

setBlock(1);
await watchdog.flush();

// Backdate the firstSeenAt to exceed timeout.
const pending = (watchdog as any).pendingHandles.get('0xhandle1');
pending.firstSeenAt = Date.now() - 4 * 60 * 1000; // 4 minutes ago

expect(() => watchdog.checkHealth()).to.throw('Consensus stall');
expect(() => watchdog.checkHealth()).to.not.throw();
});

it('should not throw when within timeout', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

setCiphertextEvents([fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1')], []);

setBlock(1);
await watchdog.flush();

expect(() => watchdog.checkHealth()).to.not.throw();
});
});

describe('consensus resolution — map pruning', function () {
it('should remove handle from pendingHandles on consensus', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

// First poll: submissions arrive.
setCiphertextEvents(
[
fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1'),
fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro2'),
],
[],
);
setBlock(1);
await watchdog.flush();
expect((watchdog as any).pendingHandles.size).to.equal(1);

// Second poll: consensus event arrives.
setCiphertextEvents([], [fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', ['0xCopro1', '0xCopro2'])]);
setBlock(2);
await watchdog.flush();

expect((watchdog as any).pendingHandles.size).to.equal(0);
expect((watchdog as any).resolvedHandleCount).to.equal(1);
});

it('should remove proof from pendingProofs on consensus', async function () {
const { watchdog, setBlock, setProofEvents } = mockWatchdog();

setProofEvents(
[fakeEvent(99n, ['0xh1'], '0xsig', '0xCopro1', '0x'), fakeEvent(99n, ['0xh1'], '0xsig', '0xCopro2', '0x')],
[],
);
setBlock(1);
await watchdog.flush();
expect((watchdog as any).pendingProofs.size).to.equal(1);

setProofEvents([], [fakeEvent(99n, ['0xh1'], ['0xsig1', '0xsig2'])]);
setBlock(2);
await watchdog.flush();

expect((watchdog as any).pendingProofs.size).to.equal(0);
expect((watchdog as any).resolvedProofCount).to.equal(1);
});
});

describe('happy path — matching submissions', function () {
it('should not throw when all coprocessors agree', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

setCiphertextEvents(
[
fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1'),
fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro2'),
],
[fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', ['0xCopro1', '0xCopro2'])],
);

setBlock(1);
await watchdog.flush();

expect(() => watchdog.checkHealth()).to.not.throw();
});
});

describe('polling guard', function () {
it('should prevent overlapping polls', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

let activePolls = 0;
let maxConcurrentPolls = 0;
const origGetBlock = (watchdog as any).provider.getBlockNumber;
(watchdog as any).provider.getBlockNumber = async () => {
activePolls++;
maxConcurrentPolls = Math.max(maxConcurrentPolls, activePolls);
// Simulate slow RPC.
await new Promise((r) => setTimeout(r, 50));
try {
return await origGetBlock();
} finally {
activePolls--;
}
};

setCiphertextEvents([], []);
setBlock(1);

// Launch two concurrent flushes.
await Promise.all([watchdog.flush(), watchdog.flush()]);

expect(maxConcurrentPolls).to.equal(1);
});

it('should not duplicate state when one sub-poll fails', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

(watchdog as any).inputVerification.queryFilter = async () => {
throw new Error('rpc broke');
};
setCiphertextEvents([fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1')], []);
setBlock(1);

await watchdog.flush();
expect((watchdog as any).pendingHandles.size).to.equal(0);

(watchdog as any).inputVerification.queryFilter = async (filter: string) => {
return filter === 'pf-sub-filter' ? [] : [];
};
await watchdog.flush();
expect((watchdog as any).pendingHandles.size).to.equal(1);
});
});

describe('summary', function () {
it('should report resolved and pending counts', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

// Resolve one handle.
setCiphertextEvents(
[fakeEvent('0xresolved', 1n, '0xd', '0xs', '0xC1'), fakeEvent('0xresolved', 1n, '0xd', '0xs', '0xC2')],
[fakeEvent('0xresolved')],
);
setBlock(1);
await watchdog.flush();

// Add one pending handle (no consensus).
setCiphertextEvents([fakeEvent('0xpending', 1n, '0xd', '0xs', '0xC1')], []);
setBlock(2);
await watchdog.flush();

const summary = watchdog.summary()!;
expect(summary).to.include('1 ciphertext(s), 0 proof(s), 0 divergence(s), 0 stalled pending item(s)');
expect(summary).to.include('1 ciphertext handle(s) never reached consensus');
expect(summary).to.include('0xpending');
});

it('should report clean summary when all resolved', async function () {
const { watchdog } = mockWatchdog();
(watchdog as any).resolvedHandleCount = 5;
(watchdog as any).resolvedProofCount = 3;

const summary = watchdog.summary()!;
expect(summary).to.include('5 ciphertext(s), 3 proof(s), 0 divergence(s), 0 stalled pending item(s)');
expect(summary).to.not.include('WARNING');
});
});

describe('mochaHooks — disabled when env vars not set', function () {
it('should not start watchdog without GATEWAY_RPC_URL', async function () {
const { mochaHooks } = require('./consensusWatchdog');
const origGw = process.env.GATEWAY_RPC_URL;
const origCt = process.env.CIPHERTEXT_COMMITS_ADDRESS;
try {
delete process.env.GATEWAY_RPC_URL;
delete process.env.CIPHERTEXT_COMMITS_ADDRESS;

// beforeAll should be a no-op.
await mochaHooks.beforeAll.call({});
// afterEach and afterAll should also be no-ops (watchdog is null).
await mochaHooks.afterEach.call({});
await mochaHooks.afterAll.call({});
} finally {
// Restore env vars even if assertions fail.
if (origGw !== undefined) process.env.GATEWAY_RPC_URL = origGw;
else delete process.env.GATEWAY_RPC_URL;
if (origCt !== undefined) process.env.CIPHERTEXT_COMMITS_ADDRESS = origCt;
else delete process.env.CIPHERTEXT_COMMITS_ADDRESS;
}
});
});

describe('optional proof monitoring', function () {
it('should keep ciphertext monitoring active without input verification address', async function () {
const { watchdog, setBlock, setCiphertextEvents } = mockWatchdog();

(watchdog as any).inputVerification = null;
setCiphertextEvents([fakeEvent('0xhandle1', 1n, '0xdigest', '0xsns', '0xCopro1')], []);
setBlock(1);

await watchdog.flush();

expect((watchdog as any).pendingHandles.size).to.equal(1);
expect((watchdog as any).pendingProofs.size).to.equal(0);
});
});
});
Loading
Loading