Skip to content

Commit 00dc520

Browse files
committed
fix: adjusted the new blocks from depth subscription mechanism
1 parent 13a31fb commit 00dc520

File tree

4 files changed

+172
-49
lines changed

4 files changed

+172
-49
lines changed

indexer/src/kadena-server/repository/application/block-repository.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export interface GetLatestBlocksParams {
2828

2929
export type BlockOutput = Omit<Block, 'parent' | 'events' | 'minerAccount' | 'transactions'> & {
3030
parentHash: string;
31+
blockId: number;
3132
};
3233

3334
export type FungibleChainAccountOutput = Omit<
@@ -65,6 +66,13 @@ export default interface BlockRepository {
6566
transactions: TransactionOutput[],
6667
): Promise<TransactionOutput[]>;
6768

69+
getLastBlocksWithDepth(
70+
chainIds: string[],
71+
minimumDepth: number,
72+
startingTimestamp: number,
73+
id?: string,
74+
): Promise<BlockOutput[]>;
75+
6876
// dataloader
6977
getBlocksByEventIds(eventIds: string[]): Promise<BlockOutput[]>;
7078
getBlocksByTransactionIds(transactionIds: string[]): Promise<BlockOutput[]>;

indexer/src/kadena-server/repository/infra/repository/block-db-repository.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,4 +648,126 @@ export default class BlockDbRepository implements BlockRepository {
648648

649649
return output;
650650
}
651+
652+
async getLastBlocksWithDepth(
653+
chainIdsParam: string[],
654+
minimumDepth: number,
655+
startingTimestamp: number,
656+
id?: string,
657+
): Promise<BlockOutput[]> {
658+
const chainIds = chainIdsParam.length ? chainIdsParam.map(Number) : await this.getChainIds();
659+
const maxHeights = await this.getMaxHeights();
660+
661+
const queryParams: any[] = [];
662+
const conditions: string[] = [];
663+
664+
chainIds.forEach(chainId => {
665+
const maxHeight = maxHeights[chainId];
666+
if (maxHeight !== undefined) {
667+
conditions.push(
668+
`("chainId" = $${queryParams.length + 1} AND height <= $${queryParams.length + 2})`,
669+
);
670+
queryParams.push(chainId, maxHeight - minimumDepth);
671+
}
672+
});
673+
674+
queryParams.push(startingTimestamp.toFixed());
675+
676+
let query = `
677+
SELECT *
678+
FROM "Blocks"
679+
WHERE ${conditions.join(' OR ')}
680+
AND "creationTime" > $${queryParams.length}
681+
`;
682+
683+
if (id) {
684+
queryParams.push(id);
685+
query += `
686+
AND id > $${queryParams.length}
687+
ORDER BY id DESC
688+
LIMIT 100
689+
`;
690+
} else {
691+
query += `
692+
ORDER BY id DESC
693+
LIMIT 5
694+
`;
695+
}
696+
697+
const { rows: blockRows } = await rootPgPool.query(query, queryParams);
698+
699+
const blocksToReturn = blockRows.map(row => blockValidator.validate(row));
700+
const blockHashToDepth = await this.createBlockDepthMap(blocksToReturn, 'hash', minimumDepth);
701+
702+
return blocksToReturn.filter(block => blockHashToDepth[block.hash] >= minimumDepth);
703+
}
704+
705+
async getConfirmationDepth(blockHash: string, minimumDepth: number): Promise<number> {
706+
const query = `
707+
WITH RECURSIVE BlockDescendants AS (
708+
SELECT hash, parent, 0 AS depth, height, "chainId"
709+
FROM "Blocks"
710+
WHERE hash = $1
711+
UNION ALL
712+
SELECT b.hash, b.parent, d.depth + 1 AS depth, b.height, b."chainId"
713+
FROM BlockDescendants d
714+
JOIN "Blocks" b ON d.hash = b.parent AND b.height = d.height + 1 AND b."chainId" = d."chainId"
715+
WHERE d.depth <= $2
716+
)
717+
SELECT MAX(depth) AS depth
718+
FROM BlockDescendants;
719+
`;
720+
721+
const { rows } = await rootPgPool.query(query, [blockHash, minimumDepth]);
722+
723+
if (rows.length && rows[0].depth) {
724+
return Number(rows[0].depth);
725+
} else {
726+
return 0;
727+
}
728+
}
729+
730+
/**
731+
*
732+
* @param items - all the items to create a block depth map for
733+
* @param hashProp - the property of the item that contains the block hash
734+
* @returns a map of block hashes to their confirmation depths
735+
*/
736+
async createBlockDepthMap<T>(
737+
items: T[],
738+
hashProp: keyof T,
739+
minimumDepth: number,
740+
): Promise<Record<string, number>> {
741+
const uniqueBlockHashes = [...new Set(items.map(item => item[hashProp]))];
742+
743+
const confirmationDepths = await Promise.all(
744+
uniqueBlockHashes.map(blockHash =>
745+
this.getConfirmationDepth(blockHash as string, minimumDepth),
746+
),
747+
);
748+
749+
return uniqueBlockHashes.reduce((map: Record<string, number>, blockHash, index) => {
750+
map[blockHash as string] = confirmationDepths[index];
751+
return map;
752+
}, {});
753+
}
754+
755+
async getMaxHeights(): Promise<Record<string, number>> {
756+
const chainIds = await this.getChainIds();
757+
758+
const maxHeightsByChainIdPromises = chainIds.map(async chainId => {
759+
const query = `
760+
SELECT MAX(height) as max_height
761+
FROM "Blocks"
762+
WHERE "chainId" = $1
763+
`;
764+
765+
const { rows } = await rootPgPool.query(query, [chainId]);
766+
return { [chainId.toString()]: parseInt(rows[0].max_height, 10) };
767+
});
768+
769+
const maxHeightsArray = await Promise.all(maxHeightsByChainIdPromises);
770+
771+
return Object.assign({}, ...maxHeightsArray);
772+
}
651773
}

indexer/src/kadena-server/repository/infra/schema-validator/block-schema-validator.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ const validate = (row: any): BlockOutput => {
9393
chainId,
9494
hash,
9595
})),
96+
blockId: res.id,
9697
};
9798
};
9899

@@ -126,6 +127,7 @@ const mapFromSequelize = (blockModel: BlockAttributes): BlockOutput => {
126127
chainId,
127128
hash,
128129
})),
130+
blockId: blockModel.id,
129131
};
130132
};
131133

indexer/src/kadena-server/resolvers/subscription/new-blocks-from-depth-subscription-resolver.ts

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,47 @@
55
* which allows clients to receive real-time updates when blocks reach a specified minimum
66
* confirmation depth. This is particularly useful for applications that need to wait for
77
* a certain number of confirmations before considering a transaction finalized.
8-
*
9-
* Unlike the standard newBlocks subscription, this subscription uses a PubSub event
10-
* filtering mechanism to only notify clients when blocks meet their criteria.
118
*/
129

13-
import { withFilter } from 'graphql-subscriptions';
1410
import { ResolverContext } from '../../config/apollo-server-config';
15-
import {
16-
SubscriptionNewBlocksFromDepthArgs,
17-
SubscriptionResolvers,
18-
} from '../../config/graphql-types';
19-
import { NEW_BLOCKS_FROM_DEPTH_EVENT } from './consts';
20-
import zod from 'zod';
21-
import { blockQueryResolver } from '../query/block-query-resolver';
11+
import { SubscriptionResolvers } from '../../config/graphql-types';
12+
import { BlockOutput } from '../../repository/application/block-repository';
2213

23-
/**
24-
* Zod schema for validating incoming block notification events
25-
*
26-
* This schema ensures that the payload from the PubSub system contains
27-
* the required properties (chainId, height, hash) and that they are of
28-
* the correct types before processing the event.
29-
*/
30-
const newBlocksFromDepthSubscriptionSchema = zod.object({
31-
chainId: zod.string(),
32-
height: zod.number(),
33-
hash: zod.string(),
34-
});
14+
async function* iteratorFn(
15+
chainIds: string[],
16+
minimumDepth: number,
17+
context: ResolverContext,
18+
): AsyncGenerator<BlockOutput[], void, unknown> {
19+
const startingTimestamp = new Date().getTime() / 1000000;
20+
const blockResult = await context.blockRepository.getLastBlocksWithDepth(
21+
chainIds,
22+
minimumDepth,
23+
startingTimestamp,
24+
);
25+
26+
let lastBlockId: string | undefined;
27+
28+
if (blockResult.length > 0) {
29+
lastBlockId = blockResult[0].blockId.toString();
30+
yield [];
31+
}
32+
33+
while (context.signal) {
34+
const newBlocks = await context.blockRepository.getLastBlocksWithDepth(
35+
chainIds,
36+
minimumDepth,
37+
startingTimestamp,
38+
lastBlockId,
39+
);
40+
41+
if (newBlocks.length > 0) {
42+
lastBlockId = newBlocks[0].blockId.toString();
43+
yield newBlocks;
44+
}
45+
46+
await new Promise(resolve => setTimeout(resolve, 3000));
47+
}
48+
}
3549

3650
/**
3751
* GraphQL subscription resolver for the 'newBlocksFromDepth' subscription
@@ -47,34 +61,11 @@ const newBlocksFromDepthSubscriptionSchema = zod.object({
4761
* - Blocks on chains specified by the chainIds argument (or all chains if not specified)
4862
* - Blocks with a depth (confirmations) greater than or equal to minimumDepth
4963
*
50-
* This approach allows for a single PubSub event stream to serve multiple clients with
51-
* different filtering requirements.
5264
*/
5365
export const newBlocksFromDepthSubscriptionResolver: SubscriptionResolvers<ResolverContext>['newBlocksFromDepth'] =
5466
{
55-
resolve: async (payload: any, _args: any, context: ResolverContext) => {
56-
const res = await (blockQueryResolver as any)({}, { hash: payload.hash }, context);
57-
return [res];
58-
},
59-
subscribe: (_parent, args: SubscriptionNewBlocksFromDepthArgs, context) => {
60-
return {
61-
[Symbol.asyncIterator]: withFilter(
62-
() => context.pubSub.asyncIterator(NEW_BLOCKS_FROM_DEPTH_EVENT),
63-
payload => {
64-
const res = newBlocksFromDepthSubscriptionSchema.safeParse(payload);
65-
if (!res.success) {
66-
console.error(
67-
'[ERROR][API][BIZ_FLOW] Invalid payload on newBlocksFromDepthSubscription',
68-
payload,
69-
);
70-
return false;
71-
}
72-
const { chainId, height } = res.data;
73-
return (
74-
(!args.chainIds || args.chainIds.includes(chainId)) && height >= args.minimumDepth
75-
);
76-
},
77-
),
78-
};
67+
resolve: (payload: any) => payload,
68+
subscribe: (_root, args, context) => {
69+
return iteratorFn(args.chainIds ?? [], args.minimumDepth, context);
7970
},
8071
};

0 commit comments

Comments
 (0)