diff --git a/qa/tests/tests/e2e/shielded-transactions.test.ts b/qa/tests/tests/e2e/shielded-transactions.test.ts index 5471b6f3..e4dd1e48 100644 --- a/qa/tests/tests/e2e/shielded-transactions.test.ts +++ b/qa/tests/tests/e2e/shielded-transactions.test.ts @@ -21,8 +21,16 @@ import { ToolkitWrapper, type ToolkitTransactionResult } from '@utils/toolkit/to import type { Transaction } from '@utils/indexer/indexer-types'; import { getBlockByHashWithRetry, getTransactionByHashWithRetry } from './test-utils'; import { TestContext } from 'vitest'; +import { collectValidZswapEvents } from 'tests/shared/zswap-events-utils'; +import { RegularTransactionSchema, ZswapLedgerEventSchema } from '@utils/indexer/graphql/schema'; +import { IndexerWsClient } from '@utils/indexer/websocket-client'; +import { EventCoordinator } from '@utils/event-coordinator'; +import { collectValidDustLedgerEvents } from 'tests/shared/dust-ledger-utils'; describe('shielded transactions', () => { + let indexerWsClient: IndexerWsClient; + let indexerEventCoordinator: EventCoordinator; + let previousMaxZswapId: number; let toolkit: ToolkitWrapper; let transactionResult: ToolkitTransactionResult; @@ -33,6 +41,9 @@ describe('shielded transactions', () => { let destinationAddress: string; beforeAll(async () => { + indexerWsClient = new IndexerWsClient(); + indexerEventCoordinator = new EventCoordinator(); + await indexerWsClient.connectionInit(); // Start a one-off toolkit container toolkit = new ToolkitWrapper({}); @@ -41,6 +52,14 @@ describe('shielded transactions', () => { // Derive shielded addresses from seeds destinationAddress = (await toolkit.showAddress(destinationSeed)).shielded; + const beforeZswapEvents = await collectValidZswapEvents( + indexerWsClient, + indexerEventCoordinator, + 1, + ); + previousMaxZswapId = beforeZswapEvents[0].data!.zswapLedgerEvents.maxId; + log.debug(`Previous max zswap ledger ID before tx = ${previousMaxZswapId}`); + // Submit one shielded->shielded transfer (1 STAR) transactionResult = await toolkit.generateSingleTx( sourceSeed, @@ -59,7 +78,7 @@ describe('shielded transactions', () => { }, 200_000); afterAll(async () => { - await Promise.all([toolkit.stop()]); + await Promise.all([toolkit.stop(), indexerWsClient.connectionClose()]); }); describe('a successful shielded transaction transferring 1 Shielded Token between two wallets', async () => { @@ -115,13 +134,79 @@ describe('shielded transactions', () => { ); // The expected transaction might take a bit more to show up by indexer, so we retry a few times const transactionResponse = await getTransactionByHashWithRetry(transactionResult.txHash!); - expect(transactionResponse).toBeSuccess(); expect(transactionResponse?.data?.transactions).toBeDefined(); expect(transactionResponse?.data?.transactions?.length).toBeGreaterThan(0); - expect( - transactionResponse?.data?.transactions?.map((tx: Transaction) => `${tx.hash}`), - ).toContain(transactionResult.txHash); + + const tx = transactionResponse!.data!.transactions!.find( + (t: Transaction) => t.hash === transactionResult.txHash, + ); + + expect(tx).toBeDefined(); + + // Validate transaction shape and narrow type using schema + const parsed = RegularTransactionSchema.safeParse(tx); + expect(parsed.success, JSON.stringify(parsed.error?.format(), null, 2)).toBe(true); + + const regularTx = parsed.data!; + + // Shielded transactions do NOT expose unshielded details + expect(regularTx.unshieldedCreatedOutputs).toEqual([]); + expect(regularTx.unshieldedSpentOutputs).toEqual([]); + + // Shielded transactions do NOT expose fees + expect(regularTx.fees.paidFees).toBe('0'); + expect(regularTx.fees.estimatedFees).toBe('0'); + }); + + /** + * After a shielded transaction is confirmed, the indexer streams the Zswap + * events in sequence, followed by a DustSpendProcessed event. + * + * @given a confirmed shielded transaction + * @when we subscribe to Zswap events starting from (previousMaxId + 1) + * @then the Zswap events are delivered in order + * @and the following event is DustSpendProcessed + */ + test('should stream Zswap events followed by DustSpendProcessed after a shielded transaction', async () => { + const received = await collectValidZswapEvents( + indexerWsClient, + indexerEventCoordinator, + 3, + previousMaxZswapId + 1, + ); + expect(received).toHaveLength(3); + + received.forEach((msg) => { + const event = msg.data!.zswapLedgerEvents; + const parsed = ZswapLedgerEventSchema.safeParse(event); + expect( + parsed.success, + `Schema error: ${JSON.stringify(parsed.error?.format(), null, 2)}`, + ).toBe(true); + }); + + // Validate Zswap event grouping and ordering + const events = received.map((m) => m.data!.zswapLedgerEvents); + expect(new Set(events.map((e) => e.maxId)).size).toBe(1); + + events.slice(1).forEach((e, i) => { + expect(e.id).toBe(events[i].id + 1); + }); + + const lastZswapMaxId = received.at(-1)!.data!.zswapLedgerEvents.maxId; + + // verify the Dust event directly follows the Zswap events + const dustEvents = await collectValidDustLedgerEvents( + indexerWsClient, + indexerEventCoordinator, + 1, + lastZswapMaxId + 1, + ); + expect(dustEvents).toHaveLength(1); + const dust = dustEvents[0].data!.dustLedgerEvents; + expect(dust.__typename).toBe('DustSpendProcessed'); + expect(dust.id).toBe(lastZswapMaxId + 1); }); /** diff --git a/qa/tests/tests/e2e/unshielded-transactions.test.ts b/qa/tests/tests/e2e/unshielded-transactions.test.ts index ab8765e3..f48c7d2a 100644 --- a/qa/tests/tests/e2e/unshielded-transactions.test.ts +++ b/qa/tests/tests/e2e/unshielded-transactions.test.ts @@ -29,10 +29,43 @@ import { UnshieldedUtxo, } from '@utils/indexer/indexer-types'; import { IndexerWsClient, UnshieldedTxSubscriptionResponse } from '@utils/indexer/websocket-client'; -import { collectValidDustEvents } from 'tests/shared/dust-utils'; +import { collectValidDustLedgerEvents } from 'tests/shared/dust-ledger-utils'; import { EventCoordinator } from '@utils/event-coordinator'; import { DustLedgerEventsUnionSchema } from '@utils/indexer/graphql/schema'; +/** + * Helper function to find a progress update event with an incremented transaction ID. + * This is the logic used inside the retry function for both source and destination address tests. + * + * @param events - The events array to search + * @param baselineTransactionId - The transaction ID to compare against + * @param addressLabel - Label for error messages (e.g., 'source' or 'destination') + * @returns The found event + * @throws Error if no matching event is found + */ +function findProgressUpdateEvent( + events: UnshieldedTxSubscriptionResponse[], + baselineTransactionId: number, + addressLabel: string, +): UnshieldedTxSubscriptionResponse { + const event = events.find((event) => { + const txEvent = event.data?.unshieldedTransactions as UnshieldedTransactionEvent; + + log.debug(`waiting for UnshieldedTransactionsProgress event`); + if (txEvent.__typename === 'UnshieldedTransactionsProgress') { + const progressUpdate = txEvent; + log.debug(`progressUpdate received: ${JSON.stringify(progressUpdate, null, 2)}`); + if (progressUpdate.highestTransactionId > baselineTransactionId) { + return true; + } + } + }); + if (!event) { + throw new Error(`${addressLabel} address progress update event not found yet`); + } + return event; +} + describe('unshielded transactions', { timeout: 200_000 }, () => { let indexerWsClient: IndexerWsClient; let indexerHttpClient: IndexerHttpClient; @@ -74,7 +107,11 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { destinationAddress = walletFixture.destinations[0].destinationAddress; - const beforeEvents = await collectValidDustEvents(indexerWsClient, indexerEventCoordinator, 1); + const beforeEvents = await collectValidDustLedgerEvents( + indexerWsClient, + indexerEventCoordinator, + 1, + ); previousMaxDustId = beforeEvents[0].data!.dustLedgerEvents.maxId; log.debug(`Previous max dust ID before tx = ${previousMaxDustId}`); @@ -96,39 +133,6 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { await Promise.all([toolkit.stop(), indexerWsClient.connectionClose()]); }); - /** - * Helper function to find a progress update event with an incremented transaction ID. - * This is the logic used inside the retry function for both source and destination address tests. - * - * @param events - The events array to search - * @param baselineTransactionId - The transaction ID to compare against - * @param addressLabel - Label for error messages (e.g., 'source' or 'destination') - * @returns The found event - * @throws Error if no matching event is found - */ - function findProgressUpdateEvent( - events: UnshieldedTxSubscriptionResponse[], - baselineTransactionId: number, - addressLabel: string, - ): UnshieldedTxSubscriptionResponse { - const event = events.find((event) => { - const txEvent = event.data?.unshieldedTransactions as UnshieldedTransactionEvent; - - log.debug(`waiting for UnshieldedTransactionsProgress event`); - if (txEvent.__typename === 'UnshieldedTransactionsProgress') { - const progressUpdate = txEvent as UnshieldedTransactionsProgress; - log.debug(`progressUpdate received: ${JSON.stringify(progressUpdate, null, 2)}`); - if (progressUpdate.highestTransactionId > baselineTransactionId) { - return true; - } - } - }); - if (!event) { - throw new Error(`${addressLabel} address progress update event not found yet`); - } - return event; - } - describe('a successful unshielded transaction transferring 1 STAR between two addresses', async () => { /** * Once an unshielded transaction has been submitted to node and confirmed, the indexer should report @@ -150,7 +154,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { ); // The expected block might take a bit more to show up by indexer, so we retry a few times - const blockResponse = await getBlockByHashWithRetry(transactionResult.blockHash!); + const blockResponse = await getBlockByHashWithRetry(transactionResult.blockHash); // Verify the transaction appears in the block expect(blockResponse?.data?.block?.transactions).toBeDefined(); @@ -339,7 +343,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { ); // The expected block might take a bit more to show up by indexer, so we retry a few times - const blockResponse = await getBlockByHashWithRetry(transactionResult.blockHash!); + const blockResponse = await getBlockByHashWithRetry(transactionResult.blockHash); // Find the transaction with unshielded outputs const unshieldedTx = blockResponse.data?.block?.transactions?.find((tx: Transaction) => { @@ -390,12 +394,12 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { ); log.debug('Progress updates before transaction:'); - progressUpdatesBeforeTransaction!.forEach((update) => { + progressUpdatesBeforeTransaction.forEach((update) => { log.debug(`${JSON.stringify(update, null, 2)}`); }); const highestTransactionIdBeforeTransaction = ( - progressUpdatesBeforeTransaction![progressUpdatesBeforeTransaction!.length - 1].data + progressUpdatesBeforeTransaction.at(-1)?.data ?.unshieldedTransactions as UnshieldedTransactionsProgress ).highestTransactionId; log.info( @@ -407,7 +411,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { }); log.debug('Progress updates after transaction:'); - progressUpdatesAfterTransaction!.forEach((update) => { + progressUpdatesAfterTransaction.forEach((update) => { log.debug(`${JSON.stringify(update, null, 2)}`); }); @@ -454,12 +458,12 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { }); log.debug('Progress updates before transaction:'); - progressUpdatesBeforeTransaction!.forEach((update) => { + progressUpdatesBeforeTransaction.forEach((update) => { log.debug(`${JSON.stringify(update, null, 2)}`); }); const highestTransactionIdBeforeTransaction = ( - progressUpdatesBeforeTransaction![progressUpdatesBeforeTransaction!.length - 1].data + progressUpdatesBeforeTransaction.at(-1)?.data ?.unshieldedTransactions as UnshieldedTransactionsProgress ).highestTransactionId; log.info( @@ -473,7 +477,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { ); log.debug('Progress updates after transaction:'); - progressUpdatesAfterTransaction!.forEach((update) => { + progressUpdatesAfterTransaction.forEach((update) => { log.debug(`${JSON.stringify(update, null, 2)}`); }); @@ -513,7 +517,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { * DustGenerationDtimeUpdate, DustInitialUtxo, DustSpendProcessed */ test('should deliver dust events in correct sequence after unshielded transaction', async () => { - const received = await collectValidDustEvents( + const received = await collectValidDustLedgerEvents( indexerWsClient, indexerEventCoordinator, 3, @@ -572,7 +576,7 @@ describe('unshielded transactions', { timeout: 200_000 }, () => { break; } - // A single tx hash can have multiple records because failed txs don't reserve hashes. + // A single tx hash can have multiple records because failed txs don't reserve hashes. // This means the same hash might appear as: // - a failed attempt (no created/spent UTXOs), or // - a later successful attempt (with created/spent UTXOs). diff --git a/qa/tests/tests/integration/basic/queries/dust-queries.test.ts b/qa/tests/tests/integration/basic/queries/dust-queries.test.ts index 8f1e7cb0..ee146e18 100644 --- a/qa/tests/tests/integration/basic/queries/dust-queries.test.ts +++ b/qa/tests/tests/integration/basic/queries/dust-queries.test.ts @@ -586,11 +586,6 @@ describe('dust generation status queries', () => { testKey: 'PM-18911', }; - ctx.skip?.( - true, - 'Skipping this test for when this has been delivered by developers https://shielded.atlassian.net/browse/PM-20789', - ); - let rewardAddress: string; const connectedCardanoNetworkType = env.getCardanoNetworkType(); diff --git a/qa/tests/tests/integration/basic/subscriptions/block-subscriptions.test.ts b/qa/tests/tests/integration/basic/subscriptions/block-subscriptions.test.ts index 68f92164..e5307471 100644 --- a/qa/tests/tests/integration/basic/subscriptions/block-subscriptions.test.ts +++ b/qa/tests/tests/integration/basic/subscriptions/block-subscriptions.test.ts @@ -62,10 +62,6 @@ describe('block subscriptions', () => { if (receivedBlocks.length === expectedCount) { eventCoordinator.notify(eventName); log.debug(`${expectedCount} blocks received`); - indexerWsClient.send({ - id: '1', - type: 'complete', - }); } }, }; diff --git a/qa/tests/tests/integration/basic/subscriptions/dust-ledger-subscriptions.test.ts b/qa/tests/tests/integration/basic/subscriptions/dust-ledger-subscriptions.test.ts index 22be2348..4c91d0b8 100644 --- a/qa/tests/tests/integration/basic/subscriptions/dust-ledger-subscriptions.test.ts +++ b/qa/tests/tests/integration/basic/subscriptions/dust-ledger-subscriptions.test.ts @@ -15,7 +15,10 @@ import '@utils/logging/test-logging-hooks'; import { IndexerWsClient } from '@utils/indexer/websocket-client'; -import { collectValidDustEvents, collectDustEventError } from '../../../shared/dust-utils'; +import { + collectValidDustLedgerEvents, + collectDustLedgerEventError, +} from '../../../shared/dust-ledger-utils'; import { EventCoordinator } from '@utils/event-coordinator'; import { DustLedgerEventsUnionSchema } from '@utils/indexer/graphql/schema'; @@ -39,22 +42,25 @@ describe('dust ledger event subscriptions', () => { * Subscribing to DustLedger events without providing an offset should replay * historical events in the correct ledger order. * + * Note: + * - Event IDs are allocated from a single global ledger sequence shared across DustLedger and ZswapLedger + * - As a result, Dust ledger event IDs are not guaranteed to be contiguous. + * * @given no dust event offset parameters are provided * @when we subscribe to dust ledger events * @then events must be applied sequentially in order - * @and the subscription must maintain strict event ordering via monotonic IDs + * @and event IDs must be globally increasing and must not go backwards */ - test('streams events in strictly increasing order', async () => { - const received = await collectValidDustEvents(indexerWsClient, eventCoordinator, 3); + test('streams events in ledger order', async () => { + const received = await collectValidDustLedgerEvents(indexerWsClient, eventCoordinator, 3); expect(received.length === 3, `Expected 3 events, got: ${received.length}`).toBe(true); - const ids = received.map((e) => e.data!.dustLedgerEvents.id); - const isStrict = ids.every((id, i) => i === 0 || id > ids[i - 1]); - - expect(isStrict, `Dust event IDs must be strictly increasing, got: ${ids.join(', ')}`).toBe( - true, - ); + const inLedgerOrder = ids.every((id, i) => i === 0 || id > ids[i - 1]); + expect( + inLedgerOrder, + `Dust event IDs must be delivered in ledger order (IDs must not go backwards), got: ${ids.join(', ')}`, + ).toBe(true); }); }); @@ -66,27 +72,25 @@ describe('dust ledger event subscriptions', () => { * @given a dust event offset parameter is provided * @when we subscribe to dust ledger events with that offset * @then events must be applied sequentially in order - * @and the subscription must maintain strict event ordering via monotonic IDs + * @and event IDs must be globally increasing and must not go backwards */ - test('streams events starting from the specified ID', async () => { - const firstEvent = await collectValidDustEvents(indexerWsClient, eventCoordinator, 3); + test('streams events in ledger order starting from the specified ID', async () => { + const firstEvent = await collectValidDustLedgerEvents(indexerWsClient, eventCoordinator, 2); const latestId = firstEvent[0].data!.dustLedgerEvents.maxId; - - const startId = Math.max(latestId - 5, 0); - const received = await collectValidDustEvents(indexerWsClient, eventCoordinator, 3, startId); - expect(received.length).toBe(3); + const startId = Math.max(latestId - 1, 0); + const received = await collectValidDustLedgerEvents( + indexerWsClient, + eventCoordinator, + 2, + startId, + ); + expect(received.length === 2, `Expected 2 events, got: ${received.length}`).toBe(true); const ids = received.map((e) => e.data!.dustLedgerEvents.id); - - expect( - ids[0] >= startId, - `Expected first event ID >= startId (${startId}), got: ${ids[0]}`, - ).toBe(true); - const isStrictlyIncreasing = ids.every((id, i) => i === 0 || id > ids[i - 1]); - + const inLedgerOrder = ids.every((id, i) => i === 0 || id > ids[i - 1]); expect( - isStrictlyIncreasing, - `Dust event IDs must be strictly increasing, got: ${ids.join(', ')}`, + inLedgerOrder, + `Dust ledger event IDs must be in ledger order (IDs must not go backwards), got: ${ids.join(', ')}`, ).toBe(true); }); @@ -98,11 +102,16 @@ describe('dust ledger event subscriptions', () => { * @then each received event must match the DustLedgerEventsUnionSchema definition */ test('validates historical dust events against schema', async () => { - const firstEvent = await await collectValidDustEvents(indexerWsClient, eventCoordinator, 1); + const firstEvent = await collectValidDustLedgerEvents(indexerWsClient, eventCoordinator, 3); const latestId = firstEvent[0].data!.dustLedgerEvents.maxId; - const fromId = Math.max(latestId - 5, 0); - const received = await collectValidDustEvents(indexerWsClient, eventCoordinator, 5, fromId); + const fromId = Math.max(latestId - 3, 0); + const received = await collectValidDustLedgerEvents( + indexerWsClient, + eventCoordinator, + 3, + fromId, + ); received .filter((msg) => msg.data?.dustLedgerEvents) .forEach((msg) => { @@ -129,7 +138,7 @@ describe('dust ledger event subscriptions', () => { * @and no dust ledger events should be streamed */ test('should return an error for unknown field', async () => { - const errorMessage = await collectDustEventError(indexerWsClient, null, true); + const errorMessage = await collectDustLedgerEventError(indexerWsClient, null, true); expect(errorMessage).toBe(`Unknown field "unknownField" on type "DustLedgerEvent".`); }); @@ -141,8 +150,8 @@ describe('dust ledger event subscriptions', () => { * @then an error should be returned */ test('rejects negative offset ID with an error', async () => { - const errorMessage = await collectDustEventError(indexerWsClient, { id: -50 }); + const errorMessage = await collectDustLedgerEventError(indexerWsClient, { id: -50 }); expect(errorMessage).toBe(`Failed to parse "Int": Invalid number`); }); }); -}); \ No newline at end of file +}); diff --git a/qa/tests/tests/integration/basic/subscriptions/unshielded-transaction-subscriptions.test.ts b/qa/tests/tests/integration/basic/subscriptions/unshielded-transaction-subscriptions.test.ts index 8952d891..839d48aa 100644 --- a/qa/tests/tests/integration/basic/subscriptions/unshielded-transaction-subscriptions.test.ts +++ b/qa/tests/tests/integration/basic/subscriptions/unshielded-transaction-subscriptions.test.ts @@ -51,7 +51,7 @@ let indexerWsClient: IndexerWsClient; async function subscribeToUnshieldedTransactionEvents( subscriptionParams: UnshieldedTransactionSubscriptionParams, stopCondition: (message: UnshieldedTxSubscriptionResponse[]) => boolean, - timeout: number = 500, + timeout: number = 5000, ): Promise { const receivedUnshieldedTransactions: UnshieldedTxSubscriptionResponse[] = []; @@ -138,7 +138,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress }, (messages) => messages.length >= 10, - 500, ); // we want at least one message ... @@ -180,7 +179,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress }, () => false, - 3000, ); messages.forEach((message) => { @@ -236,7 +234,6 @@ describe('unshielded transaction subscriptions', async () => { const messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress }, (messages) => messages.length >= 5, - 500, ); expect(messages.length).toBeGreaterThanOrEqual(1); @@ -286,7 +283,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress }, (messages) => messages.length >= 10, - 5000, ); // We expect exactly one (non-error) message ... @@ -322,7 +318,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress }, (messages) => messages.length >= 10, - 500, ); expect(messages.length).toBe(1); @@ -362,7 +357,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress }, (messages) => messages.length >= 10, - 500, ); expect(messages.length).toBe(1); @@ -397,7 +391,6 @@ describe('unshielded transaction subscriptions', async () => { const messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress, transactionId: targetTransactionId }, (messages) => messages[0].errors !== undefined, - 2000, ); expect(messages.length).toBeGreaterThanOrEqual(2); @@ -452,7 +445,6 @@ describe('unshielded transaction subscriptions', async () => { const messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress, transactionId: targetTransactionId }, (messages) => messages.length >= 10, - 1000, ); expect(messages.length).toBe(1); @@ -479,7 +471,6 @@ describe('unshielded transaction subscriptions', async () => { const messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress, transactionId: targetTransactionId }, (messages) => messages.length >= 10, - 1000, ); // We expect exactly one (non-error) message ... @@ -511,7 +502,6 @@ describe('unshielded transaction subscriptions', async () => { let messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress }, (messages) => messages.length >= 10, - 500, ); let foundTransactionIds: number[] = []; @@ -546,7 +536,6 @@ describe('unshielded transaction subscriptions', async () => { messages = await subscribeToUnshieldedTransactionEvents( { address: targetAddress, transactionId: randomTransactionId }, (_messages: UnshieldedTxSubscriptionResponse[]) => false, - 1000, ); expect(messages.length).toBeGreaterThanOrEqual(1); @@ -579,7 +568,6 @@ describe('unshielded transaction subscriptions', async () => { const messages = await subscribeToUnshieldedTransactionEvents( { address: unshieldedAddress, transactionId: 0 }, (_messages: UnshieldedTxSubscriptionResponse[]) => false, - 1000, ); expect(Array.isArray(messages)).toBe(true); diff --git a/qa/tests/tests/integration/basic/subscriptions/zswap-events-subscriptions.test.ts b/qa/tests/tests/integration/basic/subscriptions/zswap-events-subscriptions.test.ts new file mode 100644 index 00000000..03db0699 --- /dev/null +++ b/qa/tests/tests/integration/basic/subscriptions/zswap-events-subscriptions.test.ts @@ -0,0 +1,142 @@ +// This file is part of midnightntwrk/midnight-indexer +// Copyright (C) 2025 Midnight Foundation +// SPDX-License-Identifier: Apache-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import '@utils/logging/test-logging-hooks'; +import { IndexerWsClient } from '@utils/indexer/websocket-client'; +import { EventCoordinator } from '@utils/event-coordinator'; +import { ZswapLedgerEventSchema } from '@utils/indexer/graphql/schema'; +import { + collectValidZswapEvents, + collectZswapEventError, +} from '../../../shared/zswap-events-utils'; + +describe('zswap ledger event subscriptions', () => { + let indexerWsClient: IndexerWsClient; + let eventCoordinator: EventCoordinator; + + beforeEach(async () => { + indexerWsClient = new IndexerWsClient(); + eventCoordinator = new EventCoordinator(); + await indexerWsClient.connectionInit(); + }); + + afterEach(async () => { + await indexerWsClient.connectionClose(); + eventCoordinator.clear(); + }); + + describe('a subscription to zswap ledger events without offset (default replay)', () => { + /** + * Subscribing to ZswapLedger events without providing an offset should replay + * historical events in the correct ledger order. + * + * Note: + * - Event IDs are allocated from a single global ledger sequence shared across DustLedger and ZswapLedger + * - As a result, Dust ledger event IDs are not guaranteed to be contiguous. + * + * @given no zswap event offset parameters are provided + * @when we subscribe to zswap ledger events + * @then events must be applied sequentially in order + * @and event IDs must be globally increasing and must not go backwards + */ + test('streams events in ledger order', async () => { + const received = await collectValidZswapEvents(indexerWsClient, eventCoordinator, 3); + expect(received.length === 3, `Expected 3 events, got: ${received.length}`).toBe(true); + const ids = received.map((e) => e.data!.zswapLedgerEvents.id); + const inLedgerOrder = ids.every((id, i) => i === 0 || id > ids[i - 1]); + expect( + inLedgerOrder, + `Zswap event IDs must be in ledger order (IDs must not go backwards), got: ${ids.join(', ')}`, + ).toBe(true); + }); + }); + + describe('subscription with explicit offset', () => { + /** + * Subscribing to ZswapLedger events with an explicit offset should replay + * historical events beginning from the provided event ID. + * + * @given a zswap ledger event offset parameter is provided + * @when we subscribe to zswap ledger events with that offset + * @then events must be applied sequentially in order + * @and event IDs must be globally increasing and must not go backwards + */ + test('streams events starting from the specified ID', async () => { + const firstEvent = await collectValidZswapEvents(indexerWsClient, eventCoordinator, 3); + const latestId = firstEvent[0].data!.zswapLedgerEvents.maxId; + const startId = Math.max(latestId - 3, 0); + const received = await collectValidZswapEvents(indexerWsClient, eventCoordinator, 3, startId); + expect(received.length === 3, `Expected 3 events, got: ${received.length}`).toBe(true); + + const ids = received.map((e) => e.data!.zswapLedgerEvents.id); + const inLedgerOrder = ids.every((id, i) => i === 0 || id > ids[i - 1]); + expect( + inLedgerOrder, + `Zswap event IDs must be in ledger order (IDs must not go backwards), got: ${ids.join(', ')}`, + ).toBe(true); + }); + + /** + * Validates that all replayed zswap ledger events conform to the expected schema. + * + * @given a zswap ledger subscription with an explicit offset ID + * @when historical zswap events are streamed starting from that offset + * @then each received event must match the ZswapLedgerEventsUnionSchema definition + */ + test('validates historical zswap events against schema', async () => { + const received = await collectValidZswapEvents(indexerWsClient, eventCoordinator, 3); + received + .filter((msg) => msg.data?.zswapLedgerEvents) + .forEach((msg) => { + expect.soft(msg).toBeSuccess(); + + const event = msg.data!.zswapLedgerEvents; + const parsed = ZswapLedgerEventSchema.safeParse(event); + expect( + parsed.success, + `Zswap ledger event schema validation failed:\n${JSON.stringify(parsed.error?.format(), null, 2)}`, + ).toBe(true); + }); + }); + }); + + describe('subscription error handling', () => { + /** + * Subscribing with a query that references a nonexistent field should return + * a GraphQL validation error instead of streaming zswap ledger events. + * + * @given a zswap ledger subscription whose selection set contains an unknown field + * @when the subscription request is sent to the indexer GraphQL endpoint + * @then the server must return a validation error indicating the field does not exist + * @and no zswap ledger events should be streamed + */ + test('should return an error for unknown field', async () => { + const errorMessage = await collectZswapEventError(indexerWsClient, null, true); + expect(errorMessage).toBe(`Unknown field "unknownField" on type "ZswapLedgerEvent".`); + }); + + /** + * Providing a negative offset should result in an error response instead of + * + * @given a zswap ledger subscription with an explicit offset parameter + * @when the offset value is negative + * @then an error should be returned + */ + test('rejects negative offset ID with an error', async () => { + const errorMessage = await collectZswapEventError(indexerWsClient, { id: -50 }); + expect(errorMessage).toBe(`Failed to parse "Int": Invalid number`); + }); + }); +}); diff --git a/qa/tests/tests/shared/dust-utils.ts b/qa/tests/tests/shared/dust-ledger-utils.ts similarity index 53% rename from qa/tests/tests/shared/dust-utils.ts rename to qa/tests/tests/shared/dust-ledger-utils.ts index bee21b11..44293211 100644 --- a/qa/tests/tests/shared/dust-utils.ts +++ b/qa/tests/tests/shared/dust-ledger-utils.ts @@ -4,12 +4,13 @@ import { IndexerWsClient, } from '@utils/indexer/websocket-client'; import { EventCoordinator } from '@utils/event-coordinator'; +import log from '@utils/logging/logger'; /** * Helper to subscribe to dust ledger events and collect a specific number of valid responses. * Supports optional ID-based historical replay via `fromId`. */ -export async function collectValidDustEvents( +export async function collectValidDustLedgerEvents( indexerWsClient: IndexerWsClient, eventCoordinator: EventCoordinator, expectedCount: number, @@ -18,24 +19,27 @@ export async function collectValidDustEvents( const received: DustLedgerEventSubscriptionResponse[] = []; const eventName = `${expectedCount} DustLedger Events`; - let unsubscribe: (() => void) | null = null; - let finished = false; + const handler = { + next: (payload: DustLedgerEventSubscriptionResponse) => { + if (received.length >= expectedCount) return; - const handler: SubscriptionHandlers = { - next: (payload) => { - if (finished) return; received.push(payload); - - if (received.length >= expectedCount) { - finished = true; - unsubscribe?.(); + log.debug( + `Received event ${received.length}/${expectedCount}:\n${JSON.stringify(payload, null, 2)}`, + ); + if (received.length == expectedCount) { eventCoordinator.notify(eventName); + log.debug(`${expectedCount} Dust Ledger events received`); } }, }; - const offset = fromId !== undefined ? { id: fromId } : undefined; - unsubscribe = indexerWsClient.subscribeToDustLedgerEvents(handler, offset); - await eventCoordinator.waitForAll([eventName], 10000); + + const offset = fromId ? { id: fromId } : undefined; + const maxTimeBetweenIds = fromId ? 4_000 : 10_000; + const subscription = indexerWsClient.subscribeToDustLedgerEvents(handler, offset); + + await eventCoordinator.waitForAll([eventName], maxTimeBetweenIds); + subscription.unsubscribe(); return received; } @@ -43,7 +47,7 @@ export async function collectValidDustEvents( * Helper to subscribe to dust ledger events and capture GraphQL error responses. * Used for testing invalid variables (e.g. negative offsets) or invalid fields. */ -export async function collectDustEventError( +export async function collectDustLedgerEventError( indexerWsClient: IndexerWsClient, variables: Record | null, unknownField: boolean = false, @@ -66,27 +70,45 @@ export async function collectDustEventError( `; const query = unknownField ? invalidFieldQuery : validQuery; - let unsubscribe: (() => void) | null = null; + + let resolved = false; + const handler: SubscriptionHandlers = { next: (payload) => { + if (resolved) return; if (typeof payload === 'object' && payload !== null && 'errors' in payload) { const p = payload as { errors: { message: string }[] }; + resolved = true; + subscription.unsubscribe(); + clearTimeout(timeout); resolve(p.errors[0].message); - unsubscribe?.(); } }, error: (err) => { + if (resolved) return; + resolved = true; + subscription.unsubscribe(); + clearTimeout(timeout); resolve(String(err)); - unsubscribe?.(); }, }; - unsubscribe = indexerWsClient.subscribeToDustLedgerEvents(handler, undefined, query); - if (variables) { - indexerWsClient.send({ - id: '0', - type: 'start', - payload: { query, variables }, - }); + + let offset: { id: number } | undefined; + if (variables?.id) { + offset = { id: variables.id as number }; } + + const subscription = indexerWsClient.subscribeToDustLedgerEvents( + handler as SubscriptionHandlers, + offset, + query, + ); + + const timeout = setTimeout(() => { + if (resolved) return; + resolved = true; + subscription.unsubscribe(); + resolve('Timeout: No error received'); + }, 3000); }); -} \ No newline at end of file +} diff --git a/qa/tests/tests/shared/zswap-events-utils.ts b/qa/tests/tests/shared/zswap-events-utils.ts new file mode 100644 index 00000000..8e66c4a0 --- /dev/null +++ b/qa/tests/tests/shared/zswap-events-utils.ts @@ -0,0 +1,105 @@ +import { + ZswapLedgerEventSubscriptionResponse, + IndexerWsClient, + SubscriptionHandlers, +} from '@utils/indexer/websocket-client'; +import { EventCoordinator } from '@utils/event-coordinator'; +import log from '@utils/logging/logger'; + +export async function collectValidZswapEvents( + indexerWsClient: IndexerWsClient, + eventCoordinator: EventCoordinator, + expectedCount: number, + fromId?: number, +): Promise { + const received: ZswapLedgerEventSubscriptionResponse[] = []; + const eventName = `${expectedCount} ZswapLedger Events`; + + const handler = { + next: (payload: ZswapLedgerEventSubscriptionResponse) => { + if (received.length >= expectedCount) return; + + received.push(payload); + log.debug( + `Received Zswap event ${received.length}/${expectedCount}:\n${JSON.stringify(payload, null, 2)}`, + ); + if (received.length == expectedCount) { + eventCoordinator.notify(eventName); + } + }, + }; + + const offset = fromId ? { id: fromId } : undefined; + const maxTimeBetweenIds = fromId ? 4_000 : 10_000; + const subscription = indexerWsClient.subscribeToZswapLedgerEvents(handler, offset); + + await eventCoordinator.waitForAll([eventName], maxTimeBetweenIds); + subscription.unsubscribe(); + return received; +} + +export async function collectZswapEventError( + indexerWsClient: IndexerWsClient, + variables: Record | null, + unknownField: boolean = false, +): Promise { + return new Promise((resolve) => { + const validQuery = ` + subscription ZswapEvents($id: Int) { + zswapLedgerEvents(id: $id) { + id + } + } + `; + + const invalidFieldQuery = ` + subscription ZswapEvents { + zswapLedgerEvents { + unknownField + } + } + `; + + const query = unknownField ? invalidFieldQuery : validQuery; + + let resolved = false; + + const handler: SubscriptionHandlers = { + next: (payload) => { + if (resolved) return; + if (typeof payload === 'object' && payload !== null && 'errors' in payload) { + const p = payload as { errors: { message: string }[] }; + resolved = true; + subscription.unsubscribe(); + clearTimeout(timeout); + resolve(p.errors[0].message); + } + }, + error: (err) => { + if (resolved) return; + resolved = true; + subscription.unsubscribe(); + clearTimeout(timeout); + resolve(String(err)); + }, + }; + + let offset: { id: number } | undefined; + if (variables?.id) { + offset = { id: variables.id as number }; + } + + const subscription = indexerWsClient.subscribeToZswapLedgerEvents( + handler as SubscriptionHandlers, + offset, + query, + ); + + const timeout = setTimeout(() => { + if (resolved) return; + resolved = true; + subscription.unsubscribe(); + resolve('Timeout: No error received'); + }, 3000); + }); +} diff --git a/qa/tests/utils/indexer/graphql/subscriptions.ts b/qa/tests/utils/indexer/graphql/subscriptions.ts index 597b2636..7fcb2115 100644 --- a/qa/tests/utils/indexer/graphql/subscriptions.ts +++ b/qa/tests/utils/indexer/graphql/subscriptions.ts @@ -290,7 +290,7 @@ export const DUST_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT = ` } `; -export const DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_OFFSET = ` +export const DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID = ` subscription DustLedgerEvents($id: Int) { dustLedgerEvents(id: $id) { __typename @@ -304,4 +304,24 @@ export const DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_OFFSET = ` } } } -`; \ No newline at end of file +`; + +export const ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT = ` + subscription ZswapEvents { + zswapLedgerEvents { + id + raw + maxId + } + } +`; + +export const ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID = ` + subscription ZswapEvents($id: Int) { + zswapLedgerEvents(id: $id) { + id + raw + maxId + } + } +`; diff --git a/qa/tests/utils/indexer/websocket-client.ts b/qa/tests/utils/indexer/websocket-client.ts index 8b77a0e7..44241deb 100644 --- a/qa/tests/utils/indexer/websocket-client.ts +++ b/qa/tests/utils/indexer/websocket-client.ts @@ -21,6 +21,7 @@ import type { Block, BlockOffset, DustLedgerEvent, + ZswapLedgerEvent, ShieldedTransactionsEvent, UnshieldedTransactionEvent, ContractAction, @@ -35,7 +36,9 @@ import { CONTRACT_ACTIONS_SUBSCRIPTION_FROM_LATEST_BLOCK, CONTRACT_ACTIONS_SUBSCRIPTION_FROM_BLOCK_BY_OFFSET, DUST_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT, - DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_OFFSET, + DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID, + ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT, + ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID, } from './graphql/subscriptions'; export type BlockSubscriptionResponse = GraphQLResponse<{ blocks: Block }>; @@ -56,6 +59,10 @@ export type DustLedgerEventSubscriptionResponse = GraphQLResponse<{ dustLedgerEvents: DustLedgerEvent; }>; +export type ZswapLedgerEventSubscriptionResponse = GraphQLResponse<{ + zswapLedgerEvents: ZswapLedgerEvent; +}>; + /** * Handlers used to respond to incoming GraphQL subscription messages. */ @@ -743,7 +750,7 @@ export class IndexerWsClient { /** * Subscribes to dust ledger events. - * + * * This method starts a GraphQL subscription that streams DustLedgerEvent updates from the indexer: * * - Without an offset: streams all new dust events from the latest position. @@ -763,21 +770,83 @@ export class IndexerWsClient { handlers: SubscriptionHandlers, offset?: { id: number }, queryOverride?: string, - ): () => void { + ): { unsubscribe: () => void; id: string } { + const hasOffset = offset !== undefined && offset.id !== undefined; + + let query = queryOverride; + if (!query) { + query = hasOffset + ? DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID + : DUST_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT; + } + const variables = hasOffset ? { id: offset.id } : undefined; + const subscriptionId = this.getNextId(); - const isOffset = !!offset; + const payload: GraphQLStartMessage = { + id: subscriptionId, + type: 'start', + payload: { + query, + variables, + }, + }; - const query = - queryOverride ?? - (isOffset - ? DUST_LEDGER_EVENTS_SUBSCRIPTION_FROM_OFFSET - : DUST_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT); + log.debug(`Dust Ledger Events payload:\n${JSON.stringify(payload, null, 2)}`); - const variables = offset; + this.handlersMap.set(subscriptionId, handlers as SubscriptionHandlers); + this.ws.send(JSON.stringify(payload)); - log.debug(`Dust Ledger Events query:\n${query}`); - log.debug(`Dust Ledger Events variables:\n${JSON.stringify(variables, null, 2)}`); + return { + id: subscriptionId, + unsubscribe: () => { + const stopMessage: GraphQLStopMessage = { + id: subscriptionId, + type: 'stop', + }; + this.ws.send(JSON.stringify(stopMessage)); + this.handlersMap.delete(subscriptionId); + }, + }; + } + + /** + * Subscribes to zswap ledger events. + * + * This method starts a GraphQL subscription that streams ZswapLedgerEvent updates from the indexer: + * + * - Without an offset: streams historical zswap events from the beginning. + * - With an offset (id): streams zswap events starting from that event ID. + * + * The correct GraphQL query is selected automatically unless a custom + * queryOverride is provided. All incoming messages are routed to the given + * handlers, and the returned function can be used to unsubscribe. + * + * Note: Unlike dust ledger events, zswap event IDs may not be strictly sequential + * and events may be delivered in different orders depending on the offset. + * + * @param handlers - Callback functions for handling incoming zswap events + * @param offset - Optional object containing an event ID to start from + * @param queryOverride - Optional custom GraphQL subscription query + * + * @returns An object with subscription ID and unsubscribe function + */ + subscribeToZswapLedgerEvents( + handlers: SubscriptionHandlers, + offset?: { id: number }, + queryOverride?: string, + ): { unsubscribe: () => void; id: string } { + const hasOffset = offset !== undefined && offset.id !== undefined; + + let query = queryOverride; + if (!query) { + query = hasOffset + ? ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_FROM_ID + : ZSWAP_LEDGER_EVENTS_SUBSCRIPTION_DEFAULT; + } + const variables = hasOffset ? { id: offset.id } : undefined; + + const subscriptionId = this.getNextId(); const payload: GraphQLStartMessage = { id: subscriptionId, @@ -788,18 +857,21 @@ export class IndexerWsClient { }, }; - log.debug(`Dust Ledger Events payload:\n${JSON.stringify(payload, null, 2)}`); + log.debug(`Zswap Ledger Events payload:\n${JSON.stringify(payload, null, 2)}`); this.handlersMap.set(subscriptionId, handlers as SubscriptionHandlers); this.ws.send(JSON.stringify(payload)); - return () => { - const stopMessage: GraphQLStopMessage = { - id: subscriptionId, - type: 'stop', - }; - this.ws.send(JSON.stringify(stopMessage)); - this.handlersMap.delete(subscriptionId); + return { + id: subscriptionId, + unsubscribe: () => { + const stopMessage: GraphQLStopMessage = { + id: subscriptionId, + type: 'stop', + }; + this.ws.send(JSON.stringify(stopMessage)); + this.handlersMap.delete(subscriptionId); + }, }; } -} \ No newline at end of file +}